mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			640 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			640 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "webfinger.py"
 | ||
| __author__ = "Bob Mottram"
 | ||
| __license__ = "AGPL3+"
 | ||
| __version__ = "1.5.0"
 | ||
| __maintainer__ = "Bob Mottram"
 | ||
| __email__ = "bob@libreserver.org"
 | ||
| __status__ = "Production"
 | ||
| __module_group__ = "ActivityPub"
 | ||
| 
 | ||
| import os
 | ||
| import urllib.parse
 | ||
| from session import get_json
 | ||
| from session import get_json_valid
 | ||
| from cache import store_webfinger_in_cache
 | ||
| from cache import get_webfinger_from_cache
 | ||
| from utils import get_url_from_post
 | ||
| from utils import remove_html
 | ||
| from utils import acct_handle_dir
 | ||
| from utils import get_attachment_property_value
 | ||
| from utils import get_full_domain
 | ||
| from utils import load_json
 | ||
| from utils import load_json_onionify
 | ||
| from utils import save_json
 | ||
| from utils import get_protocol_prefixes
 | ||
| from utils import remove_domain_port
 | ||
| from utils import get_user_paths
 | ||
| from utils import get_group_paths
 | ||
| from utils import local_actor_url
 | ||
| from utils import get_nickname_from_actor
 | ||
| from utils import get_domain_from_actor
 | ||
| 
 | ||
| 
 | ||
| def _parse_handle(handle: str) -> (str, str, bool):
 | ||
|     """Parses a handle and returns nickname and domain
 | ||
|     """
 | ||
|     group_account = False
 | ||
|     if '.' not in handle:
 | ||
|         return None, None, False
 | ||
|     prefixes = get_protocol_prefixes()
 | ||
|     handle_str = handle
 | ||
|     for prefix in prefixes:
 | ||
|         handle_str = handle_str.replace(prefix, '')
 | ||
| 
 | ||
|     # try domain/@nick
 | ||
|     if '/@' in handle and '/@/' not in handle:
 | ||
|         domain, nickname = handle_str.split('/@')
 | ||
|         return nickname, domain, False
 | ||
| 
 | ||
|     # try nick@domain
 | ||
|     if '@' in handle:
 | ||
|         if handle.startswith('!'):
 | ||
|             handle = handle[1:]
 | ||
|             group_account = True
 | ||
|         nickname, domain = handle.split('@')
 | ||
|         return nickname, domain, group_account
 | ||
| 
 | ||
|     # try for different /users/ paths
 | ||
|     users_paths = get_user_paths()
 | ||
|     group_paths = get_group_paths()
 | ||
|     for possible_users_path in users_paths:
 | ||
|         if possible_users_path in handle:
 | ||
|             if possible_users_path in group_paths:
 | ||
|                 group_account = True
 | ||
|             domain, nickname = handle_str.split(possible_users_path)
 | ||
|             return nickname, domain, group_account
 | ||
| 
 | ||
|     return None, None, False
 | ||
| 
 | ||
| 
 | ||
| def webfinger_handle(session, handle: str, http_prefix: str,
 | ||
|                      cached_webfingers: {},
 | ||
|                      from_domain: str, project_version: str,
 | ||
|                      debug: bool, group_account: bool,
 | ||
|                      signing_priv_key_pem: str) -> {}:
 | ||
|     """Gets webfinger result for the given ActivityPub handle
 | ||
|     NOTE: in earlier implementations group_account modified the acct prefix.
 | ||
|     This has been left in, because currently there is still no consensus
 | ||
|     about how groups should be implemented.
 | ||
|     """
 | ||
|     if not session:
 | ||
|         print('WARN: No session specified for webfinger_handle')
 | ||
|         return None
 | ||
| 
 | ||
|     nickname, domain, _ = _parse_handle(handle)
 | ||
|     if not nickname:
 | ||
|         print('WARN: No nickname found in handle ' + handle)
 | ||
|         return None
 | ||
