Replace binary file operations with functions

main
bashrc 2026-04-27 15:46:03 +01:00
parent a0acf20a50
commit c855c86e4e
14 changed files with 158 additions and 205 deletions

View File

@ -31,6 +31,8 @@ from formats import get_image_extensions
from timeFunctions import date_from_string_format
from timeFunctions import date_utcnow
from content import remove_script
from data import save_binary
from data import load_binary
def remove_person_from_cache(base_dir: str, person_url: str,
@ -334,12 +336,8 @@ def cache_svg_images(session, base_dir: str, http_prefix: str,
continue
else:
image_filename = test_image_filename
image_data = None
try:
with open(image_filename, 'rb') as fp_svg:
image_data = fp_svg.read()
except OSError:
print('EX: unable to read svg file data')
image_data = load_binary(image_filename,
'EX: unable to read svg file data')
if not image_data:
continue
image_data = image_data.decode()
@ -347,15 +345,9 @@ def cache_svg_images(session, base_dir: str, http_prefix: str,
remove_script(image_data, log_filename, actor, url)
if cleaned_up != image_data:
# write the cleaned up svg image
svg_written = False
cleaned_up = cleaned_up.encode('utf-8')
try:
with open(image_filename, 'wb') as fp_im:
fp_im.write(cleaned_up)
svg_written = True
except OSError:
print('EX: unable to write cleaned up svg ' + url)
if svg_written:
if save_binary(cleaned_up, image_filename,
'EX: unable to write cleaned up svg ' + url):
# convert to list if needed
if isinstance(obj['attachment'], dict):
obj['attachment'] = [obj['attachment']]

View File

@ -53,6 +53,7 @@ from petnames import get_pet_name
from session import download_image
from data import load_string
from data import save_string
from data import save_binary
from data import append_string
MUSIC_SITES = ('soundcloud.com', 'bandcamp.com', 'resonate.coop')
@ -1816,11 +1817,8 @@ def save_media_in_form_post(media_bytes, debug: bool,
'image binary not recognized ' + filename)
return None, None
try:
with open(filename, 'wb') as fp_media:
fp_media.write(media_bytes[start_pos:])
except OSError:
print('EX: save_media_in_form_post unable to write media')
save_binary(media_bytes[start_pos:], filename,
'EX: save_media_in_form_post unable to write media')
if not os.path.isfile(filename):
if debug:

View File

@ -221,6 +221,7 @@ from daemon_get_login import redirect_to_login_screen
from daemon_get_login import show_login_screen
from poison import html_poisoned
from data import load_string
from data import load_binary
# Blogs can be longer, so don't show many per page
MAX_POSTS_IN_BLOGS_FEED = 4
@ -2642,15 +2643,12 @@ def daemon_http_get(self) -> None:
tries = 0
media_binary = None
while tries < 5:
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
break
except OSError as ex:
print('EX: manifest logo ' +
str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: manifest logo ' + str(tries) + ' [ex]'
media_binary = load_binary(media_filename, exc_str)
if media_binary is not None:
break
time.sleep(1)
tries += 1
if media_binary:
mime_type = media_file_mime_type(media_filename)
set_headers_etag(self, media_filename, mime_type,
@ -2683,15 +2681,12 @@ def daemon_http_get(self) -> None:
tries = 0
media_binary = None
while tries < 5:
try:
with open(screen_filename, 'rb') as fp_av:
media_binary = fp_av.read()
break
except OSError as ex:
print('EX: manifest screenshot ' +
str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: manifest screenshot ' + str(tries) + ' [ex]'
media_binary = load_binary(screen_filename, exc_str)
if media_binary is not None:
break
time.sleep(1)
tries += 1
if media_binary:
mime_type = media_file_mime_type(screen_filename)
set_headers_etag(self, screen_filename, mime_type,
@ -2724,15 +2719,12 @@ def daemon_http_get(self) -> None:
tries = 0
media_binary = None
while tries < 5:
try:
with open(icon_filename, 'rb') as fp_av:
media_binary = fp_av.read()
break
except OSError as ex:
print('EX: login screen image ' +
str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: login screen image ' + str(tries) + ' [ex]'
media_binary = load_binary(icon_filename, exc_str)
if media_binary is not None:
break
time.sleep(1)
tries += 1
if media_binary:
mime_type_str = media_file_mime_type(icon_filename)
set_headers_etag(self, icon_filename,

View File

@ -18,6 +18,7 @@ from utils import string_ends_with
from utils import get_css
from fitnessFunctions import fitness_performance
from daemon_utils import etag_exists
from data import load_binary
def get_style_sheet(self, base_dir: str, calling_domain: str, path: str,
@ -101,12 +102,9 @@ def get_fonts(self, calling_domain: str, path: str,
debug)
return
if os.path.isfile(font_filename):
font_binary = None
try:
with open(font_filename, 'rb') as fp_font:
font_binary = fp_font.read()
except OSError:
print('EX: unable to load font ' + font_filename)
font_binary = load_binary(font_filename,
'EX: unable to load font ' +
font_filename)
if font_binary:
set_headers_etag(self, font_filename,
font_type,

View File

@ -14,6 +14,7 @@ from httpheaders import set_headers
from httpheaders import set_headers_etag
from utils import get_nickname_from_actor
from blocking import export_blocking_file
from data import load_binary
def get_exported_blocks(self, path: str, base_dir: str,
@ -43,12 +44,9 @@ def get_exported_theme(self, path: str, base_dir: str,
filename = path.split('/exports/', 1)[1]
filename = base_dir + '/exports/' + filename
if os.path.isfile(filename):
export_binary = None
try:
with open(filename, 'rb') as fp_exp:
export_binary = fp_exp.read()
except OSError:
print('EX: unable to read theme export ' + filename)
export_binary = load_binary(filename,
'EX: unable to read theme export ' +
filename)
if export_binary:
export_type = 'application/zip'
set_headers_etag(self, filename, export_type,

View File

@ -19,6 +19,7 @@ from daemon_utils import etag_exists
from utils import get_config_param
from utils import binary_is_image
from formats import media_file_mime_type
from data import load_binary
def get_favicon(self, calling_domain: str,
@ -81,12 +82,9 @@ def get_favicon(self, calling_domain: str,
print('Sent favicon from cache: ' + calling_domain)
return
if os.path.isfile(favicon_filename):
fav_binary = None
try:
with open(favicon_filename, 'rb') as fp_fav:
fav_binary = fp_fav.read()
except OSError:
print('EX: unable to read favicon ' + favicon_filename)
fav_binary = load_binary(favicon_filename,
'EX: unable to read favicon ' +
favicon_filename)
if fav_binary:
set_headers_etag(self, favicon_filename,
fav_type,
@ -131,12 +129,9 @@ def show_cached_favicon(self, referer_domain: str, path: str,
# The file has not changed
http_304(self)
return
media_binary = None
try:
with open(fav_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read cached favicon ' + fav_filename)
media_binary = load_binary(fav_filename,
'EX: unable to read cached favicon ' +
fav_filename)
if media_binary:
if binary_is_image(fav_filename, media_binary):
mime_type = media_file_mime_type(fav_filename)

View File

@ -30,6 +30,7 @@ from fitnessFunctions import fitness_performance
from person import save_person_qrcode
from lxmf import save_lxmf_qrcode
from data import load_string
from data import load_binary
def show_avatar_or_banner(self, referer_domain: str, path: str,
@ -109,12 +110,9 @@ def show_avatar_or_banner(self, referer_domain: str, path: str,
last_modified_time.strftime('%a, %d %b %Y %H:%M:%S GMT')
media_image_type = get_image_mime_type(avatar_file)
media_binary = None
try:
with open(avatar_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read avatar ' + avatar_filename)
media_binary = load_binary(avatar_filename,
'EX: unable to read avatar ' +
avatar_filename)
if media_binary:
set_headers_etag(self, avatar_filename, media_image_type,
media_binary, None,
@ -138,12 +136,9 @@ def show_cached_avatar(self, referer_domain: str, path: str,
# The file has not changed
http_304(self)
return
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read cached avatar ' + media_filename)
media_binary = load_binary(media_filename,
'EX: unable to read cached avatar ' +
media_filename)
if media_binary:
mime_type = media_file_mime_type(media_filename)
set_headers_etag(self, media_filename,
@ -188,12 +183,9 @@ def show_help_screen_image(self, path: str,
http_304(self)
return
if os.path.isfile(media_filename):
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read help image ' + media_filename)
media_binary = load_binary(media_filename,
'EX: unable to read help image ' +
media_filename)
if media_binary:
mime_type = media_file_mime_type(media_filename)
set_headers_etag(self, media_filename,
@ -239,13 +231,9 @@ def show_manual_image(self, path: str,
debug)
return
if os.path.isfile(media_filename):
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read manual image ' +
media_filename)
media_binary = load_binary(media_filename,
'EX: unable to read manual image ' +
media_filename)
if media_binary:
mime_type = media_file_mime_type(media_filename)
set_headers_etag(self, media_filename,
@ -292,13 +280,9 @@ def show_specification_image(self, path: str,
debug)
return
if os.path.isfile(media_filename):
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read specification image ' +
media_filename)
media_binary = load_binary(media_filename,
'EX: unable to read specification image ' +
media_filename)
if media_binary:
mime_type = media_file_mime_type(media_filename)
set_headers_etag(self, media_filename,
@ -337,12 +321,9 @@ def show_share_image(self, path: str,
return True
media_file_type = get_image_mime_type(media_filename)
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read binary ' + media_filename)
media_binary = load_binary(media_filename,
'EX: unable to read binary ' +
media_filename)
if media_binary:
set_headers_etag(self, media_filename,
media_file_type,
@ -395,12 +376,9 @@ def show_icon(self, path: str,
'_GET', 'show_icon', debug)
return
if os.path.isfile(media_filename):
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read icon image ' + media_filename)
media_binary = load_binary(media_filename,
'EX: unable to read icon image ' +
media_filename)
if media_binary:
mime_type = media_file_mime_type(media_filename)
set_headers_etag(self, media_filename,
@ -463,12 +441,9 @@ def show_media(self, path: str, base_dir: str,
http_404(self, 32)
return
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read media binary ' + media_filename)
media_binary = load_binary(media_filename,
'EX: unable to read media binary ' +
media_filename)
if media_binary:
set_headers_etag(self, media_filename, media_file_type,
media_binary, None,
@ -522,14 +497,12 @@ def show_qrcode(self, calling_domain: str, path: str,
tries = 0
media_binary = None
while tries < 5:
try:
with open(qr_filename, 'rb') as fp_av:
media_binary = fp_av.read()
break
except OSError as ex:
print('EX: _show_qrcode ' + str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: _show_qrcode ' + str(tries) + ' [ex]'
media_binary = load_binary(qr_filename, exc_str)
if media_binary is not None:
break
time.sleep(1)
tries += 1
if media_binary:
mime_type = media_file_mime_type(qr_filename)
set_headers_etag(self, qr_filename, mime_type,
@ -571,15 +544,12 @@ def search_screen_banner(self, path: str,
tries = 0
media_binary = None
while tries < 5:
try:
with open(banner_filename, 'rb') as fp_av:
media_binary = fp_av.read()
break
except OSError as ex:
print('EX: _search_screen_banner ' +
str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: _search_screen_banner ' + str(tries) + ' [ex]'
media_binary = load_binary(banner_filename, exc_str)
if media_binary is not None:
break
time.sleep(1)
tries += 1
if media_binary:
mime_type = media_file_mime_type(banner_filename)
set_headers_etag(self, banner_filename, mime_type,
@ -616,14 +586,12 @@ def column_image(self, side: str, path: str, base_dir: str, domain: str,
tries = 0
media_binary = None
while tries < 5:
try:
with open(banner_filename, 'rb') as fp_av:
media_binary = fp_av.read()
break
except OSError as ex:
print('EX: _column_image ' + str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: _column_image ' + str(tries) + ' [ex]'
media_binary = load_binary(banner_filename, exc_str)
if media_binary is not None:
break
time.sleep(1)
tries += 1
if media_binary:
mime_type = media_file_mime_type(banner_filename)
set_headers_etag(self, banner_filename, mime_type,
@ -659,15 +627,13 @@ def show_default_profile_background(self, base_dir: str, theme_name: str,
tries = 0
bg_binary = None
while tries < 5:
try:
with open(bg_filename, 'rb') as fp_av:
bg_binary = fp_av.read()
break
except OSError as ex:
print('EX: _show_default_profile_background ' +
str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: _show_default_profile_background ' + \
str(tries) + ' [ex]'
bg_binary = load_binary(bg_filename, exc_str)
if bg_binary is not None:
break
time.sleep(1)
tries += 1
if bg_binary:
if ext == 'jpg':
ext = 'jpeg'
@ -711,15 +677,13 @@ def show_background_image(self, path: str,
tries = 0
bg_binary = None
while tries < 5:
try:
with open(bg_filename, 'rb') as fp_av:
bg_binary = fp_av.read()
break
except OSError as ex:
print('EX: _show_background_image ' +
str(tries) + ' ' + str(ex))
time.sleep(1)
tries += 1
exc_str = 'EX: _show_background_image ' + \
str(tries) + ' [ex]'
bg_binary = load_binary(bg_filename, exc_str)
if bg_binary is not None:
break
time.sleep(1)
tries += 1
if bg_binary:
if ext == 'jpg':
ext = 'jpeg'
@ -756,12 +720,9 @@ def show_emoji(self, path: str,
return
media_image_type = get_image_mime_type(emoji_filename)
media_binary = None
try:
with open(emoji_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read emoji image ' + emoji_filename)
media_binary = load_binary(emoji_filename,
'EX: unable to read emoji image ' +
emoji_filename)
if media_binary:
set_headers_etag(self, emoji_filename,
media_image_type,

View File

@ -24,6 +24,7 @@ from media import path_is_audio
from daemon_utils import get_user_agent
from daemon_utils import log_epicyon_instances
from data import load_string
from data import load_binary
from data import save_string
@ -126,13 +127,10 @@ def daemon_http_head(self) -> None:
if etag_str:
etag = etag_str
else:
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read media binary ' +
media_filename)
media_binary = \
load_binary(media_filename,
'EX: unable to read media binary ' +
media_filename)
if media_binary:
etag = md5(media_binary).hexdigest() # nosec
save_string(etag, media_tag_filename,

View File

@ -14,6 +14,7 @@ from httpcodes import http_404
from utils import acct_dir
from utils import binary_is_image
from formats import get_image_extension_from_mime_type
from data import save_binary
def receive_image_attachment(self, length: int, path: str, base_dir: str,
@ -67,11 +68,9 @@ def receive_image_attachment(self, length: int, path: str, base_dir: str,
if not binary_is_image(media_filename, media_bytes):
print('WARN: _receive_image image binary is not recognized ' +
media_filename)
try:
with open(media_filename, 'wb') as fp_av:
fp_av.write(media_bytes)
except OSError:
print('EX: receive_image_attachment unable to write ' + media_filename)
save_binary(media_bytes, media_filename,
'EX: receive_image_attachment unable to write ' +
media_filename)
if debug:
print('DEBUG: image saved to ' + media_filename)
self.send_response(201)

28
data.py
View File

@ -37,6 +37,20 @@ def load_string(filename: str, exception_text: str) -> str:
return None
def load_binary(filename: str, exception_text: str) -> str:
"""Loads a binary from file
"""
try:
with open(filename, 'rb') as fp:
binary = fp.read()
return binary
except OSError as exc:
if '[ex]' in exception_text:
exception_text = exception_text.replace('[ex]', str(exc))
print(exception_text)
return None
def load_line(filename: str, exception_text: str) -> str:
"""Loads a line of text from file
"""
@ -72,6 +86,20 @@ def save_string(text: str, filename: str, exception_text: str) -> bool:
return _store_base(text, filename, exception_text, 'w+')
def save_binary(text: str, filename: str, exception_text: str) -> bool:
"""Saves a binary to file
"""
try:
with open(filename, 'wb') as fp:
fp.write(text)
return True
except OSError as exc:
if '[ex]' in exception_text:
exception_text = exception_text.replace('[ex]', str(exc))
print(exception_text)
return False
def append_string(text: str, filename: str, exception_text: str) -> bool:
"""Appends a string to file
"""

View File

@ -31,6 +31,7 @@ from shutil import copyfile
from shutil import rmtree
from shutil import move
from city import spoof_geolocation
from data import load_binary
# music file ID3 v1 genres
@ -593,13 +594,9 @@ def _update_etag(media_filename: str) -> None:
return
# read the binary data
data = None
try:
with open(media_filename, 'rb') as fp_media:
data = fp_media.read()
except OSError:
print('EX: _update_etag unable to read ' + str(media_filename))
data = load_binary(media_filename,
'EX: _update_etag unable to read ' +
media_filename)
if not data:
return
# calculate hash

View File

@ -56,6 +56,7 @@ from session import download_image_any_mime_type
from content import remove_script
from data import load_list
from data import load_string
from data import save_binary
def _remove_cdata(text: str) -> str:
@ -198,11 +199,8 @@ def _download_newswire_feed_favicon(session, base_dir: str,
fav_filename = get_fav_filename_from_url(base_dir, fav_url)
if os.path.isfile(fav_filename):
return True
try:
with open(fav_filename, 'wb+') as fp_fav:
fp_fav.write(image_data)
except OSError:
print('EX: failed writing favicon ' + fav_filename)
if not save_binary(image_data, fav_filename,
'EX: failed writing favicon ' + fav_filename):
return False
return True

View File

@ -28,6 +28,8 @@ from mitm import detect_mitm
from httpsig import create_signed_header
from data import append_string
from data import save_string
from data import save_binary
from data import load_binary
def create_session(proxy_type: str):
@ -765,14 +767,9 @@ def post_image(session, attach_image_filename: str, federation_list: [],
content_type = 'image/svg+xml'
headers['Content-type'] = content_type
media_binary = None
try:
with open(attach_image_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: post_image unable to read binary ' +
attach_image_filename)
media_binary = load_binary(attach_image_filename,
'EX: post_image unable to read binary ' +
attach_image_filename)
if media_binary:
_set_user_agent(session, http_prefix, domain_full)
@ -857,8 +854,8 @@ def download_image(session, url: str, image_filename: str, debug: bool,
else:
media_binary = result.content
if binary_is_image(image_filename, media_binary):
with open(image_filename, 'wb') as fp_im:
fp_im.write(media_binary)
if save_binary(media_binary, image_filename,
'EX: download image ' + url):
if debug:
print('Image downloaded from ' + url)
return True

View File

@ -61,6 +61,7 @@ from shares import vf_proposal_from_share
from webapp_pwa import get_pwa_theme_colors
from data import load_list
from data import save_string
from data import save_binary
from data import load_string
@ -431,8 +432,9 @@ def update_avatar_image_cache(signing_priv_key_pem: str,
else:
media_binary = result.content
if binary_is_image(avatar_image_filename, media_binary):
with open(avatar_image_filename, 'wb') as fp_av:
fp_av.write(media_binary)
if save_binary(media_binary, avatar_image_filename,
'EX: update_avatar_image_cache ' +
actor):
if debug:
print('avatar image downloaded for ' + actor)
return avatar_image_filename.replace(base_dir +