From 733d4adee2334252d4ce9aad2e37ffa07f38b713 Mon Sep 17 00:00:00 2001 From: bashrc Date: Sun, 26 Apr 2026 20:33:19 +0100 Subject: [PATCH] Replace file operations with functions --- mitm.py | 22 +++--- newswire.py | 14 ++-- notifyOnPost.py | 72 ++++++++----------- person.py | 183 ++++++++++++++++++++---------------------------- 4 files changed, 117 insertions(+), 174 deletions(-) diff --git a/mitm.py b/mitm.py index f2db520f1..5a636b1a3 100644 --- a/mitm.py +++ b/mitm.py @@ -16,6 +16,8 @@ __module_group__ = "Core" import os from utils import data_dir +from data import load_string +from data import save_string def detect_mitm(self) -> bool: @@ -55,15 +57,13 @@ def load_mitm_servers(base_dir: str) -> []: mitm_servers_filename = data_dir(base_dir) + '/mitm_servers.txt' mitm_servers: list[str] = [] if os.path.isfile(mitm_servers_filename): - try: - with open(mitm_servers_filename, 'r', - encoding='utf-8') as fp_mitm: - mitm_servers = fp_mitm.read() - except OSError: - print('EX: error while reading mitm_servers.txt') + mitm_servers_str = \ + load_string(mitm_servers_filename, + 'EX: error while reading mitm_servers.txt') + if mitm_servers_str: + mitm_servers = mitm_servers_str.split('\n') if not mitm_servers: return [] - mitm_servers = mitm_servers.split('\n') return mitm_servers @@ -76,9 +76,5 @@ def save_mitm_servers(base_dir: str, mitm_servers: []) -> None: mitm_servers_str += domain + '\n' mitm_servers_filename = data_dir(base_dir) + '/mitm_servers.txt' - try: - with open(mitm_servers_filename, 'w+', - encoding='utf-8') as fp_mitm: - fp_mitm.write(mitm_servers_str) - except OSError: - print('EX: error while saving mitm_servers.txt') + save_string(mitm_servers_str, mitm_servers_filename, + 'EX: error while saving mitm_servers.txt') diff --git a/newswire.py b/newswire.py index 836131599..0ade44230 100644 --- a/newswire.py +++ b/newswire.py @@ -55,6 +55,7 @@ from filters import is_filtered from session import download_image_any_mime_type from content import remove_script from data import load_list +from data import load_string def _remove_cdata(text: str) -> str: @@ -403,14 +404,11 @@ def load_hashtag_categories(base_dir: str, language: str) -> None: if not os.path.isfile(hashtag_categories_filename): return - try: - with open(hashtag_categories_filename, 'r', - encoding='utf-8') as fp_cat: - xml_str = fp_cat.read() - _xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True) - except OSError: - print('EX: load_hashtag_categories unable to read ' + - hashtag_categories_filename) + xml_str = load_string(hashtag_categories_filename, + 'EX: load_hashtag_categories unable to read ' + + hashtag_categories_filename) + if xml_str: + _xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True) def _xml2str_to_hashtag_categories(base_dir: str, xml_str: str, diff --git a/notifyOnPost.py b/notifyOnPost.py index 7c867b457..0f6758b09 100644 --- a/notifyOnPost.py +++ b/notifyOnPost.py @@ -11,6 +11,8 @@ import os from utils import remove_domain_port from utils import acct_dir from utils import text_in_file +from data import load_string +from data import save_string def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str, @@ -40,35 +42,29 @@ def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str, # get the contents of the notifyOnPost file, which is # a set of handles - following_handles = '' + following_handles: str = '' if os.path.isfile(notify_on_post_filename): print('notify file exists') - try: - with open(notify_on_post_filename, 'r', - encoding='utf-8') as fp_calendar: - following_handles = fp_calendar.read() - except OSError: - print('EX: _notify_on_post_arrival unable to read 1 ' + - notify_on_post_filename) + following_handles = \ + load_string(notify_on_post_filename, + 'EX: _notify_on_post_arrival unable to read 1 ' + + notify_on_post_filename) + if following_handles is None: + following_handles = '' else: # create a new notifyOnPost file from the following file print('Creating notifyOnPost file ' + notify_on_post_filename) - following_handles = '' - try: - with open(following_filename, 'r', - encoding='utf-8') as fp_following: - following_handles = fp_following.read() - except OSError: - print('EX: _notify_on_post_arrival unable to read 2 ' + - following_filename) + following_handles = \ + load_string(following_filename, + 'EX: _notify_on_post_arrival unable to read 2 ' + + following_filename) + if following_handles is None: + following_handles = '' if add: - try: - with open(notify_on_post_filename, 'w+', - encoding='utf-8') as fp_notify: - fp_notify.write(following_handles + handle + '\n') - except OSError: - print('EX: _notify_on_post_arrival unable to write 1' + - notify_on_post_filename) + save_string(following_handles + handle + '\n', + notify_on_post_filename, + 'EX: _notify_on_post_arrival unable to write 1' + + notify_on_post_filename) # already in the notifyOnPost file? if handle + '\n' in following_handles or \ @@ -86,26 +82,18 @@ def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str, new_following_handles += followed + '\n' following_handles = new_following_handles - try: - with open(notify_on_post_filename, 'w+', - encoding='utf-8') as fp_notify: - fp_notify.write(following_handles) - except OSError: - print('EX: _notify_on_post_arrival unable to write 2' + - notify_on_post_filename) + save_string(following_handles, notify_on_post_filename, + 'EX: _notify_on_post_arrival unable to write 2' + + notify_on_post_filename) else: print(handle + ' not in notifyOnPost.txt') # not already in the notifyOnPost file if add: # append to the list of handles following_handles += handle + '\n' - try: - with open(notify_on_post_filename, 'w+', - encoding='utf-8') as fp_notify: - fp_notify.write(following_handles) - except OSError: - print('EX: _notify_on_post_arrival unable to write 3' + - notify_on_post_filename) + save_string(following_handles, notify_on_post_filename, + 'EX: _notify_on_post_arrival unable to write 3' + + notify_on_post_filename) def add_notify_on_post(base_dir: str, nickname: str, domain: str, @@ -138,11 +126,7 @@ def notify_when_person_posts(base_dir: str, nickname: str, domain: str, handle = following_nickname + '@' + following_domain if not os.path.isfile(notify_on_post_filename): # create a new notifyOnPost file - try: - with open(notify_on_post_filename, 'w+', - encoding='utf-8') as fp_notify: - fp_notify.write('') - except OSError: - print('EX: notify_when_person_posts unable to write ' + - notify_on_post_filename) + save_string('\n', notify_on_post_filename, + 'EX: notify_when_person_posts unable to write ' + + notify_on_post_filename) return text_in_file(handle + '\n', notify_on_post_filename, False) diff --git a/person.py b/person.py index 2ed8dd26e..6a1f08548 100644 --- a/person.py +++ b/person.py @@ -92,6 +92,9 @@ from cache import remove_person_from_cache from filters import is_filtered_bio from follow import is_following_actor from data import load_list +from data import save_string +from data import load_string +from data import append_string def generate_rsa_key() -> (str, str): @@ -631,22 +634,16 @@ def _create_person_base(base_dir: str, nickname: str, domain: str, port: int, if not os.path.isdir(base_dir + private_keys_subdir): os.mkdir(base_dir + private_keys_subdir) filename = base_dir + private_keys_subdir + '/' + handle + '.key' - try: - with open(filename, 'w+', encoding='utf-8') as fp_text: - print(private_key_pem, file=fp_text) - except OSError: - print('EX: _create_person_base unable to save ' + filename) + save_string(private_key_pem, filename, + 'EX: _create_person_base unable to save 1 ' + filename) # save the public key public_keys_subdir = '/keys/public' if not os.path.isdir(base_dir + public_keys_subdir): os.mkdir(base_dir + public_keys_subdir) filename = base_dir + public_keys_subdir + '/' + handle + '.pem' - try: - with open(filename, 'w+', encoding='utf-8') as fp_text: - print(public_key_pem, file=fp_text) - except OSError: - print('EX: _create_person_base unable to save 2 ' + filename) + save_string(public_key_pem, filename, + 'EX: _create_person_base unable to save 2 ' + filename) if password: password = remove_eol(password).strip() @@ -788,34 +785,24 @@ def create_person(base_dir: str, nickname: str, domain: str, port: int, if manual_follower_approval: follow_dms_filename = \ acct_dir(base_dir, nickname, domain) + '/.followDMs' - try: - with open(follow_dms_filename, 'w+', encoding='utf-8') as fp_foll: - fp_foll.write('\n') - except OSError: - print('EX: create_person unable to write ' + follow_dms_filename) + save_string('\n', follow_dms_filename, + 'EX: create_person unable to write ' + follow_dms_filename) # notify when posts are liked if nickname != 'news': notify_likes_filename = \ acct_dir(base_dir, nickname, domain) + '/.notifyLikes' - try: - with open(notify_likes_filename, 'w+', encoding='utf-8') as fp_lik: - fp_lik.write('\n') - except OSError: - print('EX: create_person unable to write 2 ' + - notify_likes_filename) + save_string('\n', notify_likes_filename, + 'EX: create_person unable to write 2 ' + + notify_likes_filename) # notify when posts have emoji reactions if nickname != 'news': notify_reactions_filename = \ acct_dir(base_dir, nickname, domain) + '/.notifyReactions' - try: - with open(notify_reactions_filename, 'w+', - encoding='utf-8') as fp_notify: - fp_notify.write('\n') - except OSError: - print('EX: create_person unable to write 3 ' + - notify_reactions_filename) + save_string('\n', notify_reactions_filename, + 'EX: create_person unable to write 3 ' + + notify_reactions_filename) theme = get_config_param(base_dir, 'theme') if not theme: @@ -1310,13 +1297,12 @@ def _unsuspend_media_for_account(base_dir: str, account_dir: str) -> None: if not os.path.isfile(account_media_log_filename): return - media_log = [] - try: - with open(account_media_log_filename, 'r', - encoding='utf-8') as fp_log: - media_log = fp_log.read().split('\n') - except OSError: - print('EX: suspend unable to read media log for ' + account_dir) + media_log: list[str] = [] + media_log_str = load_string(account_media_log_filename, + 'EX: suspend unable to read media log for ' + + account_dir) + if media_log_str: + media_log = media_log_str.split('\n') for filename in media_log: media_filename = base_dir + filename @@ -1356,13 +1342,12 @@ def _suspend_media_for_account(base_dir: str, account_dir: str) -> None: if not os.path.isfile(account_media_log_filename): return - media_log = [] - try: - with open(account_media_log_filename, 'r', - encoding='utf-8') as fp_log: - media_log = fp_log.read().split('\n') - except OSError: - print('EX: suspend unable to read media log for ' + account_dir) + media_log: list[str] = [] + media_log_str = load_string(account_media_log_filename, + 'EX: suspend unable to read media log for ' + + account_dir) + if media_log_str: + media_log = media_log_str.split('\n') for filename in media_log: media_filename = base_dir + filename @@ -1418,17 +1403,13 @@ def suspend_account(base_dir: str, nickname: str, domain: str) -> None: for suspended in lines: if suspended.strip('\n').strip('\r') == nickname: return - try: - with open(suspended_filename, 'a+', encoding='utf-8') as fp_sus: - fp_sus.write(nickname + '\n') - except OSError: - print('EX: suspend_account unable to append ' + suspended_filename) + append_string(nickname + '\n', suspended_filename, + 'EX: suspend_account unable to append ' + + suspended_filename) else: - try: - with open(suspended_filename, 'w+', encoding='utf-8') as fp_sus: - fp_sus.write(nickname + '\n') - except OSError: - print('EX: suspend_account unable to write ' + suspended_filename) + save_string(nickname + '\n', suspended_filename, + 'EX: suspend_account unable to write ' + + suspended_filename) _suspend_media_for_account(base_dir, account_dir) @@ -1505,14 +1486,14 @@ def _remove_account_media(base_dir: str, nickname: str, domain: str) -> None: account_dir = acct_dir(base_dir, nickname, domain) account_media_log_filename = account_dir + '/media_log.txt' - media_log = [] + media_log: list[str] = [] if os.path.isfile(account_media_log_filename): - try: - with open(account_media_log_filename, 'r', - encoding='utf-8') as fp_log: - media_log = fp_log.read().split('\n') - except OSError: - print('EX: remove unable to read media log for ' + nickname) + media_log_str = \ + load_string(account_media_log_filename, + 'EX: remove unable to read media log for ' + + nickname) + if media_log_str: + media_log = media_log_str.split('\n') for filename in media_log: media_filename = base_dir + filename @@ -1690,20 +1671,15 @@ def is_person_snoozed(base_dir: str, nickname: str, domain: str, except OSError: print('EX: is_person_snoozed unable to read ' + snoozed_filename) if replace_str: - content = None - try: - with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed: - content = fp_snoozed.read().replace(replace_str, '') - except OSError: - print('EX: is_person_snoozed unable to read 2 ' + snoozed_filename) + content = load_string(snoozed_filename, + 'EX: is_person_snoozed unable to read 2 ' + + snoozed_filename) if content: - try: - with open(snoozed_filename, 'w+', - encoding='utf-8') as fp_snooze: - fp_snooze.write(content) - except OSError: - print('EX: is_person_snoozed unable to write ' + - snoozed_filename) + content = content.replace(replace_str, '') + if content: + save_string(content, snoozed_filename, + 'EX: is_person_snoozed unable to write ' + + snoozed_filename) if text_in_file(snooze_actor + ' ', snoozed_filename): return True @@ -1722,12 +1698,9 @@ def person_snooze(base_dir: str, nickname: str, domain: str, if os.path.isfile(snoozed_filename): if text_in_file(snooze_actor + ' ', snoozed_filename): return - try: - with open(snoozed_filename, 'a+', encoding='utf-8') as fp_snoozed: - fp_snoozed.write(snooze_actor + ' ' + - str(int(time.time())) + '\n') - except OSError: - print('EX: person_snooze unable to append ' + snoozed_filename) + text = snooze_actor + ' ' + str(int(time.time())) + '\n' + append_string(text, snoozed_filename, + 'EX: person_snooze unable to append ' + snoozed_filename) def person_unsnooze(base_dir: str, nickname: str, domain: str, @@ -1753,19 +1726,15 @@ def person_unsnooze(base_dir: str, nickname: str, domain: str, except OSError: print('EX: person_unsnooze unable to read ' + snoozed_filename) if replace_str: - content = None - try: - with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed: - content = fp_snoozed.read().replace(replace_str, '') - except OSError: - print('EX: person_unsnooze unable to read 2 ' + snoozed_filename) + content = load_string(snoozed_filename, + 'EX: person_unsnooze unable to read 2 ' + + snoozed_filename) + if content: + content = content.replace(replace_str, '') if content is not None: - try: - with open(snoozed_filename, 'w+', - encoding='utf-8') as fp_snooze: - fp_snooze.write(content) - except OSError: - print('EX: unable to write ' + snoozed_filename) + save_string(content, snoozed_filename, + 'EX: person_unsnooze unable to write ' + + snoozed_filename) def set_person_notes(base_dir: str, nickname: str, domain: str, @@ -1780,11 +1749,9 @@ def set_person_notes(base_dir: str, nickname: str, domain: str, if not os.path.isdir(notes_dir): os.mkdir(notes_dir) notes_filename = notes_dir + '/' + handle + '.txt' - try: - with open(notes_filename, 'w+', encoding='utf-8') as fp_notes: - fp_notes.write(notes) - except OSError: - print('EX: unable to write ' + notes_filename) + if not save_string(notes, notes_filename, + 'EX: set_person_notes unable to write ' + + notes_filename): return False return True @@ -1793,18 +1760,16 @@ def get_person_notes(base_dir: str, nickname: str, domain: str, handle: str) -> str: """Returns notes about a person """ - person_notes = '' + person_notes: str = '' person_notes_filename = \ acct_dir(base_dir, nickname, domain) + \ '/notes/' + handle + '.txt' if os.path.isfile(person_notes_filename): - try: - with open(person_notes_filename, 'r', - encoding='utf-8') as fp_notes: - person_notes = fp_notes.read() - except OSError: - print('EX: get_person_notes unable to read ' + - person_notes_filename) + person_notes = load_string(person_notes_filename, + 'EX: get_person_notes unable to read ' + + person_notes_filename) + if person_notes is None: + person_notes = '' return person_notes @@ -2117,12 +2082,12 @@ def get_person_avatar_url(base_dir: str, person_url: str, continue if ext != 'svg': return im_path - content = '' - try: - with open(im_filename, 'r', encoding='utf-8') as fp_im: - content = fp_im.read() - except OSError: - print('EX: get_person_avatar_url unable to read ' + im_filename) + content: str = \ + load_string(im_filename, + 'EX: get_person_avatar_url unable to read ' + + im_filename) + if content is None: + content = '' if not dangerous_svg(content, False): return im_path