|     wf_domain = remove_domain_port(domain)
 | ||
| 
 | ||
|     wf_handle = nickname + '@' + wf_domain
 | ||
|     if debug:
 | ||
|         print('Parsed webfinger handle: ' + handle + ' -> ' + wf_handle)
 | ||
|     wfg = get_webfinger_from_cache(wf_handle, cached_webfingers)
 | ||
|     if wfg:
 | ||
|         if debug:
 | ||
|             print('Webfinger from cache: ' + str(wfg))
 | ||
|         return wfg
 | ||
|     url = '{}://{}/.well-known/webfinger'.format(http_prefix, domain)
 | ||
|     hdr = {
 | ||
|         'Accept': 'application/jrd+json'
 | ||
|     }
 | ||
|     par = {
 | ||
|         'resource': 'acct:{}'.format(wf_handle)
 | ||
|     }
 | ||
|     try:
 | ||
|         result = \
 | ||
|             get_json(signing_priv_key_pem, session, url, hdr, par,
 | ||
|                      debug, project_version, http_prefix, from_domain)
 | ||
|     except BaseException as ex:
 | ||
|         print('ERROR: webfinger_handle ' + wf_handle + ' ' + str(ex))
 | ||
|         return None
 | ||
| 
 | ||
|     # if the first attempt fails then try specifying the webfinger
 | ||
|     # resource in a different way
 | ||
|     if not get_json_valid(result):
 | ||
|         resource = handle
 | ||
|         if handle == wf_handle:
 | ||
|             # reconstruct the actor
 | ||
|             resource = http_prefix + '://' + wf_domain + '/users/' + nickname
 | ||
|         # try again using the actor as the resource
 | ||
|         # See https://datatracker.ietf.org/doc/html/rfc7033 section 4.5
 | ||
|         par = {
 | ||
|             'resource': '{}'.format(resource)
 | ||
|         }
 | ||
|         try:
 | ||
|             result = \
 | ||
|                 get_json(signing_priv_key_pem, session, url, hdr, par,
 | ||
|                          debug, project_version, http_prefix, from_domain)
 | ||
|         except BaseException as ex:
 | ||
|             print('ERROR: webfinger_handle ' + wf_handle + ' ' + str(ex))
 | ||
|             return None
 | ||
| 
 | ||
|     if get_json_valid(result):
 | ||
|         store_webfinger_in_cache(wf_handle, result, cached_webfingers)
 | ||
|     else:
 | ||
|         print("WARN: Unable to webfinger " + str(url) + ' ' +
 | ||
|               'from_domain: ' + str(from_domain) + ' ' +
 | ||
|               'nickname: ' + str(nickname) + ' ' +
 | ||
|               'handle: ' + str(handle) + ' ' +
 | ||
|               'wf_handle: ' + str(wf_handle) + ' ' +
 | ||
|               'domain: ' + str(wf_domain) + ' ' +
 | ||
|               'headers: ' + str(hdr) + ' ' +
 | ||
|               'params: ' + str(par))
 | ||
| 
 | ||
|     return result
 | ||
| 
 | ||
| 
 | ||
| def store_webfinger_endpoint(nickname: str, domain: str, port: int,
 | ||
|                              base_dir: str, wf_json: {}) -> bool:
 | ||
|     """Stores webfinger endpoint for a user to a file
 | ||
|     """
 | ||
|     original_domain = domain
 | ||
|     domain = get_full_domain(domain, port)
 | ||
|     handle = nickname + '@' + domain
 | ||
|     wf_subdir = '/wfendpoints'
 | ||
|     if not os.path.isdir(base_dir + wf_subdir):
 | ||
|         os.mkdir(base_dir + wf_subdir)
 | ||
|     filename = base_dir + wf_subdir + '/' + handle + '.json'
 | ||
|     save_json(wf_json, filename)
 | ||
|     if nickname == 'inbox':
 | ||
|         handle = original_domain + '@' + domain
 | ||
