Remove default value

main
Bob Mottram 2024-01-31 22:23:57 +00:00
parent 0875975d71
commit 0e35d6a74c
2 changed files with 47 additions and 45 deletions

View File

@ -410,6 +410,7 @@ from cache import check_for_changed_actor
from cache import store_person_in_cache
from cache import get_person_from_cache
from cache import get_person_pub_key
from httpsig import signed_get_key_id
from httpsig import getheader_signature_input
from httpsig import verify_post_headers
from theme import reset_theme_designer_settings
@ -679,35 +680,6 @@ class PubServer(BaseHTTPRequestHandler):
else:
print('ERROR: unable to create vote')
def _signed_get_key_id(self, headers: {}, debug: bool) -> str:
"""Returns the actor from the signed GET key_id
"""
signature = None
if headers.get('signature'):
signature = headers['signature']
elif self.headers.get('Signature'):
signature = headers['Signature']
# check that the headers are signed
if not signature:
if debug:
print('AUTH: secure mode actor, ' +
'GET has no signature in headers')
return None
# get the key_id, which is typically the instance actor
key_id = None
signature_params = signature.split(',')
for signature_item in signature_params:
if signature_item.startswith('keyId='):
if '"' in signature_item:
key_id = signature_item.split('"')[1]
# remove #/main-key or #main-key
if '#' in key_id:
key_id = key_id.split('#')[0]
return key_id
return None
def _establish_session(self,
calling_function: str,
curr_session,
@ -726,14 +698,14 @@ class PubServer(BaseHTTPRequestHandler):
return None
def _secure_mode(self, curr_session, proxy_type: str,
force: bool = False) -> bool:
force: bool) -> bool:
"""http authentication of GET requests for json
aka authorized fetch
"""
if not self.server.secure_mode and not force:
return True
key_id = self._signed_get_key_id(self.headers, self.server.debug)
key_id = signed_get_key_id(self.headers, self.server.debug)
if not key_id:
if self.server.debug:
print('AUTH: secure mode, ' +
@ -12475,7 +12447,7 @@ class PubServer(BaseHTTPRequestHandler):
'_GET', '_show_replies_to_post',
debug)
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
msg_str = json.dumps(replies_json, ensure_ascii=False)
msg_str = convert_domains(calling_domain,
referer_domain,
@ -12594,7 +12566,7 @@ class PubServer(BaseHTTPRequestHandler):
'_GET', '_show_replies_to_post',
debug)
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
msg_str = json.dumps(replies_json, ensure_ascii=False)
msg_str = convert_domains(calling_domain,
referer_domain,
@ -12721,7 +12693,7 @@ class PubServer(BaseHTTPRequestHandler):
fitness_performance(getreq_start_time, self.server.fitness,
'_GET', '_show_roles', debug)
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
roles_list = get_actor_roles_list(actor_json)
msg_str = json.dumps(roles_list, ensure_ascii=False)
msg_str = convert_domains(calling_domain,
@ -12862,7 +12834,7 @@ class PubServer(BaseHTTPRequestHandler):
self.server.debug)
else:
if self._secure_mode(curr_session,
proxy_type):
proxy_type, False):
actor_skills_list = \
get_occupation_skills(actor_json)
skills = \
@ -13309,7 +13281,7 @@ class PubServer(BaseHTTPRequestHandler):
'_GET', '_show_post_from_file',
debug)
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
if not include_create_wrapper and \
post_json_object['type'] == 'Create' and \
has_object_dict(post_json_object):
@ -15282,7 +15254,7 @@ class PubServer(BaseHTTPRequestHandler):
'_GET', '_show_outbox_timeline',
debug)
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
onion_domain = self.server.onion_domain
i2p_domain = self.server.i2p_domain
msg_str = json.dumps(outbox_feed,
@ -15603,7 +15575,7 @@ class PubServer(BaseHTTPRequestHandler):
self.server.getreq_busy = False
return True
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
onion_domain = self.server.onion_domain
i2p_domain = self.server.i2p_domain
msg_str = json.dumps(shares,
@ -15761,7 +15733,7 @@ class PubServer(BaseHTTPRequestHandler):
debug)
return True
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
if '/users/' in path:
nickname = path.split('/users/')[1]
if '/' in nickname:
@ -15918,7 +15890,7 @@ class PubServer(BaseHTTPRequestHandler):
debug)
return True
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
msg_str = json.dumps(following,
ensure_ascii=False)
msg_str = convert_domains(calling_domain,
@ -16073,7 +16045,7 @@ class PubServer(BaseHTTPRequestHandler):
debug)
return True
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
msg_str = json.dumps(following,
ensure_ascii=False)
msg_str = convert_domains(calling_domain,
@ -16230,7 +16202,7 @@ class PubServer(BaseHTTPRequestHandler):
debug)
return True
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
if '/users/' in path:
nickname = path.split('/users/')[1]
if '/' in nickname:
@ -16425,7 +16397,7 @@ class PubServer(BaseHTTPRequestHandler):
if self.server.debug:
print('DEBUG: html actor sent')
else:
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
accept_str = self.headers['Accept']
msg_str = json.dumps(actor_json, ensure_ascii=False)
msg_str = convert_domains(calling_domain,
@ -17674,7 +17646,7 @@ class PubServer(BaseHTTPRequestHandler):
print('DEBUG: followers synchronization request ' +
self.path + ' ' + calling_domain)
# check authorized fetch
if self._secure_mode(curr_session, proxy_type):
if self._secure_mode(curr_session, proxy_type, False):
nickname = get_nickname_from_actor(self.path)
sync_cache = self.server.followers_sync_cache
sync_json, _ = \
@ -21582,7 +21554,7 @@ class PubServer(BaseHTTPRequestHandler):
return
if not self._secure_mode(curr_session,
proxy_type):
proxy_type, False):
if self.server.debug:
print('WARN: Unauthorized GET')
self._404()

View File

@ -583,3 +583,33 @@ def getheader_signature_input(headers: {}):
# Ye olde Masto http sig
return headers['signature']
return None
def signed_get_key_id(headers: {}, debug: bool) -> str:
"""Returns the actor from the signed GET key_id
"""
signature = None
if headers.get('signature'):
signature = headers['signature']
elif headers.get('Signature'):
signature = headers['Signature']
# check that the headers are signed
if not signature:
if debug:
print('AUTH: secure mode actor, ' +
'GET has no signature in headers')
return None
# get the key_id, which is typically the instance actor
key_id = None
signature_params = signature.split(',')
for signature_item in signature_params:
if signature_item.startswith('keyId='):
if '"' in signature_item:
key_id = signature_item.split('"')[1]
# remove #/main-key or #main-key
if '#' in key_id:
key_id = key_id.split('#')[0]
return key_id
return None