From edd183ed2c3340b13bc3e6786a843574b1c86a97 Mon Sep 17 00:00:00 2001 From: bashrc Date: Wed, 29 Apr 2026 17:42:08 +0100 Subject: [PATCH] Replace file operations with functions --- filters.py | 90 ++++++++++++++++++++++++++------------------- follow.py | 106 ++++++++++++++++++++++++++--------------------------- 2 files changed, 105 insertions(+), 91 deletions(-) diff --git a/filters.py b/filters.py index 65de793c7..d99dc696a 100644 --- a/filters.py +++ b/filters.py @@ -17,6 +17,8 @@ from unicodetext import standardize_text from unicodetext import remove_inverted_text from unicodetext import remove_square_capitals from data import append_string +from data import save_string +from data import load_list def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool: @@ -61,17 +63,23 @@ def remove_filter(base_dir: str, nickname: str, domain: str, if not text_in_file(words, filters_filename): return False new_filters_filename = filters_filename + '.new' - try: - with open(filters_filename, 'r', encoding='utf-8') as fp_filt: - with open(new_filters_filename, 'w+', encoding='utf-8') as fp_new: - for line in fp_filt: - line = remove_eol(line) - if line != words: - fp_new.write(line + '\n') - except OSError as ex: - print('EX: unable to remove filter ' + - filters_filename + ' ' + str(ex)) + + filters_list: list[str] = \ + load_list(filters_filename, + 'EX: unable to remove filter ' + + filters_filename + ' 1 [ex]') + if filters_list is None: return False + + text: str = '' + for line in filters_list: + line = remove_eol(line) + if line != words: + text += line + '\n' + save_string(text, new_filters_filename, + 'EX: unable to remove filter ' + + filters_filename + ' 2 [ex]') + if os.path.isfile(new_filters_filename): try: os.rename(new_filters_filename, filters_filename) @@ -91,17 +99,23 @@ def remove_global_filter(base_dir: str, words: str) -> bool: if not text_in_file(words, filters_filename): return False new_filters_filename = filters_filename + '.new' - try: - with open(filters_filename, 'r', encoding='utf-8') as fp_filt: - with open(new_filters_filename, 'w+', encoding='utf-8') as fp_new: - for line in fp_filt: - line = remove_eol(line) - if line != words: - fp_new.write(line + '\n') - except OSError as ex: - print('EX: unable to remove global filter ' + - filters_filename + ' ' + str(ex)) + + global_list: list[str] = \ + load_list(filters_filename, + 'EX: unable to remove global filter ' + + filters_filename + ' [ex]') + if global_list is None: return False + + text: str = '' + for line in global_list: + line = remove_eol(line) + if line != words: + text += line + '\n' + save_string(text, new_filters_filename, + 'EX: unable to remove global filter ' + + filters_filename + ' 2 [ex]') + if os.path.isfile(new_filters_filename): try: os.rename(new_filters_filename, filters_filename) @@ -152,25 +166,25 @@ def _is_filtered_base(filename: str, content: str, # convert any fancy characters to ordinary ones content = standardize_text(content) - try: - with open(filename, 'r', encoding='utf-8') as fp_filt: - for line in fp_filt: - filter_str = remove_eol(line) - if not filter_str: - continue - if len(filter_str) < 2: - continue - if '+' not in filter_str: - if filtered_match(filter_str, content): - return True - else: - filter_words = filter_str.replace('"', '').split('+') - for filter_wrd in filter_words: - if not filtered_match(filter_wrd, content): - return False + filtered_list: list[str] = \ + load_list(filename, + 'EX: _is_filtered_base ' + filename + ' [ex]') + if filtered_list is not None: + for line in filtered_list: + filter_str = remove_eol(line) + if not filter_str: + continue + if len(filter_str) < 2: + continue + if '+' not in filter_str: + if filtered_match(filter_str, content): return True - except OSError as ex: - print('EX: _is_filtered_base ' + filename + ' ' + str(ex)) + else: + filter_words = filter_str.replace('"', '').split('+') + for filter_wrd in filter_words: + if not filtered_match(filter_wrd, content): + return False + return True return False diff --git a/follow.py b/follow.py index 227264c13..66d7fd653 100644 --- a/follow.py +++ b/follow.py @@ -144,23 +144,25 @@ def _remove_from_follow_base(base_dir: str, actor_found = True if not actor_found: return - try: - with open(approve_follows_filename + '.new', 'w+', - encoding='utf-8') as fp_approve_new: - with open(approve_follows_filename, 'r', - encoding='utf-8') as fp_approve: - if not accept_deny_actor: - for approve_handle in fp_approve: - accept_deny_handle = accept_or_deny_handle - if not approve_handle.startswith(accept_deny_handle): - fp_approve_new.write(approve_handle) - else: - for approve_handle in fp_approve: - if accept_deny_actor not in approve_handle: - fp_approve_new.write(approve_handle) - except OSError as ex: - print('EX: _remove_from_follow_base ' + - approve_follows_filename + ' ' + str(ex)) + + text: str = '' + approve_follows_list: list[str] = \ + load_list(approve_follows_filename, + 'EX: _remove_from_follow_base ' + + approve_follows_filename + ' 2 [ex]') + if approve_follows_list is not None: + if not accept_deny_actor: + for approve_handle in approve_follows_list: + accept_deny_handle = accept_or_deny_handle + if not approve_handle.startswith(accept_deny_handle): + text += approve_handle + else: + for approve_handle in approve_follows_list: + if accept_deny_actor not in approve_handle: + text += approve_handle + save_string(text, approve_follows_filename + '.new', + 'EX: _remove_from_follow_base ' + + approve_follows_filename + ' 1 [ex]') try: os.rename(approve_follows_filename + '.new', @@ -337,16 +339,15 @@ def unfollow_account(base_dir: str, nickname: str, domain: str, return False if lines: - try: - with open(filename, 'w+', encoding='utf-8') as fp_unfoll: - for line in lines: - check_handle = line.strip("\n").strip("\r").lower() - if check_handle not in (handle_to_unfollow_lower, - '!' + handle_to_unfollow_lower): - fp_unfoll.write(line) - except OSError as ex: - print('EX: unfollow_account unable to write ' + - filename + ' ' + str(ex)) + text: str = '' + for line in lines: + check_handle = line.strip("\n").strip("\r").lower() + if check_handle not in (handle_to_unfollow_lower, + '!' + handle_to_unfollow_lower): + text += line + save_string(text, filename, + 'EX: unfollow_account unable to write ' + + filename + ' [ex]') # write to an unfollowed file so that if a follow accept # later arrives then it can be ignored @@ -1593,30 +1594,29 @@ def pending_followers_timeline_json(actor: str, base_dir: str, follow_requests_filename = \ acct_dir(base_dir, nickname, domain) + '/followrequests.txt' if os.path.isfile(follow_requests_filename): - try: - with open(follow_requests_filename, 'r', - encoding='utf-8') as fp_req: - for follower_handle in fp_req: - if len(follower_handle) == 0: - continue - follower_handle = remove_eol(follower_handle) - foll_domain, _ = get_domain_from_actor(follower_handle) - if not foll_domain: - continue - foll_nickname = get_nickname_from_actor(follower_handle) - if not foll_nickname: - continue - follow_activity_filename = \ - acct_dir(base_dir, nickname, domain) + \ - '/requests/' + \ - foll_nickname + '@' + foll_domain + '.follow' - if not os.path.isfile(follow_activity_filename): - continue - follow_json = load_json(follow_activity_filename) - if not follow_json: - continue - result_json['orderedItems'].append(follow_json) - except OSError as exc: - print('EX: unable to read follow requests ' + - follow_requests_filename + ' ' + str(exc)) + follow_requests_list: list[str] = \ + load_list(follow_requests_filename, + 'EX: unable to read follow requests ' + + follow_requests_filename + ' [ex]') + if follow_requests_list is not None: + for follower_handle in follow_requests_list: + if len(follower_handle) == 0: + continue + follower_handle = remove_eol(follower_handle) + foll_domain, _ = get_domain_from_actor(follower_handle) + if not foll_domain: + continue + foll_nickname = get_nickname_from_actor(follower_handle) + if not foll_nickname: + continue + follow_activity_filename = \ + acct_dir(base_dir, nickname, domain) + \ + '/requests/' + \ + foll_nickname + '@' + foll_domain + '.follow' + if not os.path.isfile(follow_activity_filename): + continue + follow_json = load_json(follow_activity_filename) + if not follow_json: + continue + result_json['orderedItems'].append(follow_json) return result_json