Handle inbox posts containing collections of subposts

main
bashrc 2026-04-08 22:24:47 +01:00
parent 83e7397c3a
commit 101ea22e89
1 changed files with 296 additions and 309 deletions

605
inbox.py
View File

@ -3358,6 +3358,34 @@ def _receive_follow_request(session, session_onion, session_i2p,
system_language, mitm_servers)
def _split_post_collection(collection_post_json: {}) -> []:
"""Splits a collection post up into separate posts
https://codeberg.org/fediverse/fep/src/branch/main/fep/1a11/fep-1a11.md
"""
if not has_object_dict(collection_post_json):
return [collection_post_json]
if not collection_post_json['object'].get('type'):
return [collection_post_json]
if not isinstance(collection_post_json['object']['type'], str):
return [collection_post_json]
if collection_post_json['object']['type'] != 'OrderedCollection':
return [collection_post_json]
if not collection_post_json['object'].get('orderedItems'):
return [collection_post_json]
if not isinstance(collection_post_json['object']['orderedItems'], list):
return [collection_post_json]
posts_list = []
for post_dict in collection_post_json['object']['orderedItems']:
if not isinstance(post_dict, dict):
continue
post_json = collection_post_json.copy()
post_json['object'] = post_dict
posts_list.append(post_json)
if not posts_list:
return [collection_post_json]
return posts_list
def run_inbox_queue(server,
recent_posts_cache: {}, max_recent_posts: int,
project_version: str,
@ -3811,331 +3839,290 @@ def run_inbox_queue(server,
# if queue_json['post'].get('id'):
# queue_json['post']['id'] = queue_json['id']
curr_post_json = queue_json['post']
curr_nickname = queue_json['postNickname']
curr_destination = queue_json['destination']
if receive_undo(base_dir, curr_post_json,
debug, domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Undo accepted from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 5 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_undo',
debug)
inbox_start_time = time.time()
continue
# if the post contains a collection of posts then split it up
remove_queue_item = False
posts_list_json = _split_post_collection(queue_json['post'])
for curr_post_json in posts_list_json:
if debug:
print('DEBUG: checking for follow requests')
if _receive_follow_request(curr_session, session_onion,
session_i2p, session_yggdrasil,
base_dir, http_prefix, port,
send_threads, post_log,
cached_webfingers,
person_cache,
curr_post_json,
federation_list,
debug, project_version,
max_followers, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
signing_priv_key_pem, unit_test,
system_language,
server.followers_sync_cache,
server.sites_unavailable,
server.mitm_servers):
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 6 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
print('Queue: Follow activity for ' + key_id +
' removed from queue')
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_follow_request',
debug)
inbox_start_time = time.time()
continue
if receive_undo(base_dir, curr_post_json,
debug, domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Undo accepted from ' + key_id)
remove_queue_item = True
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_undo',
debug)
inbox_start_time = time.time()
continue
if debug:
print('DEBUG: No follow requests')
if receive_accept_reject(base_dir, domain, curr_post_json,
federation_list, debug,
domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Accept/Reject received from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 7 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_accept_reject',
debug)
inbox_start_time = time.time()
continue
if receive_quote_request(curr_post_json,
federation_list,
debug, domain,
session, session_onion, session_i2p,
session_yggdrasil,
base_dir,
http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, project_version,
signing_priv_key_pem,
onion_domain,
i2p_domain, yggdrasil_domain, {},
server.sites_unavailable,
system_language,
server.mitm_servers,
last_quote_request):
last_quote_request = int(time.time())
print('Queue: QuoteRequest received from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 7 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_quote_request',
debug)
inbox_start_time = time.time()
continue
if receive_move_activity(curr_session, base_dir,
http_prefix, domain, port,
cached_webfingers,
person_cache,
curr_post_json,
curr_nickname,
debug,
signing_priv_key_pem,
send_threads,
post_log,
federation_list,
onion_domain,
i2p_domain,
yggdrasil_domain,
server.sites_unavailable,
server.blocked_cache,
server.block_federated,
server.system_language,
server.mitm_servers):
if debug:
print('Queue: _receive_move_activity ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 8 unable to receive move ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_move_activity',
debug)
inbox_start_time = time.time()
continue
print('DEBUG: checking for follow requests')
if _receive_follow_request(curr_session, session_onion,
session_i2p, session_yggdrasil,
base_dir, http_prefix, port,
send_threads, post_log,
cached_webfingers,
person_cache,
curr_post_json,
federation_list,
debug, project_version,
max_followers, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
signing_priv_key_pem, unit_test,
system_language,
server.followers_sync_cache,
server.sites_unavailable,
server.mitm_servers):
remove_queue_item = True
print('Queue: Follow activity for ' + key_id +
' removed from queue')
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_follow_request',
debug)
inbox_start_time = time.time()
continue
if receive_update_activity(recent_posts_cache, curr_session,
base_dir, http_prefix,
domain, port,
cached_webfingers,
person_cache,
curr_post_json,
curr_nickname,
debug,
max_mentions, max_emoji,
allow_local_network_access,
system_language,
signing_priv_key_pem,
max_recent_posts, translate,
allow_deletion,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
theme_name, max_like_count,
cw_lists, dogwhistles,
server.min_images_for_accounts,
max_hashtags, server.buy_sites,
server.auto_cw_cache,
onion_domain,
i2p_domain, yggdrasil_domain,
server.mitm_servers,
server.instance_software,
server.block_military,
server.block_government,
server.block_bluesky,
server.block_nostr):
if debug:
print('Queue: Update accepted from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 8 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_update_activity',
debug)
inbox_start_time = time.time()
continue
print('DEBUG: No follow requests')
# get recipients list
recipients_dict, recipients_dict_followers = \
_inbox_post_recipients(base_dir, curr_post_json,
domain, port, debug,
onion_domain, i2p_domain,
yggdrasil_domain)
if len(recipients_dict.items()) == 0 and \
len(recipients_dict_followers.items()) == 0:
if debug:
print('Queue: no recipients were resolved ' +
'for post arriving in inbox')
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 9 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
continue
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_post_recipients',
debug)
inbox_start_time = time.time()
if receive_accept_reject(base_dir, domain, curr_post_json,
federation_list, debug,
domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Accept/Reject received from ' + key_id)
remove_queue_item = True
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_accept_reject',
debug)
inbox_start_time = time.time()
continue
# if there are only a small number of followers then
# process them as if they were specifically
# addresses to particular accounts
no_of_follow_items = len(recipients_dict_followers.items())
if no_of_follow_items > 0:
# always deliver to individual inboxes
if no_of_follow_items < 999999:
if receive_quote_request(curr_post_json,
federation_list,
debug, domain,
session, session_onion, session_i2p,
session_yggdrasil,
base_dir,
http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, project_version,
signing_priv_key_pem,
onion_domain,
i2p_domain, yggdrasil_domain, {},
server.sites_unavailable,
system_language,
server.mitm_servers,
last_quote_request):
last_quote_request = int(time.time())
print('Queue: QuoteRequest received from ' + key_id)
remove_queue_item = True
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_quote_request',
debug)
inbox_start_time = time.time()
continue
if receive_move_activity(curr_session, base_dir,
http_prefix, domain, port,
cached_webfingers,
person_cache,
curr_post_json,
curr_nickname,
debug,
signing_priv_key_pem,
send_threads,
post_log,
federation_list,
onion_domain,
i2p_domain,
yggdrasil_domain,
server.sites_unavailable,
server.blocked_cache,
server.block_federated,
server.system_language,
server.mitm_servers):
if debug:
print('DEBUG: moving ' + str(no_of_follow_items) +
' inbox posts addressed to followers')
for handle, post_item in recipients_dict_followers.items():
recipients_dict[handle] = post_item
recipients_dict_followers = {}
# recipients_list = [recipients_dict, recipients_dict_followers]
print('Queue: _receive_move_activity ' + key_id)
remove_queue_item = True
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_move_activity',
debug)
inbox_start_time = time.time()
continue
if debug:
print('*************************************')
print('Resolved recipients list:')
pprint(recipients_dict)
print('Resolved followers list:')
pprint(recipients_dict_followers)
print('*************************************')
if receive_update_activity(recent_posts_cache, curr_session,
base_dir, http_prefix,
domain, port,
cached_webfingers,
person_cache,
curr_post_json,
curr_nickname,
debug,
max_mentions, max_emoji,
allow_local_network_access,
system_language,
signing_priv_key_pem,
max_recent_posts, translate,
allow_deletion,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
theme_name, max_like_count,
cw_lists, dogwhistles,
server.min_images_for_accounts,
max_hashtags, server.buy_sites,
server.auto_cw_cache,
onion_domain,
i2p_domain, yggdrasil_domain,
server.mitm_servers,
server.instance_software,
server.block_military,
server.block_government,
server.block_bluesky,
server.block_nostr):
if debug:
print('Queue: Update accepted from ' + key_id)
remove_queue_item = True
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_update_activity',
debug)
inbox_start_time = time.time()
continue
# Copy any posts addressed to followers into the shared inbox
# this avoid copying file multiple times to potentially many
# individual inboxes
if len(recipients_dict_followers.items()) > 0:
shared_inbox_post_filename = \
curr_destination.replace(inbox_handle, inbox_handle)
if not os.path.isfile(shared_inbox_post_filename):
save_json(curr_post_json, shared_inbox_post_filename)
# get recipients list
recipients_dict, recipients_dict_followers = \
_inbox_post_recipients(base_dir, curr_post_json,
domain, port, debug,
onion_domain, i2p_domain,
yggdrasil_domain)
if len(recipients_dict.items()) == 0 and \
len(recipients_dict_followers.items()) == 0:
if debug:
print('Queue: no recipients were resolved ' +
'for post arriving in inbox')
remove_queue_item = True
continue
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'shared_inbox_save',
'INBOX', '_post_recipients',
debug)
inbox_start_time = time.time()
lists_enabled = get_config_param(base_dir, "listsEnabled")
dm_license_url = ''
# if there are only a small number of followers then
# process them as if they were specifically
# addresses to particular accounts
no_of_follow_items = len(recipients_dict_followers.items())
if no_of_follow_items > 0:
# always deliver to individual inboxes
if no_of_follow_items < 999999:
if debug:
print('DEBUG: moving ' + str(no_of_follow_items) +
' inbox posts addressed to followers')
for handle, post_item in recipients_dict_followers.items():
recipients_dict[handle] = post_item
recipients_dict_followers = {}
# recipients_list = [recipients_dict, recipients_dict_followers]
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'distribute_post',
debug)
inbox_start_time = time.time()
# for posts addressed to specific accounts
for handle, _ in recipients_dict.items():
destination = \
curr_destination.replace(inbox_handle, handle)
languages_understood: list[str] = []
mitm = False
if queue_json.get('mitm'):
mitm = True
bold_reading = False
bold_reading_filename = \
acct_handle_dir(base_dir, handle) + '/.boldReading'
if os.path.isfile(bold_reading_filename):
bold_reading = True
_inbox_after_initial(server, inbox_start_time,
recent_posts_cache,
max_recent_posts,
session, session_onion, session_i2p,
session_yggdrasil,
key_id, handle,
curr_post_json,
base_dir, http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
port, federation_list,
debug,
queue_filename, destination,
max_replies, allow_deletion,
max_mentions, max_emoji,
translate, unit_test,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
allow_local_network_access,
peertube_instances,
last_bounce_message,
theme_name, system_language,
max_like_count,
signing_priv_key_pem,
default_reply_interval_hrs,
cw_lists, lists_enabled,
dm_license_url,
languages_understood, mitm,
bold_reading, dogwhistles,
max_hashtags, server.buy_sites,
server.sites_unavailable,
server.mitm_servers,
server.instance_software)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'handle_after_initial',
debug)
inbox_start_time = time.time()
if debug:
pprint(curr_post_json)
print('Queue: Queue post accepted')
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 10 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
print('*************************************')
print('Resolved recipients list:')
pprint(recipients_dict)
print('Resolved followers list:')
pprint(recipients_dict_followers)
print('*************************************')
# Copy any posts addressed to followers into the shared inbox
# this avoid copying file multiple times to potentially many
# individual inboxes
if len(recipients_dict_followers.items()) > 0:
shared_inbox_post_filename = \
curr_destination.replace(inbox_handle, inbox_handle)
if not os.path.isfile(shared_inbox_post_filename):
save_json(curr_post_json, shared_inbox_post_filename)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'shared_inbox_save',
debug)
inbox_start_time = time.time()
lists_enabled = get_config_param(base_dir, "listsEnabled")
dm_license_url = ''
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'distribute_post',
debug)
inbox_start_time = time.time()
# for posts addressed to specific accounts
for handle, _ in recipients_dict.items():
destination = \
curr_destination.replace(inbox_handle, handle)
languages_understood: list[str] = []
mitm = False
if queue_json.get('mitm'):
mitm = True
bold_reading = False
bold_reading_filename = \
acct_handle_dir(base_dir, handle) + '/.boldReading'
if os.path.isfile(bold_reading_filename):
bold_reading = True
_inbox_after_initial(server, inbox_start_time,
recent_posts_cache,
max_recent_posts,
session, session_onion, session_i2p,
session_yggdrasil,
key_id, handle,
curr_post_json,
base_dir, http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
port, federation_list,
debug,
queue_filename, destination,
max_replies, allow_deletion,
max_mentions, max_emoji,
translate, unit_test,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
allow_local_network_access,
peertube_instances,
last_bounce_message,
theme_name, system_language,
max_like_count,
signing_priv_key_pem,
default_reply_interval_hrs,
cw_lists, lists_enabled,
dm_license_url,
languages_understood, mitm,
bold_reading, dogwhistles,
max_hashtags, server.buy_sites,
server.sites_unavailable,
server.mitm_servers,
server.instance_software)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'handle_after_initial',
debug)
inbox_start_time = time.time()
if debug:
pprint(curr_post_json)
print('Queue: Queue post accepted')
remove_queue_item = True
# should the current queue item be removed?
if remove_queue_item:
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 10 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)