Replace file operations with functions

main
bashrc 2026-04-26 22:08:48 +01:00
parent 733d4adee2
commit 664f13d3e2
3 changed files with 159 additions and 253 deletions

View File

@ -9,6 +9,9 @@ __module_group__ = "Core"
import os
from utils import acct_dir
from data import load_string
from data import save_string
from data import append_string
def set_pet_name(base_dir: str, nickname: str, domain: str,
@ -28,13 +31,11 @@ def set_pet_name(base_dir: str, nickname: str, domain: str,
# does this entry already exist?
if os.path.isfile(petnames_filename):
petnames_str: str = \
load_string(petnames_filename,
'EX: set_pet_name unable to read ' + petnames_filename)
if petnames_str is None:
petnames_str = ''
try:
with open(petnames_filename, 'r',
encoding='utf-8') as fp_petnames:
petnames_str = fp_petnames.read()
except OSError:
print('EX: set_pet_name unable to read ' + petnames_filename)
if entry in petnames_str:
return True
if ' ' + handle + '\n' in petnames_str:
@ -46,30 +47,22 @@ def set_pet_name(base_dir: str, nickname: str, domain: str,
else:
new_petnames_str += entry
# save the updated petnames file
try:
with open(petnames_filename, 'w+',
encoding='utf-8') as fp_petnames:
fp_petnames.write(new_petnames_str)
except OSError:
print('EX: set_pet_name unable to save ' + petnames_filename)
if not save_string(new_petnames_str, petnames_filename,
'EX: set_pet_name unable to save ' +
petnames_filename):
return False
return True
# entry does not exist in the petnames file
try:
with open(petnames_filename, 'a+',
encoding='utf-8') as fp_petnames:
fp_petnames.write(entry)
except OSError:
print('EX: set_pet_name unable to append ' + petnames_filename)
if not append_string(entry, petnames_filename,
'EX: set_pet_name unable to append ' +
petnames_filename):
return False
return True
# first entry
try:
with open(petnames_filename, 'w+', encoding='utf-8') as fp_petnames:
fp_petnames.write(entry)
except OSError:
print('EX: set_pet_name unable to write ' + petnames_filename)
if not save_string(entry, petnames_filename,
'EX: set_pet_name unable to write ' +
petnames_filename):
return False
return True
@ -86,12 +79,11 @@ def get_pet_name(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(petnames_filename):
return ''
petnames_str: str = \
load_string(petnames_filename,
'EX: get_pet_name unable to read ' + petnames_filename)
if petnames_str is None:
petnames_str = ''
try:
with open(petnames_filename, 'r', encoding='utf-8') as fp_petnames:
petnames_str = fp_petnames.read()
except OSError:
print('EX: get_pet_name unable to read ' + petnames_filename)
if ' ' + handle + '\n' in petnames_str:
petnames_list = petnames_str.split('\n')
for pet in petnames_list:
@ -117,12 +109,12 @@ def _get_pet_name_handle(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(petnames_filename):
return ''
petnames_str: str = \
load_string(petnames_filename,
'EX: _get_pet_name_handle unable to read ' +
petnames_filename)
if petnames_str is None:
petnames_str = ''
try:
with open(petnames_filename, 'r', encoding='utf-8') as fp_petnames:
petnames_str = fp_petnames.read()
except OSError:
print('EX: _get_pet_name_handle unable to read ' + petnames_filename)
if petname + ' ' in petnames_str:
petnames_list = petnames_str.split('\n')
for pet in petnames_list:

View File

@ -11,6 +11,7 @@ __module_group__ = "Core"
import os
import random
from random import randint
from data import load_string
common_nouns = (
"time",
@ -1979,11 +1980,10 @@ def load_dictionary(base_dir: str) -> []:
return []
words: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_dict:
words = fp_dict.read().split('\n')
except OSError:
print('EX: unable to load dictionary ' + filename)
words_str = load_string(filename,
'EX: unable to load dictionary ' + filename)
if words_str:
words = words_str.split('\n')
return words
@ -1998,11 +1998,10 @@ def load_2grams(base_dir: str) -> {}:
twograms = {}
lines: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_dict:
lines = fp_dict.read().split('\n')
except OSError:
print('EX: unable to load 2-grams ' + filename)
lines_str = load_string(filename,
'EX: unable to load 2-grams ' + filename)
if lines_str:
lines = lines_str.split('\n')
for line_str in lines:
words = line_str.split('\t')
if len(words) != 3:

273
posts.py
View File

@ -145,6 +145,9 @@ from conversation import conversation_tag_to_convthread_id
from conversation import post_id_to_convthread_id
from quote import quote_toots_allowed
from data import load_list
from data import load_string
from data import save_string
from data import append_string
def convert_post_content_to_html(message_json: {}) -> None:
@ -1000,11 +1003,8 @@ def _save_last_published(base_dir: str, nickname: str, domain: str,
published_filename = \
acct_dir(base_dir, nickname, domain) + '/.last_published'
try:
with open(published_filename, 'w+', encoding='utf-8') as fp_last:
fp_last.write(published)
except OSError:
print('EX: unable to save last published time ' +
save_string(published, published_filename,
'EX: unable to save last published time ' +
published_filename)
@ -1090,11 +1090,8 @@ def _update_hashtags_index(base_dir: str, tag: {}, new_post_id: str,
str(days_since_epoch) + ' ' + nickname + ' ' + \
new_post_id + '\n'
# create a new tags index file
try:
with open(tags_filename, 'w+', encoding='utf-8') as fp_tags:
fp_tags.write(tag_line)
except OSError:
print('EX: _update_hashtags_index unable to write tags file ' +
save_string(tag_line, tags_filename,
'EX: _update_hashtags_index unable to write tags file ' +
tags_filename)
return
@ -1142,13 +1139,9 @@ def _add_schedule_post(base_dir: str, nickname: str, domain: str,
schedule_index_filename + ' ' + str(ex))
return
try:
with open(schedule_index_filename, 'w+',
encoding='utf-8') as fp_schedule:
fp_schedule.write(index_str + '\n')
except OSError as ex:
print('EX: Failed to write entry to scheduled posts index2 ' +
schedule_index_filename + ' ' + str(ex))
save_string(index_str + '\n', schedule_index_filename,
'EX: Failed to write entry to scheduled posts index2 ' +
schedule_index_filename + ' [ex]')
def _create_post_cw_from_reply(base_dir: str, nickname: str, domain: str,
@ -1789,11 +1782,8 @@ def _create_post_mod_report(base_dir: str,
new_post['moderationStatus'] = 'pending'
# save to index file
moderation_index_file = data_dir(base_dir) + '/moderation.txt'
try:
with open(moderation_index_file, 'a+', encoding='utf-8') as fp_mod:
fp_mod.write(new_post_id + '\n')
except OSError:
print('EX: unable to write moderation index file ' +
append_string(new_post_id + '\n', moderation_index_file,
'EX: unable to write moderation index file ' +
moderation_index_file)
@ -2176,11 +2166,8 @@ def pin_post2(base_dir: str, nickname: str, domain: str,
"""
account_dir = acct_dir(base_dir, nickname, domain)
pinned_filename = account_dir + '/pinToProfile.txt'
try:
with open(pinned_filename, 'w+', encoding='utf-8') as fp_pin:
fp_pin.write(pinned_content)
except OSError:
print('EX: unable to write ' + pinned_filename)
save_string(pinned_content, pinned_filename,
'EX: unable to write ' + pinned_filename)
def undo_pinned_post(base_dir: str, nickname: str, domain: str) -> None:
@ -2206,12 +2193,9 @@ def get_pinned_post_as_json(base_dir: str, http_prefix: str,
pinned_post_json = {}
actor = local_actor_url(http_prefix, nickname, domain_full)
if os.path.isfile(pinned_filename):
pinned_content = None
try:
with open(pinned_filename, 'r', encoding='utf-8') as fp_pin:
pinned_content = fp_pin.read()
except OSError:
print('EX: get_pinned_post_as_json unable to read ' +
pinned_content = \
load_string(pinned_filename,
'EX: get_pinned_post_as_json unable to read ' +
pinned_filename)
if pinned_content:
pinned_post_json = {
@ -3003,11 +2987,9 @@ def create_report_post(base_dir: str,
new_report_file = acct_handle_dir(base_dir, handle) + '/.newReport'
if os.path.isfile(new_report_file):
continue
try:
with open(new_report_file, 'w+', encoding='utf-8') as fp_report:
fp_report.write(to_url + '/moderation')
except OSError:
print('EX: create_report_post unable to write ' + new_report_file)
save_string(to_url + '/moderation', new_report_file,
'EX: create_report_post unable to write ' +
new_report_file)
return post_json_object
@ -3024,22 +3006,14 @@ def _add_send_block(base_dir: str, nickname: str, domain: str,
inbox_url = inbox_url.replace('/inbox\n', '\n')
if not os.path.isfile(send_block_filename):
try:
with open(send_block_filename, 'w+',
encoding='utf-8') as fp_blocks:
fp_blocks.write(inbox_url)
except OSError:
print('EX: _add_send_block unable to create ' +
save_string(inbox_url, send_block_filename,
'EX: _add_send_block unable to create ' +
send_block_filename)
return
if not text_in_file(inbox_url, send_block_filename, False):
try:
with open(send_block_filename, 'a+',
encoding='utf-8') as fp_blocks:
fp_blocks.write(inbox_url)
except OSError:
print('EX: _add_send_block unable to write ' +
append_string(inbox_url, send_block_filename,
'EX: _add_send_block unable to write ' +
send_block_filename)
@ -3060,21 +3034,15 @@ def _remove_send_block(base_dir: str, nickname: str, domain: str,
if not text_in_file(inbox_url, send_block_filename, False):
return
send_blocks_str = ''
try:
with open(send_block_filename, 'r',
encoding='utf-8') as fp_blocks:
send_blocks_str = fp_blocks.read()
except OSError:
print('EX: _remove_send_block unable to read ' +
send_blocks_str = \
load_string(send_block_filename,
'EX: _remove_send_block unable to read ' +
send_block_filename)
if send_blocks_str is None:
send_blocks_str = ''
send_blocks_str = send_blocks_str.replace(inbox_url, '')
try:
with open(send_block_filename, 'w+',
encoding='utf-8') as fp_blocks:
fp_blocks.write(send_blocks_str)
except OSError:
print('EX: _remove_send_block unable to write ' +
save_string(send_blocks_str, send_block_filename,
'EX: _remove_send_block unable to write ' +
send_block_filename)
@ -3154,20 +3122,12 @@ def thread_send_post(session, post_json_str: str, federation_list: [],
# save the log file
post_log_filename = base_dir + '/post.log'
if os.path.isfile(post_log_filename):
try:
with open(post_log_filename, 'a+',
encoding='utf-8') as fp_log:
fp_log.write(log_str + '\n')
except OSError:
print('EX: thread_send_post unable to append ' +
append_string(log_str + '\n', post_log_filename,
'EX: thread_send_post unable to append ' +
post_log_filename)
else:
try:
with open(post_log_filename, 'w+',
encoding='utf-8') as fp_log:
fp_log.write(log_str + '\n')
except OSError:
print('EX: thread_send_post unable to write ' +
save_string(log_str + '\n', post_log_filename,
'EX: thread_send_post unable to write ' +
post_log_filename)
if post_result:
@ -4638,12 +4598,9 @@ def _add_post_to_timeline(file_path: str, boxname: str,
posts_in_box: [], box_actor: str) -> bool:
""" Reads a post from file and decides whether it is valid
"""
post_str = ''
try:
with open(file_path, 'r', encoding='utf-8') as fp_post:
post_str = fp_post.read()
except OSError:
print('EX: _add_post_to_timeline unable to read ' + file_path)
post_str = load_string(file_path,
'EX: _add_post_to_timeline unable to read ' +
file_path)
if not post_str:
return False
@ -4721,17 +4678,13 @@ def _locate_news_arrival(base_dir: str, domain: str,
account_dir = data_dir(base_dir) + '/news@' + domain + '/'
post_filename = account_dir + 'outbox/' + post_url
if os.path.isfile(post_filename):
try:
with open(post_filename, 'r', encoding='utf-8') as fp_arrival:
arrival = fp_arrival.read()
arrival = load_string(post_filename,
'EX: _locate_news_arrival unable to read ' +
post_filename)
if arrival:
arrival_date = \
date_from_string_format(arrival,
["%Y-%m-%dT%H:%M:%S%z"])
date_from_string_format(arrival, ["%Y-%m-%dT%H:%M:%S%z"])
return arrival_date
except OSError:
print('EX: _locate_news_arrival unable to read ' + post_filename)
return None
@ -5462,13 +5415,12 @@ def _expire_posts_for_person(http_prefix: str, nickname: str, domain: str,
full_filename = os.path.join(box_dir, post_filename)
if not os.path.isfile(full_filename):
continue
content = ''
try:
with open(full_filename, 'r', encoding='utf-8') as fp_content:
content = fp_content.read()
except OSError:
print('EX: expire_posts_for_person unable to open content ' +
content = \
load_string(full_filename,
'EX: expire_posts_for_person unable to open content ' +
full_filename)
if content is None:
content = ''
# Get time of publication
published_str = ''
@ -5540,11 +5492,8 @@ def set_post_expiry_keep_dms(base_dir: str, nickname: str, domain: str,
print('EX: unable to write set_post_expiry_keep_dms False ' +
expire_dms_filename)
return
try:
with open(expire_dms_filename, 'w+', encoding='utf-8') as fp_expire:
fp_expire.write('\n')
except OSError:
print('EX: unable to write set_post_expiry_keep_dms True ' +
save_string('\n', expire_dms_filename,
'EX: unable to write set_post_expiry_keep_dms True ' +
expire_dms_filename)
@ -5565,14 +5514,11 @@ def expire_posts(base_dir: str, http_prefix: str,
if not os.path.isfile(expire_posts_filename):
continue
keep_dms = get_post_expiry_keep_dms(base_dir, nickname, domain)
expire_days_str = None
try:
with open(expire_posts_filename, 'r',
encoding='utf-8') as fp_expire:
expire_days_str = fp_expire.read()
except OSError:
print('EX: expire_posts failed to read days file ' +
expire_days_str = \
load_string(expire_posts_filename,
'EX: expire_posts failed to read days file ' +
expire_posts_filename)
if expire_days_str is None:
continue
if not expire_days_str:
continue
@ -5599,12 +5545,8 @@ def get_post_expiry_days(base_dir: str, nickname: str, domain: str) -> int:
acct_handle_dir(base_dir, handle) + '/.expire_posts_days'
if not os.path.isfile(expire_posts_filename):
return 0
days_str = None
try:
with open(expire_posts_filename, 'r', encoding='utf-8') as fp_expire:
days_str = fp_expire.read()
except OSError:
print('EX: unable to write post expire days ' +
days_str = load_string(expire_posts_filename,
'EX: unable to write post expire days ' +
expire_posts_filename)
if not days_str:
return 0
@ -5620,11 +5562,9 @@ def set_post_expiry_days(base_dir: str, nickname: str, domain: str,
handle = nickname + '@' + domain
expire_posts_filename = \
acct_handle_dir(base_dir, handle) + '/.expire_posts_days'
try:
with open(expire_posts_filename, 'w+', encoding='utf-8') as fp_expire:
fp_expire.write(str(max_age_days))
except OSError:
print('EX: unable to write post expire days ' +
text = str(max_age_days)
save_string(text, expire_posts_filename,
'EX: unable to write post expire days ' +
expire_posts_filename)
@ -5671,12 +5611,8 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
index_filename + ' ' + str(ex))
# save the new index file
if new_index:
try:
with open(index_filename, 'w+',
encoding='utf-8') as fp_index:
fp_index.write(new_index)
except OSError:
print('EX: archive_posts_for_person unable to write ' +
save_string(new_index, index_filename,
'EX: archive_posts_for_person unable to write ' +
index_filename)
# remove any edits or replies
@ -5707,13 +5643,11 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
continue
edit_files_ctr += 1
if os.path.isfile(full_filename):
content = load_string(full_filename,
'EX: unable to open content 2 ' +
full_filename)
if content is None:
content = ''
try:
with open(full_filename, 'r',
encoding='utf-8') as fp_content:
content = fp_content.read()
except OSError:
print('EX: unable to open content 2 ' + full_filename)
if '"published":' in content:
published_str = content.split('"published":')[1]
if '"' in published_str:
@ -5779,12 +5713,11 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
# Get the published time
full_filename = os.path.join(box_dir, post_filename)
if os.path.isfile(full_filename):
content = load_string(full_filename,
'EX: unable to open content 1 ' +
full_filename)
if content is None:
content = ''
try:
with open(full_filename, 'r', encoding='utf-8') as fp_content:
content = fp_content.read()
except OSError:
print('EX: unable to open content 1 ' + full_filename)
if '"published":' in content:
published_str = content.split('"published":')[1]
if '"' in published_str:
@ -6132,13 +6065,12 @@ def get_public_post_domains_blocked(session, base_dir: str,
return []
# read the blocked domains as a single string
blocked_str = ''
try:
with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
blocked_str = fp_block.read()
except OSError:
print('EX: get_public_post_domains_blocked unable to read ' +
blocked_str = \
load_string(blocking_filename,
'EX: get_public_post_domains_blocked unable to read ' +
blocking_filename)
if blocked_str is None:
blocked_str = ''
blocked_domains: list[str] = []
for domain_name in post_domains:
@ -6190,13 +6122,12 @@ def check_domains(session, base_dir: str,
update_follower_warnings = False
follower_warning_str = ''
if os.path.isfile(follower_warning_filename):
try:
with open(follower_warning_filename, 'r',
encoding='utf-8') as fp_warn:
follower_warning_str = fp_warn.read()
except OSError:
print('EX: check_domains unable to read ' +
follower_warning_str = \
load_string(follower_warning_filename,
'EX: check_domains unable to read ' +
follower_warning_filename)
if follower_warning_str is None:
follower_warning_str = ''
if single_check:
# checks a single random non-mutual
@ -6247,12 +6178,8 @@ def check_domains(session, base_dir: str,
update_follower_warnings = True
if update_follower_warnings and follower_warning_str:
try:
with open(follower_warning_filename, 'w+',
encoding='utf-8') as fp_warn:
fp_warn.write(follower_warning_str)
except OSError:
print('EX: check_domains unable to write ' +
save_string(follower_warning_str, follower_warning_filename,
'EX: check_domains unable to write ' +
follower_warning_filename)
if not single_check:
print(follower_warning_str)
@ -6339,12 +6266,8 @@ def _reject_announce(announce_filename: str,
if os.path.isfile(announce_filename + '.reject'):
return
try:
with open(announce_filename + '.reject', 'w+',
encoding='utf-8') as fp_reject_announce:
fp_reject_announce.write('\n')
except OSError:
print('EX: _reject_announce unable to write ' +
save_string('\n', announce_filename + '.reject',
'EX: _reject_announce unable to write ' +
announce_filename + '.reject')
@ -6972,13 +6895,11 @@ def edited_post_filename(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(actor_filename):
return '', None
post_id = remove_id_ending(post_json_object['object']['id'])
lastpost_id = None
try:
with open(actor_filename, 'r',
encoding='utf-8') as fp_actor:
lastpost_id = fp_actor.read()
except OSError:
print('EX: edited_post_filename unable to read ' + actor_filename)
lastpost_id = \
load_string(actor_filename,
'EX: edited_post_filename unable to read ' +
actor_filename)
if lastpost_id is None:
return '', None
if not lastpost_id:
return '', None
@ -7099,15 +7020,13 @@ def get_max_profile_posts(base_dir: str, nickname: str, domain: str,
max_profile_posts = 4
if not os.path.isfile(max_posts_filename):
return max_profile_posts
try:
with open(max_posts_filename, 'r', encoding='utf-8') as fp_posts:
max_posts_str = fp_posts.read()
max_posts_str = \
load_string(max_posts_filename,
'EX: unable to read maximum profile posts ' +
max_posts_filename)
if max_posts_str:
if max_posts_str.isdigit():
max_profile_posts = int(max_posts_str)
except OSError:
print('EX: unable to read maximum profile posts ' +
max_posts_filename)
if max_profile_posts < 1:
max_profile_posts = 1
if max_profile_posts > max_recent_posts:
@ -7122,13 +7041,9 @@ def set_max_profile_posts(base_dir: str, nickname: str, domain: str,
max_posts_filename = \
acct_dir(base_dir, nickname, domain) + '/max_profile_posts.txt'
max_recent_posts_str = str(max_recent_posts)
try:
with open(max_posts_filename, 'w+',
encoding='utf-8') as fp_posts:
fp_posts.write(max_recent_posts_str)
except OSError:
print('EX: unable to save maximum profile posts ' +
max_posts_filename)
if not save_string(max_recent_posts_str, max_posts_filename,
'EX: unable to save maximum profile posts ' +
max_posts_filename):
return False
return True