From 7e9d013dd580cd8b45875e5c807e0f68d18b2961 Mon Sep 17 00:00:00 2001 From: bashrc Date: Wed, 29 Apr 2026 15:30:27 +0100 Subject: [PATCH] Replace file operations with functions --- maps.py | 33 ++++++++++++------------- markdown.py | 21 +++++++--------- mastoapiv1.py | 12 ++++----- mastoapiv2.py | 14 +++++------ media.py | 68 +++++++++++++++++++++++---------------------------- 5 files changed, 67 insertions(+), 81 deletions(-) diff --git a/maps.py b/maps.py index ec20cf26f..1a4878c69 100644 --- a/maps.py +++ b/maps.py @@ -24,6 +24,8 @@ from timeFunctions import date_epoch from timeFunctions import date_from_string_format from timeFunctions import date_utcnow from session import get_resolved_url +from data import load_string +from data import save_string def geocoords_to_osm_link(osm_domain: str, zoom: int, @@ -1030,20 +1032,20 @@ def add_tag_map_links(tag_maps_dir: str, tag_name: str, """Appends to a hashtag file containing map links This is used to show a map for a particular hashtag """ - tag_map_filename = tag_maps_dir + '/' + tag_name + '.txt' + tag_map_filename: str = tag_maps_dir + '/' + tag_name + '.txt' post_url = post_url.replace('#', '/') # read the existing map links existing_map_links: list[str] = [] if os.path.isfile(tag_map_filename): - try: - with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag: - existing_map_links = fp_tag.read().split('\n') - except OSError: - print('EX: error reading tag map ' + tag_map_filename) + existing_map_links_str = \ + load_string(tag_map_filename, + 'EX: error reading tag map ' + tag_map_filename) + if existing_map_links_str: + existing_map_links = existing_map_links_str.split('\n') # combine map links with the existing list - secs_since_epoch = \ + secs_since_epoch: int = \ int((date_from_string_format(published, ['%Y-%m-%dT%H:%M:%S%z']) - date_epoch()).total_seconds()) links_changed: bool = False @@ -1070,11 +1072,8 @@ def add_tag_map_links(tag_maps_dir: str, tag_name: str, break # save the tag - try: - with open(tag_map_filename, 'w+', encoding='utf-8') as fp_tag: - fp_tag.write(map_links_str) - except OSError: - print('EX: error writing tag map ' + tag_map_filename) + save_string(map_links_str, tag_map_filename, + 'EX: error writing tag map ' + tag_map_filename) def _gpx_location(latitude: float, longitude: float, post_id: str) -> str: @@ -1127,11 +1126,11 @@ def _hashtag_map_to_format(base_dir: str, tag_name: str, if os.path.isfile(tag_map_filename): map_links: list[str] = [] - try: - with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag: - map_links = fp_tag.read().split('\n') - except OSError: - print('EX: unable to read tag map links ' + tag_map_filename) + map_links_str = \ + load_string(tag_map_filename, + 'EX: unable to read tag map links ' + tag_map_filename) + if map_links_str: + map_links = map_links_str.split('\n') if map_links: start_secs_since_epoch = int(start_hours_since_epoch * 60 * 60) end_secs_since_epoch = int(end_hours_since_epoch * 60 * 60) diff --git a/markdown.py b/markdown.py index d271d4349..58ced3b05 100644 --- a/markdown.py +++ b/markdown.py @@ -17,6 +17,7 @@ from utils import get_url_from_post from utils import get_markdown_blog_filename from utils import get_micron_blog_filename from utils import get_gemini_blog_published +from data import save_string def _markdown_get_sections(markdown: str) -> []: @@ -573,12 +574,10 @@ def blog_to_markdown(base_dir: str, nickname: str, domain: str, for link_str in links: content_text += link_str + '\n' - try: - with open(markdown_blog_filename, 'w+', - encoding='utf-8') as fp_markdown: - fp_markdown.write(published + '\n\n' + content_text) - except OSError: - print('EX: blog_to_markdown unable to write ' + markdown_blog_filename) + if not save_string(published + '\n\n' + content_text, + markdown_blog_filename, + 'EX: blog_to_markdown unable to write ' + + markdown_blog_filename): return False return True @@ -651,12 +650,10 @@ def blog_to_micron(base_dir: str, nickname: str, domain: str, for link_str in links: content_text += link_str + '\n' - try: - with open(micron_blog_filename, 'w+', - encoding='utf-8') as fp_micron: - fp_micron.write(published + '\n\n' + content_text) - except OSError: - print('EX: blog_to_micron unable to write ' + micron_blog_filename) + if not save_string(published + '\n\n' + content_text, + micron_blog_filename, + 'EX: blog_to_micron unable to write ' + + micron_blog_filename): return False return True diff --git a/mastoapiv1.py b/mastoapiv1.py index f5fef9ae7..29735980d 100644 --- a/mastoapiv1.py +++ b/mastoapiv1.py @@ -22,6 +22,7 @@ from utils import data_dir from utils import account_is_indexable from utils import is_yggdrasil_address from data import load_list +from data import load_string def _meta_data_instance_v1(show_accounts: bool, @@ -272,13 +273,10 @@ def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str, published_filename = \ acct_dir(base_dir, nickname, domain) + '/.last_published' if os.path.isfile(published_filename): - try: - with open(published_filename, 'r', - encoding='utf-8') as fp_pub: - published = fp_pub.read() - except OSError: - print('EX: unable to read last published time 1 ' + - published_filename) + published: str = \ + load_string(published_filename, + 'EX: unable to read last published time 1 ' + + published_filename) masto_account_json = { "id": get_masto_api_v1id_from_nickname(nickname), diff --git a/mastoapiv2.py b/mastoapiv2.py index 8895f59ae..7dc6067e8 100644 --- a/mastoapiv2.py +++ b/mastoapiv2.py @@ -24,6 +24,7 @@ from formats import get_image_extensions from formats import get_audio_extensions from formats import get_video_extensions from data import load_list +from data import load_string def _get_masto_api_v2id_from_nickname(nickname: str) -> int: @@ -118,14 +119,11 @@ def _meta_data_instance_v2(show_accounts: bool, published_filename = \ acct_dir(base_dir, admin_nickname, domain) + '/.last_published' if os.path.isfile(published_filename): - try: - with open(published_filename, 'r', - encoding='utf-8') as fp_pub: - published = fp_pub.read() - except OSError: - print('EX: _meta_data_instance_v2 ' + - 'unable to read last published time 2 ' + - published_filename) + published = \ + load_string(published_filename, + 'EX: _meta_data_instance_v2 ' + + 'unable to read last published time 2 ' + + published_filename) # get all supported mime types supported_mime_types: list[str] = [] diff --git a/media.py b/media.py index 5b53222c7..6a21c2009 100644 --- a/media.py +++ b/media.py @@ -32,6 +32,9 @@ from shutil import rmtree from shutil import move from city import spoof_geolocation from data import load_binary +from data import save_string +from data import load_string +from data import append_string # music file ID3 v1 genres @@ -331,20 +334,18 @@ def _spoof_meta_data(base_dir: str, nickname: str, domain: str, decoy_seed_filename = acct_dir(base_dir, nickname, domain) + '/decoyseed' decoy_seed = 63725 if os.path.isfile(decoy_seed_filename): - try: - with open(decoy_seed_filename, 'r', encoding='utf-8') as fp_seed: - decoy_seed = int(fp_seed.read()) - except OSError: - print('EX: _spoof_meta_data unable to read ' + decoy_seed_filename) + decoy_seed_str = \ + load_string(decoy_seed_filename, + 'EX: _spoof_meta_data unable to read ' + + decoy_seed_filename) + if decoy_seed_str: + decoy_seed = int(decoy_seed_str) else: decoy_seed = randint(10000, 10000000000000000) - try: - with open(decoy_seed_filename, 'w+', - encoding='utf-8') as fp_seed: - fp_seed.write(str(decoy_seed)) - except OSError: - print('EX: _spoof_meta_data unable to write ' + - decoy_seed_filename) + text = str(decoy_seed) + save_string(text, decoy_seed_filename, + 'EX: _spoof_meta_data unable to write ' + + decoy_seed_filename) if os.path.isfile('/usr/bin/exiftool'): print('Spoofing metadata in ' + output_filename + ' using exiftool') @@ -602,13 +603,9 @@ def _update_etag(media_filename: str) -> None: # calculate hash etag = sha1(data).hexdigest() # nosec # save the hash - try: - with open(media_filename + '.etag', 'w+', - encoding='utf-8') as fp_media: - fp_media.write(etag) - except OSError: - print('EX: _update_etag unable to write ' + - str(media_filename) + '.etag') + save_string(etag, media_filename + '.etag', + 'EX: _update_etag unable to write ' + + str(media_filename) + '.etag') def _store_video_transcript(video_transcript: str, @@ -623,12 +620,10 @@ def _store_video_transcript(video_transcript: str, print('WARN: does not look like a video transcript ' + video_transcript) return False - try: - with open(media_filename + '.vtt', 'w+', encoding='utf-8') as fp_vtt: - fp_vtt.write(video_transcript) + if save_string(video_transcript, media_filename + '.vtt', + 'EX: unable to save video transcript ' + + media_filename + '.vtt'): return True - except OSError: - print('EX: unable to save video transcript ' + media_filename + '.vtt') return False @@ -641,25 +636,24 @@ def _log_uploaded_media(base_dir: str, nickname: str, domain: str, media_log = [] write_type = 'w+' if os.path.isfile(account_media_log_filename): - try: - with open(account_media_log_filename, 'r', - encoding='utf-8') as fp_log: - media_log = fp_log.read().split('\n') - write_type = 'a+' - except OSError: - print('EX: unable to read media log for ' + nickname) + media_log_str = \ + load_string(account_media_log_filename, + 'EX: unable to read media log for ' + nickname) + if media_log_str: + media_log = media_log_str.split('\n') + write_type = 'a+' # don't include the base directory, in case the installation is later # moved around if base_dir in media_filename: media_filename = media_filename.split(base_dir)[1] if media_filename in media_log: return - try: - with open(account_media_log_filename, write_type, - encoding='utf-8') as fp_log: - fp_log.write(media_filename + '\n') - except OSError: - print('EX: unable to write media log for ' + nickname) + if write_type == 'w+': + save_string(media_filename + '\n', account_media_log_filename, + 'EX: unable to write media log for ' + nickname) + else: + append_string(media_filename + '\n', account_media_log_filename, + 'EX: unable to append media log for ' + nickname) def attach_media(base_dir: str, http_prefix: str,