Replace file operations with functions

main
bashrc 2026-04-29 17:42:08 +01:00
parent 946c584b98
commit edd183ed2c
2 changed files with 105 additions and 91 deletions

View File

@ -17,6 +17,8 @@ from unicodetext import standardize_text
from unicodetext import remove_inverted_text from unicodetext import remove_inverted_text
from unicodetext import remove_square_capitals from unicodetext import remove_square_capitals
from data import append_string from data import append_string
from data import save_string
from data import load_list
def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool: def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
@ -61,17 +63,23 @@ def remove_filter(base_dir: str, nickname: str, domain: str,
if not text_in_file(words, filters_filename): if not text_in_file(words, filters_filename):
return False return False
new_filters_filename = filters_filename + '.new' new_filters_filename = filters_filename + '.new'
try:
with open(filters_filename, 'r', encoding='utf-8') as fp_filt: filters_list: list[str] = \
with open(new_filters_filename, 'w+', encoding='utf-8') as fp_new: load_list(filters_filename,
for line in fp_filt: 'EX: unable to remove filter ' +
line = remove_eol(line) filters_filename + ' 1 [ex]')
if line != words: if filters_list is None:
fp_new.write(line + '\n')
except OSError as ex:
print('EX: unable to remove filter ' +
filters_filename + ' ' + str(ex))
return False return False
text: str = ''
for line in filters_list:
line = remove_eol(line)
if line != words:
text += line + '\n'
save_string(text, new_filters_filename,
'EX: unable to remove filter ' +
filters_filename + ' 2 [ex]')
if os.path.isfile(new_filters_filename): if os.path.isfile(new_filters_filename):
try: try:
os.rename(new_filters_filename, filters_filename) os.rename(new_filters_filename, filters_filename)
@ -91,17 +99,23 @@ def remove_global_filter(base_dir: str, words: str) -> bool:
if not text_in_file(words, filters_filename): if not text_in_file(words, filters_filename):
return False return False
new_filters_filename = filters_filename + '.new' new_filters_filename = filters_filename + '.new'
try:
with open(filters_filename, 'r', encoding='utf-8') as fp_filt: global_list: list[str] = \
with open(new_filters_filename, 'w+', encoding='utf-8') as fp_new: load_list(filters_filename,
for line in fp_filt: 'EX: unable to remove global filter ' +
line = remove_eol(line) filters_filename + ' [ex]')
if line != words: if global_list is None:
fp_new.write(line + '\n')
except OSError as ex:
print('EX: unable to remove global filter ' +
filters_filename + ' ' + str(ex))
return False return False
text: str = ''
for line in global_list:
line = remove_eol(line)
if line != words:
text += line + '\n'
save_string(text, new_filters_filename,
'EX: unable to remove global filter ' +
filters_filename + ' 2 [ex]')
if os.path.isfile(new_filters_filename): if os.path.isfile(new_filters_filename):
try: try:
os.rename(new_filters_filename, filters_filename) os.rename(new_filters_filename, filters_filename)
@ -152,25 +166,25 @@ def _is_filtered_base(filename: str, content: str,
# convert any fancy characters to ordinary ones # convert any fancy characters to ordinary ones
content = standardize_text(content) content = standardize_text(content)
try: filtered_list: list[str] = \
with open(filename, 'r', encoding='utf-8') as fp_filt: load_list(filename,
for line in fp_filt: 'EX: _is_filtered_base ' + filename + ' [ex]')
filter_str = remove_eol(line) if filtered_list is not None:
if not filter_str: for line in filtered_list:
continue filter_str = remove_eol(line)
if len(filter_str) < 2: if not filter_str:
continue continue
if '+' not in filter_str: if len(filter_str) < 2:
if filtered_match(filter_str, content): continue
return True if '+' not in filter_str:
else: if filtered_match(filter_str, content):
filter_words = filter_str.replace('"', '').split('+')
for filter_wrd in filter_words:
if not filtered_match(filter_wrd, content):
return False
return True return True
except OSError as ex: else:
print('EX: _is_filtered_base ' + filename + ' ' + str(ex)) filter_words = filter_str.replace('"', '').split('+')
for filter_wrd in filter_words:
if not filtered_match(filter_wrd, content):
return False
return True
return False return False

106
follow.py
View File

@ -144,23 +144,25 @@ def _remove_from_follow_base(base_dir: str,
actor_found = True actor_found = True
if not actor_found: if not actor_found:
return return
try:
with open(approve_follows_filename + '.new', 'w+', text: str = ''
encoding='utf-8') as fp_approve_new: approve_follows_list: list[str] = \
with open(approve_follows_filename, 'r', load_list(approve_follows_filename,
encoding='utf-8') as fp_approve: 'EX: _remove_from_follow_base ' +
if not accept_deny_actor: approve_follows_filename + ' 2 [ex]')
for approve_handle in fp_approve: if approve_follows_list is not None:
accept_deny_handle = accept_or_deny_handle if not accept_deny_actor:
if not approve_handle.startswith(accept_deny_handle): for approve_handle in approve_follows_list:
fp_approve_new.write(approve_handle) accept_deny_handle = accept_or_deny_handle
else: if not approve_handle.startswith(accept_deny_handle):
for approve_handle in fp_approve: text += approve_handle
if accept_deny_actor not in approve_handle: else:
fp_approve_new.write(approve_handle) for approve_handle in approve_follows_list:
except OSError as ex: if accept_deny_actor not in approve_handle:
print('EX: _remove_from_follow_base ' + text += approve_handle
approve_follows_filename + ' ' + str(ex)) save_string(text, approve_follows_filename + '.new',
'EX: _remove_from_follow_base ' +
approve_follows_filename + ' 1 [ex]')
try: try:
os.rename(approve_follows_filename + '.new', os.rename(approve_follows_filename + '.new',
@ -337,16 +339,15 @@ def unfollow_account(base_dir: str, nickname: str, domain: str,
return False return False
if lines: if lines:
try: text: str = ''
with open(filename, 'w+', encoding='utf-8') as fp_unfoll: for line in lines:
for line in lines: check_handle = line.strip("\n").strip("\r").lower()
check_handle = line.strip("\n").strip("\r").lower() if check_handle not in (handle_to_unfollow_lower,
if check_handle not in (handle_to_unfollow_lower, '!' + handle_to_unfollow_lower):
'!' + handle_to_unfollow_lower): text += line
fp_unfoll.write(line) save_string(text, filename,
except OSError as ex: 'EX: unfollow_account unable to write ' +
print('EX: unfollow_account unable to write ' + filename + ' [ex]')
filename + ' ' + str(ex))
# write to an unfollowed file so that if a follow accept # write to an unfollowed file so that if a follow accept
# later arrives then it can be ignored # later arrives then it can be ignored
@ -1593,30 +1594,29 @@ def pending_followers_timeline_json(actor: str, base_dir: str,
follow_requests_filename = \ follow_requests_filename = \
acct_dir(base_dir, nickname, domain) + '/followrequests.txt' acct_dir(base_dir, nickname, domain) + '/followrequests.txt'
if os.path.isfile(follow_requests_filename): if os.path.isfile(follow_requests_filename):
try: follow_requests_list: list[str] = \
with open(follow_requests_filename, 'r', load_list(follow_requests_filename,
encoding='utf-8') as fp_req: 'EX: unable to read follow requests ' +
for follower_handle in fp_req: follow_requests_filename + ' [ex]')
if len(follower_handle) == 0: if follow_requests_list is not None:
continue for follower_handle in follow_requests_list:
follower_handle = remove_eol(follower_handle) if len(follower_handle) == 0:
foll_domain, _ = get_domain_from_actor(follower_handle) continue
if not foll_domain: follower_handle = remove_eol(follower_handle)
continue foll_domain, _ = get_domain_from_actor(follower_handle)
foll_nickname = get_nickname_from_actor(follower_handle) if not foll_domain:
if not foll_nickname: continue
continue foll_nickname = get_nickname_from_actor(follower_handle)
follow_activity_filename = \ if not foll_nickname:
acct_dir(base_dir, nickname, domain) + \ continue
'/requests/' + \ follow_activity_filename = \
foll_nickname + '@' + foll_domain + '.follow' acct_dir(base_dir, nickname, domain) + \
if not os.path.isfile(follow_activity_filename): '/requests/' + \
continue foll_nickname + '@' + foll_domain + '.follow'
follow_json = load_json(follow_activity_filename) if not os.path.isfile(follow_activity_filename):
if not follow_json: continue
continue follow_json = load_json(follow_activity_filename)
result_json['orderedItems'].append(follow_json) if not follow_json:
except OSError as exc: continue
print('EX: unable to read follow requests ' + result_json['orderedItems'].append(follow_json)
follow_requests_filename + ' ' + str(exc))
return result_json return result_json