Followers synchronization header

main
Bob Mottram 2023-03-17 21:13:43 +00:00
parent 90a31c45ee
commit 9443212820
9 changed files with 199 additions and 142 deletions

View File

@ -203,6 +203,7 @@ def create_announce(session, base_dir: str, federation_list: [],
group_account = True
if announce_nickname and announce_domain:
extra_headers = {}
send_signed_json(new_announce, session, base_dir,
nickname, domain, port,
announce_nickname, announce_domain,
@ -212,7 +213,8 @@ def create_announce(session, base_dir: str, federation_list: [],
person_cache,
debug, project_version, None, group_account,
signing_priv_key_pem, 639633,
curr_domain, onion_domain, i2p_domain)
curr_domain, onion_domain, i2p_domain,
extra_headers)
return new_announce

View File

@ -119,7 +119,7 @@ from inbox import run_inbox_queue_watchdog
from inbox import save_post_to_inbox_queue
from inbox import populate_replies
from inbox import receive_edit_to_post
from follow import update_followers_sync_cache
from followerSync import update_followers_sync_cache
from follow import follower_approval_active
from follow import is_following_actor
from follow import get_following_feed

136
follow.py
View File

@ -7,9 +7,8 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "ActivityPub"
from pprint import pprint
import os
from hashlib import sha256
from pprint import pprint
from utils import get_user_paths
from utils import acct_handle_dir
from utils import has_object_string_object
@ -789,6 +788,7 @@ def followed_account_accepts(session, base_dir: str, http_prefix: str,
if has_group_type(base_dir, follow_json['actor'], person_cache):
group_account = True
extra_headers = {}
return send_signed_json(accept_json, session, base_dir,
nickname_to_follow, domain_to_follow, port,
nickname, domain, from_port,
@ -797,7 +797,8 @@ def followed_account_accepts(session, base_dir: str, http_prefix: str,
send_threads, post_log, cached_webfingers,
person_cache, debug, project_version, None,
group_account, signing_priv_key_pem,
7856837, curr_domain, onion_domain, i2p_domain)
7856837, curr_domain, onion_domain, i2p_domain,
extra_headers)
def followed_account_rejects(session, session_onion, session_i2p,
@ -863,6 +864,7 @@ def followed_account_rejects(session, session_onion, session_i2p,
curr_session = session_onion
elif domain.endswith('.i2p') and session_i2p:
curr_session = session_i2p
extra_headers = {}
# send the reject activity
return send_signed_json(reject_json, curr_session, base_dir,
nickname_to_follow, domain_to_follow, port,
@ -873,7 +875,8 @@ def followed_account_rejects(session, session_onion, session_i2p,
person_cache, debug, project_version, None,
group_account, signing_priv_key_pem,
6393063,
domain, onion_domain, i2p_domain)
domain, onion_domain, i2p_domain,
extra_headers)
def send_follow_request(session, base_dir: str,
@ -962,7 +965,7 @@ def send_follow_request(session, base_dir: str,
_remove_from_follow_rejects(base_dir,
nickname, domain,
follow_handle, debug)
extra_headers = {}
send_signed_json(new_follow_json, session, base_dir,
nickname, sender_domain, sender_port,
follow_nickname, follow_domain, follow_port,
@ -971,7 +974,8 @@ def send_follow_request(session, base_dir: str,
send_threads, post_log, cached_webfingers, person_cache,
debug, project_version, None, group_account,
signing_priv_key_pem, 8234389,
curr_domain, onion_domain, i2p_domain)
curr_domain, onion_domain, i2p_domain,
extra_headers)
return new_follow_json
@ -1361,126 +1365,6 @@ def deny_follow_request_via_server(session,
return deny_html
def _get_followers_for_domain(base_dir: str,
nickname: str, domain: str,
search_domain: str) -> []:
"""Returns the followers for a given domain
this is used for followers synchronization
"""
followers_filename = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not os.path.isfile(followers_filename):
return []
lines = []
foll_text = ''
try:
with open(followers_filename, 'r', encoding='utf-8') as fp_foll:
foll_text = fp_foll.read()
except OSError:
print('EX: get_followers_for_domain unable to read followers ' +
followers_filename)
if search_domain not in foll_text:
return []
lines = foll_text.splitlines()
result = []
for line_str in lines:
if search_domain not in line_str:
continue
if line_str.endswith('@' + search_domain):
nick = line_str.split('@')[0]
paths_list = get_user_paths()
found = False
for prefix in ('https', 'http'):
if found:
break
for possible_path in paths_list:
url = prefix + '://' + search_domain + \
possible_path + nick
filename = base_dir + '/cache/actors/' + \
url.replace('/', '#') + '.json'
if not os.path.isfile(filename):
continue
if url not in result:
result.append(url)
found = True
break
elif '://' + search_domain in line_str:
result.append(line_str)
result.sort()
return result
def _get_followers_sync_json(base_dir: str,
nickname: str, domain: str,
http_prefix: str, domain_full: str,
search_domain: str) -> {}:
"""Returns a response for followers synchronization
See https://github.com/mastodon/mastodon/pull/14510
https://codeberg.org/fediverse/fep/src/branch/main/feps/fep-8fcf.md
"""
sync_list = \
_get_followers_for_domain(base_dir,
nickname, domain,
search_domain)
id_str = http_prefix + '://' + domain_full + \
'/users/' + nickname + '/followers?domain=' + search_domain
sync_json = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': id_str,
'orderedItems': sync_list,
'type': 'OrderedCollection'
}
return sync_json
def _get_followers_sync_hash(sync_json: {}) -> str:
"""Returns a hash used within the Collection-Synchronization http header
See https://github.com/mastodon/mastodon/pull/14510
https://codeberg.org/fediverse/fep/src/branch/main/feps/fep-8fcf.md
"""
if not sync_json:
return None
sync_hash = None
for actor in sync_json['orderedItems']:
actor_hash = sha256(actor.encode('utf-8'))
if sync_hash:
sync_hash = sync_hash ^ actor_hash
else:
sync_hash = actor_hash
if sync_hash:
sync_hash = sync_hash.hexdigest()
return sync_hash
def update_followers_sync_cache(base_dir: str,
nickname: str, domain: str,
http_prefix: str, domain_full: str,
calling_domain: str,
sync_cache: {}) -> ({}, str):
"""Updates the followers synchronization cache
See https://github.com/mastodon/mastodon/pull/14510
https://codeberg.org/fediverse/fep/src/branch/main/feps/fep-8fcf.md
"""
foll_sync_key = nickname + ':' + calling_domain
if sync_cache.get(foll_sync_key):
sync_hash = sync_cache[foll_sync_key]['hash']
sync_json = sync_cache[foll_sync_key]['response']
else:
sync_json = \
_get_followers_sync_json(base_dir,
nickname, domain,
http_prefix,
domain_full,
calling_domain)
sync_hash = _get_followers_sync_hash(sync_json)
if sync_hash:
sync_cache[foll_sync_key] = {
"hash": sync_hash,
"response": sync_json
}
return sync_json, sync_hash
def get_followers_of_actor(base_dir: str, actor: str, debug: bool) -> {}:
"""In a shared inbox if we receive a post we know who it's from
and if it's addressed to followers then we need to get a list of those.

133
followerSync.py 100644
View File

@ -0,0 +1,133 @@
__filename__ = "followerSync.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.4.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "ActivityPub"
import os
from hashlib import sha256
from utils import acct_dir
from utils import get_user_paths
def _get_followers_for_domain(base_dir: str,
nickname: str, domain: str,
search_domain: str) -> []:
"""Returns the followers for a given domain
this is used for followers synchronization
"""
followers_filename = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not os.path.isfile(followers_filename):
return []
lines = []
foll_text = ''
try:
with open(followers_filename, 'r', encoding='utf-8') as fp_foll:
foll_text = fp_foll.read()
except OSError:
print('EX: get_followers_for_domain unable to read followers ' +
followers_filename)
if search_domain not in foll_text:
return []
lines = foll_text.splitlines()
result = []
for line_str in lines:
if search_domain not in line_str:
continue
if line_str.endswith('@' + search_domain):
nick = line_str.split('@')[0]
paths_list = get_user_paths()
found = False
for prefix in ('https', 'http'):
if found:
break
for possible_path in paths_list:
url = prefix + '://' + search_domain + \
possible_path + nick
filename = base_dir + '/cache/actors/' + \
url.replace('/', '#') + '.json'
if not os.path.isfile(filename):
continue
if url not in result:
result.append(url)
found = True
break
elif '://' + search_domain in line_str:
result.append(line_str)
result.sort()
return result
def _get_followers_sync_json(base_dir: str,
nickname: str, domain: str,
http_prefix: str, domain_full: str,
search_domain: str) -> {}:
"""Returns a response for followers synchronization
See https://github.com/mastodon/mastodon/pull/14510
https://codeberg.org/fediverse/fep/src/branch/main/feps/fep-8fcf.md
"""
sync_list = \
_get_followers_for_domain(base_dir,
nickname, domain,
search_domain)
id_str = http_prefix + '://' + domain_full + \
'/users/' + nickname + '/followers?domain=' + search_domain
sync_json = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': id_str,
'orderedItems': sync_list,
'type': 'OrderedCollection'
}
return sync_json
def _get_followers_sync_hash(sync_json: {}) -> str:
"""Returns a hash used within the Collection-Synchronization http header
See https://github.com/mastodon/mastodon/pull/14510
https://codeberg.org/fediverse/fep/src/branch/main/feps/fep-8fcf.md
"""
if not sync_json:
return None
sync_hash = None
for actor in sync_json['orderedItems']:
actor_hash = sha256(actor.encode('utf-8'))
if sync_hash:
sync_hash = sync_hash ^ actor_hash
else:
sync_hash = actor_hash
if sync_hash:
sync_hash = sync_hash.hexdigest()
return sync_hash
def update_followers_sync_cache(base_dir: str,
nickname: str, domain: str,
http_prefix: str, domain_full: str,
calling_domain: str,
sync_cache: {}) -> ({}, str):
"""Updates the followers synchronization cache
See https://github.com/mastodon/mastodon/pull/14510
https://codeberg.org/fediverse/fep/src/branch/main/feps/fep-8fcf.md
"""
foll_sync_key = nickname + ':' + calling_domain
if sync_cache.get(foll_sync_key):
sync_hash = sync_cache[foll_sync_key]['hash']
sync_json = sync_cache[foll_sync_key]['response']
else:
sync_json = \
_get_followers_sync_json(base_dir,
nickname, domain,
http_prefix,
domain_full,
calling_domain)
sync_hash = _get_followers_sync_hash(sync_json)
if sync_hash:
sync_cache[foll_sync_key] = {
"hash": sync_hash,
"response": sync_json
}
return sync_json, sync_hash

View File

@ -3831,6 +3831,7 @@ def _bounce_dm(sender_post_id: str, session, http_prefix: str,
if not post_json_object:
print('WARN: unable to create bounce message to ' + sending_handle)
return False
extra_headers = {}
# bounce DM goes back to the sender
print('Sending bounce DM to ' + sending_handle)
send_signed_json(post_json_object, session, base_dir,
@ -3840,7 +3841,8 @@ def _bounce_dm(sender_post_id: str, session, http_prefix: str,
send_threads, post_log, cached_webfingers,
person_cache, debug, __version__, None, group_account,
signing_priv_key_pem, 7238634,
curr_domain, onion_domain, i2p_domain)
curr_domain, onion_domain, i2p_domain,
extra_headers)
return True

View File

@ -136,7 +136,7 @@ def _create_like(recent_posts_cache: {},
base_dir, post_filename, object_url,
new_like_json['actor'],
nickname, domain, debug, None)
extra_headers = {}
send_signed_json(new_like_json, session, base_dir,
nickname, domain, port,
liked_post_nickname, liked_post_domain,
@ -146,7 +146,8 @@ def _create_like(recent_posts_cache: {},
person_cache,
debug, project_version, None, group_account,
signing_priv_key_pem, 7367374,
curr_domain, onion_domain, i2p_domain)
curr_domain, onion_domain, i2p_domain,
extra_headers)
return new_like_json

View File

@ -780,6 +780,7 @@ def post_message_to_outbox(session, translate: {},
shared_items_federated_domains,
shared_item_federation_tokens,
signing_priv_key_pem,
proxy_type)
proxy_type,
server.followers_sync_cache)
followers_threads.append(named_addresses_thread)
return True

View File

@ -104,6 +104,7 @@ from context import get_individual_post_context
from maps import geocoords_from_map_link
from keys import get_person_key
from markdown import markdown_to_html
from followerSync import update_followers_sync_cache
def convert_post_content_to_html(message_json: {}) -> None:
@ -2913,7 +2914,8 @@ def send_signed_json(post_json_object: {}, session, base_dir: str,
shared_items_token: str, group_account: bool,
signing_priv_key_pem: str,
source_id: int, curr_domain: str,
onion_domain: str, i2p_domain: str) -> int:
onion_domain: str, i2p_domain: str,
extra_headers: {}) -> int:
"""Sends a signed json object to an inbox/outbox
"""
if debug:
@ -3089,6 +3091,10 @@ def send_signed_json(post_json_object: {}, session, base_dir: str,
elif debug:
print('Not sending shared items federation token')
# add any extra headers
for header_title, header_text in extra_headers.items():
signature_header_json[header_title] = header_text
# Keep the number of threads being used small
while len(send_threads) > 1000:
print('WARN: Maximum threads reached - killing send thread')
@ -3213,7 +3219,8 @@ def _send_to_named_addresses(server, session, session_onion, session_i2p,
shared_items_federated_domains: [],
shared_item_federation_tokens: {},
signing_priv_key_pem: str,
proxy_type: str) -> None:
proxy_type: str,
followers_sync_cache: {}) -> None:
"""sends a post to the specific named addresses in to/cc
"""
if not session:
@ -3297,6 +3304,7 @@ def _send_to_named_addresses(server, session, session_onion, session_i2p,
if debug:
print('DEBUG: Sending individually addressed posts: ' +
str(recipients))
domain_full = get_full_domain(domain, port)
# randomize the recipients list order, so that we are not favoring
# any particular account in terms of delivery time
random.shuffle(recipients)
@ -3309,10 +3317,9 @@ def _send_to_named_addresses(server, session, session_onion, session_i2p,
to_domain, to_port = get_domain_from_actor(address)
if not to_domain:
continue
to_domain_full = get_full_domain(to_domain, to_port)
# Don't send profile/actor updates to yourself
if is_profile_update:
domain_full = get_full_domain(domain, port)
to_domain_full = get_full_domain(to_domain, to_port)
if nickname == to_nickname and \
domain_full == to_domain_full:
if debug:
@ -3352,6 +3359,25 @@ def _send_to_named_addresses(server, session, session_onion, session_i2p,
curr_proxy_type = 'i2p'
session_type = 'i2p'
extra_headers = {}
# followers synchronization header
# See https://github.com/mastodon/mastodon/pull/14510
# https://codeberg.org/fediverse/fep/src/branch/main/feps/fep-8fcf.md
sending_actor = \
from_http_prefix + '://' + from_domain_full + '/users/' + nickname
_, followers_sync_hash = \
update_followers_sync_cache(base_dir,
nickname, domain,
http_prefix, domain_full,
to_domain_full,
followers_sync_cache)
if followers_sync_hash:
collection_sync_str = \
'collectionId="' + sending_actor + '/followers", ' + \
'url="' + sending_actor + '/followers_synchronization", ' + \
'digest="' + followers_sync_hash + '"'
extra_headers["Collection-Synchronization"] = collection_sync_str
if debug:
to_domain_full = get_full_domain(to_domain, to_port)
print('DEBUG: Post sending s2s: ' +
@ -3388,7 +3414,8 @@ def _send_to_named_addresses(server, session, session_onion, session_i2p,
person_cache, debug, project_version,
shared_items_token, group_account,
signing_priv_key_pem, 34436782,
domain, onion_domain, i2p_domain)
domain, onion_domain, i2p_domain,
extra_headers)
def send_to_named_addresses_thread(server, session, session_onion, session_i2p,
@ -3403,7 +3430,8 @@ def send_to_named_addresses_thread(server, session, session_onion, session_i2p,
shared_items_federated_domains: [],
shared_item_federation_tokens: {},
signing_priv_key_pem: str,
proxy_type: str):
proxy_type: str,
followers_sync_cache: {}):
"""Returns a thread used to send a post to named addresses
"""
print('THREAD: _send_to_named_addresses')
@ -3420,7 +3448,8 @@ def send_to_named_addresses_thread(server, session, session_onion, session_i2p,
shared_items_federated_domains,
shared_item_federation_tokens,
signing_priv_key_pem,
proxy_type), daemon=True)
proxy_type,
followers_sync_cache), daemon=True)
if not begin_thread(send_thread, 'send_to_named_addresses_thread'):
print('WARN: socket error while starting ' +
'thread to send to named addresses.')
@ -3484,6 +3513,7 @@ def send_to_followers(server, session, session_onion, session_i2p,
return
print('Post is addressed to followers')
extra_headers = {}
grouped = group_followers_by_domain(base_dir, nickname, domain)
if not grouped:
if debug:
@ -3642,7 +3672,8 @@ def send_to_followers(server, session, session_onion, session_i2p,
person_cache, debug, project_version,
shared_items_token, group_account,
signing_priv_key_pem, 639342,
domain, onion_domain, i2p_domain)
domain, onion_domain, i2p_domain,
extra_headers)
else:
# randomize the order of handles, so that we are not
# favoring any particular account in terms of its delivery time
@ -3675,7 +3706,8 @@ def send_to_followers(server, session, session_onion, session_i2p,
person_cache, debug, project_version,
shared_items_token, group_account,
signing_priv_key_pem, 634219,
domain, onion_domain, i2p_domain)
domain, onion_domain, i2p_domain,
extra_headers)
time.sleep(4)

View File

@ -137,6 +137,7 @@ def _reactionpost(recent_posts_cache: {},
nickname, domain, debug, None,
emoji_content)
extra_headers = {}
send_signed_json(new_reaction_json, session, base_dir,
nickname, domain, port,
reaction_post_nickname,
@ -146,7 +147,8 @@ def _reactionpost(recent_posts_cache: {},
person_cache,
debug, project_version, None, group_account,
signing_priv_key_pem, 7165392,
curr_domain, onion_domain, i2p_domain)
curr_domain, onion_domain, i2p_domain,
extra_headers)
return new_reaction_json