Exception handling when reading from file

main
Bob Mottram 2024-07-13 15:38:11 +01:00
parent 97a4beee60
commit 21e2095696
22 changed files with 1082 additions and 836 deletions

View File

@ -286,7 +286,7 @@ def show_individual_post(self, ssml_getreq: bool, authorized: bool,
with open(ssml_filename, 'r', encoding='utf-8') as fp_ssml: with open(ssml_filename, 'r', encoding='utf-8') as fp_ssml:
ssml_str = fp_ssml.read() ssml_str = fp_ssml.read()
except OSError: except OSError:
pass print('EX: unable to read ssml file ' + ssml_filename)
if ssml_str: if ssml_str:
msg = ssml_str.encode('utf-8') msg = ssml_str.encode('utf-8')
msglen = len(msg) msglen = len(msg)
@ -615,7 +615,7 @@ def show_individual_at_post(self, ssml_getreq: bool, authorized: bool,
with open(ssml_filename, 'r', encoding='utf-8') as fp_ssml: with open(ssml_filename, 'r', encoding='utf-8') as fp_ssml:
ssml_str = fp_ssml.read() ssml_str = fp_ssml.read()
except OSError: except OSError:
pass print('EX: unable to read ssml file 2 ' + ssml_filename)
if ssml_str: if ssml_str:
msg = ssml_str.encode('utf-8') msg = ssml_str.encode('utf-8')
msglen = len(msg) msglen = len(msg)

View File

@ -330,10 +330,13 @@ def _desktop_show_banner() -> None:
banner_filename = 'theme/' + banner_theme + '/banner.txt' banner_filename = 'theme/' + banner_theme + '/banner.txt'
if not os.path.isfile(banner_filename): if not os.path.isfile(banner_filename):
return return
with open(banner_filename, 'r', encoding='utf-8') as banner_file: try:
banner = banner_file.read() with open(banner_filename, 'r', encoding='utf-8') as fp_banner:
if banner: banner = fp_banner.read()
print(banner + '\n') if banner:
print(banner + '\n')
except OSError:
print('EX: unable to read banner file ' + banner_filename)
def _desktop_wait_for_cmd(timeout: int, debug: bool) -> str: def _desktop_wait_for_cmd(timeout: int, debug: bool) -> str:

View File

@ -1715,12 +1715,16 @@ def _command_options() -> None:
approve_follows_filename = accounts_dir + '/followrequests.txt' approve_follows_filename = accounts_dir + '/followrequests.txt'
approve_ctr = 0 approve_ctr = 0
if os.path.isfile(approve_follows_filename): if os.path.isfile(approve_follows_filename):
with open(approve_follows_filename, 'r', try:
encoding='utf-8') as approvefile: with open(approve_follows_filename, 'r',
for approve in approvefile: encoding='utf-8') as approvefile:
approve1 = remove_eol(approve) for approve in approvefile:
print(approve1) approve1 = remove_eol(approve)
approve_ctr += 1 print(approve1)
approve_ctr += 1
except OSError:
print('EX: unable to read follow approvals file ' +
approve_follows_filename)
if approve_ctr == 0: if approve_ctr == 0:
print('There are no follow requests pending approval.') print('There are no follow requests pending approval.')
sys.exit() sys.exit()

View File

@ -1598,26 +1598,30 @@ def pending_followers_timeline_json(actor: str, base_dir: str,
follow_requests_filename = \ follow_requests_filename = \
acct_dir(base_dir, nickname, domain) + '/followrequests.txt' acct_dir(base_dir, nickname, domain) + '/followrequests.txt'
if os.path.isfile(follow_requests_filename): if os.path.isfile(follow_requests_filename):
with open(follow_requests_filename, 'r', try:
encoding='utf-8') as req_file: with open(follow_requests_filename, 'r',
for follower_handle in req_file: encoding='utf-8') as fp_req:
if len(follower_handle) == 0: for follower_handle in fp_req:
continue if len(follower_handle) == 0:
follower_handle = remove_eol(follower_handle) continue
foll_domain, _ = get_domain_from_actor(follower_handle) follower_handle = remove_eol(follower_handle)
if not foll_domain: foll_domain, _ = get_domain_from_actor(follower_handle)
continue if not foll_domain:
foll_nickname = get_nickname_from_actor(follower_handle) continue
if not foll_nickname: foll_nickname = get_nickname_from_actor(follower_handle)
continue if not foll_nickname:
follow_activity_filename = \ continue
acct_dir(base_dir, nickname, domain) + \ follow_activity_filename = \
'/requests/' + \ acct_dir(base_dir, nickname, domain) + \
foll_nickname + '@' + foll_domain + '.follow' '/requests/' + \
if not os.path.isfile(follow_activity_filename): foll_nickname + '@' + foll_domain + '.follow'
continue if not os.path.isfile(follow_activity_filename):
follow_json = load_json(follow_activity_filename) continue
if not follow_json: follow_json = load_json(follow_activity_filename)
continue if not follow_json:
result_json['orderedItems'].append(follow_json) continue
result_json['orderedItems'].append(follow_json)
except OSError as exc:
print('EX: unable to read follow requests ' +
follow_requests_filename + ' ' + str(exc))
return result_json return result_json

View File

@ -77,15 +77,23 @@ def _remove_event_from_timeline(event_id: str,
""" """
if not text_in_file(event_id + '\n', tl_events_filename): if not text_in_file(event_id + '\n', tl_events_filename):
return return
events_timeline = ''
with open(tl_events_filename, 'r', with open(tl_events_filename, 'r',
encoding='utf-8') as fp_tl: encoding='utf-8') as fp_tl:
events_timeline = fp_tl.read().replace(event_id + '\n', '') events_timeline = fp_tl.read().replace(event_id + '\n', '')
if events_timeline:
try: try:
with open(tl_events_filename, 'w+', with open(tl_events_filename, 'w+',
encoding='utf-8') as fp2: encoding='utf-8') as fp2:
fp2.write(events_timeline) fp2.write(events_timeline)
except OSError: except OSError:
print('EX: ERROR: unable to save events timeline') print('EX: ERROR: unable to save events timeline')
elif os.path.isfile(tl_events_filename):
try:
os.remove(tl_events_filename)
except OSError:
print('EX: ERROR: unable to remove events timeline')
def save_event_post(base_dir: str, handle: str, post_id: str, def save_event_post(base_dir: str, handle: str, post_id: str,
@ -289,74 +297,78 @@ def get_todays_events(base_dir: str, nickname: str, domain: str,
calendar_post_ids = [] calendar_post_ids = []
recreate_events_file = False recreate_events_file = False
with open(calendar_filename, 'r', encoding='utf-8') as events_file: try:
for post_id in events_file: with open(calendar_filename, 'r', encoding='utf-8') as events_file:
post_id = remove_eol(post_id) for post_id in events_file:
post_filename = locate_post(base_dir, nickname, domain, post_id) post_id = remove_eol(post_id)
if not post_filename: post_filename = \
recreate_events_file = True locate_post(base_dir, nickname, domain, post_id)
continue if not post_filename:
recreate_events_file = True
post_json_object = load_json(post_filename)
if not _is_happening_post(post_json_object):
continue
content_language = system_language
if post_json_object.get('object'):
content = None
if post_json_object['object'].get('contentMap'):
sys_lang = system_language
if post_json_object['object']['contentMap'].get(sys_lang):
content = \
post_json_object['object']['contentMap'][sys_lang]
content_language = sys_lang
if not content:
if post_json_object['object'].get('content'):
content = post_json_object['object']['content']
if content:
if not _event_text_match(content, text_match):
continue
public_event = is_public_post(post_json_object)
post_event = []
day_of_month = None
for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
continue continue
# this tag is an event or a place
if tag['type'] == 'Event':
# tag is an event
if not tag.get('startTime'):
continue
event_time = \
date_from_string_format(tag['startTime'],
["%Y-%m-%dT%H:%M:%S%z"])
if int(event_time.strftime("%Y")) == year and \
int(event_time.strftime("%m")) == month_number and \
int(event_time.strftime("%d")) == day_number:
day_of_month = str(int(event_time.strftime("%d")))
if '#statuses#' in post_id:
# link to the id so that the event can be
# easily deleted
tag['post_id'] = post_id.split('#statuses#')[1]
tag['id'] = post_id.replace('#', '/')
tag['sender'] = post_id.split('#statuses#')[0]
tag['sender'] = tag['sender'].replace('#', '/')
tag['public'] = public_event
tag['language'] = content_language
post_event.append(tag)
else:
# tag is a place
post_event.append(tag)
if post_event and day_of_month: post_json_object = load_json(post_filename)
calendar_post_ids.append(post_id) if not _is_happening_post(post_json_object):
if not events.get(day_of_month): continue
events[day_of_month] = []
events[day_of_month].append(post_event) content_language = system_language
events[day_of_month] = \ if post_json_object.get('object'):
_sort_todays_events(events[day_of_month]) content = None
if post_json_object['object'].get('contentMap'):
sys_lang = system_language
content_map = post_json_object['object']['contentMap']
if content_map.get(sys_lang):
content = content_map[sys_lang]
content_language = sys_lang
if not content:
if post_json_object['object'].get('content'):
content = post_json_object['object']['content']
if content:
if not _event_text_match(content, text_match):
continue
public_event = is_public_post(post_json_object)
post_event = []
day_of_month = None
for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
continue
# this tag is an event or a place
if tag['type'] == 'Event':
# tag is an event
if not tag.get('startTime'):
continue
event_time = \
date_from_string_format(tag['startTime'],
["%Y-%m-%dT%H:%M:%S%z"])
if int(event_time.strftime("%Y")) == year and \
int(event_time.strftime("%m")) == month_number and \
int(event_time.strftime("%d")) == day_number:
day_of_month = str(int(event_time.strftime("%d")))
if '#statuses#' in post_id:
# link to the id so that the event can be
# easily deleted
tag['post_id'] = post_id.split('#statuses#')[1]
tag['id'] = post_id.replace('#', '/')
tag['sender'] = post_id.split('#statuses#')[0]
tag['sender'] = tag['sender'].replace('#', '/')
tag['public'] = public_event
tag['language'] = content_language
post_event.append(tag)
else:
# tag is a place
post_event.append(tag)
if post_event and day_of_month:
calendar_post_ids.append(post_id)
if not events.get(day_of_month):
events[day_of_month] = []
events[day_of_month].append(post_event)
events[day_of_month] = \
_sort_todays_events(events[day_of_month])
except OSError:
print('EX: get_todays_events failed to read ' + calendar_filename)
# if some posts have been deleted then regenerate the calendar file # if some posts have been deleted then regenerate the calendar file
if recreate_events_file: if recreate_events_file:
@ -592,37 +604,41 @@ def day_events_check(base_dir: str, nickname: str, domain: str,
return False return False
events_exist = False events_exist = False
with open(calendar_filename, 'r', encoding='utf-8') as events_file: try:
for post_id in events_file: with open(calendar_filename, 'r', encoding='utf-8') as events_file:
post_id = remove_eol(post_id) for post_id in events_file:
post_filename = locate_post(base_dir, nickname, domain, post_id) post_id = remove_eol(post_id)
if not post_filename: post_filename = \
continue locate_post(base_dir, nickname, domain, post_id)
if not post_filename:
continue
post_json_object = load_json(post_filename) post_json_object = load_json(post_filename)
if not _is_happening_post(post_json_object): if not _is_happening_post(post_json_object):
continue continue
for tag in post_json_object['object']['tag']: for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag): if not _is_happening_event(tag):
continue continue
# this tag is an event or a place # this tag is an event or a place
if tag['type'] != 'Event': if tag['type'] != 'Event':
continue continue
# tag is an event # tag is an event
if not tag.get('startTime'): if not tag.get('startTime'):
continue continue
event_time = \ event_time = \
date_from_string_format(tag['startTime'], date_from_string_format(tag['startTime'],
["%Y-%m-%dT%H:%M:%S%z"]) ["%Y-%m-%dT%H:%M:%S%z"])
if int(event_time.strftime("%d")) != day_number: if int(event_time.strftime("%d")) != day_number:
continue continue
if int(event_time.strftime("%m")) != month_number: if int(event_time.strftime("%m")) != month_number:
continue continue
if int(event_time.strftime("%Y")) != year: if int(event_time.strftime("%Y")) != year:
continue continue
events_exist = True events_exist = True
break break
except OSError:
print('EX: day_events_check failed to read ' + calendar_filename)
return events_exist return events_exist
@ -648,42 +664,46 @@ def get_this_weeks_events(base_dir: str, nickname: str, domain: str) -> {}:
calendar_post_ids = [] calendar_post_ids = []
recreate_events_file = False recreate_events_file = False
with open(calendar_filename, 'r', encoding='utf-8') as events_file: try:
for post_id in events_file: with open(calendar_filename, 'r', encoding='utf-8') as events_file:
post_id = remove_eol(post_id) for post_id in events_file:
post_filename = locate_post(base_dir, nickname, domain, post_id) post_id = remove_eol(post_id)
if not post_filename: post_filename = \
recreate_events_file = True locate_post(base_dir, nickname, domain, post_id)
continue if not post_filename:
recreate_events_file = True
post_json_object = load_json(post_filename)
if not _is_happening_post(post_json_object):
continue
post_event = []
week_day_index = None
for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
continue continue
# this tag is an event or a place
if tag['type'] == 'Event': post_json_object = load_json(post_filename)
# tag is an event if not _is_happening_post(post_json_object):
if not tag.get('startTime'): continue
post_event = []
week_day_index = None
for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
continue continue
event_time = \ # this tag is an event or a place
date_from_string_format(tag['startTime'], if tag['type'] == 'Event':
["%Y-%m-%dT%H:%M:%S%z"]) # tag is an event
if now <= event_time <= end_of_week: if not tag.get('startTime'):
week_day_index = (event_time - now).days() continue
event_time = \
date_from_string_format(tag['startTime'],
["%Y-%m-%dT%H:%M:%S%z"])
if now <= event_time <= end_of_week:
week_day_index = (event_time - now).days()
post_event.append(tag)
else:
# tag is a place
post_event.append(tag) post_event.append(tag)
else: if post_event and week_day_index:
# tag is a place calendar_post_ids.append(post_id)
post_event.append(tag) if not events.get(week_day_index):
if post_event and week_day_index: events[week_day_index] = []
calendar_post_ids.append(post_id) events[week_day_index].append(post_event)
if not events.get(week_day_index): except OSError:
events[week_day_index] = [] print('EX: get_this_weeks_events failed to read ' + calendar_filename)
events[week_day_index].append(post_event)
# if some posts have been deleted then regenerate the calendar file # if some posts have been deleted then regenerate the calendar file
if recreate_events_file: if recreate_events_file:
@ -717,60 +737,64 @@ def get_calendar_events(base_dir: str, nickname: str, domain: str,
calendar_post_ids = [] calendar_post_ids = []
recreate_events_file = False recreate_events_file = False
with open(calendar_filename, 'r', encoding='utf-8') as events_file: try:
for post_id in events_file: with open(calendar_filename, 'r', encoding='utf-8') as events_file:
post_id = remove_eol(post_id) for post_id in events_file:
post_filename = locate_post(base_dir, nickname, domain, post_id) post_id = remove_eol(post_id)
if not post_filename: post_filename = \
recreate_events_file = True locate_post(base_dir, nickname, domain, post_id)
continue if not post_filename:
recreate_events_file = True
post_json_object = load_json(post_filename)
if not post_json_object:
continue
if not _is_happening_post(post_json_object):
continue
if only_show_reminders:
if not is_reminder(post_json_object):
continue continue
if post_json_object.get('object'): post_json_object = load_json(post_filename)
if post_json_object['object'].get('content'): if not post_json_object:
content = post_json_object['object']['content'] continue
if not _event_text_match(content, text_match): if not _is_happening_post(post_json_object):
continue
if only_show_reminders:
if not is_reminder(post_json_object):
continue continue
post_event = [] if post_json_object.get('object'):
day_of_month = None if post_json_object['object'].get('content'):
for tag in post_json_object['object']['tag']: content = post_json_object['object']['content']
if not _is_happening_event(tag): if not _event_text_match(content, text_match):
continue continue
# this tag is an event or a place
if tag['type'] == 'Event': post_event = []
# tag is an event day_of_month = None
if not tag.get('startTime'): for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
continue continue
event_time = \ # this tag is an event or a place
date_from_string_format(tag['startTime'], if tag['type'] == 'Event':
["%Y-%m-%dT%H:%M:%S%z"]) # tag is an event
if int(event_time.strftime("%Y")) == year and \ if not tag.get('startTime'):
int(event_time.strftime("%m")) == month_number: continue
day_of_month = str(int(event_time.strftime("%d"))) event_time = \
if '#statuses#' in post_id: date_from_string_format(tag['startTime'],
tag['post_id'] = post_id.split('#statuses#')[1] ["%Y-%m-%dT%H:%M:%S%z"])
tag['id'] = post_id.replace('#', '/') if int(event_time.strftime("%Y")) == year and \
tag['sender'] = post_id.split('#statuses#')[0] int(event_time.strftime("%m")) == month_number:
tag['sender'] = tag['sender'].replace('#', '/') day_of_month = str(int(event_time.strftime("%d")))
if '#statuses#' in post_id:
tag['post_id'] = post_id.split('#statuses#')[1]
tag['id'] = post_id.replace('#', '/')
tag['sender'] = post_id.split('#statuses#')[0]
tag['sender'] = tag['sender'].replace('#', '/')
post_event.append(tag)
else:
# tag is a place
post_event.append(tag) post_event.append(tag)
else:
# tag is a place
post_event.append(tag)
if post_event and day_of_month: if post_event and day_of_month:
calendar_post_ids.append(post_id) calendar_post_ids.append(post_id)
if not events.get(day_of_month): if not events.get(day_of_month):
events[day_of_month] = [] events[day_of_month] = []
events[day_of_month].append(post_event) events[day_of_month].append(post_event)
except OSError:
print('EX: get_calendar_events failed to read ' + calendar_filename)
# if some posts have been deleted then regenerate the calendar file # if some posts have been deleted then regenerate the calendar file
if recreate_events_file: if recreate_events_file:
@ -807,7 +831,7 @@ def remove_calendar_event(base_dir: str, nickname: str, domain: str,
with open(calendar_filename, 'r', encoding='utf-8') as fp_cal: with open(calendar_filename, 'r', encoding='utf-8') as fp_cal:
lines_str = fp_cal.read() lines_str = fp_cal.read()
except OSError: except OSError:
print('EX: unable to read calendar file ' + print('EX: remove_calendar_event unable to read calendar file ' +
calendar_filename) calendar_filename)
if not lines_str: if not lines_str:
return return

View File

@ -433,7 +433,7 @@ def store_hash_tags(base_dir: str, nickname: str, domain: str,
with open(tags_filename, 'r', encoding='utf-8') as tags_file: with open(tags_filename, 'r', encoding='utf-8') as tags_file:
content = tags_file.read() content = tags_file.read()
except OSError: except OSError:
pass print('EX: store_hash_tags failed to read ' + tags_filename)
if post_url not in content: if post_url not in content:
content = tag_line + content content = tag_line + content
try: try:
@ -1226,11 +1226,14 @@ def _notify_moved(base_dir: str, domain_full: str,
prev_actor_handle + ' ' + new_actor_handle + ' ' + url prev_actor_handle + ' ' + new_actor_handle + ' ' + url
if os.path.isfile(moved_file): if os.path.isfile(moved_file):
with open(moved_file, 'r', try:
encoding='utf-8') as fp_move: with open(moved_file, 'r',
prev_moved_str = fp_move.read() encoding='utf-8') as fp_move:
if prev_moved_str == moved_str: prev_moved_str = fp_move.read()
continue if prev_moved_str == moved_str:
continue
except OSError:
print('EX: _notify_moved unable to read ' + moved_file)
try: try:
with open(moved_file, 'w+', encoding='utf-8') as fp_move: with open(moved_file, 'w+', encoding='utf-8') as fp_move:
fp_move.write(moved_str) fp_move.write(moved_str)
@ -3920,10 +3923,13 @@ def _like_notify(base_dir: str, domain: str,
# was there a previous like notification? # was there a previous like notification?
if os.path.isfile(prev_like_file): if os.path.isfile(prev_like_file):
# is it the same as the current notification ? # is it the same as the current notification ?
with open(prev_like_file, 'r', encoding='utf-8') as fp_like: try:
prev_like_str = fp_like.read() with open(prev_like_file, 'r', encoding='utf-8') as fp_like:
if prev_like_str == like_str: prev_like_str = fp_like.read()
return if prev_like_str == like_str:
return
except OSError:
print('EX: _like_notify unable to read ' + prev_like_file)
try: try:
with open(prev_like_file, 'w+', encoding='utf-8') as fp_like: with open(prev_like_file, 'w+', encoding='utf-8') as fp_like:
fp_like.write(like_str) fp_like.write(like_str)
@ -3985,10 +3991,13 @@ def _reaction_notify(base_dir: str, domain: str, onion_domain: str,
# was there a previous reaction notification? # was there a previous reaction notification?
if os.path.isfile(prev_reaction_file): if os.path.isfile(prev_reaction_file):
# is it the same as the current notification ? # is it the same as the current notification ?
with open(prev_reaction_file, 'r', encoding='utf-8') as fp_react: try:
prev_reaction_str = fp_react.read() with open(prev_reaction_file, 'r', encoding='utf-8') as fp_react:
if prev_reaction_str == reaction_str: prev_reaction_str = fp_react.read()
return if prev_reaction_str == reaction_str:
return
except OSError:
print('EX: _reaction_notify unable to read ' + prev_reaction_file)
try: try:
with open(prev_reaction_file, 'w+', encoding='utf-8') as fp_react: with open(prev_reaction_file, 'w+', encoding='utf-8') as fp_react:
fp_react.write(reaction_str) fp_react.write(reaction_str)
@ -4015,10 +4024,13 @@ def _notify_post_arrival(base_dir: str, handle: str, url: str) -> None:
notify_file = account_dir + '/.newNotifiedPost' notify_file = account_dir + '/.newNotifiedPost'
if os.path.isfile(notify_file): if os.path.isfile(notify_file):
# check that the same notification is not repeatedly sent # check that the same notification is not repeatedly sent
with open(notify_file, 'r', encoding='utf-8') as fp_notify: try:
existing_notification_message = fp_notify.read() with open(notify_file, 'r', encoding='utf-8') as fp_notify:
if url in existing_notification_message: existing_notification_message = fp_notify.read()
return if url in existing_notification_message:
return
except OSError:
print('EX: _notify_post_arrival unable to read ' + notify_file)
try: try:
with open(notify_file, 'w+', encoding='utf-8') as fp_notify: with open(notify_file, 'w+', encoding='utf-8') as fp_notify:
fp_notify.write(url) fp_notify.write(url)
@ -4297,12 +4309,16 @@ def _update_last_seen(base_dir: str, handle: str, actor: str) -> None:
days_since_epoch = (curr_time - date_epoch()).days days_since_epoch = (curr_time - date_epoch()).days
# has the value changed? # has the value changed?
if os.path.isfile(last_seen_filename): if os.path.isfile(last_seen_filename):
with open(last_seen_filename, 'r', try:
encoding='utf-8') as last_seen_file: with open(last_seen_filename, 'r',
days_since_epoch_file = last_seen_file.read() encoding='utf-8') as last_seen_file:
if int(days_since_epoch_file) == days_since_epoch: days_since_epoch_file = last_seen_file.read()
# value hasn't changed, so we can save writing anything to file if int(days_since_epoch_file) == days_since_epoch:
return # value hasn't changed, so we can save writing
# anything to file
return
except OSError:
print('EX: _update_last_seen unable to read ' + last_seen_filename)
try: try:
with open(last_seen_filename, 'w+', with open(last_seen_filename, 'w+',
encoding='utf-8') as last_seen_file: encoding='utf-8') as last_seen_file:

14
keys.py
View File

@ -19,8 +19,11 @@ def _get_local_private_key(base_dir: str, nickname: str, domain: str) -> str:
key_filename = base_dir + '/keys/private/' + handle.lower() + '.key' key_filename = base_dir + '/keys/private/' + handle.lower() + '.key'
if not os.path.isfile(key_filename): if not os.path.isfile(key_filename):
return None return None
with open(key_filename, 'r', encoding='utf-8') as pem_file: try:
return pem_file.read() with open(key_filename, 'r', encoding='utf-8') as fp_pem:
return fp_pem.read()
except OSError:
print('EX: _get_local_private_key unable to read ' + key_filename)
return None return None
@ -33,8 +36,11 @@ def _get_local_public_key(base_dir: str, nickname: str, domain: str) -> str:
key_filename = base_dir + '/keys/public/' + handle.lower() + '.key' key_filename = base_dir + '/keys/public/' + handle.lower() + '.key'
if not os.path.isfile(key_filename): if not os.path.isfile(key_filename):
return None return None
with open(key_filename, 'r', encoding='utf-8') as pem_file: try:
return pem_file.read() with open(key_filename, 'r', encoding='utf-8') as fp_pem:
return fp_pem.read()
except OSError:
print('EX: _get_local_public_key unable to read ' + key_filename)
return None return None

View File

@ -178,8 +178,12 @@ def manual_approve_follow_request(session, session_onion, session_i2p,
# is the handle in the requests file? # is the handle in the requests file?
approve_follows_str = '' approve_follows_str = ''
with open(approve_follows_filename, 'r', encoding='utf-8') as fp_foll: try:
approve_follows_str = fp_foll.read() with open(approve_follows_filename, 'r', encoding='utf-8') as fp_foll:
approve_follows_str = fp_foll.read()
except OSError:
print('EX: manual_approve_follow_request unable to read ' +
approve_follows_filename)
exists = False exists = False
approve_handle_full = approve_handle approve_handle_full = approve_handle
if approve_handle in approve_follows_str: if approve_handle in approve_follows_str:
@ -213,101 +217,107 @@ def manual_approve_follow_request(session, session_onion, session_i2p,
'" ' + approve_follows_filename) '" ' + approve_follows_filename)
return return
with open(approve_follows_filename + '.new', 'w+', try:
encoding='utf-8') as approvefilenew: with open(approve_follows_filename + '.new', 'w+',
update_approved_followers = False encoding='utf-8') as approvefilenew:
follow_activity_filename = None update_approved_followers = False
with open(approve_follows_filename, 'r', follow_activity_filename = None
encoding='utf-8') as approvefile: with open(approve_follows_filename, 'r',
for handle_of_follow_requester in approvefile: encoding='utf-8') as approvefile:
# is this the approved follow? for handle_of_follow_requester in approvefile:
approve_handl = approve_handle_full # is this the approved follow?
if not handle_of_follow_requester.startswith(approve_handl): appr_handl = approve_handle_full
# this isn't the approved follow so it will remain if not handle_of_follow_requester.startswith(appr_handl):
# in the requests file # this isn't the approved follow so it will remain
approvefilenew.write(handle_of_follow_requester) # in the requests file
continue approvefilenew.write(handle_of_follow_requester)
continue
handle_of_follow_requester = \ handle_of_follow_requester = \
remove_eol(handle_of_follow_requester) remove_eol(handle_of_follow_requester)
handle_of_follow_requester = \ handle_of_follow_requester = \
handle_of_follow_requester.replace('\r', '') handle_of_follow_requester.replace('\r', '')
port2 = port port2 = port
if ':' in handle_of_follow_requester: if ':' in handle_of_follow_requester:
port2 = get_port_from_domain(handle_of_follow_requester) port2 = \
requests_dir = account_dir + '/requests' get_port_from_domain(handle_of_follow_requester)
follow_activity_filename = \ requests_dir = account_dir + '/requests'
requests_dir + '/' + handle_of_follow_requester + '.follow' follow_activity_filename = \
if not os.path.isfile(follow_activity_filename): requests_dir + '/' + \
handle_of_follow_requester + '.follow'
if not os.path.isfile(follow_activity_filename):
update_approved_followers = True
continue
follow_json = load_json(follow_activity_filename)
if not follow_json:
update_approved_followers = True
continue
approve_nickname = approve_handle.split('@')[0]
approve_domain = approve_handle.split('@')[1]
approve_domain = remove_eol(approve_domain)
approve_domain = approve_domain.replace('\r', '')
approve_port = port2
if ':' in approve_domain:
approve_port = get_port_from_domain(approve_domain)
approve_domain = remove_domain_port(approve_domain)
curr_domain = domain
curr_port = port
curr_session = session
curr_http_prefix = http_prefix
curr_proxy_type = proxy_type
if onion_domain and \
not curr_domain.endswith('.onion') and \
approve_domain.endswith('.onion'):
curr_domain = onion_domain
curr_port = 80
approve_port = 80
curr_session = session_onion
curr_http_prefix = 'http'
curr_proxy_type = 'tor'
elif (i2p_domain and
not curr_domain.endswith('.i2p') and
approve_domain.endswith('.i2p')):
curr_domain = i2p_domain
curr_port = 80
approve_port = 80
curr_session = session_i2p
curr_http_prefix = 'http'
curr_proxy_type = 'i2p'
if not curr_session:
curr_session = create_session(curr_proxy_type)
print('Manual follow accept: Sending Accept for ' +
handle + ' follow request from ' +
approve_nickname + '@' + approve_domain)
actor_url = get_actor_from_post(follow_json)
followed_account_accepts(curr_session, base_dir,
curr_http_prefix,
nickname,
curr_domain, curr_port,
approve_nickname,
approve_domain,
approve_port,
actor_url,
federation_list,
follow_json,
send_threads, post_log,
cached_webfingers,
person_cache,
debug,
project_version, False,
signing_priv_key_pem,
domain,
onion_domain,
i2p_domain,
followers_sync_cache,
sites_unavailable,
system_language)
update_approved_followers = True update_approved_followers = True
continue except OSError as exc:
follow_json = load_json(follow_activity_filename) print('EX: manual_approve_follow_request unable to write ' +
if not follow_json: approve_follows_filename + '.new ' + str(exc))
update_approved_followers = True
continue
approve_nickname = approve_handle.split('@')[0]
approve_domain = approve_handle.split('@')[1]
approve_domain = remove_eol(approve_domain)
approve_domain = approve_domain.replace('\r', '')
approve_port = port2
if ':' in approve_domain:
approve_port = get_port_from_domain(approve_domain)
approve_domain = remove_domain_port(approve_domain)
curr_domain = domain
curr_port = port
curr_session = session
curr_http_prefix = http_prefix
curr_proxy_type = proxy_type
if onion_domain and \
not curr_domain.endswith('.onion') and \
approve_domain.endswith('.onion'):
curr_domain = onion_domain
curr_port = 80
approve_port = 80
curr_session = session_onion
curr_http_prefix = 'http'
curr_proxy_type = 'tor'
elif (i2p_domain and
not curr_domain.endswith('.i2p') and
approve_domain.endswith('.i2p')):
curr_domain = i2p_domain
curr_port = 80
approve_port = 80
curr_session = session_i2p
curr_http_prefix = 'http'
curr_proxy_type = 'i2p'
if not curr_session:
curr_session = create_session(curr_proxy_type)
print('Manual follow accept: Sending Accept for ' +
handle + ' follow request from ' +
approve_nickname + '@' + approve_domain)
actor_url = get_actor_from_post(follow_json)
followed_account_accepts(curr_session, base_dir,
curr_http_prefix,
nickname,
curr_domain, curr_port,
approve_nickname,
approve_domain,
approve_port,
actor_url,
federation_list,
follow_json,
send_threads, post_log,
cached_webfingers,
person_cache,
debug,
project_version, False,
signing_priv_key_pem,
domain,
onion_domain,
i2p_domain,
followers_sync_cache,
sites_unavailable,
system_language)
update_approved_followers = True
followers_filename = account_dir + '/followers.txt' followers_filename = account_dir + '/followers.txt'
if update_approved_followers: if update_approved_followers:

View File

@ -44,20 +44,24 @@ def _meta_data_instance_v1(show_accounts: bool,
rules_list = [] rules_list = []
rules_filename = data_dir(base_dir) + '/tos.md' rules_filename = data_dir(base_dir) + '/tos.md'
if os.path.isfile(rules_filename): if os.path.isfile(rules_filename):
with open(rules_filename, 'r', encoding='utf-8') as fp_rules: try:
rules_lines = fp_rules.readlines() with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rule_ctr = 1 rules_lines = fp_rules.readlines()
for line in rules_lines: rule_ctr = 1
line = line.strip() for line in rules_lines:
if not line: line = line.strip()
continue if not line:
if line.startswith('#'): continue
continue if line.startswith('#'):
rules_list.append({ continue
'id': str(rule_ctr), rules_list.append({
'text': line 'id': str(rule_ctr),
}) 'text': line
rule_ctr += 1 })
rule_ctr += 1
except OSError:
print('EX: _meta_data_instance_v1 unable to read ' +
rules_filename)
is_bot = False is_bot = False
is_group = False is_group = False

View File

@ -328,8 +328,11 @@ def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
decoy_seed_filename = acct_dir(base_dir, nickname, domain) + '/decoyseed' decoy_seed_filename = acct_dir(base_dir, nickname, domain) + '/decoyseed'
decoy_seed = 63725 decoy_seed = 63725
if os.path.isfile(decoy_seed_filename): if os.path.isfile(decoy_seed_filename):
with open(decoy_seed_filename, 'r', encoding='utf-8') as fp_seed: try:
decoy_seed = int(fp_seed.read()) with open(decoy_seed_filename, 'r', encoding='utf-8') as fp_seed:
decoy_seed = int(fp_seed.read())
except OSError:
print('EX: _spoof_meta_data unable to read ' + decoy_seed_filename)
else: else:
decoy_seed = randint(10000, 10000000000000000) decoy_seed = randint(10000, 10000000000000000)
try: try:
@ -337,7 +340,8 @@ def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
encoding='utf-8') as fp_seed: encoding='utf-8') as fp_seed:
fp_seed.write(str(decoy_seed)) fp_seed.write(str(decoy_seed))
except OSError: except OSError:
print('EX: unable to write ' + decoy_seed_filename) print('EX: _spoof_meta_data unable to write ' +
decoy_seed_filename)
if os.path.isfile('/usr/bin/exiftool'): if os.path.isfile('/usr/bin/exiftool'):
print('Spoofing metadata in ' + output_filename + ' using exiftool') print('Spoofing metadata in ' + output_filename + ' using exiftool')

View File

@ -36,16 +36,20 @@ def _move_following_handles_for_account(base_dir: str,
acct_dir(base_dir, nickname, domain) + '/following.txt' acct_dir(base_dir, nickname, domain) + '/following.txt'
if not os.path.isfile(following_filename): if not os.path.isfile(following_filename):
return ctr return ctr
with open(following_filename, 'r', encoding='utf-8') as fp_foll: try:
following_handles = fp_foll.readlines() with open(following_filename, 'r', encoding='utf-8') as fp_foll:
for follow_handle in following_handles: following_handles = fp_foll.readlines()
follow_handle = follow_handle.strip("\n").strip("\r") for follow_handle in following_handles:
ctr += \ follow_handle = follow_handle.strip("\n").strip("\r")
_update_moved_handle(base_dir, nickname, domain, ctr += \
follow_handle, session, _update_moved_handle(base_dir, nickname, domain,
http_prefix, cached_webfingers, follow_handle, session,
debug, signing_priv_key_pem, http_prefix, cached_webfingers,
block_federated) debug, signing_priv_key_pem,
block_federated)
except OSError:
print('EX: _move_following_handles_for_account unable to read ' +
following_filename)
return ctr return ctr
@ -135,8 +139,12 @@ def _update_moved_handle(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/following.txt' acct_dir(base_dir, nickname, domain) + '/following.txt'
if os.path.isfile(following_filename): if os.path.isfile(following_filename):
following_handles = [] following_handles = []
with open(following_filename, 'r', encoding='utf-8') as foll1: try:
following_handles = foll1.readlines() with open(following_filename, 'r', encoding='utf-8') as foll1:
following_handles = foll1.readlines()
except OSError:
print('EX: _update_moved_handle unable to read ' +
following_filename)
moved_to_handle = moved_to_nickname + '@' + moved_to_domain_full moved_to_handle = moved_to_nickname + '@' + moved_to_domain_full
handle_lower = handle.lower() handle_lower = handle.lower()

View File

@ -394,8 +394,12 @@ def _newswire_hashtag_processing(base_dir: str, post_json_object: {},
if not os.path.isfile(rules_filename): if not os.path.isfile(rules_filename):
return True return True
rules = [] rules = []
with open(rules_filename, 'r', encoding='utf-8') as fp_rules: try:
rules = fp_rules.readlines() with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rules = fp_rules.readlines()
except OSError:
print('EX: _newswire_hashtag_processing unable to read ' +
rules_filename)
domain_full = get_full_domain(domain, port) domain_full = get_full_domain(domain, port)
@ -467,35 +471,44 @@ def _create_news_mirror(base_dir: str, domain: str,
# no index for mirrors found # no index for mirrors found
return True return True
removals = [] removals = []
with open(mirror_index_filename, 'r', encoding='utf-8') as index_file: try:
# remove the oldest directories with open(mirror_index_filename, 'r',
ctr = 0 encoding='utf-8') as fp_index:
while no_of_dirs > max_mirrored_articles: # remove the oldest directories
ctr += 1 ctr = 0
if ctr > 5000: while no_of_dirs > max_mirrored_articles:
# escape valve ctr += 1
break if ctr > 5000:
# escape valve
break
post_id = index_file.readline() post_id = fp_index.readline()
if not post_id: if not post_id:
continue continue
post_id = post_id.strip() post_id = post_id.strip()
mirror_article_dir = mirror_dir + '/' + post_id mirror_article_dir = mirror_dir + '/' + post_id
if os.path.isdir(mirror_article_dir): if os.path.isdir(mirror_article_dir):
rmtree(mirror_article_dir, rmtree(mirror_article_dir,
ignore_errors=False, onexc=None) ignore_errors=False, onexc=None)
removals.append(post_id) removals.append(post_id)
no_of_dirs -= 1 no_of_dirs -= 1
except OSError as exc:
print('EX: _create_news_mirror unable to read ' +
mirror_index_filename + ' ' + str(exc))
# remove the corresponding index entries # remove the corresponding index entries
if removals: if removals:
index_content = '' index_content = ''
with open(mirror_index_filename, 'r', try:
encoding='utf-8') as index_file: with open(mirror_index_filename, 'r',
index_content = index_file.read() encoding='utf-8') as index_file:
for remove_post_id in removals: index_content = index_file.read()
index_content = \ for remove_post_id in removals:
index_content.replace(remove_post_id + '\n', '') index_content = \
index_content.replace(remove_post_id + '\n', '')
except OSError:
print('EX: _create_news_mirror unable to read ' +
mirror_index_filename)
try: try:
with open(mirror_index_filename, 'w+', with open(mirror_index_filename, 'w+',
encoding='utf-8') as index_file: encoding='utf-8') as index_file:

View File

@ -385,9 +385,14 @@ def load_hashtag_categories(base_dir: str, language: str) -> None:
if not os.path.isfile(hashtag_categories_filename): if not os.path.isfile(hashtag_categories_filename):
return return
with open(hashtag_categories_filename, 'r', encoding='utf-8') as fp_cat: try:
xml_str = fp_cat.read() with open(hashtag_categories_filename, 'r',
_xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True) encoding='utf-8') as fp_cat:
xml_str = fp_cat.read()
_xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True)
except OSError:
print('EX: load_hashtag_categories unable to read ' +
hashtag_categories_filename)
def _xml2str_to_hashtag_categories(base_dir: str, xml_str: str, def _xml2str_to_hashtag_categories(base_dir: str, xml_str: str,
@ -1618,68 +1623,73 @@ def _add_account_blogs_to_newswire(base_dir: str, nickname: str, domain: str,
if os.path.isfile(moderated_filename): if os.path.isfile(moderated_filename):
moderated = True moderated = True
with open(index_filename, 'r', encoding='utf-8') as index_file: try:
post_filename = 'start' with open(index_filename, 'r', encoding='utf-8') as index_file:
ctr = 0 post_filename = 'start'
while post_filename: ctr = 0
post_filename = index_file.readline() while post_filename:
if post_filename: post_filename = index_file.readline()
# if this is a full path then remove the directories if post_filename:
if '/' in post_filename: # if this is a full path then remove the directories
post_filename = post_filename.split('/')[-1] if '/' in post_filename:
post_filename = post_filename.split('/')[-1]
# filename of the post without any extension or path # filename of the post without any extension or path
# This should also correspond to any index entry in # This should also correspond to any index entry in
# the posts cache # the posts cache
post_url = remove_eol(post_filename) post_url = remove_eol(post_filename)
post_url = post_url.replace('.json', '').strip() post_url = post_url.replace('.json', '').strip()
# read the post from file # read the post from file
full_post_filename = \ full_post_filename = \
locate_post(base_dir, nickname, locate_post(base_dir, nickname,
domain, post_url, False) domain, post_url, False)
if not full_post_filename: if not full_post_filename:
print('Unable to locate post for newswire ' + post_url) print('Unable to locate post for newswire ' + post_url)
ctr += 1 ctr += 1
if ctr >= max_blogs_per_account: if ctr >= max_blogs_per_account:
break break
continue continue
post_json_object = None post_json_object = None
if full_post_filename: if full_post_filename:
post_json_object = load_json(full_post_filename) post_json_object = load_json(full_post_filename)
if _is_newswire_blog_post(post_json_object): if _is_newswire_blog_post(post_json_object):
published = post_json_object['object']['published'] published = post_json_object['object']['published']
published = published.replace('T', ' ') published = published.replace('T', ' ')
published = published.replace('Z', '+00:00') published = published.replace('Z', '+00:00')
votes = [] votes = []
if os.path.isfile(full_post_filename + '.votes'): if os.path.isfile(full_post_filename + '.votes'):
votes = load_json(full_post_filename + '.votes') votes = load_json(full_post_filename + '.votes')
content = \ content = \
get_base_content_from_post(post_json_object, get_base_content_from_post(post_json_object,
system_language) system_language)
description = first_paragraph_from_string(content) description = first_paragraph_from_string(content)
description = remove_html(description) description = remove_html(description)
tags_from_post = _get_hashtags_from_post(post_json_object) tags_from_post = \
summary = post_json_object['object']['summary'] _get_hashtags_from_post(post_json_object)
url_str = \ summary = post_json_object['object']['summary']
get_url_from_post(post_json_object['object']['url']) url2 = post_json_object['object']['url']
url2 = remove_html(url_str) url_str = get_url_from_post(url2)
fediverse_handle = '' url3 = remove_html(url_str)
extra_links = [] fediverse_handle = ''
_add_newswire_dict_entry(base_dir, extra_links = []
newswire, published, _add_newswire_dict_entry(base_dir,
summary, url2, newswire, published,
votes, full_post_filename, summary, url3,
description, moderated, False, votes, full_post_filename,
tags_from_post, description, moderated, False,
max_tags, session, debug, tags_from_post,
None, system_language, max_tags, session, debug,
fediverse_handle, extra_links) None, system_language,
fediverse_handle, extra_links)
ctr += 1 ctr += 1
if ctr >= max_blogs_per_account: if ctr >= max_blogs_per_account:
break break
except OSError as exc:
print('EX: _add_account_blogs_to_newswire unable to read ' +
index_filename + ' ' + str(exc))
def _add_blogs_to_newswire(base_dir: str, domain: str, newswire: {}, def _add_blogs_to_newswire(base_dir: str, domain: str, newswire: {},
@ -1755,8 +1765,12 @@ def get_dict_from_newswire(session, base_dir: str, domain: str,
# add rss feeds # add rss feeds
rss_feed = [] rss_feed = []
with open(subscriptions_filename, 'r', encoding='utf-8') as fp_sub: try:
rss_feed = fp_sub.readlines() with open(subscriptions_filename, 'r', encoding='utf-8') as fp_sub:
rss_feed = fp_sub.readlines()
except OSError:
print('EX: get_dict_from_newswire unable to read ' +
subscriptions_filename)
result = {} result = {}
for url in rss_feed: for url in rss_feed:
url = url.strip() url = url.strip()

122
person.py
View File

@ -1273,8 +1273,11 @@ def reenable_account(base_dir: str, nickname: str) -> None:
suspended_filename = data_dir(base_dir) + '/suspended.txt' suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename): if os.path.isfile(suspended_filename):
lines = [] lines = []
with open(suspended_filename, 'r', encoding='utf-8') as fp_sus: try:
lines = fp_sus.readlines() with open(suspended_filename, 'r', encoding='utf-8') as fp_sus:
lines = fp_sus.readlines()
except OSError:
print('EX: reenable_account unable to read ' + suspended_filename)
try: try:
with open(suspended_filename, 'w+', encoding='utf-8') as fp_sus: with open(suspended_filename, 'w+', encoding='utf-8') as fp_sus:
for suspended in lines: for suspended in lines:
@ -1298,8 +1301,11 @@ def suspend_account(base_dir: str, nickname: str, domain: str) -> None:
# Don't suspend moderators # Don't suspend moderators
moderators_file = data_dir(base_dir) + '/moderators.txt' moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file): if os.path.isfile(moderators_file):
with open(moderators_file, 'r', encoding='utf-8') as fp_mod: try:
lines = fp_mod.readlines() with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: suspend_account unable too read ' + moderators_file)
for moderator in lines: for moderator in lines:
if moderator.strip('\n').strip('\r') == nickname: if moderator.strip('\n').strip('\r') == nickname:
return return
@ -1319,8 +1325,11 @@ def suspend_account(base_dir: str, nickname: str, domain: str) -> None:
suspended_filename = data_dir(base_dir) + '/suspended.txt' suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename): if os.path.isfile(suspended_filename):
with open(suspended_filename, 'r', encoding='utf-8') as fp_sus: try:
lines = fp_sus.readlines() with open(suspended_filename, 'r', encoding='utf-8') as fp_sus:
lines = fp_sus.readlines()
except OSError:
print('EX: suspend_account unable to read 2 ' + suspended_filename)
for suspended in lines: for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname: if suspended.strip('\n').strip('\r') == nickname:
return return
@ -1356,8 +1365,12 @@ def can_remove_post(base_dir: str,
# is the post by a moderator? # is the post by a moderator?
moderators_file = data_dir(base_dir) + '/moderators.txt' moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file): if os.path.isfile(moderators_file):
with open(moderators_file, 'r', encoding='utf-8') as fp_mod: lines = []
lines = fp_mod.readlines() try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: can_remove_post unable to read ' + moderators_file)
for moderator in lines: for moderator in lines:
if domain_full + '/users/' + \ if domain_full + '/users/' + \
moderator.strip('\n') + '/' in post_id: moderator.strip('\n') + '/' in post_id:
@ -1389,8 +1402,12 @@ def _remove_tags_for_nickname(base_dir: str, nickname: str,
if not text_in_file(match_str, tag_filename): if not text_in_file(match_str, tag_filename):
continue continue
lines = [] lines = []
with open(tag_filename, 'r', encoding='utf-8') as fp_tag: try:
lines = fp_tag.readlines() with open(tag_filename, 'r', encoding='utf-8') as fp_tag:
lines = fp_tag.readlines()
except OSError:
print('EX: _remove_tags_for_nickname unable to read ' +
tag_filename)
try: try:
with open(tag_filename, 'w+', encoding='utf-8') as tag_file: with open(tag_filename, 'w+', encoding='utf-8') as tag_file:
for tagline in lines: for tagline in lines:
@ -1415,8 +1432,12 @@ def remove_account(base_dir: str, nickname: str,
# Don't remove moderators # Don't remove moderators
moderators_file = data_dir(base_dir) + '/moderators.txt' moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file): if os.path.isfile(moderators_file):
with open(moderators_file, 'r', encoding='utf-8') as fp_mod: lines = []
lines = fp_mod.readlines() try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: remove_account unable to read ' + moderators_file)
for moderator in lines: for moderator in lines:
if moderator.strip('\n') == nickname: if moderator.strip('\n') == nickname:
return False return False
@ -1542,26 +1563,32 @@ def is_person_snoozed(base_dir: str, nickname: str, domain: str,
return False return False
# remove the snooze entry if it has timed out # remove the snooze entry if it has timed out
replace_str = None replace_str = None
with open(snoozed_filename, 'r', encoding='utf-8') as snoozed_file: try:
for line in snoozed_file: with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed:
# is this the entry for the actor? for line in fp_snoozed:
if line.startswith(snooze_actor + ' '): # is this the entry for the actor?
snoozed_time_str1 = line.split(' ')[1] if line.startswith(snooze_actor + ' '):
snoozed_time_str = remove_eol(snoozed_time_str1) snoozed_time_str1 = line.split(' ')[1]
# is there a time appended? snoozed_time_str = remove_eol(snoozed_time_str1)
if snoozed_time_str.isdigit(): # is there a time appended?
snoozed_time = int(snoozed_time_str) if snoozed_time_str.isdigit():
curr_time = int(time.time()) snoozed_time = int(snoozed_time_str)
# has the snooze timed out? curr_time = int(time.time())
if int(curr_time - snoozed_time) > 60 * 60 * 24: # has the snooze timed out?
if int(curr_time - snoozed_time) > 60 * 60 * 24:
replace_str = line
else:
replace_str = line replace_str = line
else: break
replace_str = line except OSError:
break print('EX: is_person_snoozed unable to read ' + snoozed_filename)
if replace_str: if replace_str:
content = None content = None
with open(snoozed_filename, 'r', encoding='utf-8') as snoozed_file: try:
content = snoozed_file.read().replace(replace_str, '') with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed:
content = fp_snoozed.read().replace(replace_str, '')
except OSError:
print('EX: is_person_snoozed unable to read 2 ' + snoozed_filename)
if content: if content:
try: try:
with open(snoozed_filename, 'w+', with open(snoozed_filename, 'w+',
@ -1610,15 +1637,21 @@ def person_unsnooze(base_dir: str, nickname: str, domain: str,
if not text_in_file(snooze_actor + ' ', snoozed_filename): if not text_in_file(snooze_actor + ' ', snoozed_filename):
return return
replace_str = None replace_str = None
with open(snoozed_filename, 'r', encoding='utf-8') as snoozed_file: try:
for line in snoozed_file: with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed:
if line.startswith(snooze_actor + ' '): for line in fp_snoozed:
replace_str = line if line.startswith(snooze_actor + ' '):
break replace_str = line
break
except OSError:
print('EX: person_unsnooze unable to read ' + snoozed_filename)
if replace_str: if replace_str:
content = None content = None
with open(snoozed_filename, 'r', encoding='utf-8') as snoozed_file: try:
content = snoozed_file.read().replace(replace_str, '') with open(snoozed_filename, 'r', encoding='utf-8') as fp_snoozed:
content = fp_snoozed.read().replace(replace_str, '')
except OSError:
print('EX: person_unsnooze unable to read 2 ' + snoozed_filename)
if content is not None: if content is not None:
try: try:
with open(snoozed_filename, 'w+', with open(snoozed_filename, 'w+',
@ -1658,9 +1691,13 @@ def get_person_notes(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + \ acct_dir(base_dir, nickname, domain) + \
'/notes/' + handle + '.txt' '/notes/' + handle + '.txt'
if os.path.isfile(person_notes_filename): if os.path.isfile(person_notes_filename):
with open(person_notes_filename, 'r', try:
encoding='utf-8') as fp_notes: with open(person_notes_filename, 'r',
person_notes = fp_notes.read() encoding='utf-8') as fp_notes:
person_notes = fp_notes.read()
except OSError:
print('EX: get_person_notes unable to read ' +
person_notes_filename)
return person_notes return person_notes
@ -1907,8 +1944,11 @@ def get_person_avatar_url(base_dir: str, person_url: str,
if ext != 'svg': if ext != 'svg':
return im_path return im_path
content = '' content = ''
with open(im_filename, 'r', encoding='utf-8') as fp_im: try:
content = fp_im.read() with open(im_filename, 'r', encoding='utf-8') as fp_im:
content = fp_im.read()
except OSError:
print('EX: get_person_avatar_url unable to read ' + im_filename)
if not dangerous_svg(content, False): if not dangerous_svg(content, False):
return im_path return im_path

540
posts.py
View File

@ -168,18 +168,23 @@ def is_moderator(base_dir: str, nickname: str) -> bool:
return True return True
return False return False
with open(moderators_file, 'r', encoding='utf-8') as fp_mod: lines = []
lines = fp_mod.readlines() try:
if len(lines) == 0: with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
admin_name = get_config_param(base_dir, 'admin') lines = fp_mod.readlines()
if not admin_name: except OSError:
return False print('EX: is_moderator unable to read ' + moderators_file)
if admin_name == nickname:
return True if len(lines) == 0:
for moderator in lines: admin_name = get_config_param(base_dir, 'admin')
moderator = moderator.strip('\n').strip('\r') if not admin_name:
if moderator == nickname: return False
return True if admin_name == nickname:
return True
for moderator in lines:
moderator = moderator.strip('\n').strip('\r')
if moderator == nickname:
return True
return False return False
@ -193,13 +198,16 @@ def no_of_followers_on_domain(base_dir: str, handle: str,
return 0 return 0
ctr = 0 ctr = 0
with open(filename, 'r', encoding='utf-8') as followers_file: try:
for follower_handle in followers_file: with open(filename, 'r', encoding='utf-8') as followers_file:
if '@' in follower_handle: for follower_handle in followers_file:
follower_domain = follower_handle.split('@')[1] if '@' in follower_handle:
follower_domain = remove_eol(follower_domain) follower_domain = follower_handle.split('@')[1]
if domain == follower_domain: follower_domain = remove_eol(follower_domain)
ctr += 1 if domain == follower_domain:
ctr += 1
except OSError:
print('EX: no_of_followers_on_domain unable to read ' + filename)
return ctr return ctr
@ -1991,8 +1999,12 @@ def get_pinned_post_as_json(base_dir: str, http_prefix: str,
actor = local_actor_url(http_prefix, nickname, domain_full) actor = local_actor_url(http_prefix, nickname, domain_full)
if os.path.isfile(pinned_filename): if os.path.isfile(pinned_filename):
pinned_content = None pinned_content = None
with open(pinned_filename, 'r', encoding='utf-8') as pin_file: try:
pinned_content = pin_file.read() with open(pinned_filename, 'r', encoding='utf-8') as fp_pin:
pinned_content = fp_pin.read()
except OSError:
print('EX: get_pinned_post_as_json unable to read ' +
pinned_filename)
if pinned_content: if pinned_content:
pinned_post_json = { pinned_post_json = {
'atomUri': actor + '/pinned', 'atomUri': actor + '/pinned',
@ -2214,23 +2226,28 @@ def _append_citations_to_blog_post(base_dir: str,
if not os.path.isfile(citations_filename): if not os.path.isfile(citations_filename):
return return
citations_separator = '#####' citations_separator = '#####'
with open(citations_filename, 'r', encoding='utf-8') as fp_cit: citations = []
citations = fp_cit.readlines() try:
for line in citations: with open(citations_filename, 'r', encoding='utf-8') as fp_cit:
if citations_separator not in line: citations = fp_cit.readlines()
continue except OSError:
sections = line.strip().split(citations_separator) print('EX: _append_citations_to_blog_post unable to read ' +
if len(sections) != 3: citations_filename)
continue for line in citations:
# date_str = sections[0] if citations_separator not in line:
title = sections[1] continue
link = sections[2] sections = line.strip().split(citations_separator)
tag_json = { if len(sections) != 3:
"type": "Article", continue
"name": title, # date_str = sections[0]
"url": link title = sections[1]
} link = sections[2]
blog_json['object']['tag'].append(tag_json) tag_json = {
"type": "Article",
"name": title,
"url": link
}
blog_json['object']['tag'].append(tag_json)
def create_blog_post(base_dir: str, def create_blog_post(base_dir: str,
@ -2634,36 +2651,39 @@ def create_report_post(base_dir: str,
moderators_list = [] moderators_list = []
moderators_file = data_dir(base_dir) + '/moderators.txt' moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file): if os.path.isfile(moderators_file):
with open(moderators_file, 'r', encoding='utf-8') as fp_mod: try:
for line in fp_mod: with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
line = line.strip('\n').strip('\r') for line in fp_mod:
if line.startswith('#'): line = line.strip('\n').strip('\r')
continue if line.startswith('#'):
if line.startswith('/users/'): continue
line = line.replace('users', '') if line.startswith('/users/'):
if line.startswith('@'): line = line.replace('users', '')
line = line[1:] if line.startswith('@'):
if '@' in line: line = line[1:]
nick = line.split('@')[0] if '@' in line:
moderator_actor = \ nick = line.split('@')[0]
local_actor_url(http_prefix, nick, domain_full)
if moderator_actor not in moderators_list:
moderators_list.append(moderator_actor)
continue
if line.startswith('http') or \
line.startswith('ipfs') or \
line.startswith('ipns') or \
line.startswith('hyper'):
# must be a local address - no remote moderators
if '://' + domain_full + '/' in line:
if line not in moderators_list:
moderators_list.append(line)
else:
if '/' not in line:
moderator_actor = \ moderator_actor = \
local_actor_url(http_prefix, line, domain_full) local_actor_url(http_prefix, nick, domain_full)
if moderator_actor not in moderators_list: if moderator_actor not in moderators_list:
moderators_list.append(moderator_actor) moderators_list.append(moderator_actor)
continue
if line.startswith('http') or \
line.startswith('ipfs') or \
line.startswith('ipns') or \
line.startswith('hyper'):
# must be a local address - no remote moderators
if '://' + domain_full + '/' in line:
if line not in moderators_list:
moderators_list.append(line)
else:
if '/' not in line:
moderator_actor = \
local_actor_url(http_prefix, line, domain_full)
if moderator_actor not in moderators_list:
moderators_list.append(moderator_actor)
except OSError:
print('EX: create_report_post unable to read ' + moderators_file)
if len(moderators_list) == 0: if len(moderators_list) == 0:
# if there are no moderators then the admin becomes the moderator # if there are no moderators then the admin becomes the moderator
admin_nickname = get_config_param(base_dir, 'admin') admin_nickname = get_config_param(base_dir, 'admin')
@ -3305,17 +3325,21 @@ def group_followers_by_domain(base_dir: str, nickname: str, domain: str) -> {}:
if not os.path.isfile(followers_filename): if not os.path.isfile(followers_filename):
return None return None
grouped = {} grouped = {}
with open(followers_filename, 'r', encoding='utf-8') as foll_file: try:
for follower_handle in foll_file: with open(followers_filename, 'r', encoding='utf-8') as fp_foll:
if '@' not in follower_handle: for follower_handle in fp_foll:
continue if '@' not in follower_handle:
fhandle1 = follower_handle.strip() continue
fhandle = remove_eol(fhandle1) fhandle1 = follower_handle.strip()
follower_domain = fhandle.split('@')[1] fhandle = remove_eol(fhandle1)
if not grouped.get(follower_domain): follower_domain = fhandle.split('@')[1]
grouped[follower_domain] = [fhandle] if not grouped.get(follower_domain):
else: grouped[follower_domain] = [fhandle]
grouped[follower_domain].append(fhandle) else:
grouped[follower_domain].append(fhandle)
except OSError:
print('EX: group_followers_by_domain unable to read ' +
followers_filename)
return grouped return grouped
@ -4339,6 +4363,8 @@ def create_outbox(base_dir: str, nickname: str, domain: str,
def create_moderation(base_dir: str, nickname: str, domain: str, port: int, def create_moderation(base_dir: str, nickname: str, domain: str, port: int,
http_prefix: str, items_per_page: int, header_only: bool, http_prefix: str, items_per_page: int, header_only: bool,
page_number: int) -> {}: page_number: int) -> {}:
"""
"""
box_dir = create_person_dir(nickname, domain, base_dir, 'inbox') box_dir = create_person_dir(nickname, domain, base_dir, 'inbox')
boxname = 'moderation' boxname = 'moderation'
@ -4369,9 +4395,14 @@ def create_moderation(base_dir: str, nickname: str, domain: str, port: int,
if is_moderator(base_dir, nickname): if is_moderator(base_dir, nickname):
moderation_index_file = data_dir(base_dir) + '/moderation.txt' moderation_index_file = data_dir(base_dir) + '/moderation.txt'
if os.path.isfile(moderation_index_file): if os.path.isfile(moderation_index_file):
with open(moderation_index_file, 'r', lines = []
encoding='utf-8') as index_file: try:
lines = index_file.readlines() with open(moderation_index_file, 'r',
encoding='utf-8') as index_file:
lines = index_file.readlines()
except OSError:
print('EX: create_moderation unable to read ' +
moderation_index_file)
box_header['totalItems'] = len(lines) box_header['totalItems'] = len(lines)
if header_only: if header_only:
return box_header return box_header
@ -4499,23 +4530,29 @@ def _add_post_to_timeline(file_path: str, boxname: str,
posts_in_box: [], box_actor: str) -> bool: posts_in_box: [], box_actor: str) -> bool:
""" Reads a post from file and decides whether it is valid """ Reads a post from file and decides whether it is valid
""" """
with open(file_path, 'r', encoding='utf-8') as post_file: post_str = ''
post_str = post_file.read() try:
with open(file_path, 'r', encoding='utf-8') as fp_post:
post_str = fp_post.read()
except OSError:
print('EX: _add_post_to_timeline unable to read ' + file_path)
if file_path.endswith('.json'): if not post_str:
replies_filename = file_path.replace('.json', '.replies') return False
if os.path.isfile(replies_filename):
# append a replies identifier, which will later be removed
post_str += '<hasReplies>'
mitm_filename = file_path.replace('.json', '.mitm') if file_path.endswith('.json'):
if os.path.isfile(mitm_filename): replies_filename = file_path.replace('.json', '.replies')
# append a mitm identifier, which will later be removed if os.path.isfile(replies_filename):
post_str += '<postmitm>' # append a replies identifier, which will later be removed
post_str += '<hasReplies>'
return _add_post_string_to_timeline(post_str, boxname, posts_in_box, mitm_filename = file_path.replace('.json', '.mitm')
box_actor) if os.path.isfile(mitm_filename):
return False # append a mitm identifier, which will later be removed
post_str += '<postmitm>'
return _add_post_string_to_timeline(post_str, boxname,
posts_in_box, box_actor)
def remove_post_interactions(post_json_object: {}, force: bool) -> bool: def remove_post_interactions(post_json_object: {}, force: bool) -> bool:
@ -4641,111 +4678,115 @@ def _create_box_items(base_dir: str,
first_post_id = first_post_id.replace('--', '#') first_post_id = first_post_id.replace('--', '#')
first_post_id = first_post_id.replace('/', '#') first_post_id = first_post_id.replace('/', '#')
with open(index_filename, 'r', encoding='utf-8') as index_file: try:
posts_added_to_timeline = 0 with open(index_filename, 'r', encoding='utf-8') as index_file:
while posts_added_to_timeline < items_per_page: posts_added_to_timeline = 0
post_filename = index_file.readline() while posts_added_to_timeline < items_per_page:
post_filename = index_file.readline()
if not post_filename: if not post_filename:
break break
# if a first post is specified then wait until it is found # if a first post is specified then wait until it is found
# before starting to generate the timeline # before starting to generate the timeline
if first_post_id and total_posts_count == 0: if first_post_id and total_posts_count == 0:
if first_post_id not in post_filename: if first_post_id not in post_filename:
continue continue
total_posts_count = \ total_posts_count = \
int((page_number - 1) * items_per_page) int((page_number - 1) * items_per_page)
# Has this post passed through the newswire voting stage? # Has this post passed through the newswire voting stage?
if not _passed_newswire_voting(newswire_votes_threshold, if not _passed_newswire_voting(newswire_votes_threshold,
base_dir, domain, base_dir, domain,
post_filename, post_filename,
positive_voting, positive_voting,
voting_time_mins): voting_time_mins):
continue
# Skip through any posts previous to the current page
if not first_post_id:
if total_posts_count < \
int((page_number - 1) * items_per_page):
total_posts_count += 1
continue continue
# if this is a full path then remove the directories # Skip through any posts previous to the current page
if '/' in post_filename: if not first_post_id:
post_filename = post_filename.split('/')[-1] if total_posts_count < \
int((page_number - 1) * items_per_page):
total_posts_count += 1
continue
# filename of the post without any extension or path # if this is a full path then remove the directories
# This should also correspond to any index entry in if '/' in post_filename:
# the posts cache post_filename = post_filename.split('/')[-1]
post_url = remove_eol(post_filename)
post_url = post_url.replace('.json', '').strip()
# is this a duplicate? # filename of the post without any extension or path
if post_url in post_urls_in_box: # This should also correspond to any index entry in
continue # the posts cache
post_url = remove_eol(post_filename)
post_url = post_url.replace('.json', '').strip()
# is the post cached in memory? # is this a duplicate?
if recent_posts_cache.get('index'): if post_url in post_urls_in_box:
if post_url in recent_posts_cache['index']:
if recent_posts_cache['json'].get(post_url):
url = recent_posts_cache['json'][post_url]
if _add_post_string_to_timeline(url,
boxname,
posts_in_box,
box_actor):
total_posts_count += 1
posts_added_to_timeline += 1
post_urls_in_box.append(post_url)
continue
print('Post not added to timeline')
# read the post from file
full_post_filename = \
locate_post(base_dir, nickname,
original_domain, post_url, False)
if full_post_filename:
# has the post been rejected?
if os.path.isfile(full_post_filename + '.reject'):
continue continue
if _add_post_to_timeline(full_post_filename, boxname, # is the post cached in memory?
posts_in_box, box_actor): if recent_posts_cache.get('index'):
posts_added_to_timeline += 1 if post_url in recent_posts_cache['index']:
total_posts_count += 1 if recent_posts_cache['json'].get(post_url):
post_urls_in_box.append(post_url) url = recent_posts_cache['json'][post_url]
if _add_post_string_to_timeline(url,
boxname,
posts_in_box,
box_actor):
total_posts_count += 1
posts_added_to_timeline += 1
post_urls_in_box.append(post_url)
continue
print('Post not added to timeline')
# read the post from file
full_post_filename = \
locate_post(base_dir, nickname,
original_domain, post_url, False)
if full_post_filename:
# has the post been rejected?
if os.path.isfile(full_post_filename + '.reject'):
continue
if _add_post_to_timeline(full_post_filename, boxname,
posts_in_box, box_actor):
posts_added_to_timeline += 1
total_posts_count += 1
post_urls_in_box.append(post_url)
else:
print('WARN: Unable to add post ' + post_url +
' nickname ' + nickname +
' timeline ' + boxname)
else: else:
print('WARN: Unable to add post ' + post_url + if timeline_nickname != nickname:
' nickname ' + nickname + # if this is the features timeline
' timeline ' + boxname) full_post_filename = \
else: locate_post(base_dir, timeline_nickname,
if timeline_nickname != nickname: original_domain, post_url, False)
# if this is the features timeline if full_post_filename:
full_post_filename = \ if _add_post_to_timeline(full_post_filename,
locate_post(base_dir, timeline_nickname, boxname,
original_domain, post_url, False) posts_in_box, box_actor):
if full_post_filename: posts_added_to_timeline += 1
if _add_post_to_timeline(full_post_filename, total_posts_count += 1
boxname, post_urls_in_box.append(post_url)
posts_in_box, box_actor): else:
posts_added_to_timeline += 1 print('WARN: Unable to add features post ' +
total_posts_count += 1 post_url + ' nickname ' + nickname +
post_urls_in_box.append(post_url) ' timeline ' + boxname)
else: else:
print('WARN: Unable to add features post ' + print('WARN: features timeline. ' +
post_url + ' nickname ' + nickname + 'Unable to locate post ' + post_url)
' timeline ' + boxname)
else: else:
print('WARN: features timeline. ' + if timeline_nickname == 'news':
'Unable to locate post ' + post_url) print('WARN: Unable to locate news post ' +
else: post_url + ' nickname ' + nickname)
if timeline_nickname == 'news': else:
print('WARN: Unable to locate news post ' + print('WARN: Unable to locate post ' + post_url +
post_url + ' nickname ' + nickname) ' nickname ' + nickname)
else: except OSError as exc:
print('WARN: Unable to locate post ' + post_url + print('EX: _create_box_items unable to read ' + index_filename +
' nickname ' + nickname) ' ' + str(exc))
return total_posts_count, posts_added_to_timeline return total_posts_count, posts_added_to_timeline
@ -5732,8 +5773,12 @@ def get_public_post_domains_blocked(session, base_dir: str,
# read the blocked domains as a single string # read the blocked domains as a single string
blocked_str = '' blocked_str = ''
with open(blocking_filename, 'r', encoding='utf-8') as fp_block: try:
blocked_str = fp_block.read() with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
blocked_str = fp_block.read()
except OSError:
print('EX: get_public_post_domains_blocked unable to read ' +
blocking_filename)
blocked_domains = [] blocked_domains = []
for domain_name in post_domains: for domain_name in post_domains:
@ -5784,9 +5829,13 @@ def check_domains(session, base_dir: str,
update_follower_warnings = False update_follower_warnings = False
follower_warning_str = '' follower_warning_str = ''
if os.path.isfile(follower_warning_filename): if os.path.isfile(follower_warning_filename):
with open(follower_warning_filename, 'r', try:
encoding='utf-8') as fp_warn: with open(follower_warning_filename, 'r',
follower_warning_str = fp_warn.read() encoding='utf-8') as fp_warn:
follower_warning_str = fp_warn.read()
except OSError:
print('EX: check_domains unable to read ' +
follower_warning_filename)
if single_check: if single_check:
# checks a single random non-mutual # checks a single random non-mutual
@ -5852,61 +5901,66 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str,
pub_str = 'https://www.w3.org/ns/activitystreams#Public' pub_str = 'https://www.w3.org/ns/activitystreams#Public'
# populate the items list with replies # populate the items list with replies
replies_boxes = ('outbox', 'inbox') replies_boxes = ('outbox', 'inbox')
with open(post_replies_filename, 'r', encoding='utf-8') as replies_file: try:
for message_id in replies_file: with open(post_replies_filename, 'r',
reply_found = False encoding='utf-8') as replies_file:
# examine inbox and outbox for message_id in replies_file:
for boxname in replies_boxes: reply_found = False
message_id2 = remove_eol(message_id) # examine inbox and outbox
search_filename = \ for boxname in replies_boxes:
acct_dir(base_dir, nickname, domain) + '/' + \ message_id2 = remove_eol(message_id)
boxname + '/' + \ search_filename = \
message_id2.replace('/', '#') + '.json' acct_dir(base_dir, nickname, domain) + '/' + \
if os.path.isfile(search_filename): boxname + '/' + \
if authorized or \ message_id2.replace('/', '#') + '.json'
text_in_file(pub_str, search_filename): if os.path.isfile(search_filename):
post_json_object = load_json(search_filename) if authorized or \
if post_json_object: text_in_file(pub_str, search_filename):
if post_json_object['object'].get('cc'): post_json_object = load_json(search_filename)
if post_json_object:
pjo = post_json_object pjo = post_json_object
if (authorized or ordered_items = replies_json['orderedItems']
(pub_str in pjo['object']['to'] or if pjo['object'].get('cc'):
pub_str in pjo['object']['cc'])): if (authorized or
replies_json['orderedItems'].append(pjo) (pub_str in pjo['object']['to'] or
reply_found = True pub_str in pjo['object']['cc'])):
else: ordered_items.append(pjo)
if authorized or \ reply_found = True
pub_str in post_json_object['object']['to']: else:
pjo = post_json_object if authorized or \
replies_json['orderedItems'].append(pjo) pub_str in pjo['object']['to']:
reply_found = True ordered_items.append(pjo)
break reply_found = True
# if not in either inbox or outbox then examine the shared inbox break
if not reply_found: # if not in either inbox or outbox then examine the
message_id2 = remove_eol(message_id) # shared inbox
search_filename = \ if not reply_found:
data_dir(base_dir) + '/inbox@' + \ message_id2 = remove_eol(message_id)
domain + '/inbox/' + \ search_filename = \
message_id2.replace('/', '#') + '.json' data_dir(base_dir) + '/inbox@' + \
if os.path.isfile(search_filename): domain + '/inbox/' + \
if authorized or \ message_id2.replace('/', '#') + '.json'
text_in_file(pub_str, search_filename): if os.path.isfile(search_filename):
# get the json of the reply and append it to if authorized or \
# the collection text_in_file(pub_str, search_filename):
post_json_object = load_json(search_filename) # get the json of the reply and append it to
if post_json_object: # the collection
if post_json_object['object'].get('cc'): post_json_object = load_json(search_filename)
if post_json_object:
pjo = post_json_object pjo = post_json_object
if (authorized or ordered_items = replies_json['orderedItems']
(pub_str in pjo['object']['to'] or if pjo['object'].get('cc'):
pub_str in pjo['object']['cc'])): if (authorized or
pjo = post_json_object (pub_str in pjo['object']['to'] or
replies_json['orderedItems'].append(pjo) pub_str in pjo['object']['cc'])):
else: ordered_items.append(pjo)
if authorized or \ else:
pub_str in post_json_object['object']['to']: if authorized or \
pjo = post_json_object pub_str in pjo['object']['to']:
replies_json['orderedItems'].append(pjo) ordered_items.append(pjo)
except OSError:
print('EX: populate_replies_json unable to read ' +
post_replies_filename)
def _reject_announce(announce_filename: str, def _reject_announce(announce_filename: str,

View File

@ -145,33 +145,39 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
print('EX: unable to append to voters file ' + voters_filename) print('EX: unable to append to voters file ' + voters_filename)
else: else:
# change an entry in the voters file # change an entry in the voters file
with open(voters_filename, 'r', lines = []
encoding='utf-8') as voters_file: try:
lines = voters_file.readlines() with open(voters_filename, 'r',
newlines = [] encoding='utf-8') as voters_file:
save_voters_file = False lines = voters_file.readlines()
for vote_line in lines: except OSError:
if vote_line.startswith(actor_url + print('EX: question_update_votes unable to read ' +
voters_file_separator): voters_filename)
new_vote_line = actor_url + \
voters_file_separator + reply_vote + '\n' newlines = []
if vote_line == new_vote_line: save_voters_file = False
break for vote_line in lines:
save_voters_file = True if vote_line.startswith(actor_url +
newlines.append(new_vote_line) voters_file_separator):
else: new_vote_line = actor_url + \
newlines.append(vote_line) voters_file_separator + reply_vote + '\n'
if save_voters_file: if vote_line == new_vote_line:
try: break
with open(voters_filename, 'w+', save_voters_file = True
encoding='utf-8') as voters_file: newlines.append(new_vote_line)
for vote_line in newlines:
voters_file.write(vote_line)
except OSError:
print('EX: unable to write voters file2 ' +
voters_filename)
else: else:
return None, None newlines.append(vote_line)
if save_voters_file:
try:
with open(voters_filename, 'w+',
encoding='utf-8') as voters_file:
for vote_line in newlines:
voters_file.write(vote_line)
except OSError:
print('EX: unable to write voters file2 ' +
voters_filename)
else:
return None, None
# update the vote counts # update the vote counts
question_totals_changed = False question_totals_changed = False
@ -179,12 +185,17 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
if not possible_answer.get('name'): if not possible_answer.get('name'):
continue continue
total_items = 0 total_items = 0
with open(voters_filename, 'r', encoding='utf-8') as voters_file: lines = []
lines = voters_file.readlines() try:
for vote_line in lines: with open(voters_filename, 'r', encoding='utf-8') as fp_voters:
if vote_line.endswith(voters_file_separator + lines = fp_voters.readlines()
possible_answer['name'] + '\n'): except OSError:
total_items += 1 print('EX: question_update_votes unable to read ' +
voters_filename)
for vote_line in lines:
if vote_line.endswith(voters_file_separator +
possible_answer['name'] + '\n'):
total_items += 1
if possible_answer['replies']['totalItems'] != total_items: if possible_answer['replies']['totalItems'] != total_items:
possible_answer['replies']['totalItems'] = total_items possible_answer['replies']['totalItems'] = total_items
question_totals_changed = True question_totals_changed = True

View File

@ -472,7 +472,8 @@ def _update_common_reactions(base_dir: str, emoji_content: str) -> None:
encoding='utf-8') as fp_react: encoding='utf-8') as fp_react:
common_reactions = fp_react.readlines() common_reactions = fp_react.readlines()
except OSError: except OSError:
print('EX: unable to load common reactions file') print('EX: unable to load common reactions file' +
common_reactions_filename)
if common_reactions: if common_reactions:
new_common_reactions = [] new_common_reactions = []
reaction_found = False reaction_found = False

View File

@ -283,19 +283,23 @@ def is_devops(base_dir: str, nickname: str) -> bool:
return True return True
return False return False
with open(devops_file, 'r', encoding='utf-8') as fp_mod: lines = []
lines = fp_mod.readlines() try:
if len(lines) == 0: with open(devops_file, 'r', encoding='utf-8') as fp_mod:
# if there is nothing in the file lines = fp_mod.readlines()
admin_name = get_config_param(base_dir, 'admin') except OSError:
if not admin_name: print('EX: is_devops unable to read ' + devops_file)
return False if len(lines) == 0:
if admin_name == nickname: # if there is nothing in the file
return True admin_name = get_config_param(base_dir, 'admin')
for devops in lines: if not admin_name:
devops = devops.strip('\n').strip('\r') return False
if devops == nickname: if admin_name == nickname:
return True return True
for devops in lines:
devops = devops.strip('\n').strip('\r')
if devops == nickname:
return True
return False return False

View File

@ -1770,11 +1770,15 @@ def _generate_next_shares_token_update(base_dir: str,
token_update_filename = token_update_dir + '/.tokenUpdate' token_update_filename = token_update_dir + '/.tokenUpdate'
next_update_sec = None next_update_sec = None
if os.path.isfile(token_update_filename): if os.path.isfile(token_update_filename):
with open(token_update_filename, 'r', encoding='utf-8') as fp_tok: try:
next_update_str = fp_tok.read() with open(token_update_filename, 'r', encoding='utf-8') as fp_tok:
if next_update_str: next_update_str = fp_tok.read()
if next_update_str.isdigit(): if next_update_str:
next_update_sec = int(next_update_str) if next_update_str.isdigit():
next_update_sec = int(next_update_str)
except OSError:
print('EX: _generate_next_shares_token_update unable to read ' +
token_update_filename)
curr_time = int(time.time()) curr_time = int(time.time())
updated = False updated = False
if next_update_sec: if next_update_sec:
@ -1818,11 +1822,15 @@ def _regenerate_shares_token(base_dir: str, domain_full: str,
if not os.path.isfile(token_update_filename): if not os.path.isfile(token_update_filename):
return return
next_update_sec = None next_update_sec = None
with open(token_update_filename, 'r', encoding='utf-8') as fp_tok: try:
next_update_str = fp_tok.read() with open(token_update_filename, 'r', encoding='utf-8') as fp_tok:
if next_update_str: next_update_str = fp_tok.read()
if next_update_str.isdigit(): if next_update_str:
next_update_sec = int(next_update_str) if next_update_str.isdigit():
next_update_sec = int(next_update_str)
except OSError:
print('EX: _regenerate_shares_token unable to read ' +
token_update_filename)
if not next_update_sec: if not next_update_sec:
return return
curr_time = int(time.time()) curr_time = int(time.time())

View File

@ -180,5 +180,6 @@ def load_unavailable_sites(base_dir: str) -> []:
encoding='utf-8') as fp_sites: encoding='utf-8') as fp_sites:
sites_unavailable = fp_sites.read().split('\n') sites_unavailable = fp_sites.read().split('\n')
except OSError: except OSError:
print('EX: unable to save unavailable sites') print('EX: unable to read unavailable sites ' +
unavailable_sites_filename)
return sites_unavailable return sites_unavailable

View File

@ -150,24 +150,28 @@ def _speaker_pronounce(base_dir: str, say_text: str, translate: {}) -> str:
")": "," ")": ","
} }
if os.path.isfile(pronounce_filename): if os.path.isfile(pronounce_filename):
with open(pronounce_filename, 'r', encoding='utf-8') as fp_pro: try:
pronounce_list = fp_pro.readlines() with open(pronounce_filename, 'r', encoding='utf-8') as fp_pro:
for conversion in pronounce_list: pronounce_list = fp_pro.readlines()
separator = None for conversion in pronounce_list:
if '->' in conversion: separator = None
separator = '->' if '->' in conversion:
elif ';' in conversion: separator = '->'
separator = ';' elif ';' in conversion:
elif ':' in conversion: separator = ';'
separator = ':' elif ':' in conversion:
elif ',' in conversion: separator = ':'
separator = ',' elif ',' in conversion:
if not separator: separator = ','
continue if not separator:
continue
text = conversion.split(separator)[0].strip() text = conversion.split(separator)[0].strip()
converted = conversion.split(separator)[1].strip() converted = conversion.split(separator)[1].strip()
convert_dict[text] = converted convert_dict[text] = converted
except OSError:
print('EX: _speaker_pronounce unable to read ' +
pronounce_filename)
for text, converted in convert_dict.items(): for text, converted in convert_dict.items():
if text in say_text: if text in say_text:
say_text = say_text.replace(text, converted) say_text = say_text.replace(text, converted)
@ -528,13 +532,18 @@ def _post_to_speaker_json(base_dir: str, http_prefix: str,
accounts_dir = acct_dir(base_dir, nickname, domain_full) accounts_dir = acct_dir(base_dir, nickname, domain_full)
approve_follows_filename = accounts_dir + '/followrequests.txt' approve_follows_filename = accounts_dir + '/followrequests.txt'
if os.path.isfile(approve_follows_filename): if os.path.isfile(approve_follows_filename):
with open(approve_follows_filename, 'r', encoding='utf-8') as fp_foll: try:
follows = fp_foll.readlines() with open(approve_follows_filename, 'r',
if len(follows) > 0: encoding='utf-8') as fp_foll:
follow_requests_exist = True follows = fp_foll.readlines()
for i, _ in enumerate(follows): if len(follows) > 0:
follows[i] = follows[i].strip() follow_requests_exist = True
follow_requests_list = follows for i, _ in enumerate(follows):
follows[i] = follows[i].strip()
follow_requests_list = follows
except OSError:
print('EX: _post_to_speaker_json unable to read ' +
approve_follows_filename)
post_dm = False post_dm = False
dm_filename = accounts_dir + '/.newDM' dm_filename = accounts_dir + '/.newDM'
if os.path.isfile(dm_filename): if os.path.isfile(dm_filename):
@ -546,8 +555,12 @@ def _post_to_speaker_json(base_dir: str, http_prefix: str,
liked_by = '' liked_by = ''
like_filename = accounts_dir + '/.newLike' like_filename = accounts_dir + '/.newLike'
if os.path.isfile(like_filename): if os.path.isfile(like_filename):
with open(like_filename, 'r', encoding='utf-8') as fp_like: try:
liked_by = fp_like.read() with open(like_filename, 'r', encoding='utf-8') as fp_like:
liked_by = fp_like.read()
except OSError:
print('EX: _post_to_speaker_json unable to read 2 ' +
like_filename)
calendar_filename = accounts_dir + '/.newCalendar' calendar_filename = accounts_dir + '/.newCalendar'
post_cal = os.path.isfile(calendar_filename) post_cal = os.path.isfile(calendar_filename)
share_filename = accounts_dir + '/.newShare' share_filename = accounts_dir + '/.newShare'

View File

@ -46,25 +46,29 @@ def import_theme(base_dir: str, filename: str) -> bool:
' missing from imported theme') ' missing from imported theme')
return False return False
new_theme_name = None new_theme_name = None
with open(temp_theme_dir + '/name.txt', 'r', try:
encoding='utf-8') as fp_theme: with open(temp_theme_dir + '/name.txt', 'r',
new_theme_name1 = fp_theme.read() encoding='utf-8') as fp_theme:
new_theme_name = remove_eol(new_theme_name1) new_theme_name1 = fp_theme.read()
if len(new_theme_name) > 20: new_theme_name = remove_eol(new_theme_name1)
print('WARN: Imported theme name is too long') if len(new_theme_name) > 20:
return False print('WARN: Imported theme name is too long')
if len(new_theme_name) < 2:
print('WARN: Imported theme name is too short')
return False
new_theme_name = new_theme_name.lower()
forbidden_chars = (
' ', ';', '/', '\\', '?', '!', '#', '@',
':', '%', '&', '"', '+', '<', '>', '$'
)
for char in forbidden_chars:
if char in new_theme_name:
print('WARN: theme name contains forbidden character')
return False return False
if len(new_theme_name) < 2:
print('WARN: Imported theme name is too short')
return False
new_theme_name = new_theme_name.lower()
forbidden_chars = (
' ', ';', '/', '\\', '?', '!', '#', '@',
':', '%', '&', '"', '+', '<', '>', '$'
)
for char in forbidden_chars:
if char in new_theme_name:
print('WARN: theme name contains forbidden character')
return False
except OSError:
print('EX: import_theme unable to read ' +
temp_theme_dir + '/name.txt')
if not new_theme_name: if not new_theme_name:
return False return False