Revert "Replace file operations with functions"

This reverts commit 73a5c0705a.
main
bashrc 2026-04-26 17:05:16 +01:00
parent 3491644e3a
commit a9e9cbe515
7 changed files with 92 additions and 65 deletions

View File

@ -28,7 +28,6 @@ from languages import get_understood_languages
from posts import create_direct_message_post
from daemon_utils import post_to_outbox
from inbox import populate_replies
from data import append_string
def receive_vote(self, calling_domain: str, cookie: str,
@ -290,9 +289,13 @@ def _send_reply_to_question(self, base_dir: str,
max_replies,
debug)
# record the vote
append_string(message_id + '\n', votes_filename,
'EX: unable to write vote ' +
votes_filename)
try:
with open(votes_filename, 'a+',
encoding='utf-8') as fp_votes:
fp_votes.write(message_id + '\n')
except OSError:
print('EX: unable to write vote ' +
votes_filename)
# ensure that the cached post is removed if it exists,
# so that it then will be recreated

View File

@ -65,7 +65,6 @@ from cache import get_person_from_cache
from shares import add_shares_to_actor
from person import get_actor_update_json
from maps import geocoords_to_osm_link
from data import save_string
NEW_POST_SUCCESS = 1
NEW_POST_FAILED = -1
@ -2029,10 +2028,13 @@ def _receive_new_post_process(self, post_type: str, path: str, headers: {},
# This is then used for active monthly users counts
last_used_filename = \
acct_dir(base_dir, nickname, domain) + '/.lastUsed'
text = str(int(time.time()))
save_string(text, last_used_filename,
'EX: _receive_new_post_process unable to write ' +
last_used_filename)
try:
with open(last_used_filename, 'w+',
encoding='utf-8') as fp_last:
fp_last.write(str(int(time.time())))
except OSError:
print('EX: _receive_new_post_process unable to write ' +
last_used_filename)
mentions_str = ''
if fields.get('mentions'):

View File

@ -78,7 +78,6 @@ from httpheaders import redirect_headers
from httpheaders import set_headers
from fitnessFunctions import fitness_performance
from siteactive import is_online
from data import load_string
def post_to_outbox(self, message_json: {}, version: str,
@ -905,13 +904,17 @@ def etag_exists(self, media_filename: str) -> bool:
old_etag = self.headers[etag_header].replace('"', '')
if os.path.isfile(media_filename + '.etag'):
# load the etag from file
exc_str = 'EX: _etag_exists unable to read ' + \
str(media_filename)
curr_etag = load_string(media_filename + '.etag', exc_str)
if curr_etag:
if old_etag == curr_etag:
# The file has not changed
return True
curr_etag = ''
try:
with open(media_filename + '.etag', 'r',
encoding='utf-8') as fp_media:
curr_etag = fp_media.read()
except OSError:
print('EX: _etag_exists unable to read ' +
str(media_filename))
if curr_etag and old_etag == curr_etag:
# The file has not changed
return True
return False

View File

@ -72,8 +72,6 @@ from bookmarks import send_undo_bookmark_via_server
from delete import send_delete_via_server
from person import get_actor_json
from cache import get_person_from_cache
from data import save_string
from data import load_string
def _desktop_help() -> None:
@ -200,8 +198,12 @@ def _mark_post_as_read(actor: str, post_id: str, post_category: str) -> None:
except OSError as ex:
print('EX: Failed to mark post as read 1 ' + str(ex))
else:
save_string(post_id + '\n', read_posts_filename,
'EX: Failed to mark post as read 2 [ex]')
try:
with open(read_posts_filename, 'w+',
encoding='utf-8') as fp_read:
fp_read.write(post_id + '\n')
except OSError as ex:
print('EX: Failed to mark post as read 2 ' + str(ex))
def _has_read_post(actor: str, post_id: str, post_category: str) -> bool:
@ -330,10 +332,13 @@ def _desktop_show_banner() -> None:
banner_filename = 'theme/' + banner_theme + '/banner.txt'
if not os.path.isfile(banner_filename):
return
banner = load_string(banner_filename,
'EX: unable to read banner file ' + banner_filename)
if banner:
print(banner + '\n')
try:
with open(banner_filename, 'r', encoding='utf-8') as fp_banner:
banner = fp_banner.read()
if banner:
print(banner + '\n')
except OSError:
print('EX: unable to read banner file ' + banner_filename)
def _desktop_wait_for_cmd(timeout: int, debug: bool) -> str:

View File

@ -135,7 +135,6 @@ from poison import load_2grams
from webapp_post import get_instance_software
from siteactive import site_is_active
from siteactive import is_online
from data import save_string
def str2bool(value_str) -> bool:
@ -1298,9 +1297,12 @@ def _command_options() -> None:
http_prefix, debug,
__version__, argb.language,
signing_priv_key_pem, mitm_servers)
if save_string(dot_graph, 'socnet.dot',
'EX: commandline unable to write socnet.dot'):
print('Saved to socnet.dot')
try:
with open('socnet.dot', 'w+', encoding='utf-8') as fp_soc:
fp_soc.write(dot_graph)
print('Saved to socnet.dot')
except OSError:
print('EX: commandline unable to write socnet.dot')
sys.exit()
if argb.postsraw:

View File

@ -16,7 +16,6 @@ from utils import remove_eol
from unicodetext import standardize_text
from unicodetext import remove_inverted_text
from unicodetext import remove_square_capitals
from data import append_string
def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
@ -27,8 +26,12 @@ def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
if os.path.isfile(filters_filename):
if text_in_file(words, filters_filename):
return False
if not append_string(words + '\n', filters_filename,
'EX: unable to append filters ' + filters_filename):
try:
with open(filters_filename, 'a+',
encoding='utf-8') as fp_filters:
fp_filters.write(words + '\n')
except OSError:
print('EX: unable to append filters ' + filters_filename)
return False
return True
@ -45,8 +48,11 @@ def add_global_filter(base_dir: str, words: str) -> bool:
if os.path.isfile(filters_filename):
if text_in_file(words, filters_filename):
return False
if not append_string(words + '\n', filters_filename,
'EX: unable to append filters ' + filters_filename):
try:
with open(filters_filename, 'a+', encoding='utf-8') as fp_filters:
fp_filters.write(words + '\n')
except OSError:
print('EX: unable to append filters ' + filters_filename)
return False
return True

View File

@ -31,7 +31,6 @@ from utils import text_in_file
from utils import get_group_paths
from formats import get_image_extensions
from quote import get_quote_toot_url
from data import load_string
def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
@ -54,10 +53,13 @@ def is_dormant(base_dir: str, nickname: str, domain: str, actor: str,
if not os.path.isfile(last_seen_filename):
return False
days_since_epoch_str = \
load_string(last_seen_filename, 'EX: failed to read last seen ' +
last_seen_filename)
if days_since_epoch_str is None:
days_since_epoch_str = None
try:
with open(last_seen_filename, 'r',
encoding='utf-8') as fp_last_seen:
days_since_epoch_str = fp_last_seen.read()
except OSError:
print('EX: failed to read last seen ' + last_seen_filename)
return False
if days_since_epoch_str:
@ -83,11 +85,12 @@ def is_editor(base_dir: str, nickname: str) -> bool:
return True
return False
lines: list[str] = \
load_string(editors_file,
'EX: is_editor unable to read ' + editors_file)
if lines is not None:
lines = lines.split('\n')
lines: list[str] = []
try:
with open(editors_file, 'r', encoding='utf-8') as fp_editors:
lines = fp_editors.readlines()
except OSError:
print('EX: is_editor unable to read ' + editors_file)
if not lines:
admin_name = get_config_param(base_dir, 'admin')
@ -113,11 +116,12 @@ def is_artist(base_dir: str, nickname: str) -> bool:
return True
return False
lines: list[str] = \
load_string(artists_file,
'EX: is_artist unable to read ' + artists_file)
if lines is not None:
lines = lines.split('\n')
lines: list[str] = []
try:
with open(artists_file, 'r', encoding='utf-8') as fp_artists:
lines = fp_artists.readlines()
except OSError:
print('EX: is_artist unable to read ' + artists_file)
if not lines:
admin_name = get_config_param(base_dir, 'admin')
@ -154,11 +158,12 @@ def is_memorial_account(base_dir: str, nickname: str) -> bool:
memorial_file = data_dir(base_dir) + '/memorial'
if not os.path.isfile(memorial_file):
return False
memorial_list: list[str] = \
load_string(memorial_file,
'EX: unable to read ' + memorial_file)
if memorial_list is not None:
memorial_list = memorial_list.split('\n')
memorial_list: list[str] = []
try:
with open(memorial_file, 'r', encoding='utf-8') as fp_memorial:
memorial_list = fp_memorial.read().split('\n')
except OSError:
print('EX: unable to read ' + memorial_file)
if nickname in memorial_list:
return True
return False
@ -175,12 +180,12 @@ def is_suspended(base_dir: str, nickname: str) -> bool:
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
lines: list[str] = \
load_string(suspended_filename,
'EX: is_suspended unable to read ' +
suspended_filename)
if lines is not None:
lines = lines.split('\n')
lines: list[str] = []
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_susp:
lines = fp_susp.readlines()
except OSError:
print('EX: is_suspended unable to read ' + suspended_filename)
for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname:
@ -729,11 +734,12 @@ def is_moderator(base_dir: str, nickname: str) -> bool:
return True
return False
lines: list[str] = \
load_string(moderators_file,
'EX: is_moderator unable to read ' + moderators_file)
if lines is not None:
lines = lines.split('\n')
lines: list[str] = []
try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: is_moderator unable to read ' + moderators_file)
if not lines:
admin_name = get_config_param(base_dir, 'admin')