|         filename = base_dir + wf_subdir + '/' + handle + '.json'
 | ||
|         save_json(wf_json, filename)
 | ||
|     return True
 | ||
| 
 | ||
| 
 | ||
| def create_webfinger_endpoint(nickname: str, domain: str, port: int,
 | ||
|                               http_prefix: str, public_key_pem: str,
 | ||
|                               group_account: bool) -> {}:
 | ||
|     """Creates a webfinger endpoint for a user
 | ||
|     NOTE: in earlier implementations group_account modified the acct prefix.
 | ||
|     This has been left in, because currently there is still no consensus
 | ||
|     about how groups should be implemented.
 | ||
|     """
 | ||
|     original_domain = domain
 | ||
|     domain = get_full_domain(domain, port)
 | ||
| 
 | ||
|     person_name = nickname
 | ||
|     person_id = local_actor_url(http_prefix, person_name, domain)
 | ||
|     subject_str = "acct:" + person_name + "@" + original_domain
 | ||
|     profile_page_href = http_prefix + "://" + domain + "/@" + nickname
 | ||
|     if nickname in ('inbox', original_domain):
 | ||
|         person_name = 'actor'
 | ||
|         person_id = http_prefix + "://" + domain + "/" + person_name
 | ||
|         subject_str = "acct:" + original_domain + "@" + original_domain
 | ||
|         profile_page_href = http_prefix + '://' + domain + \
 | ||
|             '/about/more?instance_actor=true'
 | ||
| 
 | ||
|     person_link = http_prefix + "://" + domain + "/@" + person_name
 | ||
|     blog_url = http_prefix + "://" + domain + "/blog/" + person_name
 | ||
|     account = {
 | ||
|         "aliases": [
 | ||
|             person_link,
 | ||
|             person_id
 | ||
|         ],
 | ||
|         "links": [
 | ||
|             {
 | ||
|                 "href": person_link + "/avatar.png",
 | ||
|                 "rel": "http://webfinger.net/rel/avatar",
 | ||
|                 "type": "image/png"
 | ||
|             },
 | ||
|             {
 | ||
|                 "href": blog_url,
 | ||
|                 "rel": "http://webfinger.net/rel/blog"
 | ||
|             },
 | ||
|             {
 | ||
|                 "href": profile_page_href,
 | ||
|                 "rel": "http://webfinger.net/rel/profile-page",
 | ||
|                 "type": "text/html"
 | ||
|             },
 | ||
|             {
 | ||
|                 "href": profile_page_href,
 | ||
|                 "rel": "http://webfinger.net/rel/profile-page",
 | ||
|                 "type": "text/vcard"
 | ||
|             },
 | ||
|             {
 | ||
|                 "href": person_id,
 | ||
|                 "rel": "self",
 | ||
|                 "type": "application/activity+json"
 | ||
|             }
 | ||
|         ],
 | ||
|         "subject": subject_str
 | ||
|     }
 | ||
|     return account
 | ||
| 
 | ||
| 
 | ||
| def webfinger_node_info(http_prefix: str, domain_full: str) -> {}:
 | ||
|     """ /.well-known/nodeinfo endpoint
 | ||
|     https://codeberg.org/fediverse/fep/src/branch/main/fep/2677/fep-2677.md
 | ||
|     """
 | ||
|     instance_url = http_prefix + '://' + domain_full
 | ||
|     nodeinfo = {
 | ||
|         'links': [
 | ||
|             {
 | ||
|                 'rel': 'http://nodeinfo.diaspora.software/ns/schema/2.0',
 | ||
|                 'href': instance_url + '/nodeinfo/2.0'
 | ||
|             },
 | ||
|             {
 | ||
|                 "rel": "https://www.w3.org/ns/activitystreams#Application",
 | ||
|                 "href": instance_url + '/actor'
 | ||
|             }
 | ||
|         ]
 | ||
|     }
 | ||
|     return nodeinfo
 | ||
| 
 | ||
