Replace file operations with functions

main
bashrc 2026-04-26 18:47:27 +01:00
parent c5b9626bd8
commit 1e80710fb0
7 changed files with 87 additions and 141 deletions

View File

@ -12,6 +12,7 @@ import hashlib
from hashlib import sha256 from hashlib import sha256
from utils import acct_dir from utils import acct_dir
from utils import get_user_paths from utils import get_user_paths
from data import load_string
def remove_followers_sync(followers_sync_cache: {}, def remove_followers_sync(followers_sync_cache: {},
@ -37,13 +38,12 @@ def _get_followers_for_domain(base_dir: str,
if not os.path.isfile(followers_filename): if not os.path.isfile(followers_filename):
return [] return []
lines: list[str] = [] lines: list[str] = []
foll_text = '' foll_text: str = \
try: load_string(followers_filename,
with open(followers_filename, 'r', encoding='utf-8') as fp_foll: 'EX: get_followers_for_domain unable to read followers ' +
foll_text = fp_foll.read()
except OSError:
print('EX: get_followers_for_domain unable to read followers ' +
followers_filename) followers_filename)
if foll_text is None:
foll_text = ''
if search_domain not in foll_text: if search_domain not in foll_text:
return [] return []
lines = foll_text.splitlines() lines = foll_text.splitlines()

View File

@ -8,6 +8,8 @@ __status__ = "Production"
__module_group__ = "Calendar" __module_group__ = "Calendar"
import os import os
from data import load_string
from data import save_string
def _data_dir2(base_dir) -> str: def _data_dir2(base_dir) -> str:
@ -22,12 +24,9 @@ def _text_in_file2(text: str, filename: str,
""" """
if not case_sensitive: if not case_sensitive:
text = text.lower() text = text.lower()
content = '' content: str = \
try: load_string(filename,
with open(filename, 'r', encoding='utf-8') as fp_file: 'EX: unable to find text in missing file 2 ' + filename)
content = fp_file.read()
except OSError:
print('EX: unable to find text in missing file 2 ' + filename)
if content: if content:
if not case_sensitive: if not case_sensitive:
content = content.lower() content = content.lower()
@ -76,20 +75,12 @@ def receiving_calendar_events(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(following_filename): if not os.path.isfile(following_filename):
return False return False
# create a new calendar file from the following file # create a new calendar file from the following file
following_handles = None following_handles = \
try: load_string(following_filename,
with open(following_filename, 'r', 'EX: receiving_calendar_events ' + following_filename)
encoding='utf-8') as fp_following:
following_handles = fp_following.read()
except OSError:
print('EX: receiving_calendar_events ' + following_filename)
if following_handles: if following_handles:
try: save_string(following_handles, calendar_filename,
with open(calendar_filename, 'w+', 'EX: receiving_calendar_events 2 ' + calendar_filename)
encoding='utf-8') as fp_cal:
fp_cal.write(following_handles)
except OSError:
print('EX: receiving_calendar_events 2 ' + calendar_filename)
return _text_in_file2(handle + '\n', calendar_filename, False) return _text_in_file2(handle + '\n', calendar_filename, False)
@ -120,32 +111,24 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str,
# get the contents of the calendar file, which is # get the contents of the calendar file, which is
# a set of handles # a set of handles
following_handles = '' following_handles: str = ''
if os.path.isfile(calendar_filename): if os.path.isfile(calendar_filename):
print('Calendar file exists') print('Calendar file exists')
try: following_handles = \
with open(calendar_filename, 'r', load_string(calendar_filename,
encoding='utf-8') as fp_calendar: 'EX: _receive_calendar_events ' + calendar_filename)
following_handles = fp_calendar.read() if following_handles is None:
except OSError: following_handles = ''
print('EX: _receive_calendar_events ' + calendar_filename)
else: else:
# create a new calendar file from the following file # create a new calendar file from the following file
print('Creating calendar file ' + calendar_filename) print('Creating calendar file ' + calendar_filename)
following_handles = '' following_handles = \
try: load_string(following_filename,
with open(following_filename, 'r', 'EX: _receive_calendar_events 2 ' + calendar_filename)
encoding='utf-8') as fp_following:
following_handles = fp_following.read()
except OSError:
print('EX: _receive_calendar_events 2 ' + calendar_filename)
if add: if add:
try: save_string(following_handles + handle + '\n',
with open(calendar_filename, 'w+', calendar_filename,
encoding='utf-8') as fp_cal: 'EX: _receive_calendar_events unable to write ' +
fp_cal.write(following_handles + handle + '\n')
except OSError:
print('EX: _receive_calendar_events unable to write ' +
calendar_filename) calendar_filename)
# already in the calendar file? # already in the calendar file?
@ -164,24 +147,16 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str,
new_following_handles += followed + '\n' new_following_handles += followed + '\n'
following_handles = new_following_handles following_handles = new_following_handles
# save the result # save the result
try: save_string(following_handles, calendar_filename,
with open(calendar_filename, 'w+', 'EX: _receive_calendar_events 3 ' + calendar_filename)
encoding='utf-8') as fp_cal:
fp_cal.write(following_handles)
except OSError:
print('EX: _receive_calendar_events 3 ' + calendar_filename)
else: else:
print(handle + ' not in followingCalendar.txt') print(handle + ' not in followingCalendar.txt')
# not already in the calendar file # not already in the calendar file
if add: if add:
# append to the list of handles # append to the list of handles
following_handles += handle + '\n' following_handles += handle + '\n'
try: save_string(following_handles, calendar_filename,
with open(calendar_filename, 'w+', 'EX: _receive_calendar_events 4 ' + calendar_filename)
encoding='utf-8') as fp_cal:
fp_cal.write(following_handles)
except OSError:
print('EX: _receive_calendar_events 4 ' + calendar_filename)
def add_person_to_calendar(base_dir: str, nickname: str, domain: str, def add_person_to_calendar(base_dir: str, nickname: str, domain: str,

View File

@ -17,6 +17,7 @@ from utils import get_url_from_post
from utils import get_gemini_blog_title from utils import get_gemini_blog_title
from utils import get_gemini_blog_published from utils import get_gemini_blog_published
from utils import get_gemini_blog_filename from utils import get_gemini_blog_filename
from data import save_string
def blog_to_gemini(base_dir: str, nickname: str, domain: str, def blog_to_gemini(base_dir: str, nickname: str, domain: str,
@ -120,14 +121,11 @@ def blog_to_gemini(base_dir: str, nickname: str, domain: str,
for link_str in links: for link_str in links:
content_text += '=> ' + link_str + '\n' content_text += '=> ' + link_str + '\n'
try: text = title_text + '\n\n' + \
with open(gemini_blog_filename, 'w+', published.replace('-', '/') + '\n\n' + content_text
encoding='utf-8') as fp_gemini: if not save_string(text, gemini_blog_filename,
fp_gemini.write(title_text + '\n\n' + 'EX: blog_to_gemini unable to write ' +
published.replace('-', '/') + '\n\n' + gemini_blog_filename):
content_text)
except OSError:
print('EX: blog_to_gemini unable to write ' + gemini_blog_filename)
return False return False
return True return True

View File

@ -36,6 +36,9 @@ from context import get_individual_post_context
from session import get_method from session import get_method
from auth import create_basic_auth_header from auth import create_basic_auth_header
from conversation import post_id_to_convthread_id from conversation import post_id_to_convthread_id
from data import load_string
from data import save_string
from data import append_string
def _strings_are_digits(strings_list: []) -> bool: def _strings_are_digits(strings_list: []) -> bool:
@ -86,18 +89,15 @@ def _remove_event_from_timeline(event_id: str,
""" """
if not text_in_file(event_id + '\n', tl_events_filename): if not text_in_file(event_id + '\n', tl_events_filename):
return return
events_timeline = '' events_timeline = \
with open(tl_events_filename, 'r', load_string(tl_events_filename,
encoding='utf-8') as fp_tl: 'EX: _remove_event_from_timeline ' + tl_events_filename)
events_timeline = fp_tl.read().replace(event_id + '\n', '')
if events_timeline: if events_timeline:
try: events_timeline = events_timeline.replace(event_id + '\n', '')
with open(tl_events_filename, 'w+', if events_timeline:
encoding='utf-8') as fp2: save_string(events_timeline, tl_events_filename,
fp2.write(events_timeline) 'EX: ERROR: unable to save events timeline')
except OSError:
print('EX: ERROR: unable to save events timeline')
elif os.path.isfile(tl_events_filename): elif os.path.isfile(tl_events_filename):
try: try:
os.remove(tl_events_filename) os.remove(tl_events_filename)
@ -170,12 +170,8 @@ def save_event_post(base_dir: str, handle: str, post_id: str,
tl_events_filename + ' ' + str(ex)) tl_events_filename + ' ' + str(ex))
return False return False
else: else:
try: save_string(event_id + '\n', tl_events_filename,
with open(tl_events_filename, 'w+', 'EX: save_event_post unable to write ' +
encoding='utf-8') as fp_tl_events:
fp_tl_events.write(event_id + '\n')
except OSError:
print('EX: save_event_post unable to write ' +
tl_events_filename) tl_events_filename)
# create a directory for the calendar year # create a directory for the calendar year
@ -193,11 +189,8 @@ def save_event_post(base_dir: str, handle: str, post_id: str,
return False return False
# append the post Id to the file for the calendar month # append the post Id to the file for the calendar month
try: append_string(post_id + '\n', calendar_filename,
with open(calendar_filename, 'a+', encoding='utf-8') as fp_calendar: 'EX: unable to append to calendar ' + calendar_filename)
fp_calendar.write(post_id + '\n')
except OSError:
print('EX: unable to append to calendar ' + calendar_filename)
# create a file which will trigger a notification that # create a file which will trigger a notification that
# a new event has been added # a new event has been added
@ -205,11 +198,9 @@ def save_event_post(base_dir: str, handle: str, post_id: str,
notify_str = \ notify_str = \
'/calendar?year=' + str(event_year) + '?month=' + \ '/calendar?year=' + str(event_year) + '?month=' + \
str(event_month_number) + '?day=' + str(event_day_of_month) str(event_month_number) + '?day=' + str(event_day_of_month)
try: if not save_string(notify_str, cal_notify_filename,
with open(cal_notify_filename, 'w+', encoding='utf-8') as fp_cal: 'EX: save_event_post unable to write ' +
fp_cal.write(notify_str) cal_notify_filename):
except OSError:
print('EX: save_event_post unable to write ' + cal_notify_filename)
return False return False
return True return True
@ -870,12 +861,9 @@ def remove_calendar_event(base_dir: str, nickname: str, domain: str,
message_id = message_id.replace('#', '/') message_id = message_id.replace('#', '/')
if not text_in_file(message_id, calendar_filename): if not text_in_file(message_id, calendar_filename):
return return
lines_str = '' lines_str: str = \
try: load_string(calendar_filename,
with open(calendar_filename, 'r', encoding='utf-8') as fp_cal: 'EX: remove_calendar_event unable to read calendar file ' +
lines_str = fp_cal.read()
except OSError:
print('EX: remove_calendar_event unable to read calendar file ' +
calendar_filename) calendar_filename)
if not lines_str: if not lines_str:
return return

View File

@ -17,6 +17,8 @@ from utils import save_json
from utils import remove_id_ending from utils import remove_id_ending
from utils import has_object_dict from utils import has_object_dict
from utils import get_attributed_to from utils import get_attributed_to
from data import load_string
from data import save_string
def login_headers(self, file_format: str, length: int, def login_headers(self, file_format: str, length: int,
@ -224,21 +226,13 @@ def set_headers_etag(self, media_filename: str, file_format: str,
permissive) permissive)
etag = None etag = None
if os.path.isfile(media_filename + '.etag'): if os.path.isfile(media_filename + '.etag'):
try: etag = load_string(media_filename + '.etag',
with open(media_filename + '.etag', 'r', 'EX: _set_headers_etag ' +
encoding='utf-8') as fp_media:
etag = fp_media.read()
except OSError:
print('EX: _set_headers_etag ' +
'unable to read ' + media_filename + '.etag') 'unable to read ' + media_filename + '.etag')
if not etag: if not etag:
etag = md5(data).hexdigest() # nosec etag = md5(data).hexdigest() # nosec
try: save_string(etag, media_filename,
with open(media_filename + '.etag', 'w+', 'EX: _set_headers_etag ' +
encoding='utf-8') as fp_media:
fp_media.write(etag)
except OSError:
print('EX: _set_headers_etag ' +
'unable to write ' + media_filename + '.etag') 'unable to write ' + media_filename + '.etag')
# if etag: # if etag:
# self.send_header('ETag', '"' + etag + '"') # self.send_header('ETag', '"' + etag + '"')

View File

@ -22,6 +22,8 @@ from session import create_session
from session import set_session_for_sender from session import set_session_for_sender
from threads import begin_thread from threads import begin_thread
from person import set_person_notes from person import set_person_notes
from data import load_string
from data import save_string
def _establish_import_session(httpd, def _establish_import_session(httpd,
@ -47,12 +49,10 @@ def _update_import_following(base_dir: str,
import_filename: str) -> bool: import_filename: str) -> bool:
"""Send out follow requests from the import csv file """Send out follow requests from the import csv file
""" """
following_str = '' following_str = \
try: load_string(import_filename,
with open(import_filename, 'r', encoding='utf-8') as fp_import: 'Ex: failed to load import file ' + import_filename)
following_str = fp_import.read() if following_str is None:
except OSError:
print('Ex: failed to load import file ' + import_filename)
return False return False
if following_str: if following_str:
main_session = None main_session = None
@ -97,12 +97,8 @@ def _update_import_following(base_dir: str,
following_handle_full): following_handle_full):
# remove the followed handle from the import list # remove the followed handle from the import list
following_str = following_str.replace(orig_line + '\n', '') following_str = following_str.replace(orig_line + '\n', '')
try: save_string(following_str, import_filename,
with open(import_filename, 'w+', 'EX: unable to remove import 1 ' + line +
encoding='utf-8') as fp_import:
fp_import.write(following_str)
except OSError:
print('EX: unable to remove import 1 ' + line +
' from ' + import_filename) ' from ' + import_filename)
continue continue
@ -185,12 +181,8 @@ def _update_import_following(base_dir: str,
# remove the followed handle from the import list # remove the followed handle from the import list
following_str = following_str.replace(orig_line + '\n', '') following_str = following_str.replace(orig_line + '\n', '')
try: save_string(following_str, import_filename,
with open(import_filename, 'w+', 'EX: unable to remove import 2 ' + line +
encoding='utf-8') as fp_import:
fp_import.write(following_str)
except OSError:
print('EX: unable to remove import 2 ' + line +
' from ' + import_filename) ' from ' + import_filename)
print('FOLLOW: import sent follow to ' + line + print('FOLLOW: import sent follow to ' + line +
' from ' + import_filename) ' from ' + import_filename)

View File

@ -86,6 +86,7 @@ from announce import is_self_announce
from speaker import update_speaker from speaker import update_speaker
from webapp_post import individual_post_as_html from webapp_post import individual_post_as_html
from webapp_hashtagswarm import store_hash_tags from webapp_hashtagswarm import store_hash_tags
from data import save_string
def inbox_update_index(boxname: str, base_dir: str, handle: str, def inbox_update_index(boxname: str, base_dir: str, handle: str,
@ -119,12 +120,10 @@ def inbox_update_index(boxname: str, base_dir: str, handle: str,
except OSError as ex: except OSError as ex:
print('EX: Failed to write entry to index ' + str(ex)) print('EX: Failed to write entry to index ' + str(ex))
else: else:
try: if save_string(destination_filename + '\n',
with open(index_filename, 'w+', encoding='utf-8') as fp_index: index_filename,
fp_index.write(destination_filename + '\n') 'EX: Failed to write initial entry to index [ex]'):
written = True written = True
except OSError as ex:
print('EX: Failed to write initial entry to index ' + str(ex))
return written return written