Replace file operations with functions

main
bashrc 2026-04-26 20:33:19 +01:00
parent 2ff561c867
commit 733d4adee2
4 changed files with 117 additions and 174 deletions

22
mitm.py
View File

@ -16,6 +16,8 @@ __module_group__ = "Core"
import os import os
from utils import data_dir from utils import data_dir
from data import load_string
from data import save_string
def detect_mitm(self) -> bool: def detect_mitm(self) -> bool:
@ -55,15 +57,13 @@ def load_mitm_servers(base_dir: str) -> []:
mitm_servers_filename = data_dir(base_dir) + '/mitm_servers.txt' mitm_servers_filename = data_dir(base_dir) + '/mitm_servers.txt'
mitm_servers: list[str] = [] mitm_servers: list[str] = []
if os.path.isfile(mitm_servers_filename): if os.path.isfile(mitm_servers_filename):
try: mitm_servers_str = \
with open(mitm_servers_filename, 'r', load_string(mitm_servers_filename,
encoding='utf-8') as fp_mitm: 'EX: error while reading mitm_servers.txt')
mitm_servers = fp_mitm.read() if mitm_servers_str:
except OSError: mitm_servers = mitm_servers_str.split('\n')
print('EX: error while reading mitm_servers.txt')
if not mitm_servers: if not mitm_servers:
return [] return []
mitm_servers = mitm_servers.split('\n')
return mitm_servers return mitm_servers
@ -76,9 +76,5 @@ def save_mitm_servers(base_dir: str, mitm_servers: []) -> None:
mitm_servers_str += domain + '\n' mitm_servers_str += domain + '\n'
mitm_servers_filename = data_dir(base_dir) + '/mitm_servers.txt' mitm_servers_filename = data_dir(base_dir) + '/mitm_servers.txt'
try: save_string(mitm_servers_str, mitm_servers_filename,
with open(mitm_servers_filename, 'w+', 'EX: error while saving mitm_servers.txt')
encoding='utf-8') as fp_mitm:
fp_mitm.write(mitm_servers_str)
except OSError:
print('EX: error while saving mitm_servers.txt')

View File

