Improve standards compliance for getting actor public key

main
Bob Mottram 2023-07-10 23:30:05 +01:00
parent 5554f8bbe7
commit 5160005509
5 changed files with 61 additions and 35 deletions

View File

@ -137,6 +137,34 @@ def get_webfinger_from_cache(handle: str, cached_webfingers: {}) -> {}:
return None
def get_actor_public_key_from_id(person_json: {}, key_id: str) -> (str, str):
"""Returns the public key referenced by the given id
https://codeberg.org/fediverse/fep/src/branch/main/fep/521a/fep-521a.md
"""
pub_key = None
pub_key_id = None
if person_json.get('publicKey'):
if person_json['publicKey'].get('publicKeyPem'):
pub_key = person_json['publicKey']['publicKeyPem']
if person_json['publicKey'].get('id'):
pub_key_id = person_json['publicKey']['id']
elif person_json.get('authentication'):
if isinstance(person_json['authentication'], list):
for key_dict in person_json['authentication']:
if not key_dict.get('id') or \
not key_dict.get('publicKeyMultibase'):
continue
if key_id is None or key_dict['id'] == key_id:
pub_key = key_dict['publicKeyMultibase']
pub_key_id = key_dict['id']
break
if not pub_key and person_json.get('publicKeyPem'):
pub_key = person_json['publicKeyPem']
if person_json.get('id'):
pub_key_id = person_json['id']
return pub_key, pub_key_id
def get_person_pub_key(base_dir: str, session, person_url: str,
person_cache: {}, debug: bool,
project_version: str, http_prefix: str,
@ -145,6 +173,7 @@ def get_person_pub_key(base_dir: str, session, person_url: str,
signing_priv_key_pem: str) -> str:
"""Get the public key for an actor
"""
original_person_url = person_url
if not person_url:
return None
if '#/publicKey' in person_url:
@ -185,14 +214,7 @@ def get_person_pub_key(base_dir: str, session, person_url: str,
project_version, http_prefix, person_domain)
if not person_json:
return None
pub_key = None
if person_json.get('publicKey'):
if person_json['publicKey'].get('publicKeyPem'):
pub_key = person_json['publicKey']['publicKeyPem']
else:
if person_json.get('publicKeyPem'):
pub_key = person_json['publicKeyPem']
pub_key, _ = get_actor_public_key_from_id(person_json, original_person_url)
if not pub_key:
if debug:
print('DEBUG: Public key not found for ' + person_url)

View File

@ -378,6 +378,7 @@ from content import add_html_tags
from content import extract_media_in_form_post
from content import save_media_in_form_post
from content import extract_text_fields_in_post
from cache import get_actor_public_key_from_id
from cache import check_for_changed_actor
from cache import store_person_in_cache
from cache import get_person_from_cache
@ -972,9 +973,14 @@ class PubServer(BaseHTTPRequestHandler):
return None
store_person_in_cache(base_dir, actor, actor_json,
person_cache, False)
if not actor_json.get('publicKey'):
if not actor_json.get('publicKey') and \
not actor_json.get('authentication'):
return None
return actor_json['publicKey']
original_person_url = \
self._get_instance_url(calling_domain) + path
pub_key, _ = \
get_actor_public_key_from_id(actor_json, original_person_url)
return pub_key
def _login_headers(self, file_format: str, length: int,
calling_domain: str) -> None:

View File

@ -90,6 +90,7 @@ from follow import no_of_follow_requests
from follow import get_no_of_followers
from follow import follow_approval_required
from pprint import pprint
from cache import get_actor_public_key_from_id
from cache import store_person_in_cache
from cache import get_person_pub_key
from acceptreject import receive_accept_reject
@ -1201,22 +1202,21 @@ def _person_receive_update(base_dir: str,
print('DEBUG: You can only receive actor updates ' +
'for domains other than your own')
return False
if not person_json.get('publicKey'):
person_pub_key, _ = \
get_actor_public_key_from_id(person_json, None)
if not person_pub_key:
if debug:
print('DEBUG: actor update does not contain a public key')
return False
if not person_json['publicKey'].get('publicKeyPem'):
if debug:
print('DEBUG: actor update does not contain a public key Pem')
return False
actor_filename = base_dir + '/cache/actors/' + \
person_json['id'].replace('/', '#') + '.json'
# check that the public keys match.
# If they don't then this may be a nefarious attempt to hack an account
idx = person_json['id']
if person_cache.get(idx):
if person_cache[idx]['actor']['publicKey']['publicKeyPem'] != \
person_json['publicKey']['publicKeyPem']:
cache_pub_key, _ = \
get_actor_public_key_from_id(person_cache[idx]['actor'], None)
if cache_pub_key != person_pub_key:
if debug:
print('WARN: Public key does not match when updating actor')
return False
@ -1224,26 +1224,27 @@ def _person_receive_update(base_dir: str,
if os.path.isfile(actor_filename):
existing_person_json = load_json(actor_filename)
if existing_person_json:
if existing_person_json['publicKey']['publicKeyPem'] != \
person_json['publicKey']['publicKeyPem']:
existing_pub_key, _ = \
get_actor_public_key_from_id(existing_person_json, None)
if existing_pub_key != person_pub_key:
if debug:
print('WARN: Public key does not match ' +
'cached actor when updating')
return False
# save to cache in memory
store_person_in_cache(base_dir, person_json['id'], person_json,
store_person_in_cache(base_dir, idx, person_json,
person_cache, True)
# save to cache on file
if save_json(person_json, actor_filename):
if debug:
print('actor updated for ' + person_json['id'])
print('actor updated for ' + idx)
if person_json.get('movedTo'):
prev_domain_full = None
prev_domain, prev_port = get_domain_from_actor(person_json['id'])
prev_domain, prev_port = get_domain_from_actor(idx)
if prev_domain:
prev_domain_full = get_full_domain(prev_domain, prev_port)
prev_nickname = get_nickname_from_actor(person_json['id'])
prev_nickname = get_nickname_from_actor(idx)
new_domain = None
new_domain, new_port = get_domain_from_actor(person_json['movedTo'])
if new_domain:

View File

@ -19,6 +19,7 @@ from time import gmtime, strftime
from collections import OrderedDict
from threads import thread_with_trace
from threads import begin_thread
from cache import get_actor_public_key_from_id
from cache import store_person_in_cache
from cache import get_person_from_cache
from cache import expire_person_cache
@ -392,13 +393,7 @@ def get_person_box(signing_priv_key_pem: str, origin_domain: str,
person_id = None
if person_json.get('id'):
person_id = person_json['id']
pub_key_id = None
pub_key = None
if person_json.get('publicKey'):
if person_json['publicKey'].get('id'):
pub_key_id = person_json['publicKey']['id']
if person_json['publicKey'].get('publicKeyPem'):
pub_key = person_json['publicKey']['publicKeyPem']
pub_key, pub_key_id = get_actor_public_key_from_id(person_json, None)
shared_inbox = None
if person_json.get('sharedInbox'):
shared_inbox = person_json['sharedInbox']

View File

@ -34,6 +34,7 @@ from utils import local_actor_url
from utils import text_in_file
from utils import remove_eol
from filters import is_filtered
from cache import get_actor_public_key_from_id
from cache import store_person_in_cache
from content import add_html_tags
from content import replace_emoji_from_tags
@ -422,16 +423,17 @@ def update_avatar_image_cache(signing_priv_key_pem: str,
if person_json:
if not person_json.get('id'):
return None
if not person_json.get('publicKey'):
return None
if not person_json['publicKey'].get('publicKeyPem'):
pub_key, _ = get_actor_public_key_from_id(person_json, None)
if not pub_key:
return None
if person_json['id'] != actor:
return None
if not person_cache.get(actor):
return None
if person_cache[actor]['actor']['publicKey']['publicKeyPem'] != \
person_json['publicKey']['publicKeyPem']:
cache_key, _ = \
get_actor_public_key_from_id(person_cache[actor]['actor'],
None)
if cache_key != pub_key:
print("ERROR: " +
"public keys don't match when downloading actor for " +
actor)