| 
 | ||
| def webfinger_meta(http_prefix: str, domain_full: str) -> str:
 | ||
|     """Return /.well-known/host-meta
 | ||
|     """
 | ||
|     meta_str = \
 | ||
|         "<?xml version=’1.0' encoding=’UTF-8'?>" + \
 | ||
|         "<XRD xmlns=’http://docs.oasis-open.org/ns/xri/xrd-1.0'" + \
 | ||
|         " xmlns:hm=’http://host-meta.net/xrd/1.0'>" + \
 | ||
|         "" + \
 | ||
|         "<hm:Host>" + domain_full + "</hm:Host>" + \
 | ||
|         "" + \
 | ||
|         "<Link rel=’lrdd’" + \
 | ||
|         " template=’" + http_prefix + "://" + domain_full + \
 | ||
|         "/describe?uri={uri}'>" + \
 | ||
|         " <Title>Resource Descriptor</Title>" + \
 | ||
|         " </Link>" + \
 | ||
|         "</XRD>"
 | ||
|     return meta_str
 | ||
| 
 | ||
| 
 | ||
| def wellknown_protocol_handler(path: str, http_prefix: str,
 | ||
|                                domain_full: str) -> ({}, str):
 | ||
|     """See https://fedi-to.github.io/protocol-handler.html
 | ||
|     """
 | ||
|     if not path.startswith('/.well-known/protocol-handler?'):
 | ||
|         return None, None
 | ||
| 
 | ||
|     if 'target=' in path:
 | ||
|         path = urllib.parse.unquote(path)
 | ||
|         target = path.split('target=')[1]
 | ||
|         if ';' in target:
 | ||
|             target = target.split(';')[0]
 | ||
|         if not target:
 | ||
|             return None, None
 | ||
|         if not target.startswith('web+epicyon:') and \
 | ||
|            not target.startswith('web+mastodon:') and \
 | ||
|            not target.startswith('web+ap:'):
 | ||
|             return None, None
 | ||
|         handle = target.split(':', 1)[1].strip()
 | ||
|         if handle.startswith('//'):
 | ||
|             handle = handle[2:]
 | ||
|         if handle.startswith('@'):
 | ||
|             handle = handle[1:]
 | ||
|         if '@' in handle:
 | ||
|             nickname = handle.split('@')[0]
 | ||
|             domain_and_path = handle.split('@')[1]
 | ||
|         else:
 | ||
|             nickname = handle
 | ||
|             domain_and_path = domain_full
 | ||
|         # not an open redirect
 | ||
|         if domain_and_path.startswith(domain_full):
 | ||
|             command = ''
 | ||
|             if '/' in nickname:
 | ||
|                 command = nickname.split('/')[0]
 | ||
|                 nickname = nickname.split('/')[1]
 | ||
|             domain_length = len(domain_full)
 | ||
|             path_str = domain_and_path[domain_length:]
 | ||
|             return http_prefix + '://' + domain_full + \
 | ||
|                 '/users/' + nickname + path_str, command
 | ||
|     return None, None
 | ||
| 
 | ||
| 
 | ||
| def webfinger_lookup(path: str, base_dir: str,
 | ||
|                      domain: str, onion_domain: str, i2p_domain: str,
 | ||
|                      port: int, debug: bool) -> {}:
 | ||
|     """Lookup the webfinger endpoint for an account
 | ||
|     GET /.well-known/webfinger?resource=acct:user@domain
 | ||
|     """
 | ||
|     if not path.startswith('/.well-known/webfinger?'):
 | ||
|         return None
 | ||
|     handle = None
 | ||
|     res_type = 'acct'
 | ||
|     if 'resource=' + res_type + ':http' in path:
 | ||
|         # GET /.well-known/webfinger?resource=acct:https://domain/users/nick
 | ||
|         actor = path.split('resource=' + res_type + ':')[1]
 | ||
|         actor = urllib.parse.unquote(actor.strip())
 | ||
