From c10291fa121cceba9532680b3e3cd8c43dff2eb1 Mon Sep 17 00:00:00 2001 From: bashrc Date: Wed, 29 Apr 2026 14:23:57 +0100 Subject: [PATCH] Replace file operations with functions --- posts.py | 263 +++++++++++++++++++++++++++---------------------------- 1 file changed, 130 insertions(+), 133 deletions(-) diff --git a/posts.py b/posts.py index 5ae89015c..ba9f4c06f 100644 --- a/posts.py +++ b/posts.py @@ -185,23 +185,23 @@ def no_of_followers_on_domain(base_dir: str, handle: str, """Returns the number of followers of the given handle from the given domain """ - filename = acct_handle_dir(base_dir, handle) + '/' + follow_file + filename: str = acct_handle_dir(base_dir, handle) + '/' + follow_file if not os.path.isfile(filename): return 0 ctr: int = 0 - try: - with open(filename, 'r', encoding='utf-8') as fp_followers: - for follower_handle in fp_followers: - if '@' not in follower_handle: - continue - follower_domain = follower_handle.split('@')[1] - follower_domain = remove_eol(follower_domain) - if domain == follower_domain: - ctr += 1 - except OSError as exc: - print('EX: no_of_followers_on_domain unable to read ' + filename + - ' ' + str(exc)) + followers_list: list[str] = \ + load_list(filename, + 'EX: no_of_followers_on_domain unable to read ' + filename + + ' [ex]') + if followers_list is not None: + for follower_handle in followers_list: + if '@' not in follower_handle: + continue + follower_domain = follower_handle.split('@')[1] + follower_domain = remove_eol(follower_domain) + if domain == follower_domain: + ctr += 1 return ctr @@ -2274,13 +2274,10 @@ def regenerate_index_for_box(base_dir: str, index_lines.sort(reverse=True) result = '' - try: - with open(box_index_filename, 'w+', encoding='utf-8') as fp_box: - for line in index_lines: - result += line + '\n' - fp_box.write(line + '\n') - except OSError: - print('EX: unable to generate index for ' + box_name + ' ' + result) + for line in index_lines: + result += line + '\n' + save_string(result, box_index_filename, + 'EX: unable to generate index for ' + box_name + ' ' + result) print('Index generated for ' + box_name + '\n' + result) @@ -2883,37 +2880,38 @@ def create_report_post(base_dir: str, moderators_list: list[str] = [] moderators_file = data_dir(base_dir) + '/moderators.txt' if os.path.isfile(moderators_file): - try: - with open(moderators_file, 'r', encoding='utf-8') as fp_mod: - for line in fp_mod: - line = line.strip('\n').strip('\r') - if line.startswith('#'): - continue - if line.startswith('/users/'): - line = line.replace('users', '') - if line.startswith('@'): - line = line[1:] - if '@' in line: - nick = line.split('@')[0] + moderators_list2: list[str] = \ + load_list(moderators_file, + 'EX: create_report_post unable to read ' + + moderators_file) + if moderators_list2 is not None: + for line in moderators_list2: + line = line.strip('\n').strip('\r') + if line.startswith('#'): + continue + if line.startswith('/users/'): + line = line.replace('users', '') + if line.startswith('@'): + line = line[1:] + if '@' in line: + nick = line.split('@')[0] + moderator_actor = \ + local_actor_url(http_prefix, nick, domain_full) + if moderator_actor not in moderators_list: + moderators_list.append(moderator_actor) + continue + if string_starts_with(line, + ('http', 'ipfs', 'ipns', 'hyper')): + # must be a local address - no remote moderators + if '://' + domain_full + '/' in line: + if line not in moderators_list: + moderators_list.append(line) + else: + if '/' not in line: moderator_actor = \ - local_actor_url(http_prefix, nick, domain_full) + local_actor_url(http_prefix, line, domain_full) if moderator_actor not in moderators_list: moderators_list.append(moderator_actor) - continue - if string_starts_with(line, - ('http', 'ipfs', 'ipns', 'hyper')): - # must be a local address - no remote moderators - if '://' + domain_full + '/' in line: - if line not in moderators_list: - moderators_list.append(line) - else: - if '/' not in line: - moderator_actor = \ - local_actor_url(http_prefix, line, domain_full) - if moderator_actor not in moderators_list: - moderators_list.append(moderator_actor) - except OSError: - print('EX: create_report_post unable to read ' + moderators_file) if not moderators_list: # if there are no moderators then the admin becomes the moderator admin_nickname = get_config_param(base_dir, 'admin') @@ -3387,21 +3385,21 @@ def group_followers_by_domain(base_dir: str, nickname: str, domain: str) -> {}: if not os.path.isfile(followers_filename): return None grouped = {} - try: - with open(followers_filename, 'r', encoding='utf-8') as fp_foll: - for follower_handle in fp_foll: - if '@' not in follower_handle: - continue - fhandle1 = follower_handle.strip() - fhandle = remove_eol(fhandle1) - follower_domain = fhandle.split('@')[1] - if not grouped.get(follower_domain): - grouped[follower_domain] = [fhandle] - else: - grouped[follower_domain].append(fhandle) - except OSError: - print('EX: group_followers_by_domain unable to read ' + - followers_filename) + followers_list: list[str] = \ + load_list(followers_filename, + 'EX: group_followers_by_domain unable to read ' + + followers_filename) + if followers_list is not None: + for follower_handle in followers_list: + if '@' not in follower_handle: + continue + fhandle1 = follower_handle.strip() + fhandle = remove_eol(fhandle1) + follower_domain = fhandle.split('@')[1] + if not grouped.get(follower_domain): + grouped[follower_domain] = [fhandle] + else: + grouped[follower_domain].append(fhandle) return grouped @@ -5603,16 +5601,16 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, index_ctr: int = 0 # get the existing index entries as a string new_index = '' - try: - with open(index_filename, 'r', encoding='utf-8') as fp_index: - for post_id in fp_index: - new_index += post_id - index_ctr += 1 - if index_ctr >= max_posts_in_box: - break - except OSError as ex: - print('EX: archive_posts_for_person unable to read ' + - index_filename + ' ' + str(ex)) + index_list: list[str] = \ + load_list(index_filename, + 'EX: archive_posts_for_person unable to read ' + + index_filename + ' [ex]') + if index_list is not None: + for post_id in index_list: + new_index += post_id + index_ctr += 1 + if index_ctr >= max_posts_in_box: + break # save the new index file if new_index: save_string(new_index, index_filename, @@ -6195,66 +6193,65 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str, pub_str = 'https://www.w3.org/ns/activitystreams#Public' # populate the items list with replies replies_boxes = ('outbox', 'inbox') - try: - with open(post_replies_filename, 'r', - encoding='utf-8') as fp_replies: - for message_id in fp_replies: - reply_found: bool = False - # examine inbox and outbox - for boxname in replies_boxes: - message_id2 = remove_eol(message_id) - search_filename = \ - acct_dir(base_dir, nickname, domain) + '/' + \ - boxname + '/' + \ - message_id2.replace('/', '#') + '.json' - if os.path.isfile(search_filename): - if authorized or \ - text_in_file(pub_str, search_filename): - post_json_object = load_json(search_filename) - if post_json_object: - pjo = post_json_object - ordered_items = replies_json['orderedItems'] - if pjo['object'].get('cc'): - if (authorized or - (pub_str in pjo['object']['to'] or - pub_str in pjo['object']['cc'])): - ordered_items.append(pjo) - reply_found = True - else: - if authorized or \ - pub_str in pjo['object']['to']: - ordered_items.append(pjo) - reply_found = True - break - # if not in either inbox or outbox then examine the - # shared inbox - if not reply_found: - message_id2 = remove_eol(message_id) - search_filename = \ - data_dir(base_dir) + '/inbox@' + \ - domain + '/inbox/' + \ - message_id2.replace('/', '#') + '.json' - if os.path.isfile(search_filename): - if authorized or \ - text_in_file(pub_str, search_filename): - # get the json of the reply and append it to - # the collection - post_json_object = load_json(search_filename) - if post_json_object: - pjo = post_json_object - ordered_items = replies_json['orderedItems'] - if pjo['object'].get('cc'): - if (authorized or - (pub_str in pjo['object']['to'] or - pub_str in pjo['object']['cc'])): - ordered_items.append(pjo) - else: - if authorized or \ - pub_str in pjo['object']['to']: - ordered_items.append(pjo) - except OSError: - print('EX: populate_replies_json unable to read ' + - post_replies_filename) + replies_list: list[str] = \ + load_list(post_replies_filename, + 'EX: populate_replies_json unable to read ' + + post_replies_filename) + if replies_list is not None: + for message_id in replies_list: + reply_found: bool = False + # examine inbox and outbox + for boxname in replies_boxes: + message_id2 = remove_eol(message_id) + search_filename = \ + acct_dir(base_dir, nickname, domain) + '/' + \ + boxname + '/' + \ + message_id2.replace('/', '#') + '.json' + if os.path.isfile(search_filename): + if authorized or \ + text_in_file(pub_str, search_filename): + post_json_object = load_json(search_filename) + if post_json_object: + pjo = post_json_object + ordered_items = replies_json['orderedItems'] + if pjo['object'].get('cc'): + if (authorized or + (pub_str in pjo['object']['to'] or + pub_str in pjo['object']['cc'])): + ordered_items.append(pjo) + reply_found = True + else: + if authorized or \ + pub_str in pjo['object']['to']: + ordered_items.append(pjo) + reply_found = True + break + # if not in either inbox or outbox then examine the + # shared inbox + if not reply_found: + message_id2 = remove_eol(message_id) + search_filename = \ + data_dir(base_dir) + '/inbox@' + \ + domain + '/inbox/' + \ + message_id2.replace('/', '#') + '.json' + if os.path.isfile(search_filename): + if authorized or \ + text_in_file(pub_str, search_filename): + # get the json of the reply and append it to + # the collection + post_json_object = load_json(search_filename) + if post_json_object: + pjo = post_json_object + ordered_items = replies_json['orderedItems'] + if pjo['object'].get('cc'): + if (authorized or + (pub_str in pjo['object']['to'] or + pub_str in pjo['object']['cc'])): + ordered_items.append(pjo) + else: + if authorized or \ + pub_str in pjo['object']['to']: + ordered_items.append(pjo) def _reject_announce(announce_filename: str,