@ -55,6 +55,7 @@ from filters import is_filtered
from session import download_image_any_mime_type from session import download_image_any_mime_type
from content import remove_script from content import remove_script
from data import load_list from data import load_list
from data import load_string
def _remove_cdata(text: str) -> str: def _remove_cdata(text: str) -> str:
@ -403,14 +404,11 @@ def load_hashtag_categories(base_dir: str, language: str) -> None:
if not os.path.isfile(hashtag_categories_filename): if not os.path.isfile(hashtag_categories_filename):
return return
try: xml_str = load_string(hashtag_categories_filename,
with open(hashtag_categories_filename, 'r', 'EX: load_hashtag_categories unable to read ' +
encoding='utf-8') as fp_cat:
xml_str = fp_cat.read()
_xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True)
except OSError:
print('EX: load_hashtag_categories unable to read ' +
hashtag_categories_filename) hashtag_categories_filename)
if xml_str:
_xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True)
def _xml2str_to_hashtag_categories(base_dir: str, xml_str: str, def _xml2str_to_hashtag_categories(base_dir: str, xml_str: str,

View File

@ -11,6 +11,8 @@ import os
from utils import remove_domain_port from utils import remove_domain_port
from utils import acct_dir from utils import acct_dir
from utils import text_in_file from utils import text_in_file
from data import load_string
from data import save_string
def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str, def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str,
@ -40,34 +42,28 @@ def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str,
# get the contents of the notifyOnPost file, which is # get the contents of the notifyOnPost file, which is
# a set of handles # a set of handles
following_handles = '' following_handles: str = ''
if os.path.isfile(notify_on_post_filename): if os.path.isfile(notify_on_post_filename):
print('notify file exists') print('notify file exists')
try: following_handles = \
with open(notify_on_post_filename, 'r', load_string(notify_on_post_filename,
encoding='utf-8') as fp_calendar: 'EX: _notify_on_post_arrival unable to read 1 ' +
following_handles = fp_calendar.read()
except OSError:
print('EX: _notify_on_post_arrival unable to read 1 ' +
notify_on_post_filename) notify_on_post_filename)
if following_handles is None:
following_handles = ''
else: else:
# create a new notifyOnPost file from the following file # create a new notifyOnPost file from the following file
print('Creating notifyOnPost file ' + notify_on_post_filename) print('Creating notifyOnPost file ' + notify_on_post_filename)
following_handles = '' following_handles = \
try: load_string(following_filename,
with open(following_filename, 'r', 'EX: _notify_on_post_arrival unable to read 2 ' +
encoding='utf-8') as fp_following:
following_handles = fp_following.read()
except OSError:
print('EX: _notify_on_post_arrival unable to read 2 ' +
following_filename) following_filename)
if following_handles is None:
following_handles = ''
if add: if add:
try: save_string(following_handles + handle + '\n',
with open(notify_on_post_filename, 'w+', notify_on_post_filename,
encoding='utf-8') as fp_notify: 'EX: _notify_on_post_arrival unable to write 1' +
fp_notify.write(following_handles + handle + '\n')
except OSError:
print('EX: _notify_on_post_arrival unable to write 1' +
notify_on_post_filename) notify_on_post_filename)
# already in the notifyOnPost file? # already in the notifyOnPost file?
@ -86,12 +82,8 @@ def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str,
new_following_handles += followed + '\n' new_following_handles += followed + '\n'
following_handles = new_following_handles following_handles = new_following_handles
try: save_string(following_handles, notify_on_post_filename,
with open(notify_on_post_filename, 'w+', 'EX: _notify_on_post_arrival unable to write 2' +
encoding='utf-8') as fp_notify:
fp_notify.write(following_handles)
except OSError:
print('EX: _notify_on_post_arrival unable to write 2' +
notify_on_post_filename) notify_on_post_filename)
else: else:
print(handle + ' not in notifyOnPost.txt') print(handle + ' not in notifyOnPost.txt')
@ -99,12 +91,8 @@ def _notify_on_post_arrival(base_dir: str, nickname: str, domain: str,
if add: if add:
# append to the list of handles # append to the list of handles
following_handles += handle + '\n' following_handles += handle + '\n'
try: save_string(following_handles, notify_on_post_filename,
with open(notify_on_post_filename, 'w+', 'EX: _notify_on_post_arrival unable to write 3' +
encoding='utf-8') as fp_notify:
fp_notify.write(following_handles)
except OSError:
print('EX: _notify_on_post_arrival unable to write 3' +
notify_on_post_filename) notify_on_post_filename)
@ -138,11 +126,7 @@ def notify_when_person_posts(base_dir: str, nickname: str, domain: str,
handle = following_nickname + '@' + following_domain handle = following_nickname + '@' + following_domain
if not os.path.isfile(notify_on_post_filename): if not os.path.isfile(notify_on_post_filename):
# create a new notifyOnPost file # create a new notifyOnPost file
try: save_string('\n', notify_on_post_filename,
with open(notify_on_post_filename, 'w+', 'EX: notify_when_person_posts unable to write ' +
encoding='utf-8') as fp_notify:
fp_notify.write('')
except OSError:
print('EX: notify_when_person_posts unable to write ' +
notify_on_post_filename) notify_on_post_filename)
return text_in_file(handle + '\n', notify_on_post_filename, False) return text_in_file(handle + '\n', notify_on_post_filename, False)

173
person.py
View File

@ -92,6 +92,9 @@ from cache import remove_person_from_cache
from filters import is_filtered_bio from filters import is_filtered_bio
from follow import is_following_actor from follow import is_following_actor
from data import load_list from data import load_list
from data import save_string
from data import load_string
from data import append_string
def generate_rsa_key() -> (str, str): def generate_rsa_key() -> (str, str):
@ -631,22 +634,16 @@ def _create_person_base(base_dir: str, nickname: str, domain: str, port: int,
if not os.path.isdir(base_dir + private_keys_subdir): if not os.path.isdir(base_dir + private_keys_subdir):
os.mkdir(base_dir + private_keys_subdir) os.mkdir(base_dir + private_keys_subdir)
filename = base_dir + private_keys_subdir + '/' + handle + '.key' filename = base_dir + private_keys_subdir + '/' + handle + '.key'
try: save_string(private_key_pem, filename,
with open(filename, 'w+', encoding='utf-8') as fp_text: 'EX: _create_person_base unable to save 1 ' + filename)
print(private_key_pem, file=fp_text)
except OSError:
print('EX: _create_person_base unable to save ' + filename)
# save the public key # save the public key
public_keys_subdir = '/keys/public' public_keys_subdir = '/keys/public'
if not os.path.isdir(base_dir + public_keys_subdir): if not os.path.isdir(base_dir + public_keys_subdir):
os.mkdir(base_dir + public_keys_subdir) os.mkdir(base_dir + public_keys_subdir)
filename = base_dir + public_keys_subdir + '/' + handle + '.pem' filename = base_dir + public_keys_subdir + '/' + handle + '.pem'
try: save_string(public_key_pem, filename,
with open(filename, 'w+', encoding='utf-8') as fp_text: 'EX: _create_person_base unable to save 2 ' + filename)
print(public_key_pem, file=fp_text)
except OSError:
print('EX: _create_person_base unable to save 2 ' + filename)
if password: if password:
password = remove_eol(password).strip() password = remove_eol(password).strip()
@ -788,33 +785,23 @@ def create_person(base_dir: str, nickname: str, domain: str, port: int,
if manual_follower_approval: if manual_follower_approval:
follow_dms_filename = \ follow_dms_filename = \
acct_dir(base_dir, nickname, domain) + '/.followDMs' acct_dir(base_dir, nickname, domain) + '/.followDMs'
try: save_string('\n', follow_dms_filename,
with open(follow_dms_filename, 'w+', encoding='utf-8') as fp_foll: 'EX: create_person unable to write ' + follow_dms_filename)
fp_foll.write('\n')
except OSError:
print('EX: create_person unable to write ' + follow_dms_filename)
# notify when posts are liked # notify when posts are liked
if nickname != 'news': if nickname != 'news':
notify_likes_filename = \ notify_likes_filename = \
acct_dir(base_dir, nickname, domain) + '/.notifyLikes' acct_dir(base_dir, nickname, domain) + '/.notifyLikes'
try: save_string('\n', notify_likes_filename,
with open(notify_likes_filename, 'w+', encoding='utf-8') as fp_lik: 'EX: create_person unable to write 2 ' +
fp_lik.write('\n')
except OSError:
print('EX: create_person unable to write 2 ' +
notify_likes_filename) notify_likes_filename)
# notify when posts have emoji reactions # notify when posts have emoji reactions
if nickname != 'news': if nickname != 'news':
notify_reactions_filename = \ notify_reactions_filename = \
acct_dir(base_dir, nickname, domain) + '/.notifyReactions' acct_dir(base_dir, nickname, domain) + '/.notifyReactions'
try: save_string('\n', notify_reactions_filename,
with open(notify_reactions_filename, 'w+', 'EX: create_person unable to write 3 ' +
encoding='utf-8') as fp_notify:
fp_notify.write('\n')
except OSError:
print('EX: create_person unable to write 3 ' +
notify_reactions_filename) notify_reactions_filename)
theme = get_config_param(base_dir, 'theme') theme = get_config_param(base_dir, 'theme')
@ -1310,13 +1297,12 @@ def _unsuspend_media_for_account(base_dir: str, account_dir: str) -> None:
if not os.path.isfile(account_media_log_filename): if not os.path.isfile(account_media_log_filename):
return return
media_log = [] media_log: list[str] = []
try: media_log_str = load_string(account_media_log_filename,
with open(account_media_log_filename, 'r', 'EX: suspend unable to read media log for ' +
encoding='utf-8') as fp_log: account_dir)
media_log = fp_log.read().split('\n') if media_log_str:
except OSError: media_log = media_log_str.split('\n')
print('EX: suspend unable to read media log for ' + account_dir)
for filename in media_log: for filename in media_log:
media_filename = base_dir + filename media_filename = base_dir + filename
@ -1356,13 +1342,12 @@ def _suspend_media_for_account(base_dir: str, account_dir: str) -> None:
if not os.path.isfile(account_media_log_filename): if not os.path.isfile(account_media_log_filename):
return return
media_log = [] media_log: list[str] = []
try: media_log_str = load_string(account_media_log_filename,
with open(account_media_log_filename, 'r', 'EX: suspend unable to read media log for ' +
encoding='utf-8') as fp_log: account_dir)
media_log = fp_log.read().split('\n') if media_log_str:
except OSError: media_log = media_log_str.split('\n')
print('EX: suspend unable to read media log for ' + account_dir)
for filename in media_log: for filename in media_log:
media_filename = base_dir + filename media_filename = base_dir + filename
@ -1418,17 +1403,13 @@ def suspend_account(base_dir: str, nickname: str, domain: str) -> None:
for suspended in lines: for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname: if suspended.strip('\n').strip('\r') == nickname:
return return
try: append_string(nickname + '\n', suspended_filename,
with open(suspended_filename, 'a+', encoding='utf-8') as fp_sus: 'EX: suspend_account unable to append ' +
fp_sus.write(nickname + '\n') suspended_filename)
except OSError:
print('EX: suspend_account unable to append ' + suspended_filename)
else: else:
try: save_string(nickname + '\n', suspended_filename,
with open(suspended_filename, 'w+', encoding='utf-8') as fp_sus: 'EX: suspend_account unable to write ' +
fp_sus.write(nickname + '\n') suspended_filename)
except OSError:
print('EX: suspend_account unable to write ' + suspended_filename)
_suspend_media_for_account(base_dir, account_dir) _suspend_media_for_account(base_dir, account_dir)
@ -1505,14 +1486,14 @@ def _remove_account_media(base_dir: str, nickname: str, domain: str) -> None:
account_dir = acct_dir(base_dir, nickname, domain) account_dir = acct_dir(base_dir, nickname, domain)
account_media_log_filename = account_dir + '/media_log.txt' account_media_log_filename = account_dir + '/media_log.txt'
media_log = [] media_log: list[str] = []
if os.path.isfile(account_media_log_filename): if os.path.isfile(account_media_log_filename):
try: media_log_str = \
with open(account_media_log_filename, 'r', load_string(account_media_log_filename,
encoding='utf-8') as fp_log: 'EX: remove unable to read media log for ' +
media_log = fp_log.read().split('\n') nickname)
except OSError: if media_log_str:
print('EX: remove unable to read media log for ' + nickname) media_log = media_log_str.split('\n')
for filename in media_log: for filename in media_log:
media_filename = base_dir + filename media_filename = base_dir + filename
@ -1690,19 +1671,14 @@ def is_person_snoozed(base_dir: str, nickname: str, domain: str,
except OSError: except OSError:
print('EX: is_person_snoozed unable to read ' + snoozed_filename) print('EX: is_person_snoozed unable to read ' + snoozed_filename)
if replace_str: if replace_str:
content = None content = load_string(snoozed_filename,
try: 'EX: is_person_snoozed unable to read 2 ' +
with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed: snoozed_filename)
content = fp_snoozed.read().replace(replace_str, '')
except OSError:
print('EX: is_person_snoozed unable to read 2 ' + snoozed_filename)
if content: if content:
try: content = content.replace(replace_str, '')
with open(snoozed_filename, 'w+', if content:
encoding='utf-8') as fp_snooze: save_string(content, snoozed_filename,
fp_snooze.write(content) 'EX: is_person_snoozed unable to write ' +
except OSError:
print('EX: is_person_snoozed unable to write ' +
snoozed_filename) snoozed_filename)
if text_in_file(snooze_actor + ' ', snoozed_filename): if text_in_file(snooze_actor + ' ', snoozed_filename):
@ -1722,12 +1698,9 @@ def person_snooze(base_dir: str, nickname: str, domain: str,
if os.path.isfile(snoozed_filename): if os.path.isfile(snoozed_filename):
if text_in_file(snooze_actor + ' ', snoozed_filename): if text_in_file(snooze_actor + ' ', snoozed_filename):
return return
try: text = snooze_actor + ' ' + str(int(time.time())) + '\n'
with open(snoozed_filename, 'a+', encoding='utf-8') as fp_snoozed: append_string(text, snoozed_filename,
fp_snoozed.write(snooze_actor + ' ' + 'EX: person_snooze unable to append ' + snoozed_filename)
str(int(time.time())) + '\n')
except OSError:
print('EX: person_snooze unable to append ' + snoozed_filename)
def person_unsnooze(base_dir: str, nickname: str, domain: str, def person_unsnooze(base_dir: str, nickname: str, domain: str,
@ -1753,19 +1726,15 @@ def person_unsnooze(base_dir: str, nickname: str, domain: str,
except OSError: except OSError:
print('EX: person_unsnooze unable to read ' + snoozed_filename) print('EX: person_unsnooze unable to read ' + snoozed_filename)
if replace_str: if replace_str:
content = None content = load_string(snoozed_filename,
try: 'EX: person_unsnooze unable to read 2 ' +
with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed: snoozed_filename)
content = fp_snoozed.read().replace(replace_str, '') if content:
except OSError: content = content.replace(replace_str, '')
print('EX: person_unsnooze unable to read 2 ' + snoozed_filename)
if content is not None: if content is not None:
try: save_string(content, snoozed_filename,
with open(snoozed_filename, 'w+', 'EX: person_unsnooze unable to write ' +
encoding='utf-8') as fp_snooze: snoozed_filename)
fp_snooze.write(content)
except OSError:
print('EX: unable to write ' + snoozed_filename)
def set_person_notes(base_dir: str, nickname: str, domain: str, def set_person_notes(base_dir: str, nickname: str, domain: str,
@ -1780,11 +1749,9 @@ def set_person_notes(base_dir: str, nickname: str, domain: str,
if not os.path.isdir(notes_dir): if not os.path.isdir(notes_dir):
os.mkdir(notes_dir) os.mkdir(notes_dir)
notes_filename = notes_dir + '/' + handle + '.txt' notes_filename = notes_dir + '/' + handle + '.txt'
try: if not save_string(notes, notes_filename,
with open(notes_filename, 'w+', encoding='utf-8') as fp_notes: 'EX: set_person_notes unable to write ' +
fp_notes.write(notes) notes_filename):
except OSError:
print('EX: unable to write ' + notes_filename)
return False return False
return True return True
@ -1793,18 +1760,16 @@ def get_person_notes(base_dir: str, nickname: str, domain: str,
handle: str) -> str: handle: str) -> str:
"""Returns notes about a person """Returns notes about a person
""" """
person_notes = '' person_notes: str = ''
person_notes_filename = \ person_notes_filename = \
acct_dir(base_dir, nickname, domain) + \ acct_dir(base_dir, nickname, domain) + \
'/notes/' + handle + '.txt' '/notes/' + handle + '.txt'
if os.path.isfile(person_notes_filename): if os.path.isfile(person_notes_filename):
try: person_notes = load_string(person_notes_filename,
with open(person_notes_filename, 'r', 'EX: get_person_notes unable to read ' +
encoding='utf-8') as fp_notes:
person_notes = fp_notes.read()
except OSError:
print('EX: get_person_notes unable to read ' +
person_notes_filename) person_notes_filename)
if person_notes is None:
person_notes = ''
return person_notes return person_notes
@ -2117,12 +2082,12 @@ def get_person_avatar_url(base_dir: str, person_url: str,
continue continue
if ext != 'svg': if ext != 'svg':
return im_path return im_path
content: str = \
load_string(im_filename,
'EX: get_person_avatar_url unable to read ' +
im_filename)
if content is None:
content = '' content = ''
try:
with open(im_filename, 'r', encoding='utf-8') as fp_im:
content = fp_im.read()
except OSError:
print('EX: get_person_avatar_url unable to read ' + im_filename)
if not dangerous_svg(content, False): if not dangerous_svg(content, False):
return im_path return im_path