|         wf_nickname = get_nickname_from_actor(actor)
 | ||
|         wf_domain, port = get_domain_from_actor(actor)
 | ||
|         if wf_nickname and wf_domain:
 | ||
|             handle = wf_nickname + '@' + wf_domain
 | ||
|             if debug:
 | ||
|                 print('DEBUG: WEBFINGER handle ' + handle)
 | ||
|     elif 'resource=' + res_type + ':' in path:
 | ||
|         # GET /.well-known/webfinger?resource=acct:nick@domain
 | ||
|         handle = path.split('resource=' + res_type + ':')[1].strip()
 | ||
|         handle = urllib.parse.unquote(handle)
 | ||
|         if debug:
 | ||
|             print('DEBUG: WEBFINGER handle ' + handle)
 | ||
|     elif 'resource=' + res_type + '%3Ahttp' in path:
 | ||
|         # GET /.well-known/webfinger?resource=acct%3Ahttps://domain/users/nick
 | ||
|         actor = path.split('resource=' + res_type + '%3A')[1]
 | ||
|         actor = urllib.parse.unquote(actor.strip())
 | ||
|         wf_nickname = get_nickname_from_actor(actor)
 | ||
|         wf_domain, port = get_domain_from_actor(actor)
 | ||
|         if wf_nickname and wf_domain:
 | ||
|             handle = wf_nickname + '@' + wf_domain
 | ||
|             if debug:
 | ||
|                 print('DEBUG: WEBFINGER handle ' + handle)
 | ||
|     elif 'resource=' + res_type + '%3A' in path:
 | ||
|         # GET /.well-known/webfinger?resource=acct%3Anick@domain
 | ||
|         handle = path.split('resource=' + res_type + '%3A')[1]
 | ||
|         handle = urllib.parse.unquote(handle.strip())
 | ||
|         if debug:
 | ||
|             print('DEBUG: WEBFINGER handle ' + handle)
 | ||
|     elif 'resource=http' in path:
 | ||
|         # GET /.well-known/webfinger?resource=https://domain/users/nick
 | ||
|         actor = path.split('resource=')[1]
 | ||
|         actor = urllib.parse.unquote(actor.strip())
 | ||
|         wf_nickname = get_nickname_from_actor(actor)
 | ||
|         wf_domain, port = get_domain_from_actor(actor)
 | ||
|         if wf_nickname and wf_domain:
 | ||
|             handle = wf_nickname + '@' + wf_domain
 | ||
|             if debug:
 | ||
|                 print('DEBUG: WEBFINGER handle ' + handle)
 | ||
|     elif res_type + '=' in path:
 | ||
|         possible_handle = path.split(res_type + '=')[1]
 | ||
|         if '@' in possible_handle:
 | ||
|             if possible_handle.startswith('@'):
 | ||
|                 possible_handle = possible_handle[1:]
 | ||
|             if '@' in possible_handle:
 | ||
|                 possible_handle = possible_handle.strip()
 | ||
|                 wf_nickname = possible_handle.split('@')[0]
 | ||
|                 wf_domain = possible_handle.split('@')[1]
 | ||
|                 if wf_nickname and wf_domain:
 | ||
|                     handle = wf_nickname + '@' + wf_domain
 | ||
|         elif '%3A' in possible_handle or '/' in possible_handle:
 | ||
|             actor = urllib.parse.unquote(possible_handle.strip())
 | ||
|             wf_nickname = get_nickname_from_actor(actor)
 | ||
|             wf_domain, port = get_domain_from_actor(actor)
 | ||
|             if wf_nickname and wf_domain:
 | ||
|                 handle = wf_nickname + '@' + wf_domain
 | ||
|                 if debug:
 | ||
|                     print('DEBUG: WEBFINGER handle ' + handle)
 | ||
|     if not handle:
 | ||
|         if debug:
 | ||
|             print('DEBUG: WEBFINGER handle missing')
 | ||
|         return None
 | ||
|     if '&' in handle:
 | ||
