Snake case

merge-requests/30/head
Bob Mottram 2022-01-03 21:17:44 +00:00
parent 752a44c5d5
commit d728148709
1 changed files with 125 additions and 122 deletions

View File

@ -30,13 +30,13 @@ def _parse_handle(handle: str) -> (str, str, bool):
if '.' not in handle: if '.' not in handle:
return None, None, False return None, None, False
prefixes = get_protocol_prefixes() prefixes = get_protocol_prefixes()
handleStr = handle handle_str = handle
for prefix in prefixes: for prefix in prefixes:
handleStr = handleStr.replace(prefix, '') handle_str = handle_str.replace(prefix, '')
# try domain/@nick # try domain/@nick
if '/@' in handle: if '/@' in handle:
domain, nickname = handleStr.split('/@') domain, nickname = handle_str.split('/@')
return nickname, domain, False return nickname, domain, False
# try nick@domain # try nick@domain
@ -48,13 +48,13 @@ def _parse_handle(handle: str) -> (str, str, bool):
return nickname, domain, group_account return nickname, domain, group_account
# try for different /users/ paths # try for different /users/ paths
usersPaths = get_user_paths() users_paths = get_user_paths()
groupPaths = get_group_paths() group_paths = get_group_paths()
for possibleUsersPath in usersPaths: for possible_users_path in users_paths:
if possibleUsersPath in handle: if possible_users_path in handle:
if possibleUsersPath in groupPaths: if possible_users_path in group_paths:
group_account = True group_account = True
domain, nickname = handleStr.split(possibleUsersPath) domain, nickname = handle_str.split(possible_users_path)
return nickname, domain, group_account return nickname, domain, group_account
return None, None, False return None, None, False
@ -62,7 +62,7 @@ def _parse_handle(handle: str) -> (str, str, bool):
def webfinger_handle(session, handle: str, http_prefix: str, def webfinger_handle(session, handle: str, http_prefix: str,
cached_webfingers: {}, cached_webfingers: {},
fromDomain: str, project_version: str, from_domain: str, project_version: str,
debug: bool, group_account: bool, debug: bool, group_account: bool,
signing_priv_key_pem: str) -> {}: signing_priv_key_pem: str) -> {}:
"""Gets webfinger result for the given ActivityPub handle """Gets webfinger result for the given ActivityPub handle
@ -72,39 +72,39 @@ def webfinger_handle(session, handle: str, http_prefix: str,
print('WARN: No session specified for webfinger_handle') print('WARN: No session specified for webfinger_handle')
return None return None
nickname, domain, grpAccount = _parse_handle(handle) nickname, domain, _ = _parse_handle(handle)
if not nickname: if not nickname:
return None return None
wfDomain = remove_domain_port(domain) wf_domain = remove_domain_port(domain)
wfHandle = nickname + '@' + wfDomain wf_handle = nickname + '@' + wf_domain
wf = get_webfinger_from_cache(wfHandle, cached_webfingers) wfg = get_webfinger_from_cache(wf_handle, cached_webfingers)
if wf: if wfg:
if debug: if debug:
print('Webfinger from cache: ' + str(wf)) print('Webfinger from cache: ' + str(wfg))
return wf return wfg
url = '{}://{}/.well-known/webfinger'.format(http_prefix, domain) url = '{}://{}/.well-known/webfinger'.format(http_prefix, domain)
hdr = { hdr = {
'Accept': 'application/jrd+json' 'Accept': 'application/jrd+json'
} }
par = { par = {
'resource': 'acct:{}'.format(wfHandle) 'resource': 'acct:{}'.format(wf_handle)
} }
try: try:
result = \ result = \
get_json(signing_priv_key_pem, session, url, hdr, par, get_json(signing_priv_key_pem, session, url, hdr, par,
debug, project_version, http_prefix, fromDomain) debug, project_version, http_prefix, from_domain)
except Exception as ex: except Exception as ex:
print('ERROR: webfinger_handle ' + str(ex)) print('ERROR: webfinger_handle ' + str(ex))
return None return None
if result: if result:
store_webfinger_in_cache(wfHandle, result, cached_webfingers) store_webfinger_in_cache(wf_handle, result, cached_webfingers)
else: else:
if debug: if debug:
print("WARN: Unable to webfinger " + url + ' ' + print("WARN: Unable to webfinger " + url + ' ' +
'nickname: ' + str(nickname) + ' ' + 'nickname: ' + str(nickname) + ' ' +
'domain: ' + str(wfDomain) + ' ' + 'domain: ' + str(wf_domain) + ' ' +
'headers: ' + str(hdr) + ' ' + 'headers: ' + str(hdr) + ' ' +
'params: ' + str(par)) 'params: ' + str(par))
@ -112,71 +112,72 @@ def webfinger_handle(session, handle: str, http_prefix: str,
def store_webfinger_endpoint(nickname: str, domain: str, port: int, def store_webfinger_endpoint(nickname: str, domain: str, port: int,
base_dir: str, wfJson: {}) -> bool: base_dir: str, wf_json: {}) -> bool:
"""Stores webfinger endpoint for a user to a file """Stores webfinger endpoint for a user to a file
""" """
originalDomain = domain original_domain = domain
domain = get_full_domain(domain, port) domain = get_full_domain(domain, port)
handle = nickname + '@' + domain handle = nickname + '@' + domain
wfSubdir = '/wfendpoints' wf_subdir = '/wfendpoints'
if not os.path.isdir(base_dir + wfSubdir): if not os.path.isdir(base_dir + wf_subdir):
os.mkdir(base_dir + wfSubdir) os.mkdir(base_dir + wf_subdir)
filename = base_dir + wfSubdir + '/' + handle + '.json' filename = base_dir + wf_subdir + '/' + handle + '.json'
save_json(wfJson, filename) save_json(wf_json, filename)
if nickname == 'inbox': if nickname == 'inbox':
handle = originalDomain + '@' + domain handle = original_domain + '@' + domain
filename = base_dir + wfSubdir + '/' + handle + '.json' filename = base_dir + wf_subdir + '/' + handle + '.json'
save_json(wfJson, filename) save_json(wf_json, filename)
return True return True
def create_webfinger_endpoint(nickname: str, domain: str, port: int, def create_webfinger_endpoint(nickname: str, domain: str, port: int,
http_prefix: str, publicKeyPem: str, http_prefix: str, public_key_pem: str,
group_account: bool) -> {}: group_account: bool) -> {}:
"""Creates a webfinger endpoint for a user """Creates a webfinger endpoint for a user
""" """
originalDomain = domain original_domain = domain
domain = get_full_domain(domain, port) domain = get_full_domain(domain, port)
personName = nickname person_name = nickname
personId = local_actor_url(http_prefix, personName, domain) person_id = local_actor_url(http_prefix, person_name, domain)
subjectStr = "acct:" + personName + "@" + originalDomain subject_str = "acct:" + person_name + "@" + original_domain
profilePageHref = http_prefix + "://" + domain + "/@" + nickname profile_page_href = http_prefix + "://" + domain + "/@" + nickname
if nickname == 'inbox' or nickname == originalDomain: if nickname in ('inbox', original_domain):
personName = 'actor' person_name = 'actor'
personId = http_prefix + "://" + domain + "/" + personName person_id = http_prefix + "://" + domain + "/" + person_name
subjectStr = "acct:" + originalDomain + "@" + originalDomain subject_str = "acct:" + original_domain + "@" + original_domain
profilePageHref = http_prefix + '://' + domain + \ profile_page_href = http_prefix + '://' + domain + \
'/about/more?instance_actor=true' '/about/more?instance_actor=true'
personLink = http_prefix + "://" + domain + "/@" + personName person_link = http_prefix + "://" + domain + "/@" + person_name
blog_url = http_prefix + "://" + domain + "/blog/" + person_name
account = { account = {
"aliases": [ "aliases": [
personLink, person_link,
personId person_id
], ],
"links": [ "links": [
{ {
"href": personLink + "/avatar.png", "href": person_link + "/avatar.png",
"rel": "http://webfinger.net/rel/avatar", "rel": "http://webfinger.net/rel/avatar",
"type": "image/png" "type": "image/png"
}, },
{ {
"href": http_prefix + "://" + domain + "/blog/" + personName, "href": blog_url,
"rel": "http://webfinger.net/rel/blog" "rel": "http://webfinger.net/rel/blog"
}, },
{ {
"href": profilePageHref, "href": profile_page_href,
"rel": "http://webfinger.net/rel/profile-page", "rel": "http://webfinger.net/rel/profile-page",
"type": "text/html" "type": "text/html"
}, },
{ {
"href": personId, "href": person_id,
"rel": "self", "rel": "self",
"type": "application/activity+json" "type": "application/activity+json"
} }
], ],
"subject": subjectStr "subject": subject_str
} }
return account return account
@ -198,7 +199,7 @@ def webfinger_node_info(http_prefix: str, domain_full: str) -> {}:
def webfinger_meta(http_prefix: str, domain_full: str) -> str: def webfinger_meta(http_prefix: str, domain_full: str) -> str:
"""Return /.well-known/host-meta """Return /.well-known/host-meta
""" """
metaStr = \ meta_str = \
"<?xml version=1.0' encoding=UTF-8'?>" + \ "<?xml version=1.0' encoding=UTF-8'?>" + \
"<XRD xmlns=http://docs.oasis-open.org/ns/xri/xrd-1.0'" + \ "<XRD xmlns=http://docs.oasis-open.org/ns/xri/xrd-1.0'" + \
" xmlns:hm=http://host-meta.net/xrd/1.0'>" + \ " xmlns:hm=http://host-meta.net/xrd/1.0'>" + \
@ -211,7 +212,7 @@ def webfinger_meta(http_prefix: str, domain_full: str) -> str:
" <Title>Resource Descriptor</Title>" + \ " <Title>Resource Descriptor</Title>" + \
" </Link>" + \ " </Link>" + \
"</XRD>" "</XRD>"
return metaStr return meta_str
def webfinger_lookup(path: str, base_dir: str, def webfinger_lookup(path: str, base_dir: str,
@ -222,14 +223,14 @@ def webfinger_lookup(path: str, base_dir: str,
if not path.startswith('/.well-known/webfinger?'): if not path.startswith('/.well-known/webfinger?'):
return None return None
handle = None handle = None
resType = 'acct' res_type = 'acct'
if 'resource=' + resType + ':' in path: if 'resource=' + res_type + ':' in path:
handle = path.split('resource=' + resType + ':')[1].strip() handle = path.split('resource=' + res_type + ':')[1].strip()
handle = urllib.parse.unquote(handle) handle = urllib.parse.unquote(handle)
if debug: if debug:
print('DEBUG: WEBFINGER handle ' + handle) print('DEBUG: WEBFINGER handle ' + handle)
elif 'resource=' + resType + '%3A' in path: elif 'resource=' + res_type + '%3A' in path:
handle = path.split('resource=' + resType + '%3A')[1] handle = path.split('resource=' + res_type + '%3A')[1]
handle = urllib.parse.unquote(handle.strip()) handle = urllib.parse.unquote(handle.strip())
if debug: if debug:
print('DEBUG: WEBFINGER handle ' + handle) print('DEBUG: WEBFINGER handle ' + handle)
@ -247,9 +248,9 @@ def webfinger_lookup(path: str, base_dir: str,
handle = get_full_domain(handle, port) handle = get_full_domain(handle, port)
# convert @domain@domain to inbox@domain # convert @domain@domain to inbox@domain
if '@' in handle: if '@' in handle:
handleDomain = handle.split('@')[1] handle_domain = handle.split('@')[1]
if handle.startswith(handleDomain + '@'): if handle.startswith(handle_domain + '@'):
handle = 'inbox@' + handleDomain handle = 'inbox@' + handle_domain
# if this is a lookup for a handle using its onion domain # if this is a lookup for a handle using its onion domain
# then swap the onion domain for the clearnet version # then swap the onion domain for the clearnet version
onionify = False onionify = False
@ -270,74 +271,74 @@ def webfinger_lookup(path: str, base_dir: str,
print('DEBUG: WEBFINGER filename not found ' + filename) print('DEBUG: WEBFINGER filename not found ' + filename)
return None return None
if not onionify: if not onionify:
wfJson = load_json(filename) wf_json = load_json(filename)
else: else:
print('Webfinger request for onionified ' + handle) print('Webfinger request for onionified ' + handle)
wfJson = load_json_onionify(filename, domain, onion_domain) wf_json = load_json_onionify(filename, domain, onion_domain)
if not wfJson: if not wf_json:
wfJson = {"nickname": "unknown"} wf_json = {"nickname": "unknown"}
return wfJson return wf_json
def _webfinger_updateAvatar(wfJson: {}, actor_json: {}) -> bool: def _webfinger_updateAvatar(wf_json: {}, actor_json: {}) -> bool:
"""Updates the avatar image link """Updates the avatar image link
""" """
found = False found = False
avatarUrl = actor_json['icon']['url'] avatar_url = actor_json['icon']['url']
mediaType = actor_json['icon']['mediaType'] media_type = actor_json['icon']['mediaType']
for link in wfJson['links']: for link in wf_json['links']:
if not link.get('rel'): if not link.get('rel'):
continue continue
if not link['rel'].endswith('://webfinger.net/rel/avatar'): if not link['rel'].endswith('://webfinger.net/rel/avatar'):
continue continue
found = True found = True
if link['href'] != avatarUrl or link['type'] != mediaType: if link['href'] != avatar_url or link['type'] != media_type:
link['href'] = avatarUrl link['href'] = avatar_url
link['type'] = mediaType link['type'] = media_type
return True return True
break break
if found: if found:
return False return False
wfJson['links'].append({ wf_json['links'].append({
"href": avatarUrl, "href": avatar_url,
"rel": "http://webfinger.net/rel/avatar", "rel": "http://webfinger.net/rel/avatar",
"type": mediaType "type": media_type
}) })
return True return True
def _webfinger_add_blog_link(wfJson: {}, actor_json: {}) -> bool: def _webfinger_add_blog_link(wf_json: {}, actor_json: {}) -> bool:
"""Adds a blog link to webfinger if needed """Adds a blog link to webfinger if needed
""" """
found = False found = False
if '/users/' in actor_json['id']: if '/users/' in actor_json['id']:
blogUrl = \ blog_url = \
actor_json['id'].split('/users/')[0] + '/blog/' + \ actor_json['id'].split('/users/')[0] + '/blog/' + \
actor_json['id'].split('/users/')[1] actor_json['id'].split('/users/')[1]
else: else:
blogUrl = \ blog_url = \
actor_json['id'].split('/@')[0] + '/blog/' + \ actor_json['id'].split('/@')[0] + '/blog/' + \
actor_json['id'].split('/@')[1] actor_json['id'].split('/@')[1]
for link in wfJson['links']: for link in wf_json['links']:
if not link.get('rel'): if not link.get('rel'):
continue continue
if not link['rel'].endswith('://webfinger.net/rel/blog'): if not link['rel'].endswith('://webfinger.net/rel/blog'):
continue continue
found = True found = True
if link['href'] != blogUrl: if link['href'] != blog_url:
link['href'] = blogUrl link['href'] = blog_url
return True return True
break break
if found: if found:
return False return False
wfJson['links'].append({ wf_json['links'].append({
"href": blogUrl, "href": blog_url,
"rel": "http://webfinger.net/rel/blog" "rel": "http://webfinger.net/rel/blog"
}) })
return True return True
def _webfinger_updateFromProfile(wfJson: {}, actor_json: {}) -> bool: def _webfinger_updateFromProfile(wf_json: {}, actor_json: {}) -> bool:
"""Updates webfinger Email/blog/xmpp links from profile """Updates webfinger Email/blog/xmpp links from profile
Returns true if one or more tags has been changed Returns true if one or more tags has been changed
""" """
@ -346,7 +347,7 @@ def _webfinger_updateFromProfile(wfJson: {}, actor_json: {}) -> bool:
changed = False changed = False
webfingerPropertyName = { webfinger_property_name = {
"xmpp": "xmpp", "xmpp": "xmpp",
"matrix": "matrix", "matrix": "matrix",
"email": "mailto", "email": "mailto",
@ -357,19 +358,19 @@ def _webfinger_updateFromProfile(wfJson: {}, actor_json: {}) -> bool:
"tox": "toxId" "tox": "toxId"
} }
aliasesNotFound = [] aliases_not_found = []
for name, alias in webfingerPropertyName.items(): for name, alias in webfinger_property_name.items():
aliasesNotFound.append(alias) aliases_not_found.append(alias)
for property_value in actor_json['attachment']: for property_value in actor_json['attachment']:
if not property_value.get('name'): if not property_value.get('name'):
continue continue
property_name = property_value['name'].lower() property_name = property_value['name'].lower()
found = False found = False
for name, alias in webfingerPropertyName.items(): for name, alias in webfinger_property_name.items():
if name == property_name: if name == property_name:
if alias in aliasesNotFound: if alias in aliases_not_found:
aliasesNotFound.remove(alias) aliases_not_found.remove(alias)
found = True found = True
break break
if not found: if not found:
@ -381,40 +382,40 @@ def _webfinger_updateFromProfile(wfJson: {}, actor_json: {}) -> bool:
if property_value['type'] != 'PropertyValue': if property_value['type'] != 'PropertyValue':
continue continue
newValue = property_value['value'].strip() new_value = property_value['value'].strip()
if '://' in newValue: if '://' in new_value:
newValue = newValue.split('://')[1] new_value = new_value.split('://')[1]
aliasIndex = 0 alias_index = 0
found = False found = False
for alias in wfJson['aliases']: for alias in wf_json['aliases']:
if alias.startswith(webfingerPropertyName[property_name] + ':'): if alias.startswith(webfinger_property_name[property_name] + ':'):
found = True found = True
break break
aliasIndex += 1 alias_index += 1
newAlias = webfingerPropertyName[property_name] + ':' + newValue new_alias = webfinger_property_name[property_name] + ':' + new_value
if found: if found:
if wfJson['aliases'][aliasIndex] != newAlias: if wf_json['aliases'][alias_index] != new_alias:
changed = True changed = True
wfJson['aliases'][aliasIndex] = newAlias wf_json['aliases'][alias_index] = new_alias
else: else:
wfJson['aliases'].append(newAlias) wf_json['aliases'].append(new_alias)
changed = True changed = True
# remove any aliases which are no longer in the actor profile # remove any aliases which are no longer in the actor profile
removeAlias = [] remove_alias = []
for alias in aliasesNotFound: for alias in aliases_not_found:
for fullAlias in wfJson['aliases']: for full_alias in wf_json['aliases']:
if fullAlias.startswith(alias + ':'): if full_alias.startswith(alias + ':'):
removeAlias.append(fullAlias) remove_alias.append(full_alias)
for fullAlias in removeAlias: for full_alias in remove_alias:
wfJson['aliases'].remove(fullAlias) wf_json['aliases'].remove(full_alias)
changed = True changed = True
if _webfinger_updateAvatar(wfJson, actor_json): if _webfinger_updateAvatar(wf_json, actor_json):
changed = True changed = True
if _webfinger_add_blog_link(wfJson, actor_json): if _webfinger_add_blog_link(wf_json, actor_json):
changed = True changed = True
return changed return changed
@ -423,29 +424,31 @@ def _webfinger_updateFromProfile(wfJson: {}, actor_json: {}) -> bool:
def webfinger_update(base_dir: str, nickname: str, domain: str, def webfinger_update(base_dir: str, nickname: str, domain: str,
onion_domain: str, onion_domain: str,
cached_webfingers: {}) -> None: cached_webfingers: {}) -> None:
"""Regenerates stored webfinger
"""
handle = nickname + '@' + domain handle = nickname + '@' + domain
wfSubdir = '/wfendpoints' wf_subdir = '/wfendpoints'
if not os.path.isdir(base_dir + wfSubdir): if not os.path.isdir(base_dir + wf_subdir):
return return
filename = base_dir + wfSubdir + '/' + handle + '.json' filename = base_dir + wf_subdir + '/' + handle + '.json'
onionify = False onionify = False
if onion_domain: if onion_domain:
if onion_domain in handle: if onion_domain in handle:
handle = handle.replace(onion_domain, domain) handle = handle.replace(onion_domain, domain)
onionify = True onionify = True
if not onionify: if not onionify:
wfJson = load_json(filename) wf_json = load_json(filename)
else: else:
wfJson = load_json_onionify(filename, domain, onion_domain) wf_json = load_json_onionify(filename, domain, onion_domain)
if not wfJson: if not wf_json:
return return
actorFilename = base_dir + '/accounts/' + handle + '.json' actor_filename = base_dir + '/accounts/' + handle + '.json'
actor_json = load_json(actorFilename) actor_json = load_json(actor_filename)
if not actor_json: if not actor_json:
return return
if _webfinger_updateFromProfile(wfJson, actor_json): if _webfinger_updateFromProfile(wf_json, actor_json):
if save_json(wfJson, filename): if save_json(wf_json, filename):
store_webfinger_in_cache(handle, wfJson, cached_webfingers) store_webfinger_in_cache(handle, wf_json, cached_webfingers)