diff --git a/inbox.py b/inbox.py index 2dd30de72..2141ff6cc 100644 --- a/inbox.py +++ b/inbox.py @@ -3358,6 +3358,312 @@ def _receive_follow_request(session, session_onion, session_i2p, system_language, mitm_servers) +def _inbox_second_stage(curr_post_json: {}, + server, + recent_posts_cache: {}, max_recent_posts: int, + project_version: str, + base_dir: str, http_prefix: str, + send_threads: [], post_log: [], + cached_webfingers: {}, person_cache: {}, + domain: str, + onion_domain: str, i2p_domain: str, + yggdrasil_domain: str, + port: int, + federation_list: [], max_replies: int, + allow_deletion: bool, debug: bool, max_mentions: int, + max_emoji: int, translate: {}, unit_test: bool, + yt_replace_domain: str, + twitter_replacement_domain: str, + show_published_date_only: bool, + max_followers: int, + allow_local_network_access: bool, + peertube_instances: [], + theme_name: str, system_language: str, + max_like_count: int, signing_priv_key_pem: str, + default_reply_interval_hrs: int, + cw_lists: {}, max_hashtags: int, + key_id: str, + curr_session, + session, + session_onion, + session_i2p, + session_yggdrasil, + last_quote_request: int, + inbox_start_time, + curr_destination: str, + curr_nickname: str, + inbox_handle: str, + dogwhistles: {}, + last_bounce_message: [], + mitm: bool, + queue_filename: str): + """Second stage of the inbox queue + """ + # set the id to the same as the post filename + # This makes the filename and the id consistent + # if curr_post_json.get('id'): + # curr_post_json['id'] = queue_json['id'] + + if receive_undo(base_dir, curr_post_json, + debug, domain, onion_domain, i2p_domain, + yggdrasil_domain): + print('Queue: Undo accepted from ' + key_id) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_undo', + debug) + inbox_start_time = time.time() + return inbox_start_time, last_quote_request + + if debug: + print('DEBUG: checking for follow requests') + if _receive_follow_request(curr_session, session_onion, + session_i2p, session_yggdrasil, + base_dir, http_prefix, port, + send_threads, post_log, + cached_webfingers, + person_cache, + curr_post_json, + federation_list, + debug, project_version, + max_followers, domain, + onion_domain, i2p_domain, + yggdrasil_domain, + signing_priv_key_pem, unit_test, + system_language, + server.followers_sync_cache, + server.sites_unavailable, + server.mitm_servers): + print('Queue: Follow activity for ' + key_id + + ' removed from queue') + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_follow_request', + debug) + inbox_start_time = time.time() + return inbox_start_time, last_quote_request + + if debug: + print('DEBUG: No follow requests') + + if receive_accept_reject(base_dir, domain, curr_post_json, + federation_list, debug, + domain, onion_domain, i2p_domain, + yggdrasil_domain): + print('Queue: Accept/Reject received from ' + key_id) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'receive_accept_reject', + debug) + inbox_start_time = time.time() + return inbox_start_time, last_quote_request + + if receive_quote_request(curr_post_json, + federation_list, + debug, domain, + session, session_onion, session_i2p, + session_yggdrasil, + base_dir, + http_prefix, + send_threads, post_log, + cached_webfingers, + person_cache, project_version, + signing_priv_key_pem, + onion_domain, + i2p_domain, yggdrasil_domain, {}, + server.sites_unavailable, + system_language, + server.mitm_servers, + last_quote_request): + last_quote_request = int(time.time()) + print('Queue: QuoteRequest received from ' + key_id) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'receive_quote_request', + debug) + inbox_start_time = time.time() + return inbox_start_time, last_quote_request + + if receive_move_activity(curr_session, base_dir, + http_prefix, domain, port, + cached_webfingers, + person_cache, + curr_post_json, + curr_nickname, + debug, + signing_priv_key_pem, + send_threads, + post_log, + federation_list, + onion_domain, + i2p_domain, + yggdrasil_domain, + server.sites_unavailable, + server.blocked_cache, + server.block_federated, + server.system_language, + server.mitm_servers): + if debug: + print('Queue: _receive_move_activity ' + key_id) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_move_activity', + debug) + inbox_start_time = time.time() + return inbox_start_time, last_quote_request + + if receive_update_activity(recent_posts_cache, curr_session, + base_dir, http_prefix, + domain, port, + cached_webfingers, + person_cache, + curr_post_json, + curr_nickname, + debug, + max_mentions, max_emoji, + allow_local_network_access, + system_language, + signing_priv_key_pem, + max_recent_posts, translate, + allow_deletion, + yt_replace_domain, + twitter_replacement_domain, + show_published_date_only, + peertube_instances, + theme_name, max_like_count, + cw_lists, dogwhistles, + server.min_images_for_accounts, + max_hashtags, server.buy_sites, + server.auto_cw_cache, + onion_domain, + i2p_domain, yggdrasil_domain, + server.mitm_servers, + server.instance_software, + server.block_military, + server.block_government, + server.block_bluesky, + server.block_nostr): + if debug: + print('Queue: Update accepted from ' + key_id) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_receive_update_activity', + debug) + inbox_start_time = time.time() + return inbox_start_time, last_quote_request + + # get recipients list + recipients_dict, recipients_dict_followers = \ + _inbox_post_recipients(base_dir, curr_post_json, + domain, port, debug, + onion_domain, i2p_domain, + yggdrasil_domain) + if len(recipients_dict.items()) == 0 and \ + len(recipients_dict_followers.items()) == 0: + if debug: + print('Queue: no recipients were resolved ' + + 'for post arriving in inbox') + return inbox_start_time, last_quote_request + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', '_post_recipients', + debug) + inbox_start_time = time.time() + + # if there are only a small number of followers then + # process them as if they were specifically + # addresses to particular accounts + no_of_follow_items = len(recipients_dict_followers.items()) + if no_of_follow_items > 0: + # always deliver to individual inboxes + if no_of_follow_items < 999999: + if debug: + print('DEBUG: moving ' + str(no_of_follow_items) + + ' inbox posts addressed to followers') + for handle, post_item in recipients_dict_followers.items(): + recipients_dict[handle] = post_item + recipients_dict_followers = {} +# recipients_list = [recipients_dict, recipients_dict_followers] + + if debug: + print('*************************************') + print('Resolved recipients list:') + pprint(recipients_dict) + print('Resolved followers list:') + pprint(recipients_dict_followers) + print('*************************************') + + # Copy any posts addressed to followers into the shared inbox + # this avoid copying file multiple times to potentially many + # individual inboxes + if len(recipients_dict_followers.items()) > 0: + shared_inbox_post_filename = \ + curr_destination.replace(inbox_handle, inbox_handle) + if not os.path.isfile(shared_inbox_post_filename): + save_json(curr_post_json, shared_inbox_post_filename) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'shared_inbox_save', + debug) + inbox_start_time = time.time() + + lists_enabled = get_config_param(base_dir, "listsEnabled") + dm_license_url = '' + + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'distribute_post', + debug) + inbox_start_time = time.time() + + # for posts addressed to specific accounts + for handle, _ in recipients_dict.items(): + destination = \ + curr_destination.replace(inbox_handle, handle) + languages_understood: list[str] = [] + bold_reading = False + bold_reading_filename = \ + acct_handle_dir(base_dir, handle) + '/.boldReading' + if os.path.isfile(bold_reading_filename): + bold_reading = True + _inbox_after_initial(server, inbox_start_time, + recent_posts_cache, + max_recent_posts, + session, session_onion, session_i2p, + session_yggdrasil, + key_id, handle, + curr_post_json, + base_dir, http_prefix, + send_threads, post_log, + cached_webfingers, + person_cache, domain, + onion_domain, i2p_domain, + yggdrasil_domain, + port, federation_list, + debug, + queue_filename, destination, + max_replies, allow_deletion, + max_mentions, max_emoji, + translate, unit_test, + yt_replace_domain, + twitter_replacement_domain, + show_published_date_only, + allow_local_network_access, + peertube_instances, + last_bounce_message, + theme_name, system_language, + max_like_count, + signing_priv_key_pem, + default_reply_interval_hrs, + cw_lists, lists_enabled, + dm_license_url, + languages_understood, mitm, + bold_reading, dogwhistles, + max_hashtags, server.buy_sites, + server.sites_unavailable, + server.mitm_servers, + server.instance_software) + fitness_performance(inbox_start_time, server.fitness, + 'INBOX', 'handle_after_initial', + debug) + inbox_start_time = time.time() + if debug: + pprint(curr_post_json) + print('Queue: Queue post accepted') + return inbox_start_time, last_quote_request + + def run_inbox_queue(server, recent_posts_cache: {}, max_recent_posts: int, project_version: str, @@ -3806,332 +4112,62 @@ def run_inbox_queue(server, dogwhistles_filename = base_dir + '/default_dogwhistles.txt' dogwhistles = load_dogwhistles(dogwhistles_filename) - # set the id to the same as the post filename - # This makes the filename and the id consistent - # if queue_json['post'].get('id'): - # queue_json['post']['id'] = queue_json['id'] + curr_post_json = queue_json['post'] + curr_destination = queue_json['destination'] + curr_nickname = queue_json['postNickname'] + mitm = False + if queue_json.get('mitm'): + mitm = True - if receive_undo(base_dir, queue_json['post'], - debug, domain, onion_domain, i2p_domain, - yggdrasil_domain): - print('Queue: Undo accepted from ' + key_id) - if os.path.isfile(queue_filename): - try: - os.remove(queue_filename) - except OSError: - print('EX: run_inbox_queue 5 unable to delete ' + - str(queue_filename)) - if queue: - queue.pop(0) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_undo', - debug) - inbox_start_time = time.time() - continue + # TODO split up collections - if debug: - print('DEBUG: checking for follow requests') - if _receive_follow_request(curr_session, session_onion, - session_i2p, session_yggdrasil, - base_dir, http_prefix, port, - send_threads, post_log, - cached_webfingers, - person_cache, - queue_json['post'], - federation_list, - debug, project_version, - max_followers, domain, - onion_domain, i2p_domain, - yggdrasil_domain, - signing_priv_key_pem, unit_test, - system_language, - server.followers_sync_cache, - server.sites_unavailable, - server.mitm_servers): - if os.path.isfile(queue_filename): - try: - os.remove(queue_filename) - except OSError: - print('EX: run_inbox_queue 6 unable to delete ' + - str(queue_filename)) - if queue: - queue.pop(0) - print('Queue: Follow activity for ' + key_id + - ' removed from queue') - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_follow_request', - debug) - inbox_start_time = time.time() - continue + inbox_start_time, last_quote_request = \ + _inbox_second_stage(curr_post_json, + server, + recent_posts_cache, max_recent_posts, + project_version, + base_dir, http_prefix, + send_threads, post_log, + cached_webfingers, person_cache, + domain, + onion_domain, i2p_domain, + yggdrasil_domain, + port, + federation_list, max_replies, + allow_deletion, debug, max_mentions, + max_emoji, translate, unit_test, + yt_replace_domain, + twitter_replacement_domain, + show_published_date_only, + max_followers, + allow_local_network_access, + peertube_instances, + theme_name, system_language, + max_like_count, signing_priv_key_pem, + default_reply_interval_hrs, + cw_lists, max_hashtags, + key_id, + curr_session, + session, + session_onion, + session_i2p, + session_yggdrasil, + last_quote_request, + inbox_start_time, + curr_destination, + curr_nickname, + inbox_handle, + dogwhistles, + last_bounce_message, + mitm, + queue_filename) - if debug: - print('DEBUG: No follow requests') - - if receive_accept_reject(base_dir, domain, queue_json['post'], - federation_list, debug, - domain, onion_domain, i2p_domain, - yggdrasil_domain): - print('Queue: Accept/Reject received from ' + key_id) - if os.path.isfile(queue_filename): - try: - os.remove(queue_filename) - except OSError: - print('EX: run_inbox_queue 7 unable to delete ' + - str(queue_filename)) - if queue: - queue.pop(0) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'receive_accept_reject', - debug) - inbox_start_time = time.time() - continue - - if receive_quote_request(queue_json['post'], - federation_list, - debug, domain, - session, session_onion, session_i2p, - session_yggdrasil, - base_dir, - http_prefix, - send_threads, post_log, - cached_webfingers, - person_cache, project_version, - signing_priv_key_pem, - onion_domain, - i2p_domain, yggdrasil_domain, {}, - server.sites_unavailable, - system_language, - server.mitm_servers, - last_quote_request): - last_quote_request = int(time.time()) - print('Queue: QuoteRequest received from ' + key_id) - if os.path.isfile(queue_filename): - try: - os.remove(queue_filename) - except OSError: - print('EX: run_inbox_queue 7 unable to delete ' + - str(queue_filename)) - if queue: - queue.pop(0) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'receive_quote_request', - debug) - inbox_start_time = time.time() - continue - - if receive_move_activity(curr_session, base_dir, - http_prefix, domain, port, - cached_webfingers, - person_cache, - queue_json['post'], - queue_json['postNickname'], - debug, - signing_priv_key_pem, - send_threads, - post_log, - federation_list, - onion_domain, - i2p_domain, - yggdrasil_domain, - server.sites_unavailable, - server.blocked_cache, - server.block_federated, - server.system_language, - server.mitm_servers): - if debug: - print('Queue: _receive_move_activity ' + key_id) - if os.path.isfile(queue_filename): - try: - os.remove(queue_filename) - except OSError: - print('EX: run_inbox_queue 8 unable to receive move ' + - str(queue_filename)) - if queue: - queue.pop(0) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_move_activity', - debug) - inbox_start_time = time.time() - continue - - if receive_update_activity(recent_posts_cache, curr_session, - base_dir, http_prefix, - domain, port, - cached_webfingers, - person_cache, - queue_json['post'], - queue_json['postNickname'], - debug, - max_mentions, max_emoji, - allow_local_network_access, - system_language, - signing_priv_key_pem, - max_recent_posts, translate, - allow_deletion, - yt_replace_domain, - twitter_replacement_domain, - show_published_date_only, - peertube_instances, - theme_name, max_like_count, - cw_lists, dogwhistles, - server.min_images_for_accounts, - max_hashtags, server.buy_sites, - server.auto_cw_cache, - onion_domain, - i2p_domain, yggdrasil_domain, - server.mitm_servers, - server.instance_software, - server.block_military, - server.block_government, - server.block_bluesky, - server.block_nostr): - if debug: - print('Queue: Update accepted from ' + key_id) - if os.path.isfile(queue_filename): - try: - os.remove(queue_filename) - except OSError: - print('EX: run_inbox_queue 8 unable to delete ' + - str(queue_filename)) - if queue: - queue.pop(0) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_receive_update_activity', - debug) - inbox_start_time = time.time() - continue - - # get recipients list - recipients_dict, recipients_dict_followers = \ - _inbox_post_recipients(base_dir, queue_json['post'], - domain, port, debug, - onion_domain, i2p_domain, - yggdrasil_domain) - if len(recipients_dict.items()) == 0 and \ - len(recipients_dict_followers.items()) == 0: - if debug: - print('Queue: no recipients were resolved ' + - 'for post arriving in inbox') - if os.path.isfile(queue_filename): - try: - os.remove(queue_filename) - except OSError: - print('EX: run_inbox_queue 9 unable to delete ' + - str(queue_filename)) - if queue: - queue.pop(0) - continue - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', '_post_recipients', - debug) - inbox_start_time = time.time() - - # if there are only a small number of followers then - # process them as if they were specifically - # addresses to particular accounts - no_of_follow_items = len(recipients_dict_followers.items()) - if no_of_follow_items > 0: - # always deliver to individual inboxes - if no_of_follow_items < 999999: - if debug: - print('DEBUG: moving ' + str(no_of_follow_items) + - ' inbox posts addressed to followers') - for handle, post_item in recipients_dict_followers.items(): - recipients_dict[handle] = post_item - recipients_dict_followers = {} -# recipients_list = [recipients_dict, recipients_dict_followers] - - if debug: - print('*************************************') - print('Resolved recipients list:') - pprint(recipients_dict) - print('Resolved followers list:') - pprint(recipients_dict_followers) - print('*************************************') - - # Copy any posts addressed to followers into the shared inbox - # this avoid copying file multiple times to potentially many - # individual inboxes - if len(recipients_dict_followers.items()) > 0: - shared_inbox_post_filename = \ - queue_json['destination'].replace(inbox_handle, inbox_handle) - if not os.path.isfile(shared_inbox_post_filename): - save_json(queue_json['post'], shared_inbox_post_filename) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'shared_inbox_save', - debug) - inbox_start_time = time.time() - - lists_enabled = get_config_param(base_dir, "listsEnabled") - dm_license_url = '' - - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'distribute_post', - debug) - inbox_start_time = time.time() - - # for posts addressed to specific accounts - for handle, _ in recipients_dict.items(): - destination = \ - queue_json['destination'].replace(inbox_handle, handle) - languages_understood: list[str] = [] - mitm = False - if queue_json.get('mitm'): - mitm = True - bold_reading = False - bold_reading_filename = \ - acct_handle_dir(base_dir, handle) + '/.boldReading' - if os.path.isfile(bold_reading_filename): - bold_reading = True - _inbox_after_initial(server, inbox_start_time, - recent_posts_cache, - max_recent_posts, - session, session_onion, session_i2p, - session_yggdrasil, - key_id, handle, - queue_json['post'], - base_dir, http_prefix, - send_threads, post_log, - cached_webfingers, - person_cache, domain, - onion_domain, i2p_domain, - yggdrasil_domain, - port, federation_list, - debug, - queue_filename, destination, - max_replies, allow_deletion, - max_mentions, max_emoji, - translate, unit_test, - yt_replace_domain, - twitter_replacement_domain, - show_published_date_only, - allow_local_network_access, - peertube_instances, - last_bounce_message, - theme_name, system_language, - max_like_count, - signing_priv_key_pem, - default_reply_interval_hrs, - cw_lists, lists_enabled, - dm_license_url, - languages_understood, mitm, - bold_reading, dogwhistles, - max_hashtags, server.buy_sites, - server.sites_unavailable, - server.mitm_servers, - server.instance_software) - fitness_performance(inbox_start_time, server.fitness, - 'INBOX', 'handle_after_initial', - debug) - inbox_start_time = time.time() - if debug: - pprint(queue_json['post']) - print('Queue: Queue post accepted') + # remove item from the queue if os.path.isfile(queue_filename): try: os.remove(queue_filename) except OSError: - print('EX: run_inbox_queue 10 unable to delete ' + + print('EX: run_inbox_queue unable to delete ' + str(queue_filename)) if queue: queue.pop(0)