Replace file operations with functions

main
bashrc 2026-04-29 15:30:27 +01:00
parent 5818c78c4e
commit 7e9d013dd5
5 changed files with 67 additions and 81 deletions

33
maps.py
View File

@ -24,6 +24,8 @@ from timeFunctions import date_epoch
from timeFunctions import date_from_string_format
from timeFunctions import date_utcnow
from session import get_resolved_url
from data import load_string
from data import save_string
def geocoords_to_osm_link(osm_domain: str, zoom: int,
@ -1030,20 +1032,20 @@ def add_tag_map_links(tag_maps_dir: str, tag_name: str,
"""Appends to a hashtag file containing map links
This is used to show a map for a particular hashtag
"""
tag_map_filename = tag_maps_dir + '/' + tag_name + '.txt'
tag_map_filename: str = tag_maps_dir + '/' + tag_name + '.txt'
post_url = post_url.replace('#', '/')
# read the existing map links
existing_map_links: list[str] = []
if os.path.isfile(tag_map_filename):
try:
with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag:
existing_map_links = fp_tag.read().split('\n')
except OSError:
print('EX: error reading tag map ' + tag_map_filename)
existing_map_links_str = \
load_string(tag_map_filename,
'EX: error reading tag map ' + tag_map_filename)
if existing_map_links_str:
existing_map_links = existing_map_links_str.split('\n')
# combine map links with the existing list
secs_since_epoch = \
secs_since_epoch: int = \
int((date_from_string_format(published, ['%Y-%m-%dT%H:%M:%S%z']) -
date_epoch()).total_seconds())
links_changed: bool = False
@ -1070,11 +1072,8 @@ def add_tag_map_links(tag_maps_dir: str, tag_name: str,
break
# save the tag
try:
with open(tag_map_filename, 'w+', encoding='utf-8') as fp_tag:
fp_tag.write(map_links_str)
except OSError:
print('EX: error writing tag map ' + tag_map_filename)
save_string(map_links_str, tag_map_filename,
'EX: error writing tag map ' + tag_map_filename)
def _gpx_location(latitude: float, longitude: float, post_id: str) -> str:
@ -1127,11 +1126,11 @@ def _hashtag_map_to_format(base_dir: str, tag_name: str,
if os.path.isfile(tag_map_filename):
map_links: list[str] = []
try:
with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag:
map_links = fp_tag.read().split('\n')
except OSError:
print('EX: unable to read tag map links ' + tag_map_filename)
map_links_str = \
load_string(tag_map_filename,
'EX: unable to read tag map links ' + tag_map_filename)
if map_links_str:
map_links = map_links_str.split('\n')
if map_links:
start_secs_since_epoch = int(start_hours_since_epoch * 60 * 60)
end_secs_since_epoch = int(end_hours_since_epoch * 60 * 60)

View File

@ -17,6 +17,7 @@ from utils import get_url_from_post
from utils import get_markdown_blog_filename
from utils import get_micron_blog_filename
from utils import get_gemini_blog_published
from data import save_string
def _markdown_get_sections(markdown: str) -> []:
@ -573,12 +574,10 @@ def blog_to_markdown(base_dir: str, nickname: str, domain: str,
for link_str in links:
content_text += link_str + '\n'
try:
with open(markdown_blog_filename, 'w+',
encoding='utf-8') as fp_markdown:
fp_markdown.write(published + '\n\n' + content_text)
except OSError:
print('EX: blog_to_markdown unable to write ' + markdown_blog_filename)
if not save_string(published + '\n\n' + content_text,
markdown_blog_filename,
'EX: blog_to_markdown unable to write ' +
markdown_blog_filename):
return False
return True
@ -651,12 +650,10 @@ def blog_to_micron(base_dir: str, nickname: str, domain: str,
for link_str in links:
content_text += link_str + '\n'
try:
with open(micron_blog_filename, 'w+',
encoding='utf-8') as fp_micron:
fp_micron.write(published + '\n\n' + content_text)
except OSError:
print('EX: blog_to_micron unable to write ' + micron_blog_filename)
if not save_string(published + '\n\n' + content_text,
micron_blog_filename,
'EX: blog_to_micron unable to write ' +
micron_blog_filename):
return False
return True

View File

@ -22,6 +22,7 @@ from utils import data_dir
from utils import account_is_indexable
from utils import is_yggdrasil_address
from data import load_list
from data import load_string
def _meta_data_instance_v1(show_accounts: bool,
@ -272,13 +273,10 @@ def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str,
published_filename = \
acct_dir(base_dir, nickname, domain) + '/.last_published'
if os.path.isfile(published_filename):
try:
with open(published_filename, 'r',
encoding='utf-8') as fp_pub:
published = fp_pub.read()
except OSError:
print('EX: unable to read last published time 1 ' +
published_filename)
published: str = \
load_string(published_filename,
'EX: unable to read last published time 1 ' +
published_filename)
masto_account_json = {
"id": get_masto_api_v1id_from_nickname(nickname),

View File

@ -24,6 +24,7 @@ from formats import get_image_extensions
from formats import get_audio_extensions
from formats import get_video_extensions
from data import load_list
from data import load_string
def _get_masto_api_v2id_from_nickname(nickname: str) -> int:
@ -118,14 +119,11 @@ def _meta_data_instance_v2(show_accounts: bool,
published_filename = \
acct_dir(base_dir, admin_nickname, domain) + '/.last_published'
if os.path.isfile(published_filename):
try:
with open(published_filename, 'r',
encoding='utf-8') as fp_pub:
published = fp_pub.read()
except OSError:
print('EX: _meta_data_instance_v2 ' +
'unable to read last published time 2 ' +
published_filename)
published = \
load_string(published_filename,
'EX: _meta_data_instance_v2 ' +
'unable to read last published time 2 ' +
published_filename)
# get all supported mime types
supported_mime_types: list[str] = []

View File

@ -32,6 +32,9 @@ from shutil import rmtree
from shutil import move
from city import spoof_geolocation
from data import load_binary
from data import save_string
from data import load_string
from data import append_string
# music file ID3 v1 genres
@ -331,20 +334,18 @@ def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
decoy_seed_filename = acct_dir(base_dir, nickname, domain) + '/decoyseed'
decoy_seed = 63725
if os.path.isfile(decoy_seed_filename):
try:
with open(decoy_seed_filename, 'r', encoding='utf-8') as fp_seed:
decoy_seed = int(fp_seed.read())
except OSError:
print('EX: _spoof_meta_data unable to read ' + decoy_seed_filename)
decoy_seed_str = \
load_string(decoy_seed_filename,
'EX: _spoof_meta_data unable to read ' +
decoy_seed_filename)
if decoy_seed_str:
decoy_seed = int(decoy_seed_str)
else:
decoy_seed = randint(10000, 10000000000000000)
try:
with open(decoy_seed_filename, 'w+',
encoding='utf-8') as fp_seed:
fp_seed.write(str(decoy_seed))
except OSError:
print('EX: _spoof_meta_data unable to write ' +
decoy_seed_filename)
text = str(decoy_seed)
save_string(text, decoy_seed_filename,
'EX: _spoof_meta_data unable to write ' +
decoy_seed_filename)
if os.path.isfile('/usr/bin/exiftool'):
print('Spoofing metadata in ' + output_filename + ' using exiftool')
@ -602,13 +603,9 @@ def _update_etag(media_filename: str) -> None:
# calculate hash
etag = sha1(data).hexdigest() # nosec
# save the hash
try:
with open(media_filename + '.etag', 'w+',
encoding='utf-8') as fp_media:
fp_media.write(etag)
except OSError:
print('EX: _update_etag unable to write ' +
str(media_filename) + '.etag')
save_string(etag, media_filename + '.etag',
'EX: _update_etag unable to write ' +
str(media_filename) + '.etag')
def _store_video_transcript(video_transcript: str,
@ -623,12 +620,10 @@ def _store_video_transcript(video_transcript: str,
print('WARN: does not look like a video transcript ' +
video_transcript)
return False
try:
with open(media_filename + '.vtt', 'w+', encoding='utf-8') as fp_vtt:
fp_vtt.write(video_transcript)
if save_string(video_transcript, media_filename + '.vtt',
'EX: unable to save video transcript ' +
media_filename + '.vtt'):
return True
except OSError:
print('EX: unable to save video transcript ' + media_filename + '.vtt')
return False
@ -641,25 +636,24 @@ def _log_uploaded_media(base_dir: str, nickname: str, domain: str,
media_log = []
write_type = 'w+'
if os.path.isfile(account_media_log_filename):
try:
with open(account_media_log_filename, 'r',
encoding='utf-8') as fp_log:
media_log = fp_log.read().split('\n')
write_type = 'a+'
except OSError:
print('EX: unable to read media log for ' + nickname)
media_log_str = \
load_string(account_media_log_filename,
'EX: unable to read media log for ' + nickname)
if media_log_str:
media_log = media_log_str.split('\n')
write_type = 'a+'
# don't include the base directory, in case the installation is later
# moved around
if base_dir in media_filename:
media_filename = media_filename.split(base_dir)[1]
if media_filename in media_log:
return
try:
with open(account_media_log_filename, write_type,
encoding='utf-8') as fp_log:
fp_log.write(media_filename + '\n')
except OSError:
print('EX: unable to write media log for ' + nickname)
if write_type == 'w+':
save_string(media_filename + '\n', account_media_log_filename,
'EX: unable to write media log for ' + nickname)
else:
append_string(media_filename + '\n', account_media_log_filename,
'EX: unable to append media log for ' + nickname)
def attach_media(base_dir: str, http_prefix: str,