Replace file operations with functions

main
bashrc 2026-04-26 22:59:42 +01:00
parent 664f13d3e2
commit 54c759aea3
11 changed files with 140 additions and 173 deletions

View File

@ -17,6 +17,8 @@ from utils import dangerous_markup
from utils import get_reply_to from utils import get_reply_to
from utils import get_actor_from_post from utils import get_actor_from_post
from data import load_list from data import load_list
from data import save_string
from data import append_string
def is_vote(base_dir: str, nickname: str, domain: str, def is_vote(base_dir: str, nickname: str, domain: str,
@ -125,25 +127,17 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
actor_url = get_actor_from_post(reply_json) actor_url = get_actor_from_post(reply_json)
if not os.path.isfile(voters_filename): if not os.path.isfile(voters_filename):
# create a new voters file # create a new voters file
try: save_string(actor_url + voters_file_separator + reply_vote + '\n',
with open(voters_filename, 'w+', voters_filename,
encoding='utf-8') as fp_voters: 'EX: unable to write voters file ' + voters_filename)
fp_voters.write(actor_url +
voters_file_separator +
reply_vote + '\n')
except OSError:
print('EX: unable to write voters file ' + voters_filename)
else: else:
if not text_in_file(actor_url, voters_filename): if not text_in_file(actor_url, voters_filename):
# append to the voters file # append to the voters file
try: append_string(actor_url + voters_file_separator +
with open(voters_filename, 'a+', reply_vote + '\n',
encoding='utf-8') as fp_voters: voters_filename,
fp_voters.write(actor_url + 'EX: unable to append to voters file ' +
voters_file_separator + voters_filename)
reply_vote + '\n')
except OSError:
print('EX: unable to append to voters file ' + voters_filename)
else: else:
# change an entry in the voters file # change an entry in the voters file
lines: list[str] = \ lines: list[str] = \

View File

@ -39,6 +39,7 @@ from webfinger import webfinger_handle
from auth import create_basic_auth_header from auth import create_basic_auth_header
from posts import get_person_box from posts import get_person_box
from data import load_list from data import load_list
from data import save_string
# the maximum number of reactions from individual actors which can be # the maximum number of reactions from individual actors which can be
# added to a post. Hence an adversary can't bombard you with sockpuppet # added to a post. Hence an adversary can't bombard you with sockpuppet
@ -519,12 +520,8 @@ def _update_common_reactions(base_dir: str, emoji_content: str) -> None:
return return
else: else:
line = str(1).zfill(16) + ' ' + emoji_content + '\n' line = str(1).zfill(16) + ' ' + emoji_content + '\n'
try: if not save_string(line, common_reactions_filename,
with open(common_reactions_filename, 'w+', 'EX: error writing common reactions 2'):
encoding='utf-8') as fp_react:
fp_react.write(line)
except OSError:
print('EX: error writing common reactions 2')
return return

View File

@ -22,6 +22,8 @@ from utils import remove_html
from formats import get_image_extensions from formats import get_image_extensions
from timeFunctions import date_epoch from timeFunctions import date_epoch
from timeFunctions import date_from_string_format from timeFunctions import date_from_string_format
from data import save_string
from data import load_string
def get_book_link_from_content(content: str) -> str: def get_book_link_from_content(content: str) -> str:
@ -407,12 +409,8 @@ def _update_recent_books_list(base_dir: str, book_id: str,
print('WARN: Failed to write entry to recent books ' + print('WARN: Failed to write entry to recent books ' +
recent_books_filename + ' ' + str(ex)) recent_books_filename + ' ' + str(ex))
else: else:
try: save_string(book_id + '\n', recent_books_filename,
with open(recent_books_filename, 'w+', 'EX: unable to write recent books ' +
encoding='utf-8') as fp_recent:
fp_recent.write(book_id + '\n')
except OSError:
print('EX: unable to write recent books ' +
recent_books_filename) recent_books_filename)
@ -426,13 +424,12 @@ def _deduplicate_recent_books_list(base_dir: str,
# load recent books as a list # load recent books as a list
recent_lines: list[str] = [] recent_lines: list[str] = []
try: recent_lines_str = \
with open(recent_books_filename, 'r', load_string(recent_books_filename,
encoding='utf-8') as fp_recent: 'WARN: Failed to read recent books trim ' +
recent_lines = fp_recent.read().split('\n') recent_books_filename + ' [ex]')
except OSError as ex: if recent_lines_str:
print('WARN: Failed to read recent books trim ' + recent_lines = recent_lines_str.split('\n')
recent_books_filename + ' ' + str(ex))
# deduplicate the list # deduplicate the list
new_recent_lines: list[str] = [] new_recent_lines: list[str] = []
@ -444,12 +441,8 @@ def _deduplicate_recent_books_list(base_dir: str,
result = '' result = ''
for line in recent_lines: for line in recent_lines:
result += line + '\n' result += line + '\n'
try: save_string(result, recent_books_filename,
with open(recent_books_filename, 'w+', 'EX: unable to deduplicate recent books ' +
encoding='utf-8') as fp_recent:
fp_recent.write(result)
except OSError:
print('EX: unable to deduplicate recent books ' +
recent_books_filename) recent_books_filename)
# remove excess lines from the list # remove excess lines from the list
@ -457,12 +450,8 @@ def _deduplicate_recent_books_list(base_dir: str,
result = '' result = ''
for ctr in range(max_recent_books): for ctr in range(max_recent_books):
result += recent_lines[ctr] + '\n' result += recent_lines[ctr] + '\n'
try: save_string(result, recent_books_filename,
with open(recent_books_filename, 'w+', 'EX: unable to trim recent books ' +
encoding='utf-8') as fp_recent:
fp_recent.write(result)
except OSError:
print('EX: unable to trim recent books ' +
recent_books_filename) recent_books_filename)

View File

@ -22,6 +22,8 @@ from utils import is_account_dir
from utils import get_nickname_from_actor from utils import get_nickname_from_actor
from utils import get_domain_from_actor from utils import get_domain_from_actor
from utils import load_json from utils import load_json
from data import load_string
from data import save_string
def get_moved_accounts(base_dir: str, nickname: str, domain: str, def get_moved_accounts(base_dir: str, nickname: str, domain: str,
@ -31,27 +33,23 @@ def get_moved_accounts(base_dir: str, nickname: str, domain: str,
moved_accounts_filename = data_dir(base_dir) + '/actors_moved.txt' moved_accounts_filename = data_dir(base_dir) + '/actors_moved.txt'
if not os.path.isfile(moved_accounts_filename): if not os.path.isfile(moved_accounts_filename):
return {} return {}
refollow_str = '' refollow_str = \
try: load_string(moved_accounts_filename,
with open(moved_accounts_filename, 'r', 'EX: get_moved_accounts unable to read 1 ' +
encoding='utf-8') as fp_refollow:
refollow_str = fp_refollow.read()
except OSError:
print('EX: get_moved_accounts unable to read 1 ' +
moved_accounts_filename) moved_accounts_filename)
if refollow_str is None:
refollow_str = ''
refollow_list = refollow_str.split('\n') refollow_list = refollow_str.split('\n')
refollow_dict = {} refollow_dict = {}
follow_filename = \ follow_filename = \
acct_dir(base_dir, nickname, domain) + '/' + filename acct_dir(base_dir, nickname, domain) + '/' + filename
follow_str = '' follow_str = \
try: load_string(follow_filename,
with open(follow_filename, 'r', 'EX: get_moved_accounts unable to read 2 ' +
encoding='utf-8') as fp_follow:
follow_str = fp_follow.read()
except OSError:
print('EX: get_moved_accounts unable to read 2 ' +
follow_filename) follow_filename)
if follow_str is None:
follow_str = ''
follow_list = follow_str.split('\n') follow_list = follow_str.split('\n')
ctr = 0 ctr = 0
@ -248,14 +246,11 @@ def update_moved_actors(base_dir: str, debug: bool) -> None:
following_filename = dir_str + '/' + account + '/following.txt' following_filename = dir_str + '/' + account + '/following.txt'
if not os.path.isfile(following_filename): if not os.path.isfile(following_filename):
continue continue
following_str = '' following_str = \
try: load_string(following_filename,
with open(following_filename, 'r', 'EX: update_moved_actors unable to read ' +
encoding='utf-8') as fp_foll:
following_str = fp_foll.read()
except OSError:
print('EX: update_moved_actors unable to read ' +
following_filename) following_filename)
if following_str is None:
continue continue
following_list = following_str.split('\n') following_list = following_str.split('\n')
for handle in following_list: for handle in following_list:
@ -318,12 +313,8 @@ def update_moved_actors(base_dir: str, debug: bool) -> None:
moved_accounts_filename) moved_accounts_filename)
return return
try: save_string(moved_str, moved_accounts_filename,
with open(moved_accounts_filename, 'w+', 'EX: update_moved_actors unable to save ' +
encoding='utf-8') as fp_moved:
fp_moved.write(moved_str)
except OSError:
print('EX: update_moved_actors unable to save ' +
moved_accounts_filename) moved_accounts_filename)
@ -335,14 +326,12 @@ def _get_inactive_accounts(base_dir: str, nickname: str, domain: str,
# get the list of followers # get the list of followers
followers_filename = \ followers_filename = \
acct_dir(base_dir, nickname, domain) + '/followers.txt' acct_dir(base_dir, nickname, domain) + '/followers.txt'
followers_str = '' followers_str = \
try: load_string(followers_filename,
with open(followers_filename, 'r', 'EX: get_moved_accounts unable to read ' +
encoding='utf-8') as fp_follow:
followers_str = fp_follow.read()
except OSError:
print('EX: get_moved_accounts unable to read ' +
followers_filename) followers_filename)
if followers_str is None:
followers_str = ''
followers_list = followers_str.split('\n') followers_list = followers_str.split('\n')
result: list[str] = [] result: list[str] = []

View File

@ -17,6 +17,7 @@ from utils import text_in_file
from utils import get_config_param from utils import get_config_param
from status import get_status_number from status import get_status_number
from data import load_list from data import load_list
from data import save_string
def _clear_role_status(base_dir: str, role: str) -> None: def _clear_role_status(base_dir: str, role: str) -> None:
@ -78,13 +79,11 @@ def _add_role(base_dir: str, nickname: str, domain: str,
except OSError: except OSError:
print('EX: _add_role, failed to write roles file1 ' + role_file) print('EX: _add_role, failed to write roles file1 ' + role_file)
else: else:
try:
with open(role_file, 'w+', encoding='utf-8') as fp_role:
account_dir = acct_dir(base_dir, nickname, domain) account_dir = acct_dir(base_dir, nickname, domain)
if os.path.isdir(account_dir): if os.path.isdir(account_dir):
fp_role.write(nickname + '\n') save_string(nickname + '\n', role_file,
except OSError: 'EX: _add_role, failed to write roles file2 ' +
print('EX: _add_role, failed to write roles file2 ' + role_file) role_file)
def _remove_role(base_dir: str, nickname: str, role_filename: str) -> None: def _remove_role(base_dir: str, nickname: str, role_filename: str) -> None:

View File

@ -20,6 +20,8 @@ from utils import get_domain_from_actor
from utils import get_full_domain from utils import get_full_domain
from utils import get_followers_list from utils import get_followers_list
from utils import get_mutuals_of_person from utils import get_mutuals_of_person
from data import load_string
from data import save_string
def load_searchable_by_default(base_dir: str) -> {}: def load_searchable_by_default(base_dir: str) -> {}:
@ -34,11 +36,11 @@ def load_searchable_by_default(base_dir: str) -> {}:
nickname = account.split('@')[0] nickname = account.split('@')[0]
filename = os.path.join(dir_str, account) + '/.searchableByDefault' filename = os.path.join(dir_str, account) + '/.searchableByDefault'
if os.path.isfile(filename): if os.path.isfile(filename):
try: text = load_string(filename,
with open(filename, 'r', encoding='utf-8') as fp_search: 'EX: unable to load searchableByDefault ' +
result[nickname] = fp_search.read().strip() filename)
except OSError: if text:
print('EX: unable to load searchableByDefault ' + filename) result[nickname] = text.strip()
break break
return result return result
@ -58,11 +60,8 @@ def set_searchable_by(base_dir: str, nickname: str, domain: str,
return return
# write the new state # write the new state
try: save_string(searchable_by, filename,
with open(filename, 'w+', encoding='utf-8') as fp_search: 'EX: unable to write searchableByDropdown ' + filename)
fp_search.write(searchable_by)
except OSError:
print('EX: unable to write searchableByDropdown ' + filename)
def _actor_in_searchable_by(searchable_by: str, following_list: []) -> bool: def _actor_in_searchable_by(searchable_by: str, following_list: []) -> bool:

View File

@ -26,6 +26,8 @@ from utils import is_yggdrasil_url
from formats import image_mime_types_dict from formats import image_mime_types_dict
from mitm import detect_mitm from mitm import detect_mitm
from httpsig import create_signed_header from httpsig import create_signed_header
from data import append_string
from data import save_string
def create_session(proxy_type: str): def create_session(proxy_type: str):
@ -562,12 +564,13 @@ def site_is_verified(session, base_dir: str, http_prefix: str,
write_type = 'a+' write_type = 'a+'
if not verified_file_exists: if not verified_file_exists:
write_type = 'w+' write_type = 'w+'
try: if write_type == 'a+':
with open(verified_sites_filename, write_type, append_string(url + '\n', verified_sites_filename,
encoding='utf-8') as fp_verified: 'EX: Verified sites could not be updated 1 ' +
fp_verified.write(url + '\n') verified_sites_filename)
except OSError: else:
print('EX: Verified sites could not be updated ' + save_string(url + '\n', verified_sites_filename,
'EX: Verified sites could not be updated 2 ' +
verified_sites_filename) verified_sites_filename)
return verified return verified

View File

@ -60,6 +60,8 @@ from threads import begin_thread
from threads import thread_with_trace from threads import thread_with_trace
from cache import remove_person_from_cache from cache import remove_person_from_cache
from cache import store_person_in_cache from cache import store_person_in_cache
from data import save_string
from data import load_string
def _load_dfc_ids(base_dir: str, system_language: str, def _load_dfc_ids(base_dir: str, system_language: str,
@ -318,15 +320,15 @@ def _indicate_new_share_available(base_dir: str, http_prefix: str,
continue continue
local_actor = \ local_actor = \
local_actor_url(http_prefix, account_nickname, domain_full) local_actor_url(http_prefix, account_nickname, domain_full)
try: exc_text = \
with open(new_share_file, 'w+', encoding='utf-8') as fp_new: 'EX: _indicate_new_share_available unable to write ' + \
str(new_share_file)
if shares_file_type == 'shares': if shares_file_type == 'shares':
fp_new.write(local_actor + '/tlshares') save_string(local_actor + '/tlshares', new_share_file,
exc_text)
else: else:
fp_new.write(local_actor + '/tlwanted') save_string(local_actor + '/tlwanted', new_share_file,
except OSError: exc_text)
print('EX: _indicate_new_share_available unable to write ' +
str(new_share_file))
break break
@ -1828,15 +1830,14 @@ def _generate_next_shares_token_update(base_dir: str,
token_update_filename = token_update_dir + '/.tokenUpdate' token_update_filename = token_update_dir + '/.tokenUpdate'
next_update_sec = None next_update_sec = None
if os.path.isfile(token_update_filename): if os.path.isfile(token_update_filename):
try: next_update_str = \
with open(token_update_filename, 'r', encoding='utf-8') as fp_tok: load_string(token_update_filename,
next_update_str = fp_tok.read() 'EX: _generate_next_shares_token_update ' +
'unable to read ' +
token_update_filename)
if next_update_str: if next_update_str:
if next_update_str.isdigit(): if next_update_str.isdigit():
next_update_sec = int(next_update_str) next_update_sec = int(next_update_str)
except OSError:
print('EX: _generate_next_shares_token_update unable to read ' +
token_update_filename)
curr_time = get_current_time_int() curr_time = get_current_time_int()
updated = False updated = False
if next_update_sec: if next_update_sec:
@ -1851,11 +1852,9 @@ def _generate_next_shares_token_update(base_dir: str,
next_update_sec = curr_time + next_update_interval next_update_sec = curr_time + next_update_interval
updated = True updated = True
if updated: if updated:
try: text = str(next_update_sec)
with open(token_update_filename, 'w+', encoding='utf-8') as fp_tok: save_string(text, token_update_filename,
fp_tok.write(str(next_update_sec)) 'EX: _generate_next_shares_token_update unable to write' +
except OSError:
print('EX: _generate_next_shares_token_update unable to write' +
token_update_filename) token_update_filename)
@ -1880,15 +1879,13 @@ def _regenerate_shares_token(base_dir: str, domain_full: str,
if not os.path.isfile(token_update_filename): if not os.path.isfile(token_update_filename):
return return
next_update_sec = None next_update_sec = None
try: next_update_str = \
with open(token_update_filename, 'r', encoding='utf-8') as fp_tok: load_string(token_update_filename,
next_update_str = fp_tok.read() 'EX: _regenerate_shares_token unable to read ' +
token_update_filename)
if next_update_str: if next_update_str:
if next_update_str.isdigit(): if next_update_str.isdigit():
next_update_sec = int(next_update_str) next_update_sec = int(next_update_str)
except OSError:
print('EX: _regenerate_shares_token unable to read ' +
token_update_filename)
if not next_update_sec: if not next_update_sec:
return return
curr_time = get_current_time_int() curr_time = get_current_time_int()

View File

@ -14,6 +14,7 @@ import socket
from urllib.parse import urlparse from urllib.parse import urlparse
from utils import data_dir from utils import data_dir
from utils import string_starts_with from utils import string_starts_with
from data import load_string
class Result: class Result:
@ -177,13 +178,12 @@ def load_unavailable_sites(base_dir: str) -> []:
""" """
unavailable_sites_filename = data_dir(base_dir) + '/unavailable_sites.txt' unavailable_sites_filename = data_dir(base_dir) + '/unavailable_sites.txt'
sites_unavailable: list[str] = [] sites_unavailable: list[str] = []
try: sites_unavailable_str = \
with open(unavailable_sites_filename, 'r', load_string(unavailable_sites_filename,
encoding='utf-8') as fp_sites: 'EX: unable to read unavailable sites ' +
sites_unavailable = fp_sites.read().split('\n')
except OSError:
print('EX: unable to read unavailable sites ' +
unavailable_sites_filename) unavailable_sites_filename)
if sites_unavailable_str:
sites_unavailable = sites_unavailable_str.split('\n')
return sites_unavailable return sites_unavailable

View File

@ -33,6 +33,8 @@ from utils import get_actor_from_post
from content import html_replace_quote_marks from content import html_replace_quote_marks
from content import html_replace_inline_quotes from content import html_replace_inline_quotes
from data import load_list from data import load_list
from data import load_string
from data import save_string
SPEAKER_REMOVE_CHARS = ('.\n', '. ', ',', ';', '?', '!') SPEAKER_REMOVE_CHARS = ('.\n', '. ', ',', ';', '?', '!')
@ -560,12 +562,11 @@ def _post_to_speaker_json(base_dir: str, http_prefix: str,
liked_by = '' liked_by = ''
like_filename = accounts_dir + '/.newLike' like_filename = accounts_dir + '/.newLike'
if os.path.isfile(like_filename): if os.path.isfile(like_filename):
try: liked_by = load_string(like_filename,
with open(like_filename, 'r', encoding='utf-8') as fp_like: 'EX: _post_to_speaker_json unable to read 2 ' +
liked_by = fp_like.read()
except OSError:
print('EX: _post_to_speaker_json unable to read 2 ' +
like_filename) like_filename)
if liked_by is None:
liked_by = ''
calendar_filename = accounts_dir + '/.newCalendar' calendar_filename = accounts_dir + '/.newCalendar'
post_cal = os.path.isfile(calendar_filename) post_cal = os.path.isfile(calendar_filename)
share_filename = accounts_dir + '/.newShare' share_filename = accounts_dir + '/.newShare'
@ -622,8 +623,5 @@ def update_speaker(base_dir: str, http_prefix: str,
speaker_json['say'], speaker_json['say'],
system_language, system_language,
gender, box_name) gender, box_name)
try: save_string(ssml_str, cached_ssml_filename,
with open(cached_ssml_filename, 'w+', encoding='utf-8') as fp_ssml: 'EX: unable to write ssml ' + cached_ssml_filename)
fp_ssml.write(ssml_str)
except OSError:
print('EX: unable to write ssml ' + cached_ssml_filename)

View File

@ -10,6 +10,7 @@ __module_group__ = "Web Interface"
import os import os
from shutil import copyfile from shutil import copyfile
from utils import data_dir from utils import data_dir
from data import load_string
def text_mode_browser(ua_str: str) -> bool: def text_mode_browser(ua_str: str) -> bool:
@ -43,9 +44,9 @@ def get_text_mode_banner(base_dir: str) -> str:
""" """
text_mode_banner_filename = data_dir(base_dir) + '/banner.txt' text_mode_banner_filename = data_dir(base_dir) + '/banner.txt'
if os.path.isfile(text_mode_banner_filename): if os.path.isfile(text_mode_banner_filename):
with open(text_mode_banner_filename, 'r', banner_str = load_string(text_mode_banner_filename,
encoding='utf-8') as fp_text: 'EX: unable to load text mode banner ' +
banner_str = fp_text.read() text_mode_banner_filename)
if banner_str: if banner_str:
return banner_str.replace('\n', '<br>') return banner_str.replace('\n', '<br>')
return None return None
@ -58,8 +59,9 @@ def get_text_mode_logo(base_dir: str) -> str:
if not os.path.isfile(text_mode_logo_filename): if not os.path.isfile(text_mode_logo_filename):
text_mode_logo_filename = base_dir + '/img/logo.txt' text_mode_logo_filename = base_dir + '/img/logo.txt'
with open(text_mode_logo_filename, 'r', encoding='utf-8') as fp_text: logo_str = load_string(text_mode_logo_filename,
logo_str = fp_text.read() 'EX: unable to load text mode logo ' +
text_mode_logo_filename)
if logo_str: if logo_str:
return logo_str.replace('\n', '<br>') return logo_str.replace('\n', '<br>')
return None return None