Inbox refactoring

main
bashrc 2026-04-08 19:28:00 +01:00
parent 1c5f050db6
commit fff2b1e9b7
1 changed files with 355 additions and 319 deletions

674
inbox.py
View File

@ -3358,6 +3358,312 @@ def _receive_follow_request(session, session_onion, session_i2p,
system_language, mitm_servers)
def _inbox_second_stage(curr_post_json: {},
server,
recent_posts_cache: {}, max_recent_posts: int,
project_version: str,
base_dir: str, http_prefix: str,
send_threads: [], post_log: [],
cached_webfingers: {}, person_cache: {},
domain: str,
onion_domain: str, i2p_domain: str,
yggdrasil_domain: str,
port: int,
federation_list: [], max_replies: int,
allow_deletion: bool, debug: bool, max_mentions: int,
max_emoji: int, translate: {}, unit_test: bool,
yt_replace_domain: str,
twitter_replacement_domain: str,
show_published_date_only: bool,
max_followers: int,
allow_local_network_access: bool,
peertube_instances: [],
theme_name: str, system_language: str,
max_like_count: int, signing_priv_key_pem: str,
default_reply_interval_hrs: int,
cw_lists: {}, max_hashtags: int,
key_id: str,
curr_session,
session,
session_onion,
session_i2p,
session_yggdrasil,
last_quote_request: int,
inbox_start_time,
curr_destination: str,
curr_nickname: str,
inbox_handle: str,
dogwhistles: {},
last_bounce_message: [],
mitm: bool,
queue_filename: str):
"""Second stage of the inbox queue
"""
# set the id to the same as the post filename
# This makes the filename and the id consistent
# if curr_post_json.get('id'):
# curr_post_json['id'] = queue_json['id']
if receive_undo(base_dir, curr_post_json,
debug, domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Undo accepted from ' + key_id)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_undo',
debug)
inbox_start_time = time.time()
return inbox_start_time, last_quote_request
if debug:
print('DEBUG: checking for follow requests')
if _receive_follow_request(curr_session, session_onion,
session_i2p, session_yggdrasil,
base_dir, http_prefix, port,
send_threads, post_log,
cached_webfingers,
person_cache,
curr_post_json,
federation_list,
debug, project_version,
max_followers, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
signing_priv_key_pem, unit_test,
system_language,
server.followers_sync_cache,
server.sites_unavailable,
server.mitm_servers):
print('Queue: Follow activity for ' + key_id +
' removed from queue')
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_follow_request',
debug)
inbox_start_time = time.time()
return inbox_start_time, last_quote_request
if debug:
print('DEBUG: No follow requests')
if receive_accept_reject(base_dir, domain, curr_post_json,
federation_list, debug,
domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Accept/Reject received from ' + key_id)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_accept_reject',
debug)
inbox_start_time = time.time()
return inbox_start_time, last_quote_request
if receive_quote_request(curr_post_json,
federation_list,
debug, domain,
session, session_onion, session_i2p,
session_yggdrasil,
base_dir,
http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, project_version,
signing_priv_key_pem,
onion_domain,
i2p_domain, yggdrasil_domain, {},
server.sites_unavailable,
system_language,
server.mitm_servers,
last_quote_request):
last_quote_request = int(time.time())
print('Queue: QuoteRequest received from ' + key_id)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_quote_request',
debug)
inbox_start_time = time.time()
return inbox_start_time, last_quote_request
if receive_move_activity(curr_session, base_dir,
http_prefix, domain, port,
cached_webfingers,
person_cache,
curr_post_json,
curr_nickname,
debug,
signing_priv_key_pem,
send_threads,
post_log,
federation_list,
onion_domain,
i2p_domain,
yggdrasil_domain,
server.sites_unavailable,
server.blocked_cache,
server.block_federated,
server.system_language,
server.mitm_servers):
if debug:
print('Queue: _receive_move_activity ' + key_id)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_move_activity',
debug)
inbox_start_time = time.time()
return inbox_start_time, last_quote_request
if receive_update_activity(recent_posts_cache, curr_session,
base_dir, http_prefix,
domain, port,
cached_webfingers,
person_cache,
curr_post_json,
curr_nickname,
debug,
max_mentions, max_emoji,
allow_local_network_access,
system_language,
signing_priv_key_pem,
max_recent_posts, translate,
allow_deletion,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
theme_name, max_like_count,
cw_lists, dogwhistles,
server.min_images_for_accounts,
max_hashtags, server.buy_sites,
server.auto_cw_cache,
onion_domain,
i2p_domain, yggdrasil_domain,
server.mitm_servers,
server.instance_software,
server.block_military,
server.block_government,
server.block_bluesky,
server.block_nostr):
if debug:
print('Queue: Update accepted from ' + key_id)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_update_activity',
debug)
inbox_start_time = time.time()
return inbox_start_time, last_quote_request
# get recipients list
recipients_dict, recipients_dict_followers = \
_inbox_post_recipients(base_dir, curr_post_json,
domain, port, debug,
onion_domain, i2p_domain,
yggdrasil_domain)
if len(recipients_dict.items()) == 0 and \
len(recipients_dict_followers.items()) == 0:
if debug:
print('Queue: no recipients were resolved ' +
'for post arriving in inbox')
return inbox_start_time, last_quote_request
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_post_recipients',
debug)
inbox_start_time = time.time()
# if there are only a small number of followers then
# process them as if they were specifically
# addresses to particular accounts
no_of_follow_items = len(recipients_dict_followers.items())
if no_of_follow_items > 0:
# always deliver to individual inboxes
if no_of_follow_items < 999999:
if debug:
print('DEBUG: moving ' + str(no_of_follow_items) +
' inbox posts addressed to followers')
for handle, post_item in recipients_dict_followers.items():
recipients_dict[handle] = post_item
recipients_dict_followers = {}
# recipients_list = [recipients_dict, recipients_dict_followers]
if debug:
print('*************************************')
print('Resolved recipients list:')
pprint(recipients_dict)
print('Resolved followers list:')
pprint(recipients_dict_followers)
print('*************************************')
# Copy any posts addressed to followers into the shared inbox
# this avoid copying file multiple times to potentially many
# individual inboxes
if len(recipients_dict_followers.items()) > 0:
shared_inbox_post_filename = \
curr_destination.replace(inbox_handle, inbox_handle)
if not os.path.isfile(shared_inbox_post_filename):
save_json(curr_post_json, shared_inbox_post_filename)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'shared_inbox_save',
debug)
inbox_start_time = time.time()
lists_enabled = get_config_param(base_dir, "listsEnabled")
dm_license_url = ''
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'distribute_post',
debug)
inbox_start_time = time.time()
# for posts addressed to specific accounts
for handle, _ in recipients_dict.items():
destination = \
curr_destination.replace(inbox_handle, handle)
languages_understood: list[str] = []
bold_reading = False
bold_reading_filename = \
acct_handle_dir(base_dir, handle) + '/.boldReading'
if os.path.isfile(bold_reading_filename):
bold_reading = True
_inbox_after_initial(server, inbox_start_time,
recent_posts_cache,
max_recent_posts,
session, session_onion, session_i2p,
session_yggdrasil,
key_id, handle,
curr_post_json,
base_dir, http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
port, federation_list,
debug,
queue_filename, destination,
max_replies, allow_deletion,
max_mentions, max_emoji,
translate, unit_test,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
allow_local_network_access,
peertube_instances,
last_bounce_message,
theme_name, system_language,
max_like_count,
signing_priv_key_pem,
default_reply_interval_hrs,
cw_lists, lists_enabled,
dm_license_url,
languages_understood, mitm,
bold_reading, dogwhistles,
max_hashtags, server.buy_sites,
server.sites_unavailable,
server.mitm_servers,
server.instance_software)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'handle_after_initial',
debug)
inbox_start_time = time.time()
if debug:
pprint(curr_post_json)
print('Queue: Queue post accepted')
return inbox_start_time, last_quote_request
def run_inbox_queue(server,
recent_posts_cache: {}, max_recent_posts: int,
project_version: str,
@ -3806,332 +4112,62 @@ def run_inbox_queue(server,
dogwhistles_filename = base_dir + '/default_dogwhistles.txt'
dogwhistles = load_dogwhistles(dogwhistles_filename)
# set the id to the same as the post filename
# This makes the filename and the id consistent
# if queue_json['post'].get('id'):
# queue_json['post']['id'] = queue_json['id']
curr_post_json = queue_json['post']
curr_destination = queue_json['destination']
curr_nickname = queue_json['postNickname']
mitm = False
if queue_json.get('mitm'):
mitm = True
if receive_undo(base_dir, queue_json['post'],
debug, domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Undo accepted from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 5 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_undo',
debug)
inbox_start_time = time.time()
continue
# TODO split up collections
if debug:
print('DEBUG: checking for follow requests')
if _receive_follow_request(curr_session, session_onion,
session_i2p, session_yggdrasil,
base_dir, http_prefix, port,
send_threads, post_log,
cached_webfingers,
person_cache,
queue_json['post'],
federation_list,
debug, project_version,
max_followers, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
signing_priv_key_pem, unit_test,
system_language,
server.followers_sync_cache,
server.sites_unavailable,
server.mitm_servers):
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 6 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
print('Queue: Follow activity for ' + key_id +
' removed from queue')
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_follow_request',
debug)
inbox_start_time = time.time()
continue
inbox_start_time, last_quote_request = \
_inbox_second_stage(curr_post_json,
server,
recent_posts_cache, max_recent_posts,
project_version,
base_dir, http_prefix,
send_threads, post_log,
cached_webfingers, person_cache,
domain,
onion_domain, i2p_domain,
yggdrasil_domain,
port,
federation_list, max_replies,
allow_deletion, debug, max_mentions,
max_emoji, translate, unit_test,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
max_followers,
allow_local_network_access,
peertube_instances,
theme_name, system_language,
max_like_count, signing_priv_key_pem,
default_reply_interval_hrs,
cw_lists, max_hashtags,
key_id,
curr_session,
session,
session_onion,
session_i2p,
session_yggdrasil,
last_quote_request,
inbox_start_time,
curr_destination,
curr_nickname,
inbox_handle,
dogwhistles,
last_bounce_message,
mitm,
queue_filename)
if debug:
print('DEBUG: No follow requests')
if receive_accept_reject(base_dir, domain, queue_json['post'],
federation_list, debug,
domain, onion_domain, i2p_domain,
yggdrasil_domain):
print('Queue: Accept/Reject received from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 7 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_accept_reject',
debug)
inbox_start_time = time.time()
continue
if receive_quote_request(queue_json['post'],
federation_list,
debug, domain,
session, session_onion, session_i2p,
session_yggdrasil,
base_dir,
http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, project_version,
signing_priv_key_pem,
onion_domain,
i2p_domain, yggdrasil_domain, {},
server.sites_unavailable,
system_language,
server.mitm_servers,
last_quote_request):
last_quote_request = int(time.time())
print('Queue: QuoteRequest received from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 7 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'receive_quote_request',
debug)
inbox_start_time = time.time()
continue
if receive_move_activity(curr_session, base_dir,
http_prefix, domain, port,
cached_webfingers,
person_cache,
queue_json['post'],
queue_json['postNickname'],
debug,
signing_priv_key_pem,
send_threads,
post_log,
federation_list,
onion_domain,
i2p_domain,
yggdrasil_domain,
server.sites_unavailable,
server.blocked_cache,
server.block_federated,
server.system_language,
server.mitm_servers):
if debug:
print('Queue: _receive_move_activity ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 8 unable to receive move ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_move_activity',
debug)
inbox_start_time = time.time()
continue
if receive_update_activity(recent_posts_cache, curr_session,
base_dir, http_prefix,
domain, port,
cached_webfingers,
person_cache,
queue_json['post'],
queue_json['postNickname'],
debug,
max_mentions, max_emoji,
allow_local_network_access,
system_language,
signing_priv_key_pem,
max_recent_posts, translate,
allow_deletion,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
theme_name, max_like_count,
cw_lists, dogwhistles,
server.min_images_for_accounts,
max_hashtags, server.buy_sites,
server.auto_cw_cache,
onion_domain,
i2p_domain, yggdrasil_domain,
server.mitm_servers,
server.instance_software,
server.block_military,
server.block_government,
server.block_bluesky,
server.block_nostr):
if debug:
print('Queue: Update accepted from ' + key_id)
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 8 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_receive_update_activity',
debug)
inbox_start_time = time.time()
continue
# get recipients list
recipients_dict, recipients_dict_followers = \
_inbox_post_recipients(base_dir, queue_json['post'],
domain, port, debug,
onion_domain, i2p_domain,
yggdrasil_domain)
if len(recipients_dict.items()) == 0 and \
len(recipients_dict_followers.items()) == 0:
if debug:
print('Queue: no recipients were resolved ' +
'for post arriving in inbox')
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 9 unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)
continue
fitness_performance(inbox_start_time, server.fitness,
'INBOX', '_post_recipients',
debug)
inbox_start_time = time.time()
# if there are only a small number of followers then
# process them as if they were specifically
# addresses to particular accounts
no_of_follow_items = len(recipients_dict_followers.items())
if no_of_follow_items > 0:
# always deliver to individual inboxes
if no_of_follow_items < 999999:
if debug:
print('DEBUG: moving ' + str(no_of_follow_items) +
' inbox posts addressed to followers')
for handle, post_item in recipients_dict_followers.items():
recipients_dict[handle] = post_item
recipients_dict_followers = {}
# recipients_list = [recipients_dict, recipients_dict_followers]
if debug:
print('*************************************')
print('Resolved recipients list:')
pprint(recipients_dict)
print('Resolved followers list:')
pprint(recipients_dict_followers)
print('*************************************')
# Copy any posts addressed to followers into the shared inbox
# this avoid copying file multiple times to potentially many
# individual inboxes
if len(recipients_dict_followers.items()) > 0:
shared_inbox_post_filename = \
queue_json['destination'].replace(inbox_handle, inbox_handle)
if not os.path.isfile(shared_inbox_post_filename):
save_json(queue_json['post'], shared_inbox_post_filename)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'shared_inbox_save',
debug)
inbox_start_time = time.time()
lists_enabled = get_config_param(base_dir, "listsEnabled")
dm_license_url = ''
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'distribute_post',
debug)
inbox_start_time = time.time()
# for posts addressed to specific accounts
for handle, _ in recipients_dict.items():
destination = \
queue_json['destination'].replace(inbox_handle, handle)
languages_understood: list[str] = []
mitm = False
if queue_json.get('mitm'):
mitm = True
bold_reading = False
bold_reading_filename = \
acct_handle_dir(base_dir, handle) + '/.boldReading'
if os.path.isfile(bold_reading_filename):
bold_reading = True
_inbox_after_initial(server, inbox_start_time,
recent_posts_cache,
max_recent_posts,
session, session_onion, session_i2p,
session_yggdrasil,
key_id, handle,
queue_json['post'],
base_dir, http_prefix,
send_threads, post_log,
cached_webfingers,
person_cache, domain,
onion_domain, i2p_domain,
yggdrasil_domain,
port, federation_list,
debug,
queue_filename, destination,
max_replies, allow_deletion,
max_mentions, max_emoji,
translate, unit_test,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
allow_local_network_access,
peertube_instances,
last_bounce_message,
theme_name, system_language,
max_like_count,
signing_priv_key_pem,
default_reply_interval_hrs,
cw_lists, lists_enabled,
dm_license_url,
languages_understood, mitm,
bold_reading, dogwhistles,
max_hashtags, server.buy_sites,
server.sites_unavailable,
server.mitm_servers,
server.instance_software)
fitness_performance(inbox_start_time, server.fitness,
'INBOX', 'handle_after_initial',
debug)
inbox_start_time = time.time()
if debug:
pprint(queue_json['post'])
print('Queue: Queue post accepted')
# remove item from the queue
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)
except OSError:
print('EX: run_inbox_queue 10 unable to delete ' +
print('EX: run_inbox_queue unable to delete ' +
str(queue_filename))
if queue:
queue.pop(0)