|         handle = handle.split('&')[0].strip()
 | ||
|         print('DEBUG: WEBFINGER handle with & removed ' + handle)
 | ||
|     if '@' not in handle:
 | ||
|         if debug:
 | ||
|             print('DEBUG: WEBFINGER no @ in handle ' + handle)
 | ||
|         return None
 | ||
|     handle = get_full_domain(handle, port)
 | ||
|     # convert @domain@domain to inbox@domain
 | ||
|     if '@' in handle:
 | ||
|         handle_domain = handle.split('@')[1]
 | ||
|         if handle.startswith(handle_domain + '@'):
 | ||
|             handle = 'inbox@' + handle_domain
 | ||
|     # if this is a lookup for a handle using its onion domain
 | ||
|     # then swap the onion domain for the clearnet version
 | ||
|     onionify = False
 | ||
|     if onion_domain:
 | ||
|         if onion_domain in handle:
 | ||
|             handle = handle.replace(onion_domain, domain)
 | ||
|             onionify = True
 | ||
|     i2pify = False
 | ||
|     if i2p_domain:
 | ||
|         if i2p_domain in handle:
 | ||
|             handle = handle.replace(i2p_domain, domain)
 | ||
|             i2pify = True
 | ||
|     # instance actor
 | ||
|     if handle.startswith('actor@'):
 | ||
|         handle = handle.replace('actor@', 'inbox@', 1)
 | ||
|     elif handle.startswith('Actor@'):
 | ||
|         handle = handle.replace('Actor@', 'inbox@', 1)
 | ||
|     filename = base_dir + '/wfendpoints/' + handle + '.json'
 | ||
|     if debug:
 | ||
|         print('DEBUG: WEBFINGER filename ' + filename)
 | ||
|     if not os.path.isfile(filename):
 | ||
|         if debug:
 | ||
|             print('DEBUG: WEBFINGER filename not found ' + filename)
 | ||
|         return None
 | ||
|     if not onionify and not i2pify:
 | ||
|         wf_json = load_json(filename)
 | ||
|     elif onionify:
 | ||
|         print('Webfinger request for onionified ' + handle)
 | ||
|         wf_json = load_json_onionify(filename, domain, onion_domain)
 | ||
|     else:
 | ||
|         print('Webfinger request for i2pified ' + handle)
 | ||
|         wf_json = load_json_onionify(filename, domain, i2p_domain)
 | ||
|     if not wf_json:
 | ||
|         wf_json = {"nickname": "unknown"}
 | ||
|     return wf_json
 | ||
| 
 | ||
| 
 | ||
| def _webfinger_update_avatar(wf_json: {}, actor_json: {}) -> bool:
 | ||
|     """Updates the avatar image link
 | ||
|     """
 | ||
|     found = False
 | ||
|     url_str = get_url_from_post(actor_json['icon']['url'])
 | ||
|     avatar_url = remove_html(url_str)
 | ||
|     media_type = actor_json['icon']['mediaType']
 | ||
|     for link in wf_json['links']:
 | ||
|         if not link.get('rel'):
 | ||
|             continue
 | ||
|         if not link['rel'].endswith('://webfinger.net/rel/avatar'):
 | ||
|             continue
 | ||
|         found = True
 | ||
|         if link['href'] != avatar_url or link['type'] != media_type:
 | ||
|             link['href'] = avatar_url
 | ||
|             link['type'] = media_type
 | ||
|             return True
 | ||
|         break
 | ||
|     if found:
 | ||
|         return False
 | ||
|     wf_json['links'].append({
 | ||
|         "href": avatar_url,
 | ||
|         "rel": "http://webfinger.net/rel/avatar",
 | ||
|         "type": media_type
 | ||
|     })
 | ||
|     return True
 | ||
| 
 | ||
| 
 | ||
| def _webfinger_update_vcard(wf_json: {}, actor_json: {}) -> bool:
 | ||
