Replace save, load and append with functions

main
bashrc 2026-04-25 19:29:35 +01:00
parent 06ccf1b373
commit 412e053abd
1 changed files with 202 additions and 328 deletions

View File

@ -53,6 +53,9 @@ from conversation import mute_conversation
from conversation import unmute_conversation from conversation import unmute_conversation
from auth import create_basic_auth_header from auth import create_basic_auth_header
from session import get_json from session import get_json
from data import load_string
from data import save_string
from data import append_string
def get_global_block_reason(search_text: str, def get_global_block_reason(search_text: str,
@ -62,14 +65,10 @@ def get_global_block_reason(search_text: str,
if not text_in_file(search_text, blocking_reasons_filename): if not text_in_file(search_text, blocking_reasons_filename):
return '' return ''
reasons_str: str = '' reasons_str: str = \
try: load_string(blocking_reasons_filename,
with open(blocking_reasons_filename, 'r', 'WARN: Failed to read blocking reasons ' +
encoding='utf-8') as fp_reas: blocking_reasons_filename)
reasons_str = fp_reas.read()
except OSError:
print('WARN: Failed to read blocking reasons ' +
blocking_reasons_filename)
if not reasons_str: if not reasons_str:
return '' return ''
@ -97,13 +96,11 @@ def get_account_blocks(base_dir: str,
return '' return ''
blocked_accounts_textarea: str = '' blocked_accounts_textarea: str = ''
blocking_file_text: str = '' blocking_file_text: str = \
try: load_string(blocking_filename,
with open(blocking_filename, 'r', encoding='utf-8') as fp_block: 'EX: Failed to read account blocks ' +
blocking_file_text = fp_block.read() blocking_filename)
except OSError: if blocking_file_text is None:
if debug:
print('EX: Failed to read account blocks ' + blocking_filename)
return '' return ''
blocklist = blocking_file_text.split('\n') blocklist = blocking_file_text.split('\n')
@ -230,18 +227,10 @@ def add_account_blocks(base_dir: str,
blocking_reasons_filename) blocking_reasons_filename)
return True return True
try: save_string(blocking_file_text, blocking_filename,
with open(blocking_filename, 'w+', encoding='utf-8') as fp_block: 'EX: Failed to write ' + blocking_filename)
fp_block.write(blocking_file_text) save_string(blocking_reasons_file_text, blocking_reasons_filename,
except OSError: 'EX: Failed to write ' + blocking_reasons_filename)
print('EX: Failed to write ' + blocking_filename)
try:
with open(blocking_reasons_filename, 'w+',
encoding='utf-8') as fp_block:
fp_block.write(blocking_reasons_file_text)
except OSError:
print('EX: Failed to write ' + blocking_reasons_filename)
return True return True
@ -268,21 +257,15 @@ def _add_global_block_reason(base_dir: str,
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
if not text_in_file(block_id, if not text_in_file(block_id,
blocking_reasons_filename): blocking_reasons_filename):
try: append_string(reason_line, blocking_reasons_filename,
with open(blocking_reasons_filename, 'a+', 'EX: unable to add blocking reason ' +
encoding='utf-8') as fp_reas: block_id)
fp_reas.write(reason_line)
except OSError:
print('EX: unable to add blocking reason ' +
block_id)
else: else:
reasons_str: str = '' reasons_str: str = \
try: load_string(blocking_reasons_filename,
with open(blocking_reasons_filename, 'r', 'EX: unable to read blocking reasons')
encoding='utf-8') as fp_reas: if reasons_str is None:
reasons_str = fp_reas.read() reasons_str = ''
except OSError:
print('EX: unable to read blocking reasons')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str: str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
@ -290,21 +273,13 @@ def _add_global_block_reason(base_dir: str,
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
continue continue
new_reasons_str += reason_line new_reasons_str += reason_line
try: save_string(new_reasons_str, blocking_reasons_filename,
with open(blocking_reasons_filename, 'w+', 'EX: unable to save blocking reasons' +
encoding='utf-8') as fp_reas: blocking_reasons_filename)
fp_reas.write(new_reasons_str)
except OSError:
print('EX: unable to save blocking reasons' +
blocking_reasons_filename)
else: else:
try: save_string(reason_line, blocking_reasons_filename,
with open(blocking_reasons_filename, 'w+', 'EX: unable to save blocking reason ' +
encoding='utf-8') as fp_reas: block_id + ' ' + blocking_reasons_filename)
fp_reas.write(reason_line)
except OSError:
print('EX: unable to save blocking reason ' +
block_id + ' ' + blocking_reasons_filename)
return True return True
@ -325,11 +300,9 @@ def add_global_block(base_dir: str,
if text_in_file(block_handle, blocking_filename): if text_in_file(block_handle, blocking_filename):
return False return False
# block an account handle or domain # block an account handle or domain
try: if not append_string(block_handle + '\n', blocking_filename,
with open(blocking_filename, 'a+', encoding='utf-8') as fp_block: 'EX: unable to save blocked handle ' +
fp_block.write(block_handle + '\n') block_handle):
except OSError:
print('EX: unable to save blocked handle ' + block_handle)
return False return False
else: else:
block_hashtag = block_nickname block_hashtag = block_nickname
@ -338,11 +311,9 @@ def add_global_block(base_dir: str,
if text_in_file(block_hashtag + '\n', blocking_filename): if text_in_file(block_hashtag + '\n', blocking_filename):
return False return False
# block a hashtag # block a hashtag
try: if not append_string(block_hashtag + '\n', blocking_filename,
with open(blocking_filename, 'a+', encoding='utf-8') as fp_block: 'EX: unable to save blocked hashtag ' +
fp_block.write(block_hashtag + '\n') block_hashtag):
except OSError:
print('EX: unable to save blocked hashtag ' + block_hashtag)
return False return False
return True return True
@ -372,21 +343,15 @@ def _add_block_reason(base_dir: str,
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
if not text_in_file(block_id, if not text_in_file(block_id,
blocking_reasons_filename): blocking_reasons_filename):
try: append_string(reason_line, blocking_reasons_filename,
with open(blocking_reasons_filename, 'a+', 'EX: unable to add blocking reason 2 ' +
encoding='utf-8') as fp_reas: block_id)
fp_reas.write(reason_line)
except OSError:
print('EX: unable to add blocking reason 2 ' +
block_id)
else: else:
reasons_str: str = '' reasons_str: str = \
try: load_string(blocking_reasons_filename,
with open(blocking_reasons_filename, 'r', 'EX: unable to read blocking reasons 2')
encoding='utf-8') as fp_reas: if reasons_str is None:
reasons_str = fp_reas.read() reasons_str = ''
except OSError:
print('EX: unable to read blocking reasons 2')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str: str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
@ -394,21 +359,13 @@ def _add_block_reason(base_dir: str,
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
continue continue
new_reasons_str += reason_line new_reasons_str += reason_line
try: save_string(new_reasons_str, blocking_reasons_filename,
with open(blocking_reasons_filename, 'w+', 'EX: unable to save blocking reasons 2' +
encoding='utf-8') as fp_reas: blocking_reasons_filename)
fp_reas.write(new_reasons_str)
except OSError:
print('EX: unable to save blocking reasons 2' +
blocking_reasons_filename)
else: else:
try: save_string(reason_line, blocking_reasons_filename,
with open(blocking_reasons_filename, 'w+', 'EX: unable to save blocking reason 2 ' +
encoding='utf-8') as fp_reas: block_id + ' ' + blocking_reasons_filename)
fp_reas.write(reason_line)
except OSError:
print('EX: unable to save blocking reason 2 ' +
block_id + ' ' + blocking_reasons_filename)
return True return True
@ -433,24 +390,19 @@ def add_block(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/following.txt' acct_dir(base_dir, nickname, domain) + '/following.txt'
if os.path.isfile(following_filename): if os.path.isfile(following_filename):
if text_in_file(block_handle + '\n', following_filename): if text_in_file(block_handle + '\n', following_filename):
following_str: str = '' following_str: str = \
try: load_string(following_filename,
with open(following_filename, 'r', 'EX: Unable to read following ' +
encoding='utf-8') as fp_foll: following_filename)
following_str = fp_foll.read() if following_str is None:
except OSError:
print('EX: Unable to read following ' + following_filename)
return False return False
if following_str: if following_str:
following_str = following_str.replace(block_handle + '\n', '') following_str = following_str.replace(block_handle + '\n', '')
try: if not save_string(following_str, following_filename,
with open(following_filename, 'w+', 'EX: Unable to write following ' +
encoding='utf-8') as fp_foll: following_str):
fp_foll.write(following_str)
except OSError:
print('EX: Unable to write following ' + following_str)
return False return False
# if they are a follower then remove them # if they are a follower then remove them
@ -458,31 +410,23 @@ def add_block(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/followers.txt' acct_dir(base_dir, nickname, domain) + '/followers.txt'
if os.path.isfile(followers_filename): if os.path.isfile(followers_filename):
if text_in_file(block_handle + '\n', followers_filename): if text_in_file(block_handle + '\n', followers_filename):
followers_str: str = '' followers_str: str = \
try: load_string(followers_filename,
with open(followers_filename, 'r', 'EX: Unable to read followers ' +
encoding='utf-8') as fp_foll: followers_filename)
followers_str = fp_foll.read() if followers_str is None:
except OSError:
print('EX: Unable to read followers ' + followers_filename)
return False return False
if followers_str: if followers_str:
followers_str = followers_str.replace(block_handle + '\n', '') followers_str = followers_str.replace(block_handle + '\n', '')
try: if not save_string(followers_str, followers_filename,
with open(followers_filename, 'w+', 'EX: Unable to write followers ' +
encoding='utf-8') as fp_foll: followers_str):
fp_foll.write(followers_str)
except OSError:
print('EX: Unable to write followers ' + followers_str)
return False return False
if not append_string(block_handle + '\n', blocking_filename,
try: 'EX: unable to append block handle ' +
with open(blocking_filename, 'a+', encoding='utf-8') as fp_block: block_handle):
fp_block.write(block_handle + '\n')
except OSError:
print('EX: unable to append block handle ' + block_handle)
return False return False
if reason: if reason:
@ -509,26 +453,20 @@ def _remove_global_block_reason(base_dir: str,
if not text_in_file(unblock_id + ' ', unblocking_filename): if not text_in_file(unblock_id + ' ', unblocking_filename):
return False return False
reasons_str: str = '' reasons_str: str = \
try: load_string(unblocking_filename,
with open(unblocking_filename, 'r', 'EX: unable to read blocking reasons 3')
encoding='utf-8') as fp_reas: if reasons_str is None:
reasons_str = fp_reas.read() reasons_str = ''
except OSError:
print('EX: unable to read blocking reasons 3')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str: str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
if line.startswith(unblock_id + ' '): if line.startswith(unblock_id + ' '):
continue continue
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
try: save_string(new_reasons_str, unblocking_filename,
with open(unblocking_filename, 'w+', 'EX: unable to save blocking reasons 2' +
encoding='utf-8') as fp_reas: unblocking_filename)
fp_reas.write(new_reasons_str)
except OSError:
print('EX: unable to save blocking reasons 2' +
unblocking_filename)
return True return True
@ -719,13 +657,12 @@ def is_blocked_domain(base_dir: str, domain: str,
# instance block list # instance block list
global_blocking_filename = data_dir(base_dir) + '/blocking.txt' global_blocking_filename = data_dir(base_dir) + '/blocking.txt'
if os.path.isfile(global_blocking_filename): if os.path.isfile(global_blocking_filename):
try: blocked_cache_str = \
with open(global_blocking_filename, 'r', load_string(global_blocking_filename,
encoding='utf-8') as fp_blocked: 'EX: is_blocked_domain unable to read ' +
blocked_cache = fp_blocked.read().split('\n') global_blocking_filename + ' [ex]')
except OSError as ex: if blocked_cache_str:
print('EX: is_blocked_domain unable to read ' + blocked_cache = blocked_cache_str.split('\n')
global_blocking_filename + ' ' + str(ex))
if blocked_cache: if blocked_cache:
blocked_cache_list = blocked_cache blocked_cache_list = blocked_cache
else: else:
@ -783,13 +720,12 @@ def is_blocked_nickname(base_dir: str, nickname: str,
blocked_cache = [] blocked_cache = []
global_blocking_filename = data_dir(base_dir) + '/blocking.txt' global_blocking_filename = data_dir(base_dir) + '/blocking.txt'
if os.path.isfile(global_blocking_filename): if os.path.isfile(global_blocking_filename):
try: blocked_cache_str = \
with open(global_blocking_filename, 'r', load_string(global_blocking_filename,
encoding='utf-8') as fp_blocked: 'EX: is_blocked_nickname unable to read ' +
blocked_cache = fp_blocked.read().split('\n') global_blocking_filename + ' [ex]')
except OSError as ex: if blocked_cache_str:
print('EX: is_blocked_nickname unable to read ' + blocked_cache = blocked_cache_str.split('\n')
global_blocking_filename + ' ' + str(ex))
if blocked_cache: if blocked_cache:
search_str = nickname + '@*' search_str = nickname + '@*'
@ -877,13 +813,12 @@ def is_blocked(base_dir: str, nickname: str, domain: str,
data_dir(base_dir) + '/block_api.txt' data_dir(base_dir) + '/block_api.txt'
if os.path.isfile(federated_blocks_filename): if os.path.isfile(federated_blocks_filename):
block_federated: list[str] = [] block_federated: list[str] = []
try: block_federated_str = \
with open(federated_blocks_filename, 'r', load_string(federated_blocks_filename,
encoding='utf-8') as fp_fed: 'EX: is_blocked unable to load ' +
block_federated = fp_fed.read().split('\n') federated_blocks_filename)
except OSError: if block_federated_str:
print('EX: is_blocked unable to load ' + block_federated = block_federated_str.split('\n')
federated_blocks_filename)
if block_domain in block_federated: if block_domain in block_federated:
return True return True
if block_handle: if block_handle:
@ -997,14 +932,12 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str,
handle = following_nickname + '@' + following_domain handle = following_nickname + '@' + following_domain
if text_in_file(handle + '\n', blocking_filename, False): if text_in_file(handle + '\n', blocking_filename, False):
file_text: str = '' file_text: str = \
try: load_string(blocking_filename,
with open(blocking_filename, 'r', 'EX: unable to read noannounce add: ' +
encoding='utf-8') as fp_noannounce: blocking_filename + ' ' + handle)
file_text = fp_noannounce.read() if file_text is None:
except OSError: file_text = ''
print('EX: unable to read noannounce add: ' +
blocking_filename + ' ' + handle)
new_file_text: str = '' new_file_text: str = ''
file_text_list = file_text.split('\n') file_text_list = file_text.split('\n')
@ -1014,13 +947,9 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str,
new_file_text += allowed + '\n' new_file_text += allowed + '\n'
file_text = new_file_text file_text = new_file_text
try: save_string(file_text, blocking_filename,
with open(blocking_filename, 'w+', 'EX: unable to write noannounce add: ' +
encoding='utf-8') as fp_noannounce: blocking_filename + ' ' + handle)
fp_noannounce.write(file_text)
except OSError:
print('EX: unable to write noannounce add: ' +
blocking_filename + ' ' + handle)
def allowed_announce_remove(base_dir: str, nickname: str, domain: str, def allowed_announce_remove(base_dir: str, nickname: str, domain: str,
@ -1035,32 +964,22 @@ def allowed_announce_remove(base_dir: str, nickname: str, domain: str,
# if the noannounce.txt file doesn't yet exist # if the noannounce.txt file doesn't yet exist
if not os.path.isfile(blocking_filename): if not os.path.isfile(blocking_filename):
file_text = handle + '\n' file_text = handle + '\n'
try: save_string(file_text, blocking_filename,
with open(blocking_filename, 'w+', 'EX: unable to write initial noannounce remove: ' +
encoding='utf-8') as fp_noannounce: blocking_filename + ' ' + handle)
fp_noannounce.write(file_text)
except OSError:
print('EX: unable to write initial noannounce remove: ' +
blocking_filename + ' ' + handle)
return return
file_text: str = '' file_text: str = ''
if not text_in_file(handle + '\n', blocking_filename, False): if not text_in_file(handle + '\n', blocking_filename, False):
try: file_text = load_string(blocking_filename,
with open(blocking_filename, 'r', 'EX: unable to read noannounce remove: ' +
encoding='utf-8') as fp_noannounce: blocking_filename + ' ' + handle)
file_text = fp_noannounce.read() if file_text is None:
except OSError: file_text = ''
print('EX: unable to read noannounce remove: ' +
blocking_filename + ' ' + handle)
file_text += handle + '\n' file_text += handle + '\n'
try: save_string(file_text, blocking_filename,
with open(blocking_filename, 'w+', 'EX: unable to write noannounce: ' +
encoding='utf-8') as fp_noannounce: blocking_filename + ' ' + handle)
fp_noannounce.write(file_text)
except OSError:
print('EX: unable to write noannounce: ' +
blocking_filename + ' ' + handle)
def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str, def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str,
@ -1077,23 +996,17 @@ def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str,
handle = following_nickname + '@' + following_domain handle = following_nickname + '@' + following_domain
if not text_in_file(handle + '\n', blocking_filename, False): if not text_in_file(handle + '\n', blocking_filename, False):
file_text: str = '' file_text: str = \
try: load_string(blocking_filename,
with open(blocking_filename, 'r', 'EX: unable to read quotesblocked add: ' +
encoding='utf-8') as fp_quotes: blocking_filename + ' ' + handle)
file_text = fp_quotes.read() if file_text is None:
except OSError: file_text = ''
print('EX: unable to read quotesblocked add: ' +
blocking_filename + ' ' + handle)
file_text += handle + '\n' file_text += handle + '\n'
try: save_string(file_text, blocking_filename,
with open(blocking_filename, 'w+', 'EX: unable to write quotesblocked add: ' +
encoding='utf-8') as fp_quotes: blocking_filename + ' ' + handle)
fp_quotes.write(file_text)
except OSError:
print('EX: unable to write quotesblocked add: ' +
blocking_filename + ' ' + handle)
def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str, def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str,
@ -1111,21 +1024,15 @@ def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str,
file_text: str = '' file_text: str = ''
if text_in_file(handle + '\n', blocking_filename, False): if text_in_file(handle + '\n', blocking_filename, False):
try: file_text = load_string(blocking_filename,
with open(blocking_filename, 'r', 'EX: unable to read quotesblocked remove: ' +
encoding='utf-8') as fp_quotes: blocking_filename + ' ' + handle)
file_text = fp_quotes.read() if file_text is None:
except OSError: file_text = ''
print('EX: unable to read quotesblocked remove: ' +
blocking_filename + ' ' + handle)
file_text = file_text.replace(handle + '\n', '') file_text = file_text.replace(handle + '\n', '')
try: save_string(file_text, blocking_filename,
with open(blocking_filename, 'w+', 'EX: unable to write quotesblocked remove: ' +
encoding='utf-8') as fp_quotes: blocking_filename + ' ' + handle)
fp_quotes.write(file_text)
except OSError:
print('EX: unable to write quotesblocked remove: ' +
blocking_filename + ' ' + handle)
def outbox_block(base_dir: str, nickname: str, domain: str, def outbox_block(base_dir: str, nickname: str, domain: str,
@ -1340,12 +1247,9 @@ def mute_post(base_dir: str, nickname: str, domain: str, port: int,
else: else:
print('MUTE: cached post not found ' + cached_post_filename) print('MUTE: cached post not found ' + cached_post_filename)
try: if not save_string('\n', post_filename + '.muted',
with open(post_filename + '.muted', 'w+', 'EX: Failed to save mute file ' +
encoding='utf-8') as fp_mute: post_filename + '.muted'):
fp_mute.write('\n')
except OSError:
print('EX: Failed to save mute file ' + post_filename + '.muted')
return return
print('MUTE: ' + post_filename + '.muted file added') print('MUTE: ' + post_filename + '.muted file added')
@ -1784,24 +1688,24 @@ def import_blocking_file(base_dir: str, nickname: str, domain: str,
existing_lines: list[str] = [] existing_lines: list[str] = []
if os.path.isfile(blocking_filename): if os.path.isfile(blocking_filename):
try: existing_lines_str = \
with open(blocking_filename, 'r', encoding='utf-8') as fp_blocks: load_string(blocking_filename,
existing_lines = fp_blocks.read().splitlines() 'EX: ' +
except OSError: 'unable to import existing ' +
print('EX: ' + 'blocked instances from file ' +
'unable to import existing blocked instances from file ' + blocking_filename)
blocking_filename) if existing_lines_str:
existing_lines = existing_lines_str.splitlines()
existing_reasons: list[str] = [] existing_reasons: list[str] = []
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
try: existing_reasons_str = \
with open(blocking_reasons_filename, load_string(blocking_reasons_filename,
'r', encoding='utf-8') as fp_blocks: 'EX: ' +
existing_reasons = fp_blocks.read().splitlines() 'unable to import existing ' +
except OSError: 'blocked instance reasons from file ' +
print('EX: ' + blocking_reasons_filename)
'unable to import existing ' + if existing_reasons_str:
'blocked instance reasons from file ' + existing_reasons = existing_reasons_str.splitlines()
blocking_reasons_filename)
append_blocks: list[str] = [] append_blocks: list[str] = []
append_reasons: list[str] = [] append_reasons: list[str] = []
@ -1876,21 +1780,21 @@ def export_blocking_file(base_dir: str, nickname: str, domain: str) -> str:
blocking_lines: list[str] = [] blocking_lines: list[str] = []
if os.path.isfile(blocking_filename): if os.path.isfile(blocking_filename):
try: blocking_lines_str = \
with open(blocking_filename, 'r', encoding='utf-8') as fp_block: load_string(blocking_filename,
blocking_lines = fp_block.read().splitlines() 'EX: export_blocks failed to read ' +
except OSError: blocking_filename)
print('EX: export_blocks failed to read ' + blocking_filename) if blocking_lines_str:
blocking_lines = blocking_lines_str.splitlines()
blocking_reasons: list[str] = [] blocking_reasons: list[str] = []
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
try: blocking_reasons_str = \
with open(blocking_reasons_filename, 'r', load_string(blocking_reasons_filename,
encoding='utf-8') as fp_block: 'EX: export_blocks failed to read ' +
blocking_reasons = fp_block.read().splitlines() blocking_reasons_filename)
except OSError: if blocking_reasons_str:
print('EX: export_blocks failed to read ' + blocking_reasons = blocking_reasons_str.splitlines()
blocking_reasons_filename)
blocks_str = blocks_header blocks_str = blocks_header
for blocked_domain in blocking_lines: for blocked_domain in blocking_lines:
@ -1954,15 +1858,13 @@ def load_blocked_military(base_dir: str) -> {}:
block_military_filename = data_dir(base_dir) + '/block_military.txt' block_military_filename = data_dir(base_dir) + '/block_military.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_military_filename): if os.path.isfile(block_military_filename):
try: nicknames_list_str = \
with open(block_military_filename, 'r', load_string(block_military_filename,
encoding='utf-8') as fp_mil: 'EX: error while reading block military file')
nicknames_list = fp_mil.read() if nicknames_list_str:
except OSError: nicknames_list = nicknames_list_str.split('\n')
print('EX: error while reading block military file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -1975,15 +1877,13 @@ def load_blocked_government(base_dir: str) -> {}:
block_government_filename = data_dir(base_dir) + '/block_government.txt' block_government_filename = data_dir(base_dir) + '/block_government.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_government_filename): if os.path.isfile(block_government_filename):
try: nicknames_list_str = \
with open(block_government_filename, 'r', load_string(block_government_filename,
encoding='utf-8') as fp_gov: 'EX: error while reading block government file')
nicknames_list = fp_gov.read() if nicknames_list_str:
except OSError: nicknames_list = nicknames_list_str.split('\n')
print('EX: error while reading block government file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -1996,15 +1896,13 @@ def load_blocked_bluesky(base_dir: str) -> {}:
block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt' block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_bluesky_filename): if os.path.isfile(block_bluesky_filename):
try: nicknames_list_str = \
with open(block_bluesky_filename, 'r', load_string(block_bluesky_filename,
encoding='utf-8') as fp_bsky: 'EX: error while reading block bluesky file')
nicknames_list = fp_bsky.read() if nicknames_list_str:
except OSError: nicknames_list = nicknames_list_str.split('\n')
print('EX: error while reading block bluesky file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -2017,15 +1915,13 @@ def load_blocked_nostr(base_dir: str) -> {}:
block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt' block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_nostr_filename): if os.path.isfile(block_nostr_filename):
try: nicknames_list_str = \
with open(block_nostr_filename, 'r', load_string(block_nostr_filename,
encoding='utf-8') as fp_nostr: 'EX: error while reading block nostr file')
nicknames_list = fp_nostr.read() if nicknames_list_str:
except OSError: nicknames_list = nicknames_list_str.split('\n')
print('EX: error while reading block nostr file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -2040,12 +1936,8 @@ def save_blocked_military(base_dir: str, block_military: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_military_filename = data_dir(base_dir) + '/block_military.txt' block_military_filename = data_dir(base_dir) + '/block_military.txt'
try: save_string(nicknames_str, block_military_filename,
with open(block_military_filename, 'w+', 'EX: error while saving block military file')
encoding='utf-8') as fp_mil:
fp_mil.write(nicknames_str)
except OSError:
print('EX: error while saving block military file')
def save_blocked_government(base_dir: str, block_government: {}) -> None: def save_blocked_government(base_dir: str, block_government: {}) -> None:
@ -2056,12 +1948,8 @@ def save_blocked_government(base_dir: str, block_government: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_government_filename = data_dir(base_dir) + '/block_government.txt' block_government_filename = data_dir(base_dir) + '/block_government.txt'
try: save_string(nicknames_str, block_government_filename,
with open(block_government_filename, 'w+', 'EX: error while saving block government file')
encoding='utf-8') as fp_gov:
fp_gov.write(nicknames_str)
except OSError:
print('EX: error while saving block government file')
def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None: def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None:
@ -2072,12 +1960,8 @@ def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt' block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt'
try: save_string(nicknames_str, block_bluesky_filename,
with open(block_bluesky_filename, 'w+', 'EX: error while saving block bluesky file')
encoding='utf-8') as fp_bsky:
fp_bsky.write(nicknames_str)
except OSError:
print('EX: error while saving block bluesky file')
def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None: def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None:
@ -2088,12 +1972,8 @@ def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt' block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt'
try: save_string(nicknames_str, block_nostr_filename,
with open(block_nostr_filename, 'w+', 'EX: error while saving block nostr file')
encoding='utf-8') as fp_nostr:
fp_nostr.write(nicknames_str)
except OSError:
print('EX: error while saving block nostr file')
def get_mil_domains_list() -> []: def get_mil_domains_list() -> []:
@ -2183,12 +2063,12 @@ def load_federated_blocks_endpoints(base_dir: str) -> []:
data_dir(base_dir) + '/block_api_endpoints.txt' data_dir(base_dir) + '/block_api_endpoints.txt'
if os.path.isfile(block_api_endpoints_filename): if os.path.isfile(block_api_endpoints_filename):
new_block_federated_endpoints: list[str] = [] new_block_federated_endpoints: list[str] = []
try: new_block_federated_endpoints_str = \
with open(block_api_endpoints_filename, 'r', load_string(block_api_endpoints_filename,
encoding='utf-8') as fp_ep: 'EX: unable to load block_api_endpoints.txt')
new_block_federated_endpoints = fp_ep.read().split('\n') if new_block_federated_endpoints_str:
except OSError: new_block_federated_endpoints = \
print('EX: unable to load block_api_endpoints.txt') new_block_federated_endpoints_str.split('\n')
for endpoint in new_block_federated_endpoints: for endpoint in new_block_federated_endpoints:
if endpoint: if endpoint:
if '#' not in endpoint: if '#' not in endpoint:
@ -2305,11 +2185,8 @@ def _update_federated_blocks(session, base_dir: str,
print('EX: unable to remove block api: ' + block_api_filename) print('EX: unable to remove block api: ' + block_api_filename)
else: else:
print('DEBUG: federated blocklist loaded: ' + str(block_federated)) print('DEBUG: federated blocklist loaded: ' + str(block_federated))
try: save_string(new_block_api_str, block_api_filename,
with open(block_api_filename, 'w+', encoding='utf-8') as fp_api: 'EX: unable to write block_api.txt')
fp_api.write(new_block_api_str)
except OSError:
print('EX: unable to write block_api.txt')
return block_federated return block_federated
@ -2348,12 +2225,9 @@ def save_block_federated_endpoints(base_dir: str,
except OSError: except OSError:
print('EX: unable to delete block_api.txt') print('EX: unable to delete block_api.txt')
else: else:
try: save_string(block_federated_endpoints_str,
with open(block_api_endpoints_filename, 'w+', block_api_endpoints_filename,
encoding='utf-8') as fp_api: 'EX: unable to write block_api_endpoints.txt')
fp_api.write(block_federated_endpoints_str)
except OSError:
print('EX: unable to write block_api_endpoints.txt')
return result return result