Move function out of daemon

main
Bob Mottram 2024-01-29 21:45:04 +00:00
parent 7cf4871eec
commit bb43d669ba
2 changed files with 63 additions and 52 deletions

View File

@ -64,6 +64,7 @@ from donate import get_website
from donate import set_website
from donate import get_gemini_link
from donate import set_gemini_link
from person import get_account_pub_key
from person import update_memorial_flags
from person import get_featured_hashtags
from person import set_featured_hashtags
@ -405,7 +406,6 @@ from content import save_media_in_form_post
from content import extract_text_fields_in_post
from cache import remove_person_from_cache
from cache import clear_actor_cache
from cache import get_actor_public_key_from_id
from cache import check_for_changed_actor
from cache import store_person_in_cache
from cache import get_person_from_cache
@ -797,54 +797,6 @@ class PubServer(BaseHTTPRequestHandler):
print('AUTH: secure mode authorization failed for ' + key_id)
return False
def _get_account_pub_key(self, path: str, person_cache: {},
base_dir: str, domain: str,
calling_domain: str) -> str:
"""Returns the public key for an account
"""
if '/users/' not in path:
return None
nickname = path.split('/users/')[1]
if '#main-key' in nickname:
nickname = nickname.split('#main-key')[0]
elif '/main-key' in nickname:
nickname = nickname.split('/main-key')[0]
elif '#/publicKey' in nickname:
nickname = nickname.split('#/publicKey')[0]
else:
return None
actor = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/users/' + nickname
actor_json = get_person_from_cache(base_dir, actor, person_cache)
if not actor_json:
actor_filename = acct_dir(base_dir, nickname, domain) + '.json'
if not os.path.isfile(actor_filename):
return None
actor_json = load_json(actor_filename, 1, 1)
if not actor_json:
return None
store_person_in_cache(base_dir, actor, actor_json,
person_cache, False)
if not actor_json.get('publicKey') and \
not actor_json.get('assertionMethod'):
return None
original_person_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
path
pub_key, _ = \
get_actor_public_key_from_id(actor_json, original_person_url)
return pub_key
def _login_headers(self, file_format: str, length: int,
calling_domain: str) -> None:
self.send_response(200)
@ -17670,9 +17622,13 @@ class PubServer(BaseHTTPRequestHandler):
# getting the public key for an account
acct_pub_key_json = \
self._get_account_pub_key(self.path, self.server.person_cache,
get_account_pub_key(self.path, self.server.person_cache,
self.server.base_dir,
self.server.domain, calling_domain)
self.server.domain, calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain)
if acct_pub_key_json:
msg_str = json.dumps(acct_pub_key_json, ensure_ascii=False)
msg = msg_str.encode('utf-8')

View File

@ -37,6 +37,7 @@ from roles import set_role
from roles import actor_roles_from_list
from roles import get_actor_roles_list
from media import process_meta_data
from utils import get_instance_url
from utils import get_url_from_post
from utils import date_utcnow
from utils import get_memorials
@ -77,6 +78,7 @@ from session import create_session
from session import get_json
from webfinger import webfinger_handle
from pprint import pprint
from cache import get_actor_public_key_from_id
from cache import get_person_from_cache
from cache import store_person_in_cache
from cache import remove_person_from_cache
@ -2236,3 +2238,56 @@ def update_memorial_flags(base_dir: str, person_cache: {}) -> None:
store_person_in_cache(base_dir, actor, actor_json,
person_cache, True)
break
def get_account_pub_key(path: str, person_cache: {},
base_dir: str, domain: str,
calling_domain: str,
http_prefix: str,
domain_full: str,
onion_domain: str,
i2p_domain: str) -> str:
"""Returns the public key for an account
"""
if '/users/' not in path:
return None
nickname = path.split('/users/')[1]
if '#main-key' in nickname:
nickname = nickname.split('#main-key')[0]
elif '/main-key' in nickname:
nickname = nickname.split('/main-key')[0]
elif '#/publicKey' in nickname:
nickname = nickname.split('#/publicKey')[0]
else:
return None
actor = \
get_instance_url(calling_domain,
http_prefix,
domain_full,
onion_domain,
i2p_domain) + \
'/users/' + nickname
actor_json = get_person_from_cache(base_dir, actor, person_cache)
if not actor_json:
actor_filename = acct_dir(base_dir, nickname, domain) + '.json'
if not os.path.isfile(actor_filename):
return None
actor_json = load_json(actor_filename, 1, 1)
if not actor_json:
return None
store_person_in_cache(base_dir, actor, actor_json,
person_cache, False)
if not actor_json.get('publicKey') and \
not actor_json.get('assertionMethod'):
return None
original_person_url = \
get_instance_url(calling_domain,
http_prefix,
domain_full,
onion_domain,
i2p_domain) + \
path
pub_key, _ = \
get_actor_public_key_from_id(actor_json, original_person_url)
return pub_key