|     """Updates the vcard link
 | ||
|     """
 | ||
|     for link in wf_json['links']:
 | ||
|         if link.get('type'):
 | ||
|             if link['type'] == 'text/vcard':
 | ||
|                 return False
 | ||
|     url_str = get_url_from_post(actor_json['url'])
 | ||
|     actor_url = remove_html(url_str)
 | ||
|     wf_json['links'].append({
 | ||
|         "href": actor_url,
 | ||
|         "rel": "http://webfinger.net/rel/profile-page",
 | ||
|         "type": "text/vcard"
 | ||
|     })
 | ||
|     return True
 | ||
| 
 | ||
| 
 | ||
| def _webfinger_add_blog_link(wf_json: {}, actor_json: {}) -> bool:
 | ||
|     """Adds a blog link to webfinger if needed
 | ||
|     """
 | ||
|     found = False
 | ||
|     if '/users/' in actor_json['id']:
 | ||
|         blog_url = \
 | ||
|             actor_json['id'].split('/users/')[0] + '/blog/' + \
 | ||
|             actor_json['id'].split('/users/')[1]
 | ||
|     else:
 | ||
|         blog_url = \
 | ||
|             actor_json['id'].split('/@')[0] + '/blog/' + \
 | ||
|             actor_json['id'].split('/@')[1]
 | ||
|     for link in wf_json['links']:
 | ||
|         if not link.get('rel'):
 | ||
|             continue
 | ||
|         if not link['rel'].endswith('://webfinger.net/rel/blog'):
 | ||
|             continue
 | ||
|         found = True
 | ||
|         if link['href'] != blog_url:
 | ||
|             link['href'] = blog_url
 | ||
|             return True
 | ||
|         break
 | ||
|     if found:
 | ||
|         return False
 | ||
|     wf_json['links'].append({
 | ||
|         "href": blog_url,
 | ||
|         "rel": "http://webfinger.net/rel/blog"
 | ||
|     })
 | ||
|     return True
 | ||
| 
 | ||
| 
 | ||
| def _webfinger_updateFromProfile(wf_json: {}, actor_json: {}) -> bool:
 | ||
|     """Updates webfinger Email/blog/xmpp links from profile
 | ||
|     Returns true if one or more tags has been changed
 | ||
|     """
 | ||
|     if not actor_json.get('attachment'):
 | ||
|         return False
 | ||
| 
 | ||
|     changed = False
 | ||
| 
 | ||
|     webfinger_property_name = {
 | ||
|         "xmpp": "xmpp",
 | ||
|         "matrix": "matrix",
 | ||
|         "email": "mailto",
 | ||
|         "ssb": "ssb",
 | ||
|         "briar": "briar",
 | ||
|         "cwtch": "cwtch",
 | ||
|         "tox": "toxId"
 | ||
|     }
 | ||
| 
 | ||
|     aliases_not_found = []
 | ||
|     for name, alias in webfinger_property_name.items():
 | ||
|         aliases_not_found.append(alias)
 | ||
| 
 | ||
|     for property_value in actor_json['attachment']:
 | ||
|         name_value = None
 | ||
|         if property_value.get('name'):
 | ||
|             name_value = property_value['name']
 | ||
|         elif property_value.get('schema:name'):
 | ||
|             name_value = property_value['schema:name']
 | ||
|         if not name_value:
 | ||
|             continue
 | ||
|         property_name = name_value.lower()
 | ||
|         found = False
 | ||
|         for name, alias in webfinger_property_name.items():
 | ||
|             if name == property_name:
 | ||
|                 if alias in aliases_not_found:
 | ||
|                     aliases_not_found.remove(alias)
 | ||
|                 found = True
 | ||
|                 break
 | ||
|         if not found:
 | ||
|             continue
 | ||
|         if not property_value.get('type'):
 | ||
|             continue
 | ||
|         prop_value_name, _ = \
 | ||
|             get_attachment_property_value(property_value)
 | ||
|         if not prop_value_name:
 | ||
