From c855c86e4e3445f729e1f8991c5df6291984df19 Mon Sep 17 00:00:00 2001 From: bashrc Date: Mon, 27 Apr 2026 15:46:03 +0100 Subject: [PATCH] Replace binary file operations with functions --- cache.py | 20 ++---- content.py | 8 +-- daemon_get.py | 46 +++++------- daemon_get_css.py | 10 ++- daemon_get_exports.py | 10 ++- daemon_get_favicon.py | 19 ++--- daemon_get_images.py | 159 ++++++++++++++++-------------------------- daemon_head.py | 12 ++-- daemon_post_image.py | 9 ++- data.py | 28 ++++++++ media.py | 11 ++- newswire.py | 8 +-- session.py | 17 ++--- webapp_utils.py | 6 +- 14 files changed, 158 insertions(+), 205 deletions(-) diff --git a/cache.py b/cache.py index 88d03f6e1..04a70fa49 100644 --- a/cache.py +++ b/cache.py @@ -31,6 +31,8 @@ from formats import get_image_extensions from timeFunctions import date_from_string_format from timeFunctions import date_utcnow from content import remove_script +from data import save_binary +from data import load_binary def remove_person_from_cache(base_dir: str, person_url: str, @@ -334,12 +336,8 @@ def cache_svg_images(session, base_dir: str, http_prefix: str, continue else: image_filename = test_image_filename - image_data = None - try: - with open(image_filename, 'rb') as fp_svg: - image_data = fp_svg.read() - except OSError: - print('EX: unable to read svg file data') + image_data = load_binary(image_filename, + 'EX: unable to read svg file data') if not image_data: continue image_data = image_data.decode() @@ -347,15 +345,9 @@ def cache_svg_images(session, base_dir: str, http_prefix: str, remove_script(image_data, log_filename, actor, url) if cleaned_up != image_data: # write the cleaned up svg image - svg_written = False cleaned_up = cleaned_up.encode('utf-8') - try: - with open(image_filename, 'wb') as fp_im: - fp_im.write(cleaned_up) - svg_written = True - except OSError: - print('EX: unable to write cleaned up svg ' + url) - if svg_written: + if save_binary(cleaned_up, image_filename, + 'EX: unable to write cleaned up svg ' + url): # convert to list if needed if isinstance(obj['attachment'], dict): obj['attachment'] = [obj['attachment']] diff --git a/content.py b/content.py index 0a251de0a..955fc50c3 100644 --- a/content.py +++ b/content.py @@ -53,6 +53,7 @@ from petnames import get_pet_name from session import download_image from data import load_string from data import save_string +from data import save_binary from data import append_string MUSIC_SITES = ('soundcloud.com', 'bandcamp.com', 'resonate.coop') @@ -1816,11 +1817,8 @@ def save_media_in_form_post(media_bytes, debug: bool, 'image binary not recognized ' + filename) return None, None - try: - with open(filename, 'wb') as fp_media: - fp_media.write(media_bytes[start_pos:]) - except OSError: - print('EX: save_media_in_form_post unable to write media') + save_binary(media_bytes[start_pos:], filename, + 'EX: save_media_in_form_post unable to write media') if not os.path.isfile(filename): if debug: diff --git a/daemon_get.py b/daemon_get.py index db2647c5f..7fe55c2e0 100644 --- a/daemon_get.py +++ b/daemon_get.py @@ -221,6 +221,7 @@ from daemon_get_login import redirect_to_login_screen from daemon_get_login import show_login_screen from poison import html_poisoned from data import load_string +from data import load_binary # Blogs can be longer, so don't show many per page MAX_POSTS_IN_BLOGS_FEED = 4 @@ -2642,15 +2643,12 @@ def daemon_http_get(self) -> None: tries = 0 media_binary = None while tries < 5: - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - break - except OSError as ex: - print('EX: manifest logo ' + - str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: manifest logo ' + str(tries) + ' [ex]' + media_binary = load_binary(media_filename, exc_str) + if media_binary is not None: + break + time.sleep(1) + tries += 1 if media_binary: mime_type = media_file_mime_type(media_filename) set_headers_etag(self, media_filename, mime_type, @@ -2683,15 +2681,12 @@ def daemon_http_get(self) -> None: tries = 0 media_binary = None while tries < 5: - try: - with open(screen_filename, 'rb') as fp_av: - media_binary = fp_av.read() - break - except OSError as ex: - print('EX: manifest screenshot ' + - str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: manifest screenshot ' + str(tries) + ' [ex]' + media_binary = load_binary(screen_filename, exc_str) + if media_binary is not None: + break + time.sleep(1) + tries += 1 if media_binary: mime_type = media_file_mime_type(screen_filename) set_headers_etag(self, screen_filename, mime_type, @@ -2724,15 +2719,12 @@ def daemon_http_get(self) -> None: tries = 0 media_binary = None while tries < 5: - try: - with open(icon_filename, 'rb') as fp_av: - media_binary = fp_av.read() - break - except OSError as ex: - print('EX: login screen image ' + - str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: login screen image ' + str(tries) + ' [ex]' + media_binary = load_binary(icon_filename, exc_str) + if media_binary is not None: + break + time.sleep(1) + tries += 1 if media_binary: mime_type_str = media_file_mime_type(icon_filename) set_headers_etag(self, icon_filename, diff --git a/daemon_get_css.py b/daemon_get_css.py index cad890c94..4e01c5f60 100644 --- a/daemon_get_css.py +++ b/daemon_get_css.py @@ -18,6 +18,7 @@ from utils import string_ends_with from utils import get_css from fitnessFunctions import fitness_performance from daemon_utils import etag_exists +from data import load_binary def get_style_sheet(self, base_dir: str, calling_domain: str, path: str, @@ -101,12 +102,9 @@ def get_fonts(self, calling_domain: str, path: str, debug) return if os.path.isfile(font_filename): - font_binary = None - try: - with open(font_filename, 'rb') as fp_font: - font_binary = fp_font.read() - except OSError: - print('EX: unable to load font ' + font_filename) + font_binary = load_binary(font_filename, + 'EX: unable to load font ' + + font_filename) if font_binary: set_headers_etag(self, font_filename, font_type, diff --git a/daemon_get_exports.py b/daemon_get_exports.py index cf9853e7a..8a525e568 100644 --- a/daemon_get_exports.py +++ b/daemon_get_exports.py @@ -14,6 +14,7 @@ from httpheaders import set_headers from httpheaders import set_headers_etag from utils import get_nickname_from_actor from blocking import export_blocking_file +from data import load_binary def get_exported_blocks(self, path: str, base_dir: str, @@ -43,12 +44,9 @@ def get_exported_theme(self, path: str, base_dir: str, filename = path.split('/exports/', 1)[1] filename = base_dir + '/exports/' + filename if os.path.isfile(filename): - export_binary = None - try: - with open(filename, 'rb') as fp_exp: - export_binary = fp_exp.read() - except OSError: - print('EX: unable to read theme export ' + filename) + export_binary = load_binary(filename, + 'EX: unable to read theme export ' + + filename) if export_binary: export_type = 'application/zip' set_headers_etag(self, filename, export_type, diff --git a/daemon_get_favicon.py b/daemon_get_favicon.py index 80873b6a7..802d09bab 100644 --- a/daemon_get_favicon.py +++ b/daemon_get_favicon.py @@ -19,6 +19,7 @@ from daemon_utils import etag_exists from utils import get_config_param from utils import binary_is_image from formats import media_file_mime_type +from data import load_binary def get_favicon(self, calling_domain: str, @@ -81,12 +82,9 @@ def get_favicon(self, calling_domain: str, print('Sent favicon from cache: ' + calling_domain) return if os.path.isfile(favicon_filename): - fav_binary = None - try: - with open(favicon_filename, 'rb') as fp_fav: - fav_binary = fp_fav.read() - except OSError: - print('EX: unable to read favicon ' + favicon_filename) + fav_binary = load_binary(favicon_filename, + 'EX: unable to read favicon ' + + favicon_filename) if fav_binary: set_headers_etag(self, favicon_filename, fav_type, @@ -131,12 +129,9 @@ def show_cached_favicon(self, referer_domain: str, path: str, # The file has not changed http_304(self) return - media_binary = None - try: - with open(fav_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read cached favicon ' + fav_filename) + media_binary = load_binary(fav_filename, + 'EX: unable to read cached favicon ' + + fav_filename) if media_binary: if binary_is_image(fav_filename, media_binary): mime_type = media_file_mime_type(fav_filename) diff --git a/daemon_get_images.py b/daemon_get_images.py index a6d3ea607..7ff015957 100644 --- a/daemon_get_images.py +++ b/daemon_get_images.py @@ -30,6 +30,7 @@ from fitnessFunctions import fitness_performance from person import save_person_qrcode from lxmf import save_lxmf_qrcode from data import load_string +from data import load_binary def show_avatar_or_banner(self, referer_domain: str, path: str, @@ -109,12 +110,9 @@ def show_avatar_or_banner(self, referer_domain: str, path: str, last_modified_time.strftime('%a, %d %b %Y %H:%M:%S GMT') media_image_type = get_image_mime_type(avatar_file) - media_binary = None - try: - with open(avatar_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read avatar ' + avatar_filename) + media_binary = load_binary(avatar_filename, + 'EX: unable to read avatar ' + + avatar_filename) if media_binary: set_headers_etag(self, avatar_filename, media_image_type, media_binary, None, @@ -138,12 +136,9 @@ def show_cached_avatar(self, referer_domain: str, path: str, # The file has not changed http_304(self) return - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read cached avatar ' + media_filename) + media_binary = load_binary(media_filename, + 'EX: unable to read cached avatar ' + + media_filename) if media_binary: mime_type = media_file_mime_type(media_filename) set_headers_etag(self, media_filename, @@ -188,12 +183,9 @@ def show_help_screen_image(self, path: str, http_304(self) return if os.path.isfile(media_filename): - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read help image ' + media_filename) + media_binary = load_binary(media_filename, + 'EX: unable to read help image ' + + media_filename) if media_binary: mime_type = media_file_mime_type(media_filename) set_headers_etag(self, media_filename, @@ -239,13 +231,9 @@ def show_manual_image(self, path: str, debug) return if os.path.isfile(media_filename): - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read manual image ' + - media_filename) + media_binary = load_binary(media_filename, + 'EX: unable to read manual image ' + + media_filename) if media_binary: mime_type = media_file_mime_type(media_filename) set_headers_etag(self, media_filename, @@ -292,13 +280,9 @@ def show_specification_image(self, path: str, debug) return if os.path.isfile(media_filename): - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read specification image ' + - media_filename) + media_binary = load_binary(media_filename, + 'EX: unable to read specification image ' + + media_filename) if media_binary: mime_type = media_file_mime_type(media_filename) set_headers_etag(self, media_filename, @@ -337,12 +321,9 @@ def show_share_image(self, path: str, return True media_file_type = get_image_mime_type(media_filename) - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read binary ' + media_filename) + media_binary = load_binary(media_filename, + 'EX: unable to read binary ' + + media_filename) if media_binary: set_headers_etag(self, media_filename, media_file_type, @@ -395,12 +376,9 @@ def show_icon(self, path: str, '_GET', 'show_icon', debug) return if os.path.isfile(media_filename): - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read icon image ' + media_filename) + media_binary = load_binary(media_filename, + 'EX: unable to read icon image ' + + media_filename) if media_binary: mime_type = media_file_mime_type(media_filename) set_headers_etag(self, media_filename, @@ -463,12 +441,9 @@ def show_media(self, path: str, base_dir: str, http_404(self, 32) return - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read media binary ' + media_filename) + media_binary = load_binary(media_filename, + 'EX: unable to read media binary ' + + media_filename) if media_binary: set_headers_etag(self, media_filename, media_file_type, media_binary, None, @@ -522,14 +497,12 @@ def show_qrcode(self, calling_domain: str, path: str, tries = 0 media_binary = None while tries < 5: - try: - with open(qr_filename, 'rb') as fp_av: - media_binary = fp_av.read() - break - except OSError as ex: - print('EX: _show_qrcode ' + str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: _show_qrcode ' + str(tries) + ' [ex]' + media_binary = load_binary(qr_filename, exc_str) + if media_binary is not None: + break + time.sleep(1) + tries += 1 if media_binary: mime_type = media_file_mime_type(qr_filename) set_headers_etag(self, qr_filename, mime_type, @@ -571,15 +544,12 @@ def search_screen_banner(self, path: str, tries = 0 media_binary = None while tries < 5: - try: - with open(banner_filename, 'rb') as fp_av: - media_binary = fp_av.read() - break - except OSError as ex: - print('EX: _search_screen_banner ' + - str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: _search_screen_banner ' + str(tries) + ' [ex]' + media_binary = load_binary(banner_filename, exc_str) + if media_binary is not None: + break + time.sleep(1) + tries += 1 if media_binary: mime_type = media_file_mime_type(banner_filename) set_headers_etag(self, banner_filename, mime_type, @@ -616,14 +586,12 @@ def column_image(self, side: str, path: str, base_dir: str, domain: str, tries = 0 media_binary = None while tries < 5: - try: - with open(banner_filename, 'rb') as fp_av: - media_binary = fp_av.read() - break - except OSError as ex: - print('EX: _column_image ' + str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: _column_image ' + str(tries) + ' [ex]' + media_binary = load_binary(banner_filename, exc_str) + if media_binary is not None: + break + time.sleep(1) + tries += 1 if media_binary: mime_type = media_file_mime_type(banner_filename) set_headers_etag(self, banner_filename, mime_type, @@ -659,15 +627,13 @@ def show_default_profile_background(self, base_dir: str, theme_name: str, tries = 0 bg_binary = None while tries < 5: - try: - with open(bg_filename, 'rb') as fp_av: - bg_binary = fp_av.read() - break - except OSError as ex: - print('EX: _show_default_profile_background ' + - str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: _show_default_profile_background ' + \ + str(tries) + ' [ex]' + bg_binary = load_binary(bg_filename, exc_str) + if bg_binary is not None: + break + time.sleep(1) + tries += 1 if bg_binary: if ext == 'jpg': ext = 'jpeg' @@ -711,15 +677,13 @@ def show_background_image(self, path: str, tries = 0 bg_binary = None while tries < 5: - try: - with open(bg_filename, 'rb') as fp_av: - bg_binary = fp_av.read() - break - except OSError as ex: - print('EX: _show_background_image ' + - str(tries) + ' ' + str(ex)) - time.sleep(1) - tries += 1 + exc_str = 'EX: _show_background_image ' + \ + str(tries) + ' [ex]' + bg_binary = load_binary(bg_filename, exc_str) + if bg_binary is not None: + break + time.sleep(1) + tries += 1 if bg_binary: if ext == 'jpg': ext = 'jpeg' @@ -756,12 +720,9 @@ def show_emoji(self, path: str, return media_image_type = get_image_mime_type(emoji_filename) - media_binary = None - try: - with open(emoji_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read emoji image ' + emoji_filename) + media_binary = load_binary(emoji_filename, + 'EX: unable to read emoji image ' + + emoji_filename) if media_binary: set_headers_etag(self, emoji_filename, media_image_type, diff --git a/daemon_head.py b/daemon_head.py index 82b92a945..e63fffde7 100644 --- a/daemon_head.py +++ b/daemon_head.py @@ -24,6 +24,7 @@ from media import path_is_audio from daemon_utils import get_user_agent from daemon_utils import log_epicyon_instances from data import load_string +from data import load_binary from data import save_string @@ -126,13 +127,10 @@ def daemon_http_head(self) -> None: if etag_str: etag = etag_str else: - media_binary = None - try: - with open(media_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: unable to read media binary ' + - media_filename) + media_binary = \ + load_binary(media_filename, + 'EX: unable to read media binary ' + + media_filename) if media_binary: etag = md5(media_binary).hexdigest() # nosec save_string(etag, media_tag_filename, diff --git a/daemon_post_image.py b/daemon_post_image.py index c08d50479..165352344 100644 --- a/daemon_post_image.py +++ b/daemon_post_image.py @@ -14,6 +14,7 @@ from httpcodes import http_404 from utils import acct_dir from utils import binary_is_image from formats import get_image_extension_from_mime_type +from data import save_binary def receive_image_attachment(self, length: int, path: str, base_dir: str, @@ -67,11 +68,9 @@ def receive_image_attachment(self, length: int, path: str, base_dir: str, if not binary_is_image(media_filename, media_bytes): print('WARN: _receive_image image binary is not recognized ' + media_filename) - try: - with open(media_filename, 'wb') as fp_av: - fp_av.write(media_bytes) - except OSError: - print('EX: receive_image_attachment unable to write ' + media_filename) + save_binary(media_bytes, media_filename, + 'EX: receive_image_attachment unable to write ' + + media_filename) if debug: print('DEBUG: image saved to ' + media_filename) self.send_response(201) diff --git a/data.py b/data.py index 2535901ec..78905b4dc 100644 --- a/data.py +++ b/data.py @@ -37,6 +37,20 @@ def load_string(filename: str, exception_text: str) -> str: return None +def load_binary(filename: str, exception_text: str) -> str: + """Loads a binary from file + """ + try: + with open(filename, 'rb') as fp: + binary = fp.read() + return binary + except OSError as exc: + if '[ex]' in exception_text: + exception_text = exception_text.replace('[ex]', str(exc)) + print(exception_text) + return None + + def load_line(filename: str, exception_text: str) -> str: """Loads a line of text from file """ @@ -72,6 +86,20 @@ def save_string(text: str, filename: str, exception_text: str) -> bool: return _store_base(text, filename, exception_text, 'w+') +def save_binary(text: str, filename: str, exception_text: str) -> bool: + """Saves a binary to file + """ + try: + with open(filename, 'wb') as fp: + fp.write(text) + return True + except OSError as exc: + if '[ex]' in exception_text: + exception_text = exception_text.replace('[ex]', str(exc)) + print(exception_text) + return False + + def append_string(text: str, filename: str, exception_text: str) -> bool: """Appends a string to file """ diff --git a/media.py b/media.py index c6830d13c..207e25b67 100644 --- a/media.py +++ b/media.py @@ -31,6 +31,7 @@ from shutil import copyfile from shutil import rmtree from shutil import move from city import spoof_geolocation +from data import load_binary # music file ID3 v1 genres @@ -593,13 +594,9 @@ def _update_etag(media_filename: str) -> None: return # read the binary data - data = None - try: - with open(media_filename, 'rb') as fp_media: - data = fp_media.read() - except OSError: - print('EX: _update_etag unable to read ' + str(media_filename)) - + data = load_binary(media_filename, + 'EX: _update_etag unable to read ' + + media_filename) if not data: return # calculate hash diff --git a/newswire.py b/newswire.py index 0ade44230..87debb028 100644 --- a/newswire.py +++ b/newswire.py @@ -56,6 +56,7 @@ from session import download_image_any_mime_type from content import remove_script from data import load_list from data import load_string +from data import save_binary def _remove_cdata(text: str) -> str: @@ -198,11 +199,8 @@ def _download_newswire_feed_favicon(session, base_dir: str, fav_filename = get_fav_filename_from_url(base_dir, fav_url) if os.path.isfile(fav_filename): return True - try: - with open(fav_filename, 'wb+') as fp_fav: - fp_fav.write(image_data) - except OSError: - print('EX: failed writing favicon ' + fav_filename) + if not save_binary(image_data, fav_filename, + 'EX: failed writing favicon ' + fav_filename): return False return True diff --git a/session.py b/session.py index 38ed6e923..24564a0be 100644 --- a/session.py +++ b/session.py @@ -28,6 +28,8 @@ from mitm import detect_mitm from httpsig import create_signed_header from data import append_string from data import save_string +from data import save_binary +from data import load_binary def create_session(proxy_type: str): @@ -765,14 +767,9 @@ def post_image(session, attach_image_filename: str, federation_list: [], content_type = 'image/svg+xml' headers['Content-type'] = content_type - media_binary = None - try: - with open(attach_image_filename, 'rb') as fp_av: - media_binary = fp_av.read() - except OSError: - print('EX: post_image unable to read binary ' + - attach_image_filename) - + media_binary = load_binary(attach_image_filename, + 'EX: post_image unable to read binary ' + + attach_image_filename) if media_binary: _set_user_agent(session, http_prefix, domain_full) @@ -857,8 +854,8 @@ def download_image(session, url: str, image_filename: str, debug: bool, else: media_binary = result.content if binary_is_image(image_filename, media_binary): - with open(image_filename, 'wb') as fp_im: - fp_im.write(media_binary) + if save_binary(media_binary, image_filename, + 'EX: download image ' + url): if debug: print('Image downloaded from ' + url) return True diff --git a/webapp_utils.py b/webapp_utils.py index 51303c491..f77ae0492 100644 --- a/webapp_utils.py +++ b/webapp_utils.py @@ -61,6 +61,7 @@ from shares import vf_proposal_from_share from webapp_pwa import get_pwa_theme_colors from data import load_list from data import save_string +from data import save_binary from data import load_string @@ -431,8 +432,9 @@ def update_avatar_image_cache(signing_priv_key_pem: str, else: media_binary = result.content if binary_is_image(avatar_image_filename, media_binary): - with open(avatar_image_filename, 'wb') as fp_av: - fp_av.write(media_binary) + if save_binary(media_binary, avatar_image_filename, + 'EX: update_avatar_image_cache ' + + actor): if debug: print('avatar image downloaded for ' + actor) return avatar_image_filename.replace(base_dir +