Replace file operations with functions

main
bashrc 2026-04-29 14:23:57 +01:00
parent 721907ca4d
commit c10291fa12
1 changed files with 130 additions and 133 deletions

263
posts.py
View File

@ -185,23 +185,23 @@ def no_of_followers_on_domain(base_dir: str, handle: str,
"""Returns the number of followers of the given handle from the """Returns the number of followers of the given handle from the
given domain given domain
""" """
filename = acct_handle_dir(base_dir, handle) + '/' + follow_file filename: str = acct_handle_dir(base_dir, handle) + '/' + follow_file
if not os.path.isfile(filename): if not os.path.isfile(filename):
return 0 return 0
ctr: int = 0 ctr: int = 0
try: followers_list: list[str] = \
with open(filename, 'r', encoding='utf-8') as fp_followers: load_list(filename,
for follower_handle in fp_followers: 'EX: no_of_followers_on_domain unable to read ' + filename +
if '@' not in follower_handle: ' [ex]')
continue if followers_list is not None:
follower_domain = follower_handle.split('@')[1] for follower_handle in followers_list:
follower_domain = remove_eol(follower_domain) if '@' not in follower_handle:
if domain == follower_domain: continue
ctr += 1 follower_domain = follower_handle.split('@')[1]
except OSError as exc: follower_domain = remove_eol(follower_domain)
print('EX: no_of_followers_on_domain unable to read ' + filename + if domain == follower_domain:
' ' + str(exc)) ctr += 1
return ctr return ctr
@ -2274,13 +2274,10 @@ def regenerate_index_for_box(base_dir: str,
index_lines.sort(reverse=True) index_lines.sort(reverse=True)
result = '' result = ''
try: for line in index_lines:
with open(box_index_filename, 'w+', encoding='utf-8') as fp_box: result += line + '\n'
for line in index_lines: save_string(result, box_index_filename,
result += line + '\n' 'EX: unable to generate index for ' + box_name + ' ' + result)
fp_box.write(line + '\n')
except OSError:
print('EX: unable to generate index for ' + box_name + ' ' + result)
print('Index generated for ' + box_name + '\n' + result) print('Index generated for ' + box_name + '\n' + result)
@ -2883,37 +2880,38 @@ def create_report_post(base_dir: str,
moderators_list: list[str] = [] moderators_list: list[str] = []
moderators_file = data_dir(base_dir) + '/moderators.txt' moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file): if os.path.isfile(moderators_file):
try: moderators_list2: list[str] = \
with open(moderators_file, 'r', encoding='utf-8') as fp_mod: load_list(moderators_file,
for line in fp_mod: 'EX: create_report_post unable to read ' +
line = line.strip('\n').strip('\r') moderators_file)
if line.startswith('#'): if moderators_list2 is not None:
continue for line in moderators_list2:
if line.startswith('/users/'): line = line.strip('\n').strip('\r')
line = line.replace('users', '') if line.startswith('#'):
if line.startswith('@'): continue
line = line[1:] if line.startswith('/users/'):
if '@' in line: line = line.replace('users', '')
nick = line.split('@')[0] if line.startswith('@'):
line = line[1:]
if '@' in line:
nick = line.split('@')[0]
moderator_actor = \
local_actor_url(http_prefix, nick, domain_full)
if moderator_actor not in moderators_list:
moderators_list.append(moderator_actor)
continue
if string_starts_with(line,
('http', 'ipfs', 'ipns', 'hyper')):
# must be a local address - no remote moderators
if '://' + domain_full + '/' in line:
if line not in moderators_list:
moderators_list.append(line)
else:
if '/' not in line:
moderator_actor = \ moderator_actor = \
local_actor_url(http_prefix, nick, domain_full) local_actor_url(http_prefix, line, domain_full)
if moderator_actor not in moderators_list: if moderator_actor not in moderators_list:
moderators_list.append(moderator_actor) moderators_list.append(moderator_actor)
continue
if string_starts_with(line,
('http', 'ipfs', 'ipns', 'hyper')):
# must be a local address - no remote moderators
if '://' + domain_full + '/' in line:
if line not in moderators_list:
moderators_list.append(line)
else:
if '/' not in line:
moderator_actor = \
local_actor_url(http_prefix, line, domain_full)
if moderator_actor not in moderators_list:
moderators_list.append(moderator_actor)
except OSError:
print('EX: create_report_post unable to read ' + moderators_file)
if not moderators_list: if not moderators_list:
# if there are no moderators then the admin becomes the moderator # if there are no moderators then the admin becomes the moderator
admin_nickname = get_config_param(base_dir, 'admin') admin_nickname = get_config_param(base_dir, 'admin')
@ -3387,21 +3385,21 @@ def group_followers_by_domain(base_dir: str, nickname: str, domain: str) -> {}:
if not os.path.isfile(followers_filename): if not os.path.isfile(followers_filename):
return None return None
grouped = {} grouped = {}
try: followers_list: list[str] = \
with open(followers_filename, 'r', encoding='utf-8') as fp_foll: load_list(followers_filename,
for follower_handle in fp_foll: 'EX: group_followers_by_domain unable to read ' +
if '@' not in follower_handle: followers_filename)
continue if followers_list is not None:
fhandle1 = follower_handle.strip() for follower_handle in followers_list:
fhandle = remove_eol(fhandle1) if '@' not in follower_handle:
follower_domain = fhandle.split('@')[1] continue
if not grouped.get(follower_domain): fhandle1 = follower_handle.strip()
grouped[follower_domain] = [fhandle] fhandle = remove_eol(fhandle1)
else: follower_domain = fhandle.split('@')[1]
grouped[follower_domain].append(fhandle) if not grouped.get(follower_domain):
except OSError: grouped[follower_domain] = [fhandle]
print('EX: group_followers_by_domain unable to read ' + else:
followers_filename) grouped[follower_domain].append(fhandle)
return grouped return grouped
@ -5603,16 +5601,16 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
index_ctr: int = 0 index_ctr: int = 0
# get the existing index entries as a string # get the existing index entries as a string
new_index = '' new_index = ''
try: index_list: list[str] = \
with open(index_filename, 'r', encoding='utf-8') as fp_index: load_list(index_filename,
for post_id in fp_index: 'EX: archive_posts_for_person unable to read ' +
new_index += post_id index_filename + ' [ex]')
index_ctr += 1 if index_list is not None:
if index_ctr >= max_posts_in_box: for post_id in index_list:
break new_index += post_id
except OSError as ex: index_ctr += 1
print('EX: archive_posts_for_person unable to read ' + if index_ctr >= max_posts_in_box:
index_filename + ' ' + str(ex)) break
# save the new index file # save the new index file
if new_index: if new_index:
save_string(new_index, index_filename, save_string(new_index, index_filename,
@ -6195,66 +6193,65 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str,
pub_str = 'https://www.w3.org/ns/activitystreams#Public' pub_str = 'https://www.w3.org/ns/activitystreams#Public'
# populate the items list with replies # populate the items list with replies
replies_boxes = ('outbox', 'inbox') replies_boxes = ('outbox', 'inbox')
try: replies_list: list[str] = \
with open(post_replies_filename, 'r', load_list(post_replies_filename,
encoding='utf-8') as fp_replies: 'EX: populate_replies_json unable to read ' +
for message_id in fp_replies: post_replies_filename)
reply_found: bool = False if replies_list is not None:
# examine inbox and outbox for message_id in replies_list:
for boxname in replies_boxes: reply_found: bool = False
message_id2 = remove_eol(message_id) # examine inbox and outbox
search_filename = \ for boxname in replies_boxes:
acct_dir(base_dir, nickname, domain) + '/' + \ message_id2 = remove_eol(message_id)
boxname + '/' + \ search_filename = \
message_id2.replace('/', '#') + '.json' acct_dir(base_dir, nickname, domain) + '/' + \
if os.path.isfile(search_filename): boxname + '/' + \
if authorized or \ message_id2.replace('/', '#') + '.json'
text_in_file(pub_str, search_filename): if os.path.isfile(search_filename):
post_json_object = load_json(search_filename) if authorized or \
if post_json_object: text_in_file(pub_str, search_filename):
pjo = post_json_object post_json_object = load_json(search_filename)
ordered_items = replies_json['orderedItems'] if post_json_object:
if pjo['object'].get('cc'): pjo = post_json_object
if (authorized or ordered_items = replies_json['orderedItems']
(pub_str in pjo['object']['to'] or if pjo['object'].get('cc'):
pub_str in pjo['object']['cc'])): if (authorized or
ordered_items.append(pjo) (pub_str in pjo['object']['to'] or
reply_found = True pub_str in pjo['object']['cc'])):
else: ordered_items.append(pjo)
if authorized or \ reply_found = True
pub_str in pjo['object']['to']: else:
ordered_items.append(pjo) if authorized or \
reply_found = True pub_str in pjo['object']['to']:
break ordered_items.append(pjo)
# if not in either inbox or outbox then examine the reply_found = True
# shared inbox break
if not reply_found: # if not in either inbox or outbox then examine the
message_id2 = remove_eol(message_id) # shared inbox
search_filename = \ if not reply_found:
data_dir(base_dir) + '/inbox@' + \ message_id2 = remove_eol(message_id)
domain + '/inbox/' + \ search_filename = \
message_id2.replace('/', '#') + '.json' data_dir(base_dir) + '/inbox@' + \
if os.path.isfile(search_filename): domain + '/inbox/' + \
if authorized or \ message_id2.replace('/', '#') + '.json'
text_in_file(pub_str, search_filename): if os.path.isfile(search_filename):
# get the json of the reply and append it to if authorized or \
# the collection text_in_file(pub_str, search_filename):
post_json_object = load_json(search_filename) # get the json of the reply and append it to
if post_json_object: # the collection
pjo = post_json_object post_json_object = load_json(search_filename)
ordered_items = replies_json['orderedItems'] if post_json_object:
if pjo['object'].get('cc'): pjo = post_json_object
if (authorized or ordered_items = replies_json['orderedItems']
(pub_str in pjo['object']['to'] or if pjo['object'].get('cc'):
pub_str in pjo['object']['cc'])): if (authorized or
ordered_items.append(pjo) (pub_str in pjo['object']['to'] or
else: pub_str in pjo['object']['cc'])):
if authorized or \ ordered_items.append(pjo)
pub_str in pjo['object']['to']: else:
ordered_items.append(pjo) if authorized or \
except OSError: pub_str in pjo['object']['to']:
print('EX: populate_replies_json unable to read ' + ordered_items.append(pjo)
post_replies_filename)
def _reject_announce(announce_filename: str, def _reject_announce(announce_filename: str,