|             continue
 | ||
|         if not property_value['type'].endswith('PropertyValue'):
 | ||
|             continue
 | ||
| 
 | ||
|         new_value = property_value[prop_value_name].strip()
 | ||
|         if '://' in new_value:
 | ||
|             new_value = new_value.split('://')[1]
 | ||
| 
 | ||
|         alias_index = 0
 | ||
|         found = False
 | ||
|         for alias in wf_json['aliases']:
 | ||
|             if alias.startswith(webfinger_property_name[property_name] + ':'):
 | ||
|                 found = True
 | ||
|                 break
 | ||
|             alias_index += 1
 | ||
|         new_alias = webfinger_property_name[property_name] + ':' + new_value
 | ||
|         if found:
 | ||
|             if wf_json['aliases'][alias_index] != new_alias:
 | ||
|                 changed = True
 | ||
|                 wf_json['aliases'][alias_index] = new_alias
 | ||
|         else:
 | ||
|             wf_json['aliases'].append(new_alias)
 | ||
|             changed = True
 | ||
| 
 | ||
|     # remove any aliases which are no longer in the actor profile
 | ||
|     remove_alias = []
 | ||
|     for alias in aliases_not_found:
 | ||
|         for full_alias in wf_json['aliases']:
 | ||
|             if full_alias.startswith(alias + ':'):
 | ||
|                 remove_alias.append(full_alias)
 | ||
|     for full_alias in remove_alias:
 | ||
|         wf_json['aliases'].remove(full_alias)
 | ||
|         changed = True
 | ||
| 
 | ||
|     if _webfinger_update_avatar(wf_json, actor_json):
 | ||
|         changed = True
 | ||
| 
 | ||
|     if _webfinger_update_vcard(wf_json, actor_json):
 | ||
|         changed = True
 | ||
| 
 | ||
|     if _webfinger_add_blog_link(wf_json, actor_json):
 | ||
|         changed = True
 | ||
| 
 | ||
|     return changed
 | ||
| 
 | ||
| 
 | ||
| def webfinger_update(base_dir: str, nickname: str, domain: str,
 | ||
|                      onion_domain: str, i2p_domain: str,
 | ||
|                      cached_webfingers: {}) -> None:
 | ||
|     """Regenerates stored webfinger
 | ||
|     """
 | ||
|     handle = nickname + '@' + domain
 | ||
|     wf_subdir = '/wfendpoints'
 | ||
|     if not os.path.isdir(base_dir + wf_subdir):
 | ||
|         return
 | ||
| 
 | ||
|     filename = base_dir + wf_subdir + '/' + handle + '.json'
 | ||
|     onionify = False
 | ||
|     i2pify = False
 | ||
|     if onion_domain:
 | ||
|         if onion_domain in handle:
 | ||
|             handle = handle.replace(onion_domain, domain)
 | ||
|             onionify = True
 | ||
|     elif i2p_domain:
 | ||
|         if i2p_domain in handle:
 | ||
|             handle = handle.replace(i2p_domain, domain)
 | ||
|             i2pify = True
 | ||
|     if not onionify:
 | ||
|         if not i2pify:
 | ||
|             wf_json = load_json(filename)
 | ||
|         else:
 | ||
|             wf_json = load_json_onionify(filename, domain, i2p_domain)
 | ||
|     else:
 | ||
|         wf_json = load_json_onionify(filename, domain, onion_domain)
 | ||
|     if not wf_json:
 | ||
|         return
 | ||
| 
 | ||
|     actor_filename = acct_handle_dir(base_dir, handle) + '.json'
 | ||
|     actor_json = load_json(actor_filename)
 | ||
|     if not actor_json:
 | ||
|         return
 | ||
| 
 | ||
|     if _webfinger_updateFromProfile(wf_json, actor_json):
 | ||
|         if save_json(wf_json, filename):
 | ||
|             store_webfinger_in_cache(handle, wf_json, cached_webfingers)
 |