Snake case

merge-requests/30/head
Bob Mottram 2021-12-31 17:38:22 +00:00
parent 9f5ee4db18
commit 81c3fc8789
1 changed files with 256 additions and 268 deletions

380
posts.py
View File

@ -178,18 +178,18 @@ def _get_person_key(nickname: str, domain: str, base_dir: str,
"""Returns the public or private key of a person
"""
if key_type == 'private':
keyPem = _get_local_private_key(base_dir, nickname, domain)
key_pem = _get_local_private_key(base_dir, nickname, domain)
else:
keyPem = _get_local_public_key(base_dir, nickname, domain)
if not keyPem:
key_pem = _get_local_public_key(base_dir, nickname, domain)
if not key_pem:
if debug:
print('DEBUG: ' + key_type + ' key file not found')
return ''
if len(keyPem) < 20:
if len(key_pem) < 20:
if debug:
print('DEBUG: private key was too short: ' + keyPem)
print('DEBUG: private key was too short: ' + key_pem)
return ''
return keyPem
return key_pem
def _clean_html(raw_html: str) -> str:
@ -265,7 +265,7 @@ def parse_user_feed(signing_priv_key_pem: str,
if 'orderedItems' in feed_json:
return feed_json['orderedItems']
elif 'items' in feed_json:
if 'items' in feed_json:
return feed_json['items']
next_url = None
@ -280,19 +280,19 @@ def parse_user_feed(signing_priv_key_pem: str,
if next_url:
if isinstance(next_url, str):
if '?max_id=0' not in next_url:
userFeed = \
user_feed = \
parse_user_feed(signing_priv_key_pem,
session, next_url, as_header,
project_version, http_prefix,
origin_domain, debug, depth + 1)
if userFeed:
return userFeed
if user_feed:
return user_feed
elif isinstance(next_url, dict):
userFeed = next_url
if userFeed.get('orderedItems'):
return userFeed['orderedItems']
elif userFeed.get('items'):
return userFeed['items']
user_feed = next_url
if user_feed.get('orderedItems'):
return user_feed['orderedItems']
elif user_feed.get('items'):
return user_feed['items']
return None
@ -562,11 +562,11 @@ def _get_posts(session, outbox_url: str, max_posts: int,
print('Returning the raw feed')
result = []
i = 0
userFeed = parse_user_feed(signing_priv_key_pem,
user_feed = parse_user_feed(signing_priv_key_pem,
session, outbox_url, as_header,
project_version, http_prefix,
origin_domain, debug)
for item in userFeed:
for item in user_feed:
result.append(item)
i += 1
if i == max_posts:
@ -576,15 +576,15 @@ def _get_posts(session, outbox_url: str, max_posts: int,
if debug:
print('Returning a human readable version of the feed')
userFeed = parse_user_feed(signing_priv_key_pem,
user_feed = parse_user_feed(signing_priv_key_pem,
session, outbox_url, as_header,
project_version, http_prefix,
origin_domain, debug)
if not userFeed:
if not user_feed:
return person_posts
i = 0
for item in userFeed:
for item in user_feed:
if is_create_inside_announce(item):
item = item['object']
@ -592,7 +592,8 @@ def _get_posts(session, outbox_url: str, max_posts: int,
continue
this_item = item
if item['type'] != 'Note' and item['type'] != 'Page':
this_item_type = item['type']
if this_item_type not in ('Note', 'Page'):
this_item = item['object']
content = get_base_content_from_post(item, system_language)
@ -609,8 +610,8 @@ def _get_posts(session, outbox_url: str, max_posts: int,
for tag_item in this_item['tag']:
if not tag_item.get('type'):
continue
tagType = tag_item['type'].lower()
if tagType == 'emoji':
tag_type = tag_item['type'].lower()
if tag_type == 'emoji':
if tag_item.get('name') and tag_item.get('icon'):
if tag_item['icon'].get('url'):
# No emoji from non-permitted domains
@ -623,7 +624,7 @@ def _get_posts(session, outbox_url: str, max_posts: int,
if debug:
print('url not permitted ' +
tag_item['icon']['url'])
if tagType == 'mention':
if tag_type == 'mention':
if tag_item.get('name'):
if tag_item['name'] not in mentions:
mentions.append(tag_item['name'])
@ -779,10 +780,10 @@ def get_post_domains(session, outbox_url: str, max_posts: int,
post_domains = domain_list
i = 0
userFeed = parse_user_feed(signing_priv_key_pem,
user_feed = parse_user_feed(signing_priv_key_pem,
session, outbox_url, as_header,
project_version, http_prefix, domain, debug)
for item in userFeed:
for item in user_feed:
i += 1
if i > max_posts:
break
@ -793,7 +794,7 @@ def get_post_domains(session, outbox_url: str, max_posts: int,
_update_word_frequency(content_str, word_frequency)
if item['object'].get('inReplyTo'):
if isinstance(item['object']['inReplyTo'], str):
post_domain, postPort = \
post_domain, post_port = \
get_domain_from_actor(item['object']['inReplyTo'])
if post_domain not in post_domains:
post_domains.append(post_domain)
@ -802,10 +803,10 @@ def get_post_domains(session, outbox_url: str, max_posts: int,
for tag_item in item['object']['tag']:
if not tag_item.get('type'):
continue
tagType = tag_item['type'].lower()
if tagType == 'mention':
tag_type = tag_item['type'].lower()
if tag_type == 'mention':
if tag_item.get('href'):
post_domain, postPort = \
post_domain, post_port = \
get_domain_from_actor(tag_item['href'])
if post_domain not in post_domains:
post_domains.append(post_domain)
@ -844,10 +845,10 @@ def _get_posts_for_blocked_domains(base_dir: str,
blocked_posts = {}
i = 0
userFeed = parse_user_feed(signing_priv_key_pem,
user_feed = parse_user_feed(signing_priv_key_pem,
session, outbox_url, as_header,
project_version, http_prefix, domain, debug)
for item in userFeed:
for item in user_feed:
i += 1
if i > max_posts:
break
@ -855,7 +856,7 @@ def _get_posts_for_blocked_domains(base_dir: str,
continue
if item['object'].get('inReplyTo'):
if isinstance(item['object']['inReplyTo'], str):
post_domain, postPort = \
post_domain, post_port = \
get_domain_from_actor(item['object']['inReplyTo'])
if is_blocked_domain(base_dir, post_domain):
if item['object'].get('url'):
@ -872,9 +873,9 @@ def _get_posts_for_blocked_domains(base_dir: str,
for tag_item in item['object']['tag']:
if not tag_item.get('type'):
continue
tagType = tag_item['type'].lower()
if tagType == 'mention' and tag_item.get('href'):
post_domain, postPort = \
tag_type = tag_item['type'].lower()
if tag_type == 'mention' and tag_item.get('href'):
post_domain, post_port = \
get_domain_from_actor(tag_item['href'])
if is_blocked_domain(base_dir, post_domain):
if item['object'].get('url'):
@ -893,19 +894,18 @@ def delete_all_posts(base_dir: str,
nickname: str, domain: str, boxname: str) -> None:
"""Deletes all posts for a person from inbox or outbox
"""
if boxname != 'inbox' and boxname != 'outbox' and \
boxname != 'tlblogs' and boxname != 'tlnews':
if boxname not in ('inbox', 'outbox', 'tlblogs', 'tlnews'):
return
box_dir = create_person_dir(nickname, domain, base_dir, boxname)
for deleteFilename in os.scandir(box_dir):
deleteFilename = deleteFilename.name
file_path = os.path.join(box_dir, deleteFilename)
for delete_filename in os.scandir(box_dir):
delete_filename = delete_filename.name
file_path = os.path.join(box_dir, delete_filename)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path, ignore_errors=False, onerror=None)
except Exception as ex:
except OSError as ex:
print('ERROR: delete_all_posts ' + str(ex))
@ -915,15 +915,13 @@ def save_post_to_box(base_dir: str, http_prefix: str, post_id: str,
"""Saves the give json to the give box
Returns the filename
"""
if boxname != 'inbox' and boxname != 'outbox' and \
boxname != 'tlblogs' and boxname != 'tlnews' and \
boxname != 'scheduled':
if boxname not in ('inbox', 'outbox', 'tlblogs', 'tlnews', 'scheduled'):
return None
original_domain = domain
domain = remove_domain_port(domain)
if not post_id:
status_number, published = get_status_number()
status_number, _ = get_status_number()
post_id = \
local_actor_url(http_prefix, nickname, original_domain) + \
'/statuses/' + status_number
@ -1039,13 +1037,13 @@ def _add_auto_cw(base_dir: str, nickname: str, domain: str,
"""
new_subject = subject
auto_cw_list = _load_auto_cw(base_dir, nickname, domain)
for cwRule in auto_cw_list:
if '->' not in cwRule:
for cw_rule in auto_cw_list:
if '->' not in cw_rule:
continue
rulematch = cwRule.split('->')[0].strip()
rulematch = cw_rule.split('->')[0].strip()
if rulematch not in content:
continue
cw_str = cwRule.split('->')[1].strip()
cw_str = cw_rule.split('->')[1].strip()
if new_subject:
if cw_str not in new_subject:
new_subject += ', ' + cw_str
@ -1091,7 +1089,7 @@ def _create_post_s2s(base_dir: str, nickname: str, domain: str, port: int,
content_license_url: str) -> {}:
"""Creates a new server-to-server post
"""
actorUrl = local_actor_url(http_prefix, nickname, domain)
actor_url = local_actor_url(http_prefix, nickname, domain)
id_str = \
local_actor_url(http_prefix, nickname, domain) + \
'/statuses/' + status_number + '/replies'
@ -1105,7 +1103,7 @@ def _create_post_s2s(base_dir: str, nickname: str, domain: str, port: int,
'@context': post_context,
'id': new_post_id + '/activity',
'type': 'Create',
'actor': actorUrl,
'actor': actor_url,
'published': published,
'to': to_recipients,
'cc': to_cc,
@ -1290,8 +1288,8 @@ def _consolidate_actors_list(actors_list: []) -> None:
remove_actors = []
for cc_actor in possible_duplicate_actors:
for usr_path in u_paths:
cc_actorFull = cc_actor.replace('/@', usr_path)
if cc_actorFull in actors_list:
cc_actor_full = cc_actor.replace('/@', usr_path)
if cc_actor_full in actors_list:
if cc_actor not in remove_actors:
remove_actors.append(cc_actor)
break
@ -1358,18 +1356,18 @@ def get_actor_from_in_reply_to(in_reply_to: str) -> str:
"""Tries to get the replied to actor from the inReplyTo post id
Note: this will not always be successful for some instance types
"""
replyNickname = get_nickname_from_actor(in_reply_to)
if not replyNickname:
reply_nickname = get_nickname_from_actor(in_reply_to)
if not reply_nickname:
return None
reply_actor = None
if '/' + replyNickname + '/' in in_reply_to:
if '/' + reply_nickname + '/' in in_reply_to:
reply_actor = \
in_reply_to.split('/' + replyNickname + '/')[0] + \
'/' + replyNickname
elif '#' + replyNickname + '#' in in_reply_to:
in_reply_to.split('/' + reply_nickname + '/')[0] + \
'/' + reply_nickname
elif '#' + reply_nickname + '#' in in_reply_to:
reply_actor = \
in_reply_to.split('#' + replyNickname + '#')[0] + \
'#' + replyNickname
in_reply_to.split('#' + reply_nickname + '#')[0] + \
'#' + reply_nickname
reply_actor = reply_actor.replace('#', '/')
if not reply_actor:
return None
@ -1409,10 +1407,10 @@ def _create_post_base(base_dir: str,
subject = _add_auto_cw(base_dir, nickname, domain, subject, content)
if nickname != 'news':
mentionedRecipients = \
mentioned_recipients = \
get_mentioned_people(base_dir, http_prefix, content, domain, False)
else:
mentionedRecipients = ''
mentioned_recipients = ''
tags = []
hashtags_dict = {}
@ -1424,7 +1422,7 @@ def _create_post_base(base_dir: str,
content = \
add_html_tags(base_dir, http_prefix,
nickname, domain, content,
mentionedRecipients,
mentioned_recipients,
hashtags_dict, True)
# replace emoji with unicode
@ -1465,8 +1463,8 @@ def _create_post_base(base_dir: str,
to_recipients = [to_url]
# who to send to
if mentionedRecipients:
for mention in mentionedRecipients:
if mentioned_recipients:
for mention in mentioned_recipients:
if mention not in to_cc:
to_cc.append(mention)
@ -1596,9 +1594,9 @@ def outbox_message_create_wrap(http_prefix: str,
new_post_id = \
local_actor_url(http_prefix, nickname, domain) + \
'/statuses/' + status_number
cc = []
cc_list = []
if message_json.get('cc'):
cc = message_json['cc']
cc_list = message_json['cc']
new_post = {
"@context": "https://www.w3.org/ns/activitystreams",
'id': new_post_id + '/activity',
@ -1606,7 +1604,7 @@ def outbox_message_create_wrap(http_prefix: str,
'actor': local_actor_url(http_prefix, nickname, domain),
'published': published,
'to': message_json['to'],
'cc': cc,
'cc': cc_list,
'object': message_json
}
new_post['object']['id'] = new_post['id']
@ -1661,8 +1659,8 @@ def pin_post(base_dir: str, nickname: str, domain: str,
account_dir = acct_dir(base_dir, nickname, domain)
pinned_filename = account_dir + '/pinToProfile.txt'
try:
with open(pinned_filename, 'w+') as pinFile:
pinFile.write(pinned_content)
with open(pinned_filename, 'w+') as pin_file:
pin_file.write(pinned_content)
except OSError:
print('EX: unable to write ' + pinned_filename)
@ -1751,26 +1749,26 @@ def regenerate_index_for_box(base_dir: str,
Used by unit tests to artificially create an index
"""
box_dir = acct_dir(base_dir, nickname, domain) + '/' + box_name
boxIndexFilename = box_dir + '.index'
box_index_filename = box_dir + '.index'
if not os.path.isdir(box_dir):
return
if os.path.isfile(boxIndexFilename):
if os.path.isfile(box_index_filename):
return
index_lines = []
for subdir, dirs, files in os.walk(box_dir):
for f in files:
if ':##' not in f:
for _, _, files in os.walk(box_dir):
for fname in files:
if ':##' not in fname:
continue
index_lines.append(f)
index_lines.append(fname)
break
index_lines.sort(reverse=True)
result = ''
try:
with open(boxIndexFilename, 'w+') as fp_box:
with open(box_index_filename, 'w+') as fp_box:
for line in index_lines:
result += line + '\n'
fp_box.write(line + '\n')
@ -1841,8 +1839,8 @@ def _append_citations_to_blog_post(base_dir: str,
if not os.path.isfile(citations_filename):
return
citations_separator = '#####'
with open(citations_filename, 'r') as f:
citations = f.readlines()
with open(citations_filename, 'r') as fp_cit:
citations = fp_cit.readlines()
for line in citations:
if citations_separator not in line:
continue
@ -2240,7 +2238,7 @@ def thread_send_post(session, post_json_str: str, federation_list: [],
"""
tries = 0
send_interval_sec = 30
for attempt in range(20):
for _ in range(20):
post_result = None
unauthorized = False
if debug:
@ -2347,8 +2345,8 @@ def send_post(signing_priv_key_pem: str, project_version: str,
# get the actor inbox for the To handle
origin_domain = domain
(inbox_url, pub_key_id, pub_key, to_person_id, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(inbox_url, _, pub_key, to_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -2496,8 +2494,8 @@ def send_post_via_server(signing_priv_key_pem: str, project_version: str,
# get the actor inbox for the To handle
origin_domain = from_domain
(inbox_url, pub_key_id, pub_key, from_person_id, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -2531,9 +2529,9 @@ def send_post_via_server(signing_priv_key_pem: str, project_version: str,
from_nickname, from_domain_full) + \
'/followers'
else:
to_domainFull = get_full_domain(to_domain, to_port)
to_domain_full = get_full_domain(to_domain, to_port)
to_person_id = \
local_actor_url(http_prefix, to_nickname, to_domainFull)
local_actor_url(http_prefix, to_nickname, to_domain_full)
post_json_object = \
_create_post_base(base_dir,
@ -2602,13 +2600,13 @@ def group_followers_by_domain(base_dir: str, nickname: str, domain: str) -> {}:
for follower_handle in foll_file:
if '@' not in follower_handle:
continue
fHandle = \
fhandle = \
follower_handle.strip().replace('\n', '').replace('\r', '')
follower_domain = fHandle.split('@')[1]
follower_domain = fhandle.split('@')[1]
if not grouped.get(follower_domain):
grouped[follower_domain] = [fHandle]
grouped[follower_domain] = [fhandle]
else:
grouped[follower_domain].append(fHandle)
grouped[follower_domain].append(fhandle)
return grouped
@ -2635,9 +2633,9 @@ def _add_followers_to_public_post(post_json_object: {}) -> None:
return
if len(post_json_object['object']['to']) > 1:
return
elif len(post_json_object['object']['to']) == 0:
if len(post_json_object['object']['to']) == 0:
return
elif not post_json_object['object']['to'][0].endswith('#Public'):
if not post_json_object['object']['to'][0].endswith('#Public'):
return
if post_json_object['object'].get('cc'):
return
@ -2714,9 +2712,8 @@ def send_signed_json(post_json_object: {}, session, base_dir: str,
# get the actor inbox/outbox for the To handle
origin_domain = domain
(inbox_url, pub_key_id, pub_key, to_person_id,
shared_inbox_url, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(inbox_url, _, pub_key, to_person_id, shared_inbox_url, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -2772,7 +2769,7 @@ def send_signed_json(post_json_object: {}, session, base_dir: str,
signed_post_json_object = post_json_object.copy()
generate_json_signature(signed_post_json_object, private_key_pem)
post_json_object = signed_post_json_object
except Exception as ex:
except BaseException as ex:
print('WARN: failed to JSON-LD sign post, ' + str(ex))
# convert json to string so that there are no
@ -2839,10 +2836,10 @@ def add_to_field(activity_type: str, post_json_object: {},
is_same_type = True
if debug:
print('DEBUG: "to" field assigned to ' + activity_type)
toAddress = post_json_object['object']
if '/statuses/' in toAddress:
toAddress = toAddress.split('/statuses/')[0]
post_json_object['to'] = [toAddress]
to_address = post_json_object['object']
if '/statuses/' in to_address:
to_address = to_address.split('/statuses/')[0]
post_json_object['to'] = [to_address]
to_field_added = True
elif has_object_dict(post_json_object):
# add a to field to bookmark add or remove
@ -2866,10 +2863,10 @@ def add_to_field(activity_type: str, post_json_object: {},
if debug:
print('DEBUG: "to" field assigned to ' +
activity_type)
toAddress = post_json_object['object']['object']
if '/statuses/' in toAddress:
toAddress = toAddress.split('/statuses/')[0]
post_json_object['object']['to'] = [toAddress]
to_address = post_json_object['object']['object']
if '/statuses/' in to_address:
to_address = to_address.split('/statuses/')[0]
post_json_object['object']['to'] = [to_address]
post_json_object['to'] = \
[post_json_object['object']['object']]
to_field_added = True
@ -2997,18 +2994,18 @@ def _send_to_named_addresses(session, base_dir: str,
# Don't send profile/actor updates to yourself
if is_profile_update:
domain_full = get_full_domain(domain, port)
to_domainFull = get_full_domain(to_domain, to_port)
to_domain_full = get_full_domain(to_domain, to_port)
if nickname == to_nickname and \
domain_full == to_domainFull:
domain_full == to_domain_full:
if debug:
print('Not sending profile update to self. ' +
nickname + '@' + domain_full)
continue
if debug:
domain_full = get_full_domain(domain, port)
to_domainFull = get_full_domain(to_domain, to_port)
to_domain_full = get_full_domain(to_domain, to_port)
print('DEBUG: Post sending s2s: ' + nickname + '@' + domain_full +
' to ' + to_nickname + '@' + to_domainFull)
' to ' + to_nickname + '@' + to_domain_full)
# if we have an alt onion domain and we are sending to
# another onion domain then switch the clearnet
@ -3026,7 +3023,7 @@ def _send_to_named_addresses(session, base_dir: str,
from_domain = i2p_domain
from_domain_full = i2p_domain
from_http_prefix = 'http'
cc = []
cc_list = []
# if the "to" domain is within the shared items
# federation list then send the token for this domain
@ -3042,7 +3039,7 @@ def _send_to_named_addresses(session, base_dir: str,
send_signed_json(post_json_object, session, base_dir,
nickname, from_domain, port,
to_nickname, to_domain, to_port,
cc, from_http_prefix, True, client_to_server,
cc_list, from_http_prefix, True, client_to_server,
federation_list,
send_threads, post_log, cached_webfingers,
person_cache, debug, project_version,
@ -3118,10 +3115,7 @@ def _sending_profile_update(post_json_object: {}) -> bool:
if not has_object_stringType(post_json_object, False):
return False
activity_type = post_json_object['object']['type']
if activity_type == 'Person' or \
activity_type == 'Application' or \
activity_type == 'Group' or \
activity_type == 'Service':
if activity_type in ('Person', 'Application', 'Group', 'Service'):
return True
return False
@ -3212,7 +3206,7 @@ def send_to_followers(session, base_dir: str,
to_port = get_port_from_domain(to_domain)
to_domain = remove_domain_port(to_domain)
cc = ''
cc_list = ''
# if we are sending to an onion domain and we
# have an alt onion domain then use the alt
@ -3253,8 +3247,8 @@ def send_to_followers(session, base_dir: str,
send_signed_json(post_json_object, session, base_dir,
nickname, from_domain, port,
to_nickname, to_domain, to_port,
cc, from_http_prefix, True, client_to_server,
federation_list,
cc_list, from_http_prefix, True,
client_to_server, federation_list,
send_threads, post_log, cached_webfingers,
person_cache, debug, project_version,
shared_items_token, group_account,
@ -3282,8 +3276,8 @@ def send_to_followers(session, base_dir: str,
send_signed_json(post_json_object, session, base_dir,
nickname, from_domain, port,
to_nickname, to_domain, to_port,
cc, from_http_prefix, True, client_to_server,
federation_list,
cc_list, from_http_prefix, True,
client_to_server, federation_list,
send_threads, post_log, cached_webfingers,
person_cache, debug, project_version,
shared_items_token, group_account,
@ -3453,7 +3447,7 @@ def create_moderation(base_dir: str, nickname: str, domain: str, port: int,
'totalItems': 0,
'type': 'OrderedCollection'
}
boxItems = {
box_items = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': box_url + page_str,
'orderedItems': [
@ -3493,11 +3487,11 @@ def create_moderation(base_dir: str, nickname: str, domain: str, port: int,
if os.path.isfile(post_filename):
post_json_object = load_json(post_filename)
if post_json_object:
boxItems['orderedItems'].append(post_json_object)
box_items['orderedItems'].append(post_json_object)
if header_only:
return box_header
return boxItems
return box_items
def is_image_media(session, base_dir: str, http_prefix: str,
@ -3514,7 +3508,7 @@ def is_image_media(session, base_dir: str, http_prefix: str,
"""
if post_json_object['type'] == 'Announce':
blocked_cache = {}
post_jsonAnnounce = \
post_json_announce = \
download_announce(session, base_dir, http_prefix,
nickname, domain, post_json_object,
__version__, translate,
@ -3526,8 +3520,8 @@ def is_image_media(session, base_dir: str, http_prefix: str,
domain_full, person_cache,
signing_priv_key_pem,
blocked_cache)
if post_jsonAnnounce:
post_json_object = post_jsonAnnounce
if post_json_announce:
post_json_object = post_json_announce
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
@ -3572,9 +3566,7 @@ def _add_post_string_to_timeline(post_str: str, boxname: str,
elif boxname == 'tlreplies':
if box_actor not in post_str:
return False
elif (boxname == 'tlblogs' or
boxname == 'tlnews' or
boxname == 'tlfeatures'):
elif boxname in ('tlblogs', 'tlnews', 'tlfeatures'):
if '"Create"' not in post_str:
return False
if '"Article"' not in post_str:
@ -3600,8 +3592,8 @@ def _add_post_to_timeline(file_path: str, boxname: str,
post_str = post_file.read()
if file_path.endswith('.json'):
repliesFilename = file_path.replace('.json', '.replies')
if os.path.isfile(repliesFilename):
replies_filename = file_path.replace('.json', '.replies')
if os.path.isfile(replies_filename):
# append a replies identifier, which will later be removed
post_str += '<hasReplies>'
@ -3713,12 +3705,9 @@ def _create_box_indexed(recent_posts_cache: {},
if not authorized or not page_number:
page_number = 1
if boxname != 'inbox' and boxname != 'dm' and \
boxname != 'tlreplies' and boxname != 'tlmedia' and \
boxname != 'tlblogs' and boxname != 'tlnews' and \
boxname != 'tlfeatures' and \
boxname != 'outbox' and boxname != 'tlbookmarks' and \
boxname != 'bookmarks':
if boxname not in ('inbox', 'dm', 'tlreplies', 'tlmedia',
'tlblogs', 'tlnews', 'tlfeatures', 'outbox',
'tlbookmarks', 'bookmarks'):
print('ERROR: invalid boxname ' + boxname)
return None
@ -3757,7 +3746,7 @@ def _create_box_indexed(recent_posts_cache: {},
'totalItems': 0,
'type': 'OrderedCollection'
}
boxItems = {
box_items = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': box_url + page_str,
'orderedItems': [
@ -3823,7 +3812,6 @@ def _create_box_indexed(recent_posts_cache: {},
posts_added_to_timeline += 1
post_urls_in_box.append(post_url)
continue
else:
print('Post not added to timeline')
# read the post from file
@ -3904,9 +3892,9 @@ def _create_box_indexed(recent_posts_cache: {},
# remove the replies identifier
post_str = post_str.replace('<hasReplies>', '')
p = None
pst = None
try:
p = json.loads(post_str)
pst = json.loads(post_str)
except BaseException:
print('EX: _create_box_indexed unable to load json ' + post_str)
continue
@ -3914,15 +3902,15 @@ def _create_box_indexed(recent_posts_cache: {},
# Does this post have replies?
# This will be used to indicate that replies exist within the html
# created by individual_post_as_html
p['hasReplies'] = has_replies
pst['hasReplies'] = has_replies
if not authorized:
if not remove_post_interactions(p, False):
if not remove_post_interactions(pst, False):
continue
boxItems['orderedItems'].append(p)
box_items['orderedItems'].append(pst)
return boxItems
return box_items
def expire_cache(base_dir: str, person_cache: {},
@ -3955,7 +3943,7 @@ def archive_posts(base_dir: str, http_prefix: str, archive_dir: str,
if not os.path.isdir(archive_dir + '/accounts'):
os.mkdir(archive_dir + '/accounts')
for subdir, dirs, files in os.walk(base_dir + '/accounts'):
for _, dirs, _ in os.walk(base_dir + '/accounts'):
for handle in dirs:
if '@' in handle:
nickname = handle.split('@')[0]
@ -3996,7 +3984,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
"""Retain a maximum number of posts within the given box
Move any others to an archive directory
"""
if boxname != 'inbox' and boxname != 'outbox':
if boxname not in ('inbox', 'outbox'):
return
if archive_dir:
if not os.path.isdir(archive_dir):
@ -4004,7 +3992,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
box_dir = create_person_dir(nickname, domain, base_dir, boxname)
posts_in_box = os.scandir(box_dir)
no_of_posts = 0
for f in posts_in_box:
for _ in posts_in_box:
no_of_posts += 1
if no_of_posts <= max_posts_in_box:
print('Checked ' + str(no_of_posts) + ' ' + boxname +
@ -4018,17 +4006,17 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
if os.path.isfile(index_filename):
index_ctr = 0
# get the existing index entries as a string
newIndex = ''
new_index = ''
with open(index_filename, 'r') as index_file:
for post_id in index_file:
newIndex += post_id
new_index += post_id
index_ctr += 1
if index_ctr >= max_posts_in_box:
break
# save the new index file
if len(newIndex) > 0:
if len(new_index) > 0:
with open(index_filename, 'w+') as index_file:
index_file.write(newIndex)
index_file.write(new_index)
posts_in_box_dict = {}
posts_ctr = 0
@ -4038,9 +4026,9 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
if not post_filename.endswith('.json'):
continue
# Time of file creation
fullFilename = os.path.join(box_dir, post_filename)
if os.path.isfile(fullFilename):
content = open(fullFilename).read()
full_filename = os.path.join(box_dir, post_filename)
if os.path.isfile(full_filename):
content = open(full_filename).read()
if '"published":' in content:
published_str = content.split('"published":')[1]
if '"' in published_str:
@ -4159,8 +4147,8 @@ def get_public_posts_of_person(base_dir: str, nickname: str, domain: str,
if debug:
print('Getting the outbox for ' + handle)
(person_url, pub_key_id, pub_key, personId, shaedInbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(person_url, _, _, person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -4168,8 +4156,8 @@ def get_public_posts_of_person(base_dir: str, nickname: str, domain: str,
nickname, domain, 'outbox',
62524)
if debug:
print('Actor url: ' + str(personId))
if not personId:
print('Actor url: ' + str(person_id))
if not person_id:
return
max_mentions = 10
@ -4212,8 +4200,8 @@ def get_public_post_domains(session, base_dir: str, nickname: str, domain: str,
str(wf_request))
return domain_list
(person_url, pub_key_id, pub_key, personId, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(person_url, _, _, _, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -4309,8 +4297,8 @@ def get_public_post_info(session, base_dir: str, nickname: str, domain: str,
str(wf_request))
return {}
(person_url, pub_key_id, pub_key, personId, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(person_url, _, _, _, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -4331,9 +4319,9 @@ def get_public_post_info(session, base_dir: str, nickname: str, domain: str,
signing_priv_key_pem)
post_domains.sort()
domains_info = {}
for d in post_domains:
if not domains_info.get(d):
domains_info[d] = []
for pdomain in post_domains:
if not domains_info.get(pdomain):
domains_info[pdomain] = []
blocked_posts = \
_get_posts_for_blocked_domains(base_dir, session,
@ -4383,16 +4371,16 @@ def get_public_post_domains_blocked(session, base_dir: str,
blocked_str = fp_block.read()
blocked_domains = []
for domainName in post_domains:
if '@' not in domainName:
for domain_name in post_domains:
if '@' not in domain_name:
continue
# get the domain after the @
domainName = domainName.split('@')[1].strip()
if is_evil(domainName):
blocked_domains.append(domainName)
domain_name = domain_name.split('@')[1].strip()
if is_evil(domain_name):
blocked_domains.append(domain_name)
continue
if domainName in blocked_str:
blocked_domains.append(domainName)
if domain_name in blocked_str:
blocked_domains.append(domain_name)
return blocked_domains
@ -4474,8 +4462,8 @@ def check_domains(session, base_dir: str,
signing_priv_key_pem)
if blocked_domains:
print(handle)
for d in blocked_domains:
print(' ' + d)
for bdomain in blocked_domains:
print(' ' + bdomain)
if len(blocked_domains) > max_blocked_domains:
follower_warning_str += handle + '\n'
update_follower_warnings = True
@ -4492,12 +4480,12 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str,
replies_json: {}) -> None:
pub_str = 'https://www.w3.org/ns/activitystreams#Public'
# populate the items list with replies
repliesBoxes = ('outbox', 'inbox')
replies_boxes = ('outbox', 'inbox')
with open(post_replies_filename, 'r') as replies_file:
for message_id in replies_file:
replyFound = False
reply_found = False
# examine inbox and outbox
for boxname in repliesBoxes:
for boxname in replies_boxes:
message_id2 = message_id.replace('\n', '').replace('\r', '')
search_filename = \
acct_dir(base_dir, nickname, domain) + '/' + \
@ -4514,16 +4502,16 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str,
(pub_str in pjo['object']['to'] or
pub_str in pjo['object']['cc'])):
replies_json['orderedItems'].append(pjo)
replyFound = True
reply_found = True
else:
if authorized or \
pub_str in post_json_object['object']['to']:
pjo = post_json_object
replies_json['orderedItems'].append(pjo)
replyFound = True
reply_found = True
break
# if not in either inbox or outbox then examine the shared inbox
if not replyFound:
if not reply_found:
message_id2 = message_id.replace('\n', '').replace('\r', '')
search_filename = \
base_dir + \
@ -4639,7 +4627,7 @@ def download_announce(session, base_dir: str, http_prefix: str,
actor_nickname + '@' + actor_domain)
return None
object_nickname = get_nickname_from_actor(post_json_object['object'])
object_domain, objectPort = \
object_domain, _ = \
get_domain_from_actor(post_json_object['object'])
if not object_domain:
print('Announce object does not contain a ' +
@ -4868,8 +4856,8 @@ def send_block_via_server(base_dir: str, session,
# get the actor inbox for the To handle
origin_domain = from_domain
(inbox_url, pub_key_id, pub_key, from_person_id, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -4950,8 +4938,8 @@ def send_mute_via_server(base_dir: str, session,
# get the actor inbox for the To handle
origin_domain = from_domain
(inbox_url, pub_key_id, pub_key, from_person_id, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -5036,8 +5024,8 @@ def send_undo_mute_via_server(base_dir: str, session,
# get the actor inbox for the To handle
origin_domain = from_domain
(inbox_url, pub_key_id, pub_key, from_person_id, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -5126,8 +5114,8 @@ def send_undo_block_via_server(base_dir: str, session,
# get the actor inbox for the To handle
origin_domain = from_domain
(inbox_url, pub_key_id, pub_key, from_person_id, shared_inbox, avatar_url,
display_name, _) = get_person_box(signing_priv_key_pem,
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session, wf_request,
person_cache,
@ -5273,18 +5261,18 @@ def edited_post_filename(base_dir: str, nickname: str, domain: str,
if not isinstance(post_json_object['object']['attributedTo'], str):
return ''
actor = post_json_object['object']['attributedTo']
actorFilename = \
actor_filename = \
acct_dir(base_dir, nickname, domain) + '/lastpost/' + \
actor.replace('/', '#')
if not os.path.isfile(actorFilename):
if not os.path.isfile(actor_filename):
return ''
post_id = remove_id_ending(post_json_object['object']['id'])
lastpost_id = None
try:
with open(actorFilename, 'r') as fp_actor:
with open(actor_filename, 'r') as fp_actor:
lastpost_id = fp_actor.read()
except OSError:
print('EX: edited_post_filename unable to read ' + actorFilename)
print('EX: edited_post_filename unable to read ' + actor_filename)
return ''
if not lastpost_id:
return ''
@ -5371,7 +5359,7 @@ def get_original_post_from_announce_url(announce_url: str, base_dir: str,
if has_users_path(orig_post_id):
# get the actor from the original post url
orig_nick = get_nickname_from_actor(orig_post_id)
orig_domain, orig_port = get_domain_from_actor(orig_post_id)
orig_domain, _ = get_domain_from_actor(orig_post_id)
if orig_nick and orig_domain:
actor = \
orig_post_id.split('/' + orig_nick + '/')[0] + \