Revert "Replace save, load and append with functions"

This reverts commit 6073d29035.
main
bashrc 2026-04-26 17:13:28 +01:00
parent e5166d9636
commit c7e945567b
1 changed files with 328 additions and 202 deletions

View File

@ -53,9 +53,6 @@ from conversation import mute_conversation
from conversation import unmute_conversation from conversation import unmute_conversation
from auth import create_basic_auth_header from auth import create_basic_auth_header
from session import get_json from session import get_json
from data import load_string
from data import save_string
from data import append_string
def get_global_block_reason(search_text: str, def get_global_block_reason(search_text: str,
@ -65,10 +62,14 @@ def get_global_block_reason(search_text: str,
if not text_in_file(search_text, blocking_reasons_filename): if not text_in_file(search_text, blocking_reasons_filename):
return '' return ''
reasons_str: str = \ reasons_str: str = ''
load_string(blocking_reasons_filename, try:
'WARN: Failed to read blocking reasons ' + with open(blocking_reasons_filename, 'r',
blocking_reasons_filename) encoding='utf-8') as fp_reas:
reasons_str = fp_reas.read()
except OSError:
print('WARN: Failed to read blocking reasons ' +
blocking_reasons_filename)
if not reasons_str: if not reasons_str:
return '' return ''
@ -96,11 +97,13 @@ def get_account_blocks(base_dir: str,
return '' return ''
blocked_accounts_textarea: str = '' blocked_accounts_textarea: str = ''
blocking_file_text: str = \ blocking_file_text: str = ''
load_string(blocking_filename, try:
'EX: Failed to read account blocks ' + with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
blocking_filename) blocking_file_text = fp_block.read()
if blocking_file_text is None: except OSError:
if debug:
print('EX: Failed to read account blocks ' + blocking_filename)
return '' return ''
blocklist = blocking_file_text.split('\n') blocklist = blocking_file_text.split('\n')
@ -227,10 +230,18 @@ def add_account_blocks(base_dir: str,
blocking_reasons_filename) blocking_reasons_filename)
return True return True
save_string(blocking_file_text, blocking_filename, try:
'EX: Failed to write ' + blocking_filename) with open(blocking_filename, 'w+', encoding='utf-8') as fp_block:
save_string(blocking_reasons_file_text, blocking_reasons_filename, fp_block.write(blocking_file_text)
'EX: Failed to write ' + blocking_reasons_filename) except OSError:
print('EX: Failed to write ' + blocking_filename)
try:
with open(blocking_reasons_filename, 'w+',
encoding='utf-8') as fp_block:
fp_block.write(blocking_reasons_file_text)
except OSError:
print('EX: Failed to write ' + blocking_reasons_filename)
return True return True
@ -257,15 +268,21 @@ def _add_global_block_reason(base_dir: str,
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
if not text_in_file(block_id, if not text_in_file(block_id,
blocking_reasons_filename): blocking_reasons_filename):
append_string(reason_line, blocking_reasons_filename, try:
'EX: unable to add blocking reason ' + with open(blocking_reasons_filename, 'a+',
block_id) encoding='utf-8') as fp_reas:
fp_reas.write(reason_line)
except OSError:
print('EX: unable to add blocking reason ' +
block_id)
else: else:
reasons_str: str = \ reasons_str: str = ''
load_string(blocking_reasons_filename, try:
'EX: unable to read blocking reasons') with open(blocking_reasons_filename, 'r',
if reasons_str is None: encoding='utf-8') as fp_reas:
reasons_str = '' reasons_str = fp_reas.read()
except OSError:
print('EX: unable to read blocking reasons')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str: str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
@ -273,13 +290,21 @@ def _add_global_block_reason(base_dir: str,
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
continue continue
new_reasons_str += reason_line new_reasons_str += reason_line
save_string(new_reasons_str, blocking_reasons_filename, try:
'EX: unable to save blocking reasons' + with open(blocking_reasons_filename, 'w+',
blocking_reasons_filename) encoding='utf-8') as fp_reas:
fp_reas.write(new_reasons_str)
except OSError:
print('EX: unable to save blocking reasons' +
blocking_reasons_filename)
else: else:
save_string(reason_line, blocking_reasons_filename, try:
'EX: unable to save blocking reason ' + with open(blocking_reasons_filename, 'w+',
block_id + ' ' + blocking_reasons_filename) encoding='utf-8') as fp_reas:
fp_reas.write(reason_line)
except OSError:
print('EX: unable to save blocking reason ' +
block_id + ' ' + blocking_reasons_filename)
return True return True
@ -300,9 +325,11 @@ def add_global_block(base_dir: str,
if text_in_file(block_handle, blocking_filename): if text_in_file(block_handle, blocking_filename):
return False return False
# block an account handle or domain # block an account handle or domain
if not append_string(block_handle + '\n', blocking_filename, try:
'EX: unable to save blocked handle ' + with open(blocking_filename, 'a+', encoding='utf-8') as fp_block:
block_handle): fp_block.write(block_handle + '\n')
except OSError:
print('EX: unable to save blocked handle ' + block_handle)
return False return False
else: else:
block_hashtag = block_nickname block_hashtag = block_nickname
@ -311,9 +338,11 @@ def add_global_block(base_dir: str,
if text_in_file(block_hashtag + '\n', blocking_filename): if text_in_file(block_hashtag + '\n', blocking_filename):
return False return False
# block a hashtag # block a hashtag
if not append_string(block_hashtag + '\n', blocking_filename, try:
'EX: unable to save blocked hashtag ' + with open(blocking_filename, 'a+', encoding='utf-8') as fp_block:
block_hashtag): fp_block.write(block_hashtag + '\n')
except OSError:
print('EX: unable to save blocked hashtag ' + block_hashtag)
return False return False
return True return True
@ -343,15 +372,21 @@ def _add_block_reason(base_dir: str,
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
if not text_in_file(block_id, if not text_in_file(block_id,
blocking_reasons_filename): blocking_reasons_filename):
append_string(reason_line, blocking_reasons_filename, try:
'EX: unable to add blocking reason 2 ' + with open(blocking_reasons_filename, 'a+',
block_id) encoding='utf-8') as fp_reas:
fp_reas.write(reason_line)
except OSError:
print('EX: unable to add blocking reason 2 ' +
block_id)
else: else:
reasons_str: str = \ reasons_str: str = ''
load_string(blocking_reasons_filename, try:
'EX: unable to read blocking reasons 2') with open(blocking_reasons_filename, 'r',
if reasons_str is None: encoding='utf-8') as fp_reas:
reasons_str = '' reasons_str = fp_reas.read()
except OSError:
print('EX: unable to read blocking reasons 2')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str: str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
@ -359,13 +394,21 @@ def _add_block_reason(base_dir: str,
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
continue continue
new_reasons_str += reason_line new_reasons_str += reason_line
save_string(new_reasons_str, blocking_reasons_filename, try:
'EX: unable to save blocking reasons 2' + with open(blocking_reasons_filename, 'w+',
blocking_reasons_filename) encoding='utf-8') as fp_reas:
fp_reas.write(new_reasons_str)
except OSError:
print('EX: unable to save blocking reasons 2' +
blocking_reasons_filename)
else: else:
save_string(reason_line, blocking_reasons_filename, try:
'EX: unable to save blocking reason 2 ' + with open(blocking_reasons_filename, 'w+',
block_id + ' ' + blocking_reasons_filename) encoding='utf-8') as fp_reas:
fp_reas.write(reason_line)
except OSError:
print('EX: unable to save blocking reason 2 ' +
block_id + ' ' + blocking_reasons_filename)
return True return True
@ -390,19 +433,24 @@ def add_block(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/following.txt' acct_dir(base_dir, nickname, domain) + '/following.txt'
if os.path.isfile(following_filename): if os.path.isfile(following_filename):
if text_in_file(block_handle + '\n', following_filename): if text_in_file(block_handle + '\n', following_filename):
following_str: str = \ following_str: str = ''
load_string(following_filename, try:
'EX: Unable to read following ' + with open(following_filename, 'r',
following_filename) encoding='utf-8') as fp_foll:
if following_str is None: following_str = fp_foll.read()
except OSError:
print('EX: Unable to read following ' + following_filename)
return False return False
if following_str: if following_str:
following_str = following_str.replace(block_handle + '\n', '') following_str = following_str.replace(block_handle + '\n', '')
if not save_string(following_str, following_filename, try:
'EX: Unable to write following ' + with open(following_filename, 'w+',
following_str): encoding='utf-8') as fp_foll:
fp_foll.write(following_str)
except OSError:
print('EX: Unable to write following ' + following_str)
return False return False
# if they are a follower then remove them # if they are a follower then remove them
@ -410,23 +458,31 @@ def add_block(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/followers.txt' acct_dir(base_dir, nickname, domain) + '/followers.txt'
if os.path.isfile(followers_filename): if os.path.isfile(followers_filename):
if text_in_file(block_handle + '\n', followers_filename): if text_in_file(block_handle + '\n', followers_filename):
followers_str: str = \ followers_str: str = ''
load_string(followers_filename, try:
'EX: Unable to read followers ' + with open(followers_filename, 'r',
followers_filename) encoding='utf-8') as fp_foll:
if followers_str is None: followers_str = fp_foll.read()
except OSError:
print('EX: Unable to read followers ' + followers_filename)
return False return False
if followers_str: if followers_str:
followers_str = followers_str.replace(block_handle + '\n', '') followers_str = followers_str.replace(block_handle + '\n', '')
if not save_string(followers_str, followers_filename, try:
'EX: Unable to write followers ' + with open(followers_filename, 'w+',
followers_str): encoding='utf-8') as fp_foll:
fp_foll.write(followers_str)
except OSError:
print('EX: Unable to write followers ' + followers_str)
return False return False
if not append_string(block_handle + '\n', blocking_filename,
'EX: unable to append block handle ' + try:
block_handle): with open(blocking_filename, 'a+', encoding='utf-8') as fp_block:
fp_block.write(block_handle + '\n')
except OSError:
print('EX: unable to append block handle ' + block_handle)
return False return False
if reason: if reason:
@ -453,20 +509,26 @@ def _remove_global_block_reason(base_dir: str,
if not text_in_file(unblock_id + ' ', unblocking_filename): if not text_in_file(unblock_id + ' ', unblocking_filename):
return False return False
reasons_str: str = \ reasons_str: str = ''
load_string(unblocking_filename, try:
'EX: unable to read blocking reasons 3') with open(unblocking_filename, 'r',
if reasons_str is None: encoding='utf-8') as fp_reas:
reasons_str = '' reasons_str = fp_reas.read()
except OSError:
print('EX: unable to read blocking reasons 3')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str: str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
if line.startswith(unblock_id + ' '): if line.startswith(unblock_id + ' '):
continue continue
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
save_string(new_reasons_str, unblocking_filename, try:
'EX: unable to save blocking reasons 2' + with open(unblocking_filename, 'w+',
unblocking_filename) encoding='utf-8') as fp_reas:
fp_reas.write(new_reasons_str)
except OSError:
print('EX: unable to save blocking reasons 2' +
unblocking_filename)
return True return True
@ -657,12 +719,13 @@ def is_blocked_domain(base_dir: str, domain: str,
# instance block list # instance block list
global_blocking_filename = data_dir(base_dir) + '/blocking.txt' global_blocking_filename = data_dir(base_dir) + '/blocking.txt'
if os.path.isfile(global_blocking_filename): if os.path.isfile(global_blocking_filename):
blocked_cache_str = \ try:
load_string(global_blocking_filename, with open(global_blocking_filename, 'r',
'EX: is_blocked_domain unable to read ' + encoding='utf-8') as fp_blocked:
global_blocking_filename + ' [ex]') blocked_cache = fp_blocked.read().split('\n')
if blocked_cache_str: except OSError as ex:
blocked_cache = blocked_cache_str.split('\n') print('EX: is_blocked_domain unable to read ' +
global_blocking_filename + ' ' + str(ex))
if blocked_cache: if blocked_cache:
blocked_cache_list = blocked_cache blocked_cache_list = blocked_cache
else: else:
@ -720,12 +783,13 @@ def is_blocked_nickname(base_dir: str, nickname: str,
blocked_cache = [] blocked_cache = []
global_blocking_filename = data_dir(base_dir) + '/blocking.txt' global_blocking_filename = data_dir(base_dir) + '/blocking.txt'
if os.path.isfile(global_blocking_filename): if os.path.isfile(global_blocking_filename):
blocked_cache_str = \ try:
load_string(global_blocking_filename, with open(global_blocking_filename, 'r',
'EX: is_blocked_nickname unable to read ' + encoding='utf-8') as fp_blocked:
global_blocking_filename + ' [ex]') blocked_cache = fp_blocked.read().split('\n')
if blocked_cache_str: except OSError as ex:
blocked_cache = blocked_cache_str.split('\n') print('EX: is_blocked_nickname unable to read ' +
global_blocking_filename + ' ' + str(ex))
if blocked_cache: if blocked_cache:
search_str = nickname + '@*' search_str = nickname + '@*'
@ -813,12 +877,13 @@ def is_blocked(base_dir: str, nickname: str, domain: str,
data_dir(base_dir) + '/block_api.txt' data_dir(base_dir) + '/block_api.txt'
if os.path.isfile(federated_blocks_filename): if os.path.isfile(federated_blocks_filename):
block_federated: list[str] = [] block_federated: list[str] = []
block_federated_str = \ try:
load_string(federated_blocks_filename, with open(federated_blocks_filename, 'r',
'EX: is_blocked unable to load ' + encoding='utf-8') as fp_fed:
federated_blocks_filename) block_federated = fp_fed.read().split('\n')
if block_federated_str: except OSError:
block_federated = block_federated_str.split('\n') print('EX: is_blocked unable to load ' +
federated_blocks_filename)
if block_domain in block_federated: if block_domain in block_federated:
return True return True
if block_handle: if block_handle:
@ -932,12 +997,14 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str,
handle = following_nickname + '@' + following_domain handle = following_nickname + '@' + following_domain
if text_in_file(handle + '\n', blocking_filename, False): if text_in_file(handle + '\n', blocking_filename, False):
file_text: str = \ file_text: str = ''
load_string(blocking_filename, try:
'EX: unable to read noannounce add: ' + with open(blocking_filename, 'r',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_noannounce:
if file_text is None: file_text = fp_noannounce.read()
file_text = '' except OSError:
print('EX: unable to read noannounce add: ' +
blocking_filename + ' ' + handle)
new_file_text: str = '' new_file_text: str = ''
file_text_list = file_text.split('\n') file_text_list = file_text.split('\n')
@ -947,9 +1014,13 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str,
new_file_text += allowed + '\n' new_file_text += allowed + '\n'
file_text = new_file_text file_text = new_file_text
save_string(file_text, blocking_filename, try:
'EX: unable to write noannounce add: ' + with open(blocking_filename, 'w+',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_noannounce:
fp_noannounce.write(file_text)
except OSError:
print('EX: unable to write noannounce add: ' +
blocking_filename + ' ' + handle)
def allowed_announce_remove(base_dir: str, nickname: str, domain: str, def allowed_announce_remove(base_dir: str, nickname: str, domain: str,
@ -964,22 +1035,32 @@ def allowed_announce_remove(base_dir: str, nickname: str, domain: str,
# if the noannounce.txt file doesn't yet exist # if the noannounce.txt file doesn't yet exist
if not os.path.isfile(blocking_filename): if not os.path.isfile(blocking_filename):
file_text = handle + '\n' file_text = handle + '\n'
save_string(file_text, blocking_filename, try:
'EX: unable to write initial noannounce remove: ' + with open(blocking_filename, 'w+',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_noannounce:
fp_noannounce.write(file_text)
except OSError:
print('EX: unable to write initial noannounce remove: ' +
blocking_filename + ' ' + handle)
return return
file_text: str = '' file_text: str = ''
if not text_in_file(handle + '\n', blocking_filename, False): if not text_in_file(handle + '\n', blocking_filename, False):
file_text = load_string(blocking_filename, try:
'EX: unable to read noannounce remove: ' + with open(blocking_filename, 'r',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_noannounce:
if file_text is None: file_text = fp_noannounce.read()
file_text = '' except OSError:
print('EX: unable to read noannounce remove: ' +
blocking_filename + ' ' + handle)
file_text += handle + '\n' file_text += handle + '\n'
save_string(file_text, blocking_filename, try:
'EX: unable to write noannounce: ' + with open(blocking_filename, 'w+',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_noannounce:
fp_noannounce.write(file_text)
except OSError:
print('EX: unable to write noannounce: ' +
blocking_filename + ' ' + handle)
def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str, def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str,
@ -996,17 +1077,23 @@ def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str,
handle = following_nickname + '@' + following_domain handle = following_nickname + '@' + following_domain
if not text_in_file(handle + '\n', blocking_filename, False): if not text_in_file(handle + '\n', blocking_filename, False):
file_text: str = \ file_text: str = ''
load_string(blocking_filename, try:
'EX: unable to read quotesblocked add: ' + with open(blocking_filename, 'r',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_quotes:
if file_text is None: file_text = fp_quotes.read()
file_text = '' except OSError:
print('EX: unable to read quotesblocked add: ' +
blocking_filename + ' ' + handle)
file_text += handle + '\n' file_text += handle + '\n'
save_string(file_text, blocking_filename, try:
'EX: unable to write quotesblocked add: ' + with open(blocking_filename, 'w+',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_quotes:
fp_quotes.write(file_text)
except OSError:
print('EX: unable to write quotesblocked add: ' +
blocking_filename + ' ' + handle)
def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str, def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str,
@ -1024,15 +1111,21 @@ def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str,
file_text: str = '' file_text: str = ''
if text_in_file(handle + '\n', blocking_filename, False): if text_in_file(handle + '\n', blocking_filename, False):
file_text = load_string(blocking_filename, try:
'EX: unable to read quotesblocked remove: ' + with open(blocking_filename, 'r',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_quotes:
if file_text is None: file_text = fp_quotes.read()
file_text = '' except OSError:
print('EX: unable to read quotesblocked remove: ' +
blocking_filename + ' ' + handle)
file_text = file_text.replace(handle + '\n', '') file_text = file_text.replace(handle + '\n', '')
save_string(file_text, blocking_filename, try:
'EX: unable to write quotesblocked remove: ' + with open(blocking_filename, 'w+',
blocking_filename + ' ' + handle) encoding='utf-8') as fp_quotes:
fp_quotes.write(file_text)
except OSError:
print('EX: unable to write quotesblocked remove: ' +
blocking_filename + ' ' + handle)
def outbox_block(base_dir: str, nickname: str, domain: str, def outbox_block(base_dir: str, nickname: str, domain: str,
@ -1247,9 +1340,12 @@ def mute_post(base_dir: str, nickname: str, domain: str, port: int,
else: else:
print('MUTE: cached post not found ' + cached_post_filename) print('MUTE: cached post not found ' + cached_post_filename)
if not save_string('\n', post_filename + '.muted', try:
'EX: Failed to save mute file ' + with open(post_filename + '.muted', 'w+',
post_filename + '.muted'): encoding='utf-8') as fp_mute:
fp_mute.write('\n')
except OSError:
print('EX: Failed to save mute file ' + post_filename + '.muted')
return return
print('MUTE: ' + post_filename + '.muted file added') print('MUTE: ' + post_filename + '.muted file added')
@ -1688,24 +1784,24 @@ def import_blocking_file(base_dir: str, nickname: str, domain: str,
existing_lines: list[str] = [] existing_lines: list[str] = []
if os.path.isfile(blocking_filename): if os.path.isfile(blocking_filename):
existing_lines_str = \ try:
load_string(blocking_filename, with open(blocking_filename, 'r', encoding='utf-8') as fp_blocks:
'EX: ' + existing_lines = fp_blocks.read().splitlines()
'unable to import existing ' + except OSError:
'blocked instances from file ' + print('EX: ' +
blocking_filename) 'unable to import existing blocked instances from file ' +
if existing_lines_str: blocking_filename)
existing_lines = existing_lines_str.splitlines()
existing_reasons: list[str] = [] existing_reasons: list[str] = []
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
existing_reasons_str = \ try:
load_string(blocking_reasons_filename, with open(blocking_reasons_filename,
'EX: ' + 'r', encoding='utf-8') as fp_blocks:
'unable to import existing ' + existing_reasons = fp_blocks.read().splitlines()
'blocked instance reasons from file ' + except OSError:
blocking_reasons_filename) print('EX: ' +
if existing_reasons_str: 'unable to import existing ' +
existing_reasons = existing_reasons_str.splitlines() 'blocked instance reasons from file ' +
blocking_reasons_filename)
append_blocks: list[str] = [] append_blocks: list[str] = []
append_reasons: list[str] = [] append_reasons: list[str] = []
@ -1780,21 +1876,21 @@ def export_blocking_file(base_dir: str, nickname: str, domain: str) -> str:
blocking_lines: list[str] = [] blocking_lines: list[str] = []
if os.path.isfile(blocking_filename): if os.path.isfile(blocking_filename):
blocking_lines_str = \ try:
load_string(blocking_filename, with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
'EX: export_blocks failed to read ' + blocking_lines = fp_block.read().splitlines()
blocking_filename) except OSError:
if blocking_lines_str: print('EX: export_blocks failed to read ' + blocking_filename)
blocking_lines = blocking_lines_str.splitlines()
blocking_reasons: list[str] = [] blocking_reasons: list[str] = []
if os.path.isfile(blocking_reasons_filename): if os.path.isfile(blocking_reasons_filename):
blocking_reasons_str = \ try:
load_string(blocking_reasons_filename, with open(blocking_reasons_filename, 'r',
'EX: export_blocks failed to read ' + encoding='utf-8') as fp_block:
blocking_reasons_filename) blocking_reasons = fp_block.read().splitlines()
if blocking_reasons_str: except OSError:
blocking_reasons = blocking_reasons_str.splitlines() print('EX: export_blocks failed to read ' +
blocking_reasons_filename)
blocks_str = blocks_header blocks_str = blocks_header
for blocked_domain in blocking_lines: for blocked_domain in blocking_lines:
@ -1858,13 +1954,15 @@ def load_blocked_military(base_dir: str) -> {}:
block_military_filename = data_dir(base_dir) + '/block_military.txt' block_military_filename = data_dir(base_dir) + '/block_military.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_military_filename): if os.path.isfile(block_military_filename):
nicknames_list_str = \ try:
load_string(block_military_filename, with open(block_military_filename, 'r',
'EX: error while reading block military file') encoding='utf-8') as fp_mil:
if nicknames_list_str: nicknames_list = fp_mil.read()
nicknames_list = nicknames_list_str.split('\n') except OSError:
print('EX: error while reading block military file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -1877,13 +1975,15 @@ def load_blocked_government(base_dir: str) -> {}:
block_government_filename = data_dir(base_dir) + '/block_government.txt' block_government_filename = data_dir(base_dir) + '/block_government.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_government_filename): if os.path.isfile(block_government_filename):
nicknames_list_str = \ try:
load_string(block_government_filename, with open(block_government_filename, 'r',
'EX: error while reading block government file') encoding='utf-8') as fp_gov:
if nicknames_list_str: nicknames_list = fp_gov.read()
nicknames_list = nicknames_list_str.split('\n') except OSError:
print('EX: error while reading block government file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -1896,13 +1996,15 @@ def load_blocked_bluesky(base_dir: str) -> {}:
block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt' block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_bluesky_filename): if os.path.isfile(block_bluesky_filename):
nicknames_list_str = \ try:
load_string(block_bluesky_filename, with open(block_bluesky_filename, 'r',
'EX: error while reading block bluesky file') encoding='utf-8') as fp_bsky:
if nicknames_list_str: nicknames_list = fp_bsky.read()
nicknames_list = nicknames_list_str.split('\n') except OSError:
print('EX: error while reading block bluesky file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -1915,13 +2017,15 @@ def load_blocked_nostr(base_dir: str) -> {}:
block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt' block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt'
nicknames_list: list[str] = [] nicknames_list: list[str] = []
if os.path.isfile(block_nostr_filename): if os.path.isfile(block_nostr_filename):
nicknames_list_str = \ try:
load_string(block_nostr_filename, with open(block_nostr_filename, 'r',
'EX: error while reading block nostr file') encoding='utf-8') as fp_nostr:
if nicknames_list_str: nicknames_list = fp_nostr.read()
nicknames_list = nicknames_list_str.split('\n') except OSError:
print('EX: error while reading block nostr file')
if not nicknames_list: if not nicknames_list:
return {} return {}
nicknames_list = nicknames_list.split('\n')
nicknames_dict = {} nicknames_dict = {}
for nickname in nicknames_list: for nickname in nicknames_list:
nicknames_dict[nickname] = True nicknames_dict[nickname] = True
@ -1936,8 +2040,12 @@ def save_blocked_military(base_dir: str, block_military: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_military_filename = data_dir(base_dir) + '/block_military.txt' block_military_filename = data_dir(base_dir) + '/block_military.txt'
save_string(nicknames_str, block_military_filename, try:
'EX: error while saving block military file') with open(block_military_filename, 'w+',
encoding='utf-8') as fp_mil:
fp_mil.write(nicknames_str)
except OSError:
print('EX: error while saving block military file')
def save_blocked_government(base_dir: str, block_government: {}) -> None: def save_blocked_government(base_dir: str, block_government: {}) -> None:
@ -1948,8 +2056,12 @@ def save_blocked_government(base_dir: str, block_government: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_government_filename = data_dir(base_dir) + '/block_government.txt' block_government_filename = data_dir(base_dir) + '/block_government.txt'
save_string(nicknames_str, block_government_filename, try:
'EX: error while saving block government file') with open(block_government_filename, 'w+',
encoding='utf-8') as fp_gov:
fp_gov.write(nicknames_str)
except OSError:
print('EX: error while saving block government file')
def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None: def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None:
@ -1960,8 +2072,12 @@ def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt' block_bluesky_filename = data_dir(base_dir) + '/block_bluesky.txt'
save_string(nicknames_str, block_bluesky_filename, try:
'EX: error while saving block bluesky file') with open(block_bluesky_filename, 'w+',
encoding='utf-8') as fp_bsky:
fp_bsky.write(nicknames_str)
except OSError:
print('EX: error while saving block bluesky file')
def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None: def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None:
@ -1972,8 +2088,12 @@ def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None:
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt' block_nostr_filename = data_dir(base_dir) + '/block_nostr.txt'
save_string(nicknames_str, block_nostr_filename, try:
'EX: error while saving block nostr file') with open(block_nostr_filename, 'w+',
encoding='utf-8') as fp_nostr:
fp_nostr.write(nicknames_str)
except OSError:
print('EX: error while saving block nostr file')
def get_mil_domains_list() -> []: def get_mil_domains_list() -> []:
@ -2063,12 +2183,12 @@ def load_federated_blocks_endpoints(base_dir: str) -> []:
data_dir(base_dir) + '/block_api_endpoints.txt' data_dir(base_dir) + '/block_api_endpoints.txt'
if os.path.isfile(block_api_endpoints_filename): if os.path.isfile(block_api_endpoints_filename):
new_block_federated_endpoints: list[str] = [] new_block_federated_endpoints: list[str] = []
new_block_federated_endpoints_str = \ try:
load_string(block_api_endpoints_filename, with open(block_api_endpoints_filename, 'r',
'EX: unable to load block_api_endpoints.txt') encoding='utf-8') as fp_ep:
if new_block_federated_endpoints_str: new_block_federated_endpoints = fp_ep.read().split('\n')
new_block_federated_endpoints = \ except OSError:
new_block_federated_endpoints_str.split('\n') print('EX: unable to load block_api_endpoints.txt')
for endpoint in new_block_federated_endpoints: for endpoint in new_block_federated_endpoints:
if endpoint: if endpoint:
if '#' not in endpoint: if '#' not in endpoint:
@ -2185,8 +2305,11 @@ def _update_federated_blocks(session, base_dir: str,
print('EX: unable to remove block api: ' + block_api_filename) print('EX: unable to remove block api: ' + block_api_filename)
else: else:
print('DEBUG: federated blocklist loaded: ' + str(block_federated)) print('DEBUG: federated blocklist loaded: ' + str(block_federated))
save_string(new_block_api_str, block_api_filename, try:
'EX: unable to write block_api.txt') with open(block_api_filename, 'w+', encoding='utf-8') as fp_api:
fp_api.write(new_block_api_str)
except OSError:
print('EX: unable to write block_api.txt')
return block_federated return block_federated
@ -2225,9 +2348,12 @@ def save_block_federated_endpoints(base_dir: str,
except OSError: except OSError:
print('EX: unable to delete block_api.txt') print('EX: unable to delete block_api.txt')
else: else:
save_string(block_federated_endpoints_str, try:
block_api_endpoints_filename, with open(block_api_endpoints_filename, 'w+',
'EX: unable to write block_api_endpoints.txt') encoding='utf-8') as fp_api:
fp_api.write(block_federated_endpoints_str)
except OSError:
print('EX: unable to write block_api_endpoints.txt')
return result return result