From 8df259eff94f15bf6a04aa416799b996e16561c6 Mon Sep 17 00:00:00 2001 From: bashrc Date: Wed, 8 Apr 2026 19:56:24 +0100 Subject: [PATCH] Revert "Inbox refactoring" This reverts commit fff2b1e9b741782625ff1fe4755a693d94e40494. --- inbox.py | 674 ++++++++++++++++++++++++++----------------------------- 1 file changed, 319 insertions(+), 355 deletions(-) diff --git a/inbox.py b/inbox.py index 2141ff6cc..2dd30de72 100644 --- a/inbox.py +++ b/inbox.py @@ -3358,312 +3358,6 @@ def _receive_follow_request(session, session_onion, session_i2p, system_language, mitm_servers) -def _inbox_second_stage(curr_post_json: {}, - server, - recent_posts_cache: {}, max_recent_posts: int, - project_version: str, - base_dir: str, http_prefix: str, - send_threads: [], post_log: [], - cached_webfingers: {}, person_cache: {}, - domain: str, - onion_domain: str, i2p_domain: str, - yggdrasil_domain: str, - port: int, - federation_list: [], max_replies: int, - allow_deletion: bool, debug: bool, max_mentions: int, - max_emoji: int, translate: {}, unit_test: bool, - yt_replace_domain: str, - twitter_replacement_domain: str, - show_published_date_only: bool, - max_followers: int, - allow_local_network_access: bool, - peertube_instances: [], - theme_name: str, system_language: str, - max_like_count: int, signing_priv_key_pem: str, - default_reply_interval_hrs: int, - cw_lists: {}, max_hashtags: int, - key_id: str, - curr_session, - session, - session_onion, - session_i2p, - session_yggdrasil, - last_quote_request: int, - inbox_start_time, - curr_destination: str, - curr_nickname: str, - inbox_handle: str, - dogwhistles: {}, - last_bounce_message: [], - mitm: bool, - queue_filename: str): - """Second stage of the inbox queue - """ - # set the id to the same as the post filename - # This makes the filename and the id consistent - # if curr_post_json.get('id'): - # curr_post_json['id'] = queue_json['id'] - - if receive_undo(base_dir, curr_post_json, - debug, domain, onion_domain, i2p_domain, - yggdrasil_domain): - print('Queue: Undo accepted from ' + key_id) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_undo', - debug) - inbox_start_time = time.time() - return inbox_start_time, last_quote_request - - if debug: - print('DEBUG: checking for follow requests') - if _receive_follow_request(curr_session, session_onion, - session_i2p, session_yggdrasil, - base_dir, http_prefix, port, - send_threads, post_log, - cached_webfingers, - person_cache, - curr_post_json, - federation_list, - debug, project_version, - max_followers, domain, - onion_domain, i2p_domain, - yggdrasil_domain, - signing_priv_key_pem, unit_test, - system_language, - server.followers_sync_cache, - server.sites_unavailable, - server.mitm_servers): - print('Queue: Follow activity for ' + key_id + - ' removed from queue') - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_follow_request', - debug) - inbox_start_time = time.time() - return inbox_start_time, last_quote_request - - if debug: - print('DEBUG: No follow requests') - - if receive_accept_reject(base_dir, domain, curr_post_json, - federation_list, debug, - domain, onion_domain, i2p_domain, - yggdrasil_domain): - print('Queue: Accept/Reject received from ' + key_id) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'receive_accept_reject', - debug) - inbox_start_time = time.time() - return inbox_start_time, last_quote_request - - if receive_quote_request(curr_post_json, - federation_list, - debug, domain, - session, session_onion, session_i2p, - session_yggdrasil, - base_dir, - http_prefix, - send_threads, post_log, - cached_webfingers, - person_cache, project_version, - signing_priv_key_pem, - onion_domain, - i2p_domain, yggdrasil_domain, {}, - server.sites_unavailable, - system_language, - server.mitm_servers, - last_quote_request): - last_quote_request = int(time.time()) - print('Queue: QuoteRequest received from ' + key_id) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'receive_quote_request', - debug) - inbox_start_time = time.time() - return inbox_start_time, last_quote_request - - if receive_move_activity(curr_session, base_dir, - http_prefix, domain, port, - cached_webfingers, - person_cache, - curr_post_json, - curr_nickname, - debug, - signing_priv_key_pem, - send_threads, - post_log, - federation_list, - onion_domain, - i2p_domain, - yggdrasil_domain, - server.sites_unavailable, - server.blocked_cache, - server.block_federated, - server.system_language, - server.mitm_servers): - if debug: - print('Queue: _receive_move_activity ' + key_id) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_move_activity', - debug) - inbox_start_time = time.time() - return inbox_start_time, last_quote_request - - if receive_update_activity(recent_posts_cache, curr_session, - base_dir, http_prefix, - domain, port, - cached_webfingers, - person_cache, - curr_post_json, - curr_nickname, - debug, - max_mentions, max_emoji, - allow_local_network_access, - system_language, - signing_priv_key_pem, - max_recent_posts, translate, - allow_deletion, - yt_replace_domain, - twitter_replacement_domain, - show_published_date_only, - peertube_instances, - theme_name, max_like_count, - cw_lists, dogwhistles, - server.min_images_for_accounts, - max_hashtags, server.buy_sites, - server.auto_cw_cache, - onion_domain, - i2p_domain, yggdrasil_domain, - server.mitm_servers, - server.instance_software, - server.block_military, - server.block_government, - server.block_bluesky, - server.block_nostr): - if debug: - print('Queue: Update accepted from ' + key_id) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_update_activity', - debug) - inbox_start_time = time.time() - return inbox_start_time, last_quote_request - - # get recipients list - recipients_dict, recipients_dict_followers = \ - _inbox_post_recipients(base_dir, curr_post_json, - domain, port, debug, - onion_domain, i2p_domain, - yggdrasil_domain) - if len(recipients_dict.items()) == 0 and \ - len(recipients_dict_followers.items()) == 0: - if debug: - print('Queue: no recipients were resolved ' + - 'for post arriving in inbox') - return inbox_start_time, last_quote_request - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_post_recipients', - debug) - inbox_start_time = time.time() - - # if there are only a small number of followers then - # process them as if they were specifically - # addresses to particular accounts - no_of_follow_items = len(recipients_dict_followers.items()) - if no_of_follow_items > 0: - # always deliver to individual inboxes - if no_of_follow_items < 999999: - if debug: - print('DEBUG: moving ' + str(no_of_follow_items) + - ' inbox posts addressed to followers') - for handle, post_item in recipients_dict_followers.items(): - recipients_dict[handle] = post_item - recipients_dict_followers = {} -# recipients_list = [recipients_dict, recipients_dict_followers] - - if debug: - print('*************************************') - print('Resolved recipients list:') - pprint(recipients_dict) - print('Resolved followers list:') - pprint(recipients_dict_followers) - print('*************************************') - - # Copy any posts addressed to followers into the shared inbox - # this avoid copying file multiple times to potentially many - # individual inboxes - if len(recipients_dict_followers.items()) > 0: - shared_inbox_post_filename = \ - curr_destination.replace(inbox_handle, inbox_handle) - if not os.path.isfile(shared_inbox_post_filename): - save_json(curr_post_json, shared_inbox_post_filename) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'shared_inbox_save', - debug) - inbox_start_time = time.time() - - lists_enabled = get_config_param(base_dir, "listsEnabled") - dm_license_url = '' - - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'distribute_post', - debug) - inbox_start_time = time.time() - - # for posts addressed to specific accounts - for handle, _ in recipients_dict.items(): - destination = \ - curr_destination.replace(inbox_handle, handle) - languages_understood: list[str] = [] - bold_reading = False - bold_reading_filename = \ - acct_handle_dir(base_dir, handle) + '/.boldReading' - if os.path.isfile(bold_reading_filename): - bold_reading = True - _inbox_after_initial(server, inbox_start_time, - recent_posts_cache, - max_recent_posts, - session, session_onion, session_i2p, - session_yggdrasil, - key_id, handle, - curr_post_json, - base_dir, http_prefix, - send_threads, post_log, - cached_webfingers, - person_cache, domain, - onion_domain, i2p_domain, - yggdrasil_domain, - port, federation_list, - debug, - queue_filename, destination, - max_replies, allow_deletion, - max_mentions, max_emoji, - translate, unit_test, - yt_replace_domain, - twitter_replacement_domain, - show_published_date_only, - allow_local_network_access, - peertube_instances, - last_bounce_message, - theme_name, system_language, - max_like_count, - signing_priv_key_pem, - default_reply_interval_hrs, - cw_lists, lists_enabled, - dm_license_url, - languages_understood, mitm, - bold_reading, dogwhistles, - max_hashtags, server.buy_sites, - server.sites_unavailable, - server.mitm_servers, - server.instance_software) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'handle_after_initial', - debug) - inbox_start_time = time.time() - if debug: - pprint(curr_post_json) - print('Queue: Queue post accepted') - return inbox_start_time, last_quote_request - - def run_inbox_queue(server, recent_posts_cache: {}, max_recent_posts: int, project_version: str, @@ -4112,62 +3806,332 @@ def run_inbox_queue(server, dogwhistles_filename = base_dir + '/default_dogwhistles.txt' dogwhistles = load_dogwhistles(dogwhistles_filename) - curr_post_json = queue_json['post'] - curr_destination = queue_json['destination'] - curr_nickname = queue_json['postNickname'] - mitm = False - if queue_json.get('mitm'): - mitm = True + # set the id to the same as the post filename + # This makes the filename and the id consistent + # if queue_json['post'].get('id'): + # queue_json['post']['id'] = queue_json['id'] - # TODO split up collections + if receive_undo(base_dir, queue_json['post'], + debug, domain, onion_domain, i2p_domain, + yggdrasil_domain): + print('Queue: Undo accepted from ' + key_id) + if os.path.isfile(queue_filename): + try: + os.remove(queue_filename) + except OSError: + print('EX: run_inbox_queue 5 unable to delete ' + + str(queue_filename)) + if queue: + queue.pop(0) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_undo', + debug) + inbox_start_time = time.time() + continue - inbox_start_time, last_quote_request = \ - _inbox_second_stage(curr_post_json, - server, - recent_posts_cache, max_recent_posts, - project_version, - base_dir, http_prefix, - send_threads, post_log, - cached_webfingers, person_cache, - domain, - onion_domain, i2p_domain, - yggdrasil_domain, - port, - federation_list, max_replies, - allow_deletion, debug, max_mentions, - max_emoji, translate, unit_test, - yt_replace_domain, - twitter_replacement_domain, - show_published_date_only, - max_followers, - allow_local_network_access, - peertube_instances, - theme_name, system_language, - max_like_count, signing_priv_key_pem, - default_reply_interval_hrs, - cw_lists, max_hashtags, - key_id, - curr_session, - session, - session_onion, - session_i2p, - session_yggdrasil, - last_quote_request, - inbox_start_time, - curr_destination, - curr_nickname, - inbox_handle, - dogwhistles, - last_bounce_message, - mitm, - queue_filename) + if debug: + print('DEBUG: checking for follow requests') + if _receive_follow_request(curr_session, session_onion, + session_i2p, session_yggdrasil, + base_dir, http_prefix, port, + send_threads, post_log, + cached_webfingers, + person_cache, + queue_json['post'], + federation_list, + debug, project_version, + max_followers, domain, + onion_domain, i2p_domain, + yggdrasil_domain, + signing_priv_key_pem, unit_test, + system_language, + server.followers_sync_cache, + server.sites_unavailable, + server.mitm_servers): + if os.path.isfile(queue_filename): + try: + os.remove(queue_filename) + except OSError: + print('EX: run_inbox_queue 6 unable to delete ' + + str(queue_filename)) + if queue: + queue.pop(0) + print('Queue: Follow activity for ' + key_id + + ' removed from queue') + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_follow_request', + debug) + inbox_start_time = time.time() + continue - # remove item from the queue + if debug: + print('DEBUG: No follow requests') + + if receive_accept_reject(base_dir, domain, queue_json['post'], + federation_list, debug, + domain, onion_domain, i2p_domain, + yggdrasil_domain): + print('Queue: Accept/Reject received from ' + key_id) + if os.path.isfile(queue_filename): + try: + os.remove(queue_filename) + except OSError: + print('EX: run_inbox_queue 7 unable to delete ' + + str(queue_filename)) + if queue: + queue.pop(0) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'receive_accept_reject', + debug) + inbox_start_time = time.time() + continue + + if receive_quote_request(queue_json['post'], + federation_list, + debug, domain, + session, session_onion, session_i2p, + session_yggdrasil, + base_dir, + http_prefix, + send_threads, post_log, + cached_webfingers, + person_cache, project_version, + signing_priv_key_pem, + onion_domain, + i2p_domain, yggdrasil_domain, {}, + server.sites_unavailable, + system_language, + server.mitm_servers, + last_quote_request): + last_quote_request = int(time.time()) + print('Queue: QuoteRequest received from ' + key_id) + if os.path.isfile(queue_filename): + try: + os.remove(queue_filename) + except OSError: + print('EX: run_inbox_queue 7 unable to delete ' + + str(queue_filename)) + if queue: + queue.pop(0) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'receive_quote_request', + debug) + inbox_start_time = time.time() + continue + + if receive_move_activity(curr_session, base_dir, + http_prefix, domain, port, + cached_webfingers, + person_cache, + queue_json['post'], + queue_json['postNickname'], + debug, + signing_priv_key_pem, + send_threads, + post_log, + federation_list, + onion_domain, + i2p_domain, + yggdrasil_domain, + server.sites_unavailable, + server.blocked_cache, + server.block_federated, + server.system_language, + server.mitm_servers): + if debug: + print('Queue: _receive_move_activity ' + key_id) + if os.path.isfile(queue_filename): + try: + os.remove(queue_filename) + except OSError: + print('EX: run_inbox_queue 8 unable to receive move ' + + str(queue_filename)) + if queue: + queue.pop(0) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_move_activity', + debug) + inbox_start_time = time.time() + continue + + if receive_update_activity(recent_posts_cache, curr_session, + base_dir, http_prefix, + domain, port, + cached_webfingers, + person_cache, + queue_json['post'], + queue_json['postNickname'], + debug, + max_mentions, max_emoji, + allow_local_network_access, + system_language, + signing_priv_key_pem, + max_recent_posts, translate, + allow_deletion, + yt_replace_domain, + twitter_replacement_domain, + show_published_date_only, + peertube_instances, + theme_name, max_like_count, + cw_lists, dogwhistles, + server.min_images_for_accounts, + max_hashtags, server.buy_sites, + server.auto_cw_cache, + onion_domain, + i2p_domain, yggdrasil_domain, + server.mitm_servers, + server.instance_software, + server.block_military, + server.block_government, + server.block_bluesky, + server.block_nostr): + if debug: + print('Queue: Update accepted from ' + key_id) + if os.path.isfile(queue_filename): + try: + os.remove(queue_filename) + except OSError: + print('EX: run_inbox_queue 8 unable to delete ' + + str(queue_filename)) + if queue: + queue.pop(0) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_update_activity', + debug) + inbox_start_time = time.time() + continue + + # get recipients list + recipients_dict, recipients_dict_followers = \ + _inbox_post_recipients(base_dir, queue_json['post'], + domain, port, debug, + onion_domain, i2p_domain, + yggdrasil_domain) + if len(recipients_dict.items()) == 0 and \ + len(recipients_dict_followers.items()) == 0: + if debug: + print('Queue: no recipients were resolved ' + + 'for post arriving in inbox') + if os.path.isfile(queue_filename): + try: + os.remove(queue_filename) + except OSError: + print('EX: run_inbox_queue 9 unable to delete ' + + str(queue_filename)) + if queue: + queue.pop(0) + continue + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_post_recipients', + debug) + inbox_start_time = time.time() + + # if there are only a small number of followers then + # process them as if they were specifically + # addresses to particular accounts + no_of_follow_items = len(recipients_dict_followers.items()) + if no_of_follow_items > 0: + # always deliver to individual inboxes + if no_of_follow_items < 999999: + if debug: + print('DEBUG: moving ' + str(no_of_follow_items) + + ' inbox posts addressed to followers') + for handle, post_item in recipients_dict_followers.items(): + recipients_dict[handle] = post_item + recipients_dict_followers = {} +# recipients_list = [recipients_dict, recipients_dict_followers] + + if debug: + print('*************************************') + print('Resolved recipients list:') + pprint(recipients_dict) + print('Resolved followers list:') + pprint(recipients_dict_followers) + print('*************************************') + + # Copy any posts addressed to followers into the shared inbox + # this avoid copying file multiple times to potentially many + # individual inboxes + if len(recipients_dict_followers.items()) > 0: + shared_inbox_post_filename = \ + queue_json['destination'].replace(inbox_handle, inbox_handle) + if not os.path.isfile(shared_inbox_post_filename): + save_json(queue_json['post'], shared_inbox_post_filename) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'shared_inbox_save', + debug) + inbox_start_time = time.time() + + lists_enabled = get_config_param(base_dir, "listsEnabled") + dm_license_url = '' + + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'distribute_post', + debug) + inbox_start_time = time.time() + + # for posts addressed to specific accounts + for handle, _ in recipients_dict.items(): + destination = \ + queue_json['destination'].replace(inbox_handle, handle) + languages_understood: list[str] = [] + mitm = False + if queue_json.get('mitm'): + mitm = True + bold_reading = False + bold_reading_filename = \ + acct_handle_dir(base_dir, handle) + '/.boldReading' + if os.path.isfile(bold_reading_filename): + bold_reading = True + _inbox_after_initial(server, inbox_start_time, + recent_posts_cache, + max_recent_posts, + session, session_onion, session_i2p, + session_yggdrasil, + key_id, handle, + queue_json['post'], + base_dir, http_prefix, + send_threads, post_log, + cached_webfingers, + person_cache, domain, + onion_domain, i2p_domain, + yggdrasil_domain, + port, federation_list, + debug, + queue_filename, destination, + max_replies, allow_deletion, + max_mentions, max_emoji, + translate, unit_test, + yt_replace_domain, + twitter_replacement_domain, + show_published_date_only, + allow_local_network_access, + peertube_instances, + last_bounce_message, + theme_name, system_language, + max_like_count, + signing_priv_key_pem, + default_reply_interval_hrs, + cw_lists, lists_enabled, + dm_license_url, + languages_understood, mitm, + bold_reading, dogwhistles, + max_hashtags, server.buy_sites, + server.sites_unavailable, + server.mitm_servers, + server.instance_software) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'handle_after_initial', + debug) + inbox_start_time = time.time() + if debug: + pprint(queue_json['post']) + print('Queue: Queue post accepted') if os.path.isfile(queue_filename): try: os.remove(queue_filename) except OSError: - print('EX: run_inbox_queue unable to delete ' + + print('EX: run_inbox_queue 10 unable to delete ' + str(queue_filename)) if queue: queue.pop(0)