mirror of https://gitlab.com/bashrc2/epicyon
Replace file operations with functions
parent
7c883cf255
commit
2b8f9e3ab7
|
|
@ -28,6 +28,7 @@ from languages import get_understood_languages
|
||||||
from posts import create_direct_message_post
|
from posts import create_direct_message_post
|
||||||
from daemon_utils import post_to_outbox
|
from daemon_utils import post_to_outbox
|
||||||
from inbox import populate_replies
|
from inbox import populate_replies
|
||||||
|
from data import append_string
|
||||||
|
|
||||||
|
|
||||||
def receive_vote(self, calling_domain: str, cookie: str,
|
def receive_vote(self, calling_domain: str, cookie: str,
|
||||||
|
|
@ -289,13 +290,9 @@ def _send_reply_to_question(self, base_dir: str,
|
||||||
max_replies,
|
max_replies,
|
||||||
debug)
|
debug)
|
||||||
# record the vote
|
# record the vote
|
||||||
try:
|
append_string(message_id + '\n', votes_filename,
|
||||||
with open(votes_filename, 'a+',
|
'EX: unable to write vote ' +
|
||||||
encoding='utf-8') as fp_votes:
|
votes_filename)
|
||||||
fp_votes.write(message_id + '\n')
|
|
||||||
except OSError:
|
|
||||||
print('EX: unable to write vote ' +
|
|
||||||
votes_filename)
|
|
||||||
|
|
||||||
# ensure that the cached post is removed if it exists,
|
# ensure that the cached post is removed if it exists,
|
||||||
# so that it then will be recreated
|
# so that it then will be recreated
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,7 @@ from cache import get_person_from_cache
|
||||||
from shares import add_shares_to_actor
|
from shares import add_shares_to_actor
|
||||||
from person import get_actor_update_json
|
from person import get_actor_update_json
|
||||||
from maps import geocoords_to_osm_link
|
from maps import geocoords_to_osm_link
|
||||||
|
from data import save_string
|
||||||
|
|
||||||
NEW_POST_SUCCESS = 1
|
NEW_POST_SUCCESS = 1
|
||||||
NEW_POST_FAILED = -1
|
NEW_POST_FAILED = -1
|
||||||
|
|
@ -2028,13 +2029,10 @@ def _receive_new_post_process(self, post_type: str, path: str, headers: {},
|
||||||
# This is then used for active monthly users counts
|
# This is then used for active monthly users counts
|
||||||
last_used_filename = \
|
last_used_filename = \
|
||||||
acct_dir(base_dir, nickname, domain) + '/.lastUsed'
|
acct_dir(base_dir, nickname, domain) + '/.lastUsed'
|
||||||
try:
|
text = str(int(time.time()))
|
||||||
with open(last_used_filename, 'w+',
|
save_string(text, last_used_filename,
|
||||||
encoding='utf-8') as fp_last:
|
'EX: _receive_new_post_process unable to write ' +
|
||||||
fp_last.write(str(int(time.time())))
|
last_used_filename)
|
||||||
except OSError:
|
|
||||||
print('EX: _receive_new_post_process unable to write ' +
|
|
||||||
last_used_filename)
|
|
||||||
|
|
||||||
mentions_str = ''
|
mentions_str = ''
|
||||||
if fields.get('mentions'):
|
if fields.get('mentions'):
|
||||||
|
|
|
||||||
|
|
@ -78,6 +78,7 @@ from httpheaders import redirect_headers
|
||||||
from httpheaders import set_headers
|
from httpheaders import set_headers
|
||||||
from fitnessFunctions import fitness_performance
|
from fitnessFunctions import fitness_performance
|
||||||
from siteactive import is_online
|
from siteactive import is_online
|
||||||
|
from data import load_string
|
||||||
|
|
||||||
|
|
||||||
def post_to_outbox(self, message_json: {}, version: str,
|
def post_to_outbox(self, message_json: {}, version: str,
|
||||||
|
|
@ -904,17 +905,13 @@ def etag_exists(self, media_filename: str) -> bool:
|
||||||
old_etag = self.headers[etag_header].replace('"', '')
|
old_etag = self.headers[etag_header].replace('"', '')
|
||||||
if os.path.isfile(media_filename + '.etag'):
|
if os.path.isfile(media_filename + '.etag'):
|
||||||
# load the etag from file
|
# load the etag from file
|
||||||
curr_etag = ''
|
exc_str = 'EX: _etag_exists unable to read ' + \
|
||||||
try:
|
str(media_filename)
|
||||||
with open(media_filename + '.etag', 'r',
|
curr_etag = load_string(media_filename + '.etag', exc_str)
|
||||||
encoding='utf-8') as fp_media:
|
if curr_etag:
|
||||||
curr_etag = fp_media.read()
|
if old_etag == curr_etag:
|
||||||
except OSError:
|
# The file has not changed
|
||||||
print('EX: _etag_exists unable to read ' +
|
return True
|
||||||
str(media_filename))
|
|
||||||
if curr_etag and old_etag == curr_etag:
|
|
||||||
# The file has not changed
|
|
||||||
return True
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,8 @@ from bookmarks import send_undo_bookmark_via_server
|
||||||
from delete import send_delete_via_server
|
from delete import send_delete_via_server
|
||||||
from person import get_actor_json
|
from person import get_actor_json
|
||||||
from cache import get_person_from_cache
|
from cache import get_person_from_cache
|
||||||
|
from data import save_string
|
||||||
|
from data import load_string
|
||||||
|
|
||||||
|
|
||||||
def _desktop_help() -> None:
|
def _desktop_help() -> None:
|
||||||
|
|
@ -198,12 +200,8 @@ def _mark_post_as_read(actor: str, post_id: str, post_category: str) -> None:
|
||||||
except OSError as ex:
|
except OSError as ex:
|
||||||
print('EX: Failed to mark post as read 1 ' + str(ex))
|
print('EX: Failed to mark post as read 1 ' + str(ex))
|
||||||
else:
|
else:
|
||||||
try:
|
save_string(post_id + '\n', read_posts_filename,
|
||||||
with open(read_posts_filename, 'w+',
|
'EX: Failed to mark post as read 2 [ex]')
|
||||||
encoding='utf-8') as fp_read:
|
|
||||||
fp_read.write(post_id + '\n')
|
|
||||||
except OSError as ex:
|
|
||||||
print('EX: Failed to mark post as read 2 ' + str(ex))
|
|
||||||
|
|
||||||
|
|
||||||
def _has_read_post(actor: str, post_id: str, post_category: str) -> bool:
|
def _has_read_post(actor: str, post_id: str, post_category: str) -> bool:
|
||||||
|
|
@ -332,13 +330,10 @@ def _desktop_show_banner() -> None:
|
||||||
banner_filename = 'theme/' + banner_theme + '/banner.txt'
|
banner_filename = 'theme/' + banner_theme + '/banner.txt'
|
||||||
if not os.path.isfile(banner_filename):
|
if not os.path.isfile(banner_filename):
|
||||||
return
|
return
|
||||||
try:
|
banner = load_string(banner_filename,
|
||||||
with open(banner_filename, 'r', encoding='utf-8') as fp_banner:
|
'EX: unable to read banner file ' + banner_filename)
|
||||||
banner = fp_banner.read()
|
if banner:
|
||||||
if banner:
|
print(banner + '\n')
|
||||||
print(banner + '\n')
|
|
||||||
except OSError:
|
|
||||||
print('EX: unable to read banner file ' + banner_filename)
|
|
||||||
|
|
||||||
|
|
||||||
def _desktop_wait_for_cmd(timeout: int, debug: bool) -> str:
|
def _desktop_wait_for_cmd(timeout: int, debug: bool) -> str:
|
||||||
|
|
|
||||||
10
epicyon.py
10
epicyon.py
|
|
@ -135,6 +135,7 @@ from poison import load_2grams
|
||||||
from webapp_post import get_instance_software
|
from webapp_post import get_instance_software
|
||||||
from siteactive import site_is_active
|
from siteactive import site_is_active
|
||||||
from siteactive import is_online
|
from siteactive import is_online
|
||||||
|
from data import save_string
|
||||||
|
|
||||||
|
|
||||||
def str2bool(value_str) -> bool:
|
def str2bool(value_str) -> bool:
|
||||||
|
|
@ -1297,12 +1298,9 @@ def _command_options() -> None:
|
||||||
http_prefix, debug,
|
http_prefix, debug,
|
||||||
__version__, argb.language,
|
__version__, argb.language,
|
||||||
signing_priv_key_pem, mitm_servers)
|
signing_priv_key_pem, mitm_servers)
|
||||||
try:
|
if save_string(dot_graph, 'socnet.dot',
|
||||||
with open('socnet.dot', 'w+', encoding='utf-8') as fp_soc:
|
'EX: commandline unable to write socnet.dot'):
|
||||||
fp_soc.write(dot_graph)
|
print('Saved to socnet.dot')
|
||||||
print('Saved to socnet.dot')
|
|
||||||
except OSError:
|
|
||||||
print('EX: commandline unable to write socnet.dot')
|
|
||||||
sys.exit()
|
sys.exit()
|
||||||
|
|
||||||
if argb.postsraw:
|
if argb.postsraw:
|
||||||
|
|
|
||||||
16
filters.py
16
filters.py
|
|
@ -16,6 +16,7 @@ from utils import remove_eol
|
||||||
from unicodetext import standardize_text
|
from unicodetext import standardize_text
|
||||||
from unicodetext import remove_inverted_text
|
from unicodetext import remove_inverted_text
|
||||||
from unicodetext import remove_square_capitals
|
from unicodetext import remove_square_capitals
|
||||||
|
from data import append_string
|
||||||
|
|
||||||
|
|
||||||
def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
|
def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
|
||||||
|
|
@ -26,12 +27,8 @@ def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
|
||||||
if os.path.isfile(filters_filename):
|
if os.path.isfile(filters_filename):
|
||||||
if text_in_file(words, filters_filename):
|
if text_in_file(words, filters_filename):
|
||||||
return False
|
return False
|
||||||
try:
|
if not append_string(words + '\n', filters_filename,
|
||||||
with open(filters_filename, 'a+',
|
'EX: unable to append filters ' + filters_filename):
|
||||||
encoding='utf-8') as fp_filters:
|
|
||||||
fp_filters.write(words + '\n')
|
|
||||||
except OSError:
|
|
||||||
print('EX: unable to append filters ' + filters_filename)
|
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
@ -48,11 +45,8 @@ def add_global_filter(base_dir: str, words: str) -> bool:
|
||||||
if os.path.isfile(filters_filename):
|
if os.path.isfile(filters_filename):
|
||||||
if text_in_file(words, filters_filename):
|
if text_in_file(words, filters_filename):
|
||||||
return False
|
return False
|
||||||
try:
|
if not append_string(words + '\n', filters_filename,
|
||||||
with open(filters_filename, 'a+', encoding='utf-8') as fp_filters:
|
'EX: unable to append filters ' + filters_filename):
|
||||||
fp_filters.write(words + '\n')
|
|
||||||
except OSError:
|
|
||||||
print('EX: unable to append filters ' + filters_filename)
|
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
|
||||||
68
flags.py
68
flags.py
|
|
@ -31,6 +31,7 @@ from utils import text_in_file
|
||||||
from utils import get_group_paths
|
from utils import get_group_paths
|
||||||
from formats import get_image_extensions
|
from formats import get_image_extensions
|
||||||
from quote import get_quote_toot_url
|
from quote import get_quote_toot_url
|
||||||
|
from data import load_string
|
||||||
|
|
||||||
|
|
||||||
def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
|
def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
|
||||||
|
|
@ -53,13 +54,10 @@ def is_dormant(base_dir: str, nickname: str, domain: str, actor: str,
|
||||||
if not os.path.isfile(last_seen_filename):
|
if not os.path.isfile(last_seen_filename):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
days_since_epoch_str = None
|
days_since_epoch_str = \
|
||||||
try:
|
load_string(last_seen_filename, 'EX: failed to read last seen ' +
|
||||||
with open(last_seen_filename, 'r',
|
last_seen_filename)
|
||||||
encoding='utf-8') as fp_last_seen:
|
if days_since_epoch_str is None:
|
||||||
days_since_epoch_str = fp_last_seen.read()
|
|
||||||
except OSError:
|
|
||||||
print('EX: failed to read last seen ' + last_seen_filename)
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if days_since_epoch_str:
|
if days_since_epoch_str:
|
||||||
|
|
@ -85,12 +83,11 @@ def is_editor(base_dir: str, nickname: str) -> bool:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
lines: list[str] = []
|
lines: list[str] = \
|
||||||
try:
|
load_string(editors_file,
|
||||||
with open(editors_file, 'r', encoding='utf-8') as fp_editors:
|
'EX: is_editor unable to read ' + editors_file)
|
||||||
lines = fp_editors.readlines()
|
if lines is not None:
|
||||||
except OSError:
|
lines = lines.split('\n')
|
||||||
print('EX: is_editor unable to read ' + editors_file)
|
|
||||||
|
|
||||||
if not lines:
|
if not lines:
|
||||||
admin_name = get_config_param(base_dir, 'admin')
|
admin_name = get_config_param(base_dir, 'admin')
|
||||||
|
|
@ -116,12 +113,11 @@ def is_artist(base_dir: str, nickname: str) -> bool:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
lines: list[str] = []
|
lines: list[str] = \
|
||||||
try:
|
load_string(artists_file,
|
||||||
with open(artists_file, 'r', encoding='utf-8') as fp_artists:
|
'EX: is_artist unable to read ' + artists_file)
|
||||||
lines = fp_artists.readlines()
|
if lines is not None:
|
||||||
except OSError:
|
lines = lines.split('\n')
|
||||||
print('EX: is_artist unable to read ' + artists_file)
|
|
||||||
|
|
||||||
if not lines:
|
if not lines:
|
||||||
admin_name = get_config_param(base_dir, 'admin')
|
admin_name = get_config_param(base_dir, 'admin')
|
||||||
|
|
@ -158,12 +154,11 @@ def is_memorial_account(base_dir: str, nickname: str) -> bool:
|
||||||
memorial_file = data_dir(base_dir) + '/memorial'
|
memorial_file = data_dir(base_dir) + '/memorial'
|
||||||
if not os.path.isfile(memorial_file):
|
if not os.path.isfile(memorial_file):
|
||||||
return False
|
return False
|
||||||
memorial_list: list[str] = []
|
memorial_list: list[str] = \
|
||||||
try:
|
load_string(memorial_file,
|
||||||
with open(memorial_file, 'r', encoding='utf-8') as fp_memorial:
|
'EX: unable to read ' + memorial_file)
|
||||||
memorial_list = fp_memorial.read().split('\n')
|
if memorial_list is not None:
|
||||||
except OSError:
|
memorial_list = memorial_list.split('\n')
|
||||||
print('EX: unable to read ' + memorial_file)
|
|
||||||
if nickname in memorial_list:
|
if nickname in memorial_list:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
@ -180,12 +175,12 @@ def is_suspended(base_dir: str, nickname: str) -> bool:
|
||||||
|
|
||||||
suspended_filename = data_dir(base_dir) + '/suspended.txt'
|
suspended_filename = data_dir(base_dir) + '/suspended.txt'
|
||||||
if os.path.isfile(suspended_filename):
|
if os.path.isfile(suspended_filename):
|
||||||
lines: list[str] = []
|
lines: list[str] = \
|
||||||
try:
|
load_string(suspended_filename,
|
||||||
with open(suspended_filename, 'r', encoding='utf-8') as fp_susp:
|
'EX: is_suspended unable to read ' +
|
||||||
lines = fp_susp.readlines()
|
suspended_filename)
|
||||||
except OSError:
|
if lines is not None:
|
||||||
print('EX: is_suspended unable to read ' + suspended_filename)
|
lines = lines.split('\n')
|
||||||
|
|
||||||
for suspended in lines:
|
for suspended in lines:
|
||||||
if suspended.strip('\n').strip('\r') == nickname:
|
if suspended.strip('\n').strip('\r') == nickname:
|
||||||
|
|
@ -734,12 +729,11 @@ def is_moderator(base_dir: str, nickname: str) -> bool:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
lines: list[str] = []
|
lines: list[str] = \
|
||||||
try:
|
load_string(moderators_file,
|
||||||
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
|
'EX: is_moderator unable to read ' + moderators_file)
|
||||||
lines = fp_mod.readlines()
|
if lines is not None:
|
||||||
except OSError:
|
lines = lines.split('\n')
|
||||||
print('EX: is_moderator unable to read ' + moderators_file)
|
|
||||||
|
|
||||||
if not lines:
|
if not lines:
|
||||||
admin_name = get_config_param(base_dir, 'admin')
|
admin_name = get_config_param(base_dir, 'admin')
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue