Replace file operations with functions

main
bashrc 2026-04-29 17:10:15 +01:00
parent 7e9d013dd5
commit 946c584b98
5 changed files with 403 additions and 423 deletions

14
git.py
View File

@ -17,6 +17,7 @@ from utils import get_attachment_property_value
from utils import remove_html
from utils import get_attributed_to
from utils import string_contains
from data import save_string
def _git_format_content(content: str) -> str:
@ -213,17 +214,14 @@ def receive_git_patch(base_dir: str, nickname: str, domain: str,
patch_str = \
_git_add_from_handle(patch_str,
'@' + from_nickname + '@' + from_domain)
try:
with open(patch_filename, 'w+', encoding='utf-8') as fp_patch:
fp_patch.write(patch_str)
if save_string(patch_str, patch_filename,
'EX: receive_git_patch ' + patch_filename + ' 1 [ex]'):
patch_notify_filename = \
acct_dir(base_dir, nickname, domain) + '/.newPatchContent'
with open(patch_notify_filename, 'w+',
encoding='utf-8') as fp_patch_notify:
fp_patch_notify.write(patch_str)
if save_string(patch_str, patch_notify_filename,
'EX: receive_git_patch ' +
patch_filename + ' 2 [ex]'):
return True
except OSError as ex:
print('EX: receive_git_patch ' + patch_filename + ' ' + str(ex))
return False

View File

@ -36,6 +36,7 @@ from context import get_individual_post_context
from session import get_method
from auth import create_basic_auth_header
from conversation import post_id_to_convthread_id
from data import load_list
from data import load_string
from data import save_string
from data import append_string
@ -298,9 +299,12 @@ def get_todays_events(base_dir: str, nickname: str, domain: str,
calendar_post_ids: list[str] = []
recreate_events_file: bool = False
try:
with open(calendar_filename, 'r', encoding='utf-8') as fp_events:
for post_id in fp_events:
calendar_list: list[str] = \
load_list(calendar_filename,
'EX: get_todays_events failed to read ' +
calendar_filename + ' [ex]')
if calendar_list is not None:
for post_id in calendar_list:
post_id = remove_eol(post_id)
post_filename = \
locate_post(base_dir, nickname, domain, post_id)
@ -378,19 +382,14 @@ def get_todays_events(base_dir: str, nickname: str, domain: str,
events[day_of_month].append(post_event)
events[day_of_month] = \
_sort_todays_events(events[day_of_month])
except OSError as exc:
print('EX: get_todays_events failed to read ' +
calendar_filename + ' ' + str(exc))
# if some posts have been deleted then regenerate the calendar file
if recreate_events_file:
try:
with open(calendar_filename, 'w+',
encoding='utf-8') as fp_calendar:
text: str = ''
for post_id in calendar_post_ids:
fp_calendar.write(post_id + '\n')
except OSError:
print('EX: unable to recreate events file 1 ' +
text += post_id + '\n'
save_string(text, calendar_filename,
'EX: unable to recreate events file 1 ' +
calendar_filename)
return events
@ -625,9 +624,11 @@ def day_events_check(base_dir: str, nickname: str, domain: str,
return False
events_exist: bool = False
try:
with open(calendar_filename, 'r', encoding='utf-8') as fp_events:
for post_id in fp_events:
calendar_list: list[str] = \
load_list(calendar_filename,
'EX: day_events_check failed to read ' + calendar_filename)
if calendar_list is not None:
for post_id in calendar_list:
post_id = remove_eol(post_id)
post_filename = \
locate_post(base_dir, nickname, domain, post_id)
@ -658,8 +659,6 @@ def day_events_check(base_dir: str, nickname: str, domain: str,
continue
events_exist = True
break
except OSError:
print('EX: day_events_check failed to read ' + calendar_filename)
return events_exist
@ -685,9 +684,12 @@ def get_this_weeks_events(base_dir: str, nickname: str, domain: str) -> {}:
calendar_post_ids: list[str] = []
recreate_events_file: bool = False
try:
with open(calendar_filename, 'r', encoding='utf-8') as fp_events:
for post_id in fp_events:
calendar_list: list[str] = \
load_list(calendar_filename,
'EX: get_this_weeks_events failed to read ' +
calendar_filename)
if calendar_list is not None:
for post_id in calendar_list:
post_id = remove_eol(post_id)
post_filename = \
locate_post(base_dir, nickname, domain, post_id)
@ -727,18 +729,14 @@ def get_this_weeks_events(base_dir: str, nickname: str, domain: str) -> {}:
if not events.get(week_day_index):
events[week_day_index]: list[dict] = []
events[week_day_index].append(post_event)
except OSError:
print('EX: get_this_weeks_events failed to read ' + calendar_filename)
# if some posts have been deleted then regenerate the calendar file
if recreate_events_file:
try:
with open(calendar_filename, 'w+',
encoding='utf-8') as fp_calendar:
text: str = ''
for post_id in calendar_post_ids:
fp_calendar.write(post_id + '\n')
except OSError:
print('EX: unable to recreate events file 2 ' +
text += post_id + '\n'
save_string(text, calendar_filename,
'EX: unable to recreate events file 2 ' +
calendar_filename)
return events
@ -762,9 +760,12 @@ def get_calendar_events(base_dir: str, nickname: str, domain: str,
calendar_post_ids: list[str] = []
recreate_events_file: bool = False
try:
with open(calendar_filename, 'r', encoding='utf-8') as fp_events:
for post_id in fp_events:
calendar_list: list[str] = \
load_list(calendar_filename,
'EX: get_calendar_events failed to read ' +
calendar_filename)
if calendar_list is not None:
for post_id in calendar_list:
post_id = remove_eol(post_id)
post_filename = \
locate_post(base_dir, nickname, domain, post_id)
@ -828,18 +829,14 @@ def get_calendar_events(base_dir: str, nickname: str, domain: str,
if not events.get(day_of_month):
events[day_of_month]: list[dict] = []
events[day_of_month].append(post_event)
except OSError:
print('EX: get_calendar_events failed to read ' + calendar_filename)
# if some posts have been deleted then regenerate the calendar file
if recreate_events_file:
try:
with open(calendar_filename, 'w+',
encoding='utf-8') as fp_calendar:
text: str = ''
for post_id in calendar_post_ids:
fp_calendar.write(post_id + '\n')
except OSError:
print('EX: unable to recreate events file 3 ' +
text += post_id + '\n'
save_string(text, calendar_filename,
'EX: unable to recreate events file 3 ' +
calendar_filename)
return events
@ -869,14 +866,13 @@ def remove_calendar_event(base_dir: str, nickname: str, domain: str,
return
lines = lines_str.split('\n')
print('Removing calendar event: ' + message_id)
try:
with open(calendar_filename, 'w+', encoding='utf-8') as fp_cal:
text: str = ''
for line in lines:
if message_id in line:
continue
fp_cal.write(line + '\n')
except OSError:
print('EX: unable to remove calendar event ' +
text += line + '\n'
save_string(text, calendar_filename,
'EX: unable to remove calendar event ' +
calendar_filename)

21
keys.py
View File

@ -8,6 +8,7 @@ __status__ = "Production"
__module_group__ = "ActivityPub"
import os
from data import load_string
def _get_local_private_key(base_dir: str, nickname: str, domain: str) -> str:
@ -19,11 +20,11 @@ def _get_local_private_key(base_dir: str, nickname: str, domain: str) -> str:
key_filename = base_dir + '/keys/private/' + handle.lower() + '.key'
if not os.path.isfile(key_filename):
return None
try:
with open(key_filename, 'r', encoding='utf-8') as fp_pem:
return fp_pem.read()
except OSError:
print('EX: _get_local_private_key unable to read ' + key_filename)
text = load_string(key_filename,
'EX: _get_local_private_key unable to read ' +
key_filename)
if text is not None:
return text
return None
@ -36,11 +37,11 @@ def _get_local_public_key(base_dir: str, nickname: str, domain: str) -> str:
key_filename = base_dir + '/keys/public/' + handle.lower() + '.key'
if not os.path.isfile(key_filename):
return None
try:
with open(key_filename, 'r', encoding='utf-8') as fp_pem:
return fp_pem.read()
except OSError:
print('EX: _get_local_public_key unable to read ' + key_filename)
text = load_string(key_filename,
'EX: _get_local_public_key unable to read ' +
key_filename)
if text is not None:
return text
return None

View File

@ -20,6 +20,8 @@ from utils import get_config_param
from utils import local_actor_url
from utils import resembles_url
from cache import get_person_from_cache
from data import load_string
from data import save_string
def get_actor_languages(actor_json: {}) -> str:
@ -354,12 +356,8 @@ def set_default_post_language(base_dir: str, nickname: str, domain: str,
"""
default_post_language_filename = \
acct_dir(base_dir, nickname, domain) + '/.new_post_language'
try:
with open(default_post_language_filename, 'w+',
encoding='utf-8') as fp_lang:
fp_lang.write(language)
except OSError:
print('EX: Unable to write default post language ' +
save_string(language, default_post_language_filename,
'EX: Unable to write default post language ' +
default_post_language_filename)
@ -379,13 +377,11 @@ def load_default_post_languages(base_dir: str) -> {}:
acct_dir(base_dir, nickname, domain) + '/.new_post_language'
if not os.path.isfile(default_post_language_filename):
continue
try:
with open(default_post_language_filename, 'r',
encoding='utf-8') as fp_lang:
result[nickname] = fp_lang.read()
except OSError:
print('EX: Unable to read default post language ' +
text = load_string(default_post_language_filename,
'EX: Unable to read default post language ' +
default_post_language_filename)
if text:
result[nickname] = text
break
return result

View File

@ -24,6 +24,10 @@ from utils import is_yggdrasil_address
from threads import thread_with_trace
from threads import begin_thread
from session import create_session
from data import save_string
from data import load_string
from data import append_string
from data import load_list
def manual_deny_follow_request2(session, session_onion, session_i2p,
@ -60,12 +64,8 @@ def manual_deny_follow_request2(session, session_onion, session_i2p,
remove_from_follow_requests(base_dir, nickname, domain, deny_handle, debug)
# Store rejected follows
try:
with open(rejected_follows_filename, 'a+',
encoding='utf-8') as fp_rejects:
fp_rejects.write(deny_handle + '\n')
except OSError:
print('EX: manual_deny_follow_request2 unable to append ' +
append_string(deny_handle + '\n', rejected_follows_filename,
'EX: manual_deny_follow_request2 unable to append ' +
rejected_follows_filename)
deny_nickname = deny_handle.split('@')[0]
@ -144,21 +144,13 @@ def _approve_follower_handle(account_dir: str, approve_handle: str) -> None:
approved_filename = account_dir + '/approved.txt'
if os.path.isfile(approved_filename):
if not text_in_file(approve_handle, approved_filename):
try:
with open(approved_filename, 'a+',
encoding='utf-8') as fp_approved:
fp_approved.write(approve_handle + '\n')
except OSError:
print('EX: _approve_follower_handle unable to append ' +
append_string(approve_handle + '\n', approved_filename,
'EX: _approve_follower_handle unable to append ' +
approved_filename)
return
try:
with open(approved_filename, 'w+',
encoding='utf-8') as fp_approved:
fp_approved.write(approve_handle + '\n')
except OSError:
print('EX: _approve_follower_handle unable to write ' +
save_string(approve_handle + '\n', approved_filename,
'EX: _approve_follower_handle unable to write ' +
approved_filename)
@ -194,12 +186,12 @@ def manual_approve_follow_request(session, session_onion, session_i2p,
# is the handle in the requests file?
approve_follows_str: str = ''
try:
with open(approve_follows_filename, 'r', encoding='utf-8') as fp_foll:
approve_follows_str = fp_foll.read()
except OSError:
print('EX: manual_approve_follow_request unable to read ' +
approve_follows_str2: str = \
load_string(approve_follows_filename,
'EX: manual_approve_follow_request unable to read ' +
approve_follows_filename)
if approve_follows_str2:
approve_follows_str = approve_follows_str2
exists: bool = False
approve_handle_full = approve_handle
if approve_handle in approve_follows_str:
@ -234,20 +226,22 @@ def manual_approve_follow_request(session, session_onion, session_i2p,
'" ' + approve_follows_filename)
return
try:
with open(approve_follows_filename + '.new', 'w+',
encoding='utf-8') as fp_approve_new:
approve_follows_text: str = ''
update_approved_followers: bool = False
follow_activity_filename = None
with open(approve_follows_filename, 'r',
encoding='utf-8') as fp_approve:
for handle_of_follow_requester in fp_approve:
approve_follows_list: list[str] = \
load_list(approve_follows_filename,
'EX: manual_approve_follow_request ' +
'unable to write ' + approve_follows_filename +
'.new [ex]')
if approve_follows_list is not None:
for handle_of_follow_requester in approve_follows_list:
# is this the approved follow?
appr_handl = approve_handle_full
if not handle_of_follow_requester.startswith(appr_handl):
# this isn't the approved follow so it will remain
# in the requests file
fp_approve_new.write(handle_of_follow_requester)
approve_follows_text += handle_of_follow_requester
continue
handle_of_follow_requester = \
@ -256,12 +250,10 @@ def manual_approve_follow_request(session, session_onion, session_i2p,
handle_of_follow_requester.replace('\r', '')
port2 = port
if ':' in handle_of_follow_requester:
port2 = \
get_port_from_domain(handle_of_follow_requester)
port2 = get_port_from_domain(handle_of_follow_requester)
requests_dir = account_dir + '/requests'
follow_activity_filename = \
requests_dir + '/' + \
handle_of_follow_requester + '.follow'
requests_dir + '/' + handle_of_follow_requester + '.follow'
if not os.path.isfile(follow_activity_filename):
update_approved_followers = True
continue
@ -343,9 +335,9 @@ def manual_approve_follow_request(session, session_onion, session_i2p,
system_language,
mitm_servers)
update_approved_followers = True
except OSError as exc:
print('EX: manual_approve_follow_request unable to write ' +
approve_follows_filename + '.new ' + str(exc))
save_string(approve_follows_text, approve_follows_filename + '.new',
'EX: manual_approve_follow_request unable to write ' +
approve_follows_filename + '.new [ex]')
followers_filename = account_dir + '/followers.txt'
if update_approved_followers:
@ -370,12 +362,9 @@ def manual_approve_follow_request(session, session_onion, session_i2p,
else:
print('Manual follow accept: first follower accepted for ' +
handle + ' is ' + approve_handle_full)
try:
with open(followers_filename, 'w+',
encoding='utf-8') as fp_followers:
fp_followers.write(approve_handle_full + '\n')
except OSError:
print('EX: manual_approve_follow_request unable to write ' +
save_string(approve_handle_full + '\n',
followers_filename,
'EX: manual_approve_follow_request unable to write ' +
followers_filename)
# only update the follow requests file if the follow is confirmed to be