Snake case

merge-requests/26/head
Bob Mottram 2022-01-02 15:41:05 +00:00
parent 27c91490b0
commit d57c7eb6f2
1 changed files with 276 additions and 279 deletions

View File

@ -28,129 +28,124 @@ from utils import get_sha_512
from utils import local_actor_url from utils import local_actor_url
def message_content_digest(messageBodyJsonStr: str, def message_content_digest(message_body_json_str: str,
digestAlgorithm: str) -> str: digest_algorithm: str) -> str:
"""Returns the digest for the message body """Returns the digest for the message body
""" """
msg = messageBodyJsonStr.encode('utf-8') msg = message_body_json_str.encode('utf-8')
if digestAlgorithm == 'rsa-sha512' or \ if digest_algorithm in ('rsa-sha512', 'rsa-pss-sha512'):
digestAlgorithm == 'rsa-pss-sha512': hash_result = get_sha_512(msg)
hashResult = get_sha_512(msg)
else: else:
hashResult = get_sha_256(msg) hash_result = get_sha_256(msg)
return base64.b64encode(hashResult).decode('utf-8') return base64.b64encode(hash_result).decode('utf-8')
def get_digest_prefix(digestAlgorithm: str) -> str: def get_digest_prefix(digest_algorithm: str) -> str:
"""Returns the prefix for the message body digest """Returns the prefix for the message body digest
""" """
if digestAlgorithm == 'rsa-sha512' or \ if digest_algorithm in ('rsa-sha512', 'rsa-pss-sha512'):
digestAlgorithm == 'rsa-pss-sha512':
return 'SHA-512' return 'SHA-512'
return 'SHA-256' return 'SHA-256'
def get_digest_algorithm_from_headers(httpHeaders: {}) -> str: def get_digest_algorithm_from_headers(http_headers: {}) -> str:
"""Returns the digest algorithm from http headers """Returns the digest algorithm from http headers
""" """
digestStr = None digest_str = None
if httpHeaders.get('digest'): if http_headers.get('digest'):
digestStr = httpHeaders['digest'] digest_str = http_headers['digest']
elif httpHeaders.get('Digest'): elif http_headers.get('Digest'):
digestStr = httpHeaders['Digest'] digest_str = http_headers['Digest']
if digestStr: if digest_str:
if digestStr.startswith('SHA-512'): if digest_str.startswith('SHA-512'):
return 'rsa-sha512' return 'rsa-sha512'
return 'rsa-sha256' return 'rsa-sha256'
def sign_post_headers(dateStr: str, privateKeyPem: str, def sign_post_headers(dateStr: str, private_key_pem: str,
nickname: str, nickname: str, domain: str, port: int,
domain: str, port: int, to_domain: str, to_port: int,
toDomain: str, toPort: int, path: str, http_prefix: str,
path: str, message_body_json_str: str,
http_prefix: str, content_type: str, algorithm: str,
messageBodyJsonStr: str, digest_algorithm: str) -> str:
content_type: str,
algorithm: str,
digestAlgorithm: str) -> str:
"""Returns a raw signature string that can be plugged into a header and """Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission. used to verify the authenticity of an HTTP transmission.
""" """
domain = get_full_domain(domain, port) domain = get_full_domain(domain, port)
toDomain = get_full_domain(toDomain, toPort) to_domain = get_full_domain(to_domain, to_port)
if not dateStr: if not dateStr:
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
if nickname != domain and nickname.lower() != 'actor': if nickname != domain and nickname.lower() != 'actor':
keyID = local_actor_url(http_prefix, nickname, domain) key_id = local_actor_url(http_prefix, nickname, domain)
else: else:
# instance actor # instance actor
keyID = http_prefix + '://' + domain + '/actor' key_id = http_prefix + '://' + domain + '/actor'
keyID += '#main-key' key_id += '#main-key'
if not messageBodyJsonStr: if not message_body_json_str:
headers = { headers = {
'(request-target)': f'get {path}', '(request-target)': f'get {path}',
'host': toDomain, 'host': to_domain,
'date': dateStr, 'date': dateStr,
'accept': content_type 'accept': content_type
} }
else: else:
bodyDigest = \ body_digest = \
message_content_digest(messageBodyJsonStr, digestAlgorithm) message_content_digest(message_body_json_str, digest_algorithm)
digestPrefix = get_digest_prefix(digestAlgorithm) digest_prefix = get_digest_prefix(digest_algorithm)
contentLength = len(messageBodyJsonStr) content_length = len(message_body_json_str)
headers = { headers = {
'(request-target)': f'post {path}', '(request-target)': f'post {path}',
'host': toDomain, 'host': to_domain,
'date': dateStr, 'date': dateStr,
'digest': f'{digestPrefix}={bodyDigest}', 'digest': f'{digest_prefix}={body_digest}',
'content-type': 'application/activity+json', 'content-type': 'application/activity+json',
'content-length': str(contentLength) 'content-length': str(content_length)
} }
key = load_pem_private_key(privateKeyPem.encode('utf-8'), key = load_pem_private_key(private_key_pem.encode('utf-8'),
None, backend=default_backend()) None, backend=default_backend())
# headers.update({ # headers.update({
# '(request-target)': f'post {path}', # '(request-target)': f'post {path}',
# }) # })
# build a digest for signing # build a digest for signing
signedHeaderKeys = headers.keys() signed_header_keys = headers.keys()
signedHeaderText = '' signed_header_text = ''
for headerKey in signedHeaderKeys: for header_key in signed_header_keys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signed_header_text += f'{header_key}: {headers[header_key]}\n'
# strip the trailing linefeed # strip the trailing linefeed
signedHeaderText = signedHeaderText.rstrip('\n') signed_header_text = signed_header_text.rstrip('\n')
# signedHeaderText.encode('ascii') matches # signed_header_text.encode('ascii') matches
headerDigest = get_sha_256(signedHeaderText.encode('ascii')) header_digest = get_sha_256(signed_header_text.encode('ascii'))
# print('headerDigest2: ' + str(headerDigest)) # print('header_digest2: ' + str(header_digest))
# Sign the digest # Sign the digest
rawSignature = key.sign(headerDigest, raw_signature = key.sign(header_digest,
padding.PKCS1v15(), padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA256())) hazutils.Prehashed(hashes.SHA256()))
signature = base64.b64encode(rawSignature).decode('ascii') signature = base64.b64encode(raw_signature).decode('ascii')
# Put it into a valid HTTP signature format # Put it into a valid HTTP signature format
signatureDict = { signature_dict = {
'keyId': keyID, 'keyId': key_id,
'algorithm': algorithm, 'algorithm': algorithm,
'headers': ' '.join(signedHeaderKeys), 'headers': ' '.join(signed_header_keys),
'signature': signature 'signature': signature
} }
signatureHeader = ','.join( signature_header = ','.join(
[f'{k}="{v}"' for k, v in signatureDict.items()]) [f'{k}="{v}"' for k, v in signature_dict.items()])
return signatureHeader return signature_header
def sign_post_headers_new(dateStr: str, privateKeyPem: str, def sign_post_headers_new(dateStr: str, private_key_pem: str,
nickname: str, nickname: str,
domain: str, port: int, domain: str, port: int,
toDomain: str, toPort: int, to_domain: str, to_port: int,
path: str, path: str,
http_prefix: str, http_prefix: str,
messageBodyJsonStr: str, message_body_json_str: str,
algorithm: str, digestAlgorithm: str, algorithm: str, digest_algorithm: str,
debug: bool) -> (str, str): debug: bool) -> (str, str):
"""Returns a raw signature strings that can be plugged into a header """Returns a raw signature strings that can be plugged into a header
as "Signature-Input" and "Signature" as "Signature-Input" and "Signature"
@ -159,97 +154,97 @@ def sign_post_headers_new(dateStr: str, privateKeyPem: str,
""" """
domain = get_full_domain(domain, port) domain = get_full_domain(domain, port)
toDomain = get_full_domain(toDomain, toPort) to_domain = get_full_domain(to_domain, to_port)
timeFormat = "%a, %d %b %Y %H:%M:%S %Z" time_format = "%a, %d %b %Y %H:%M:%S %Z"
if not dateStr: if not dateStr:
curr_time = gmtime() curr_time = gmtime()
dateStr = strftime(timeFormat, curr_time) dateStr = strftime(time_format, curr_time)
else: else:
curr_time = datetime.datetime.strptime(dateStr, timeFormat) curr_time = datetime.datetime.strptime(dateStr, time_format)
secondsSinceEpoch = \ seconds_since_epoch = \
int((curr_time - datetime.datetime(1970, 1, 1)).total_seconds()) int((curr_time - datetime.datetime(1970, 1, 1)).total_seconds())
keyID = local_actor_url(http_prefix, nickname, domain) + '#main-key' key_id = local_actor_url(http_prefix, nickname, domain) + '#main-key'
if not messageBodyJsonStr: if not message_body_json_str:
headers = { headers = {
'@request-target': f'get {path}', '@request-target': f'get {path}',
'@created': str(secondsSinceEpoch), '@created': str(seconds_since_epoch),
'host': toDomain, 'host': to_domain,
'date': dateStr 'date': dateStr
} }
else: else:
bodyDigest = message_content_digest(messageBodyJsonStr, body_digest = message_content_digest(message_body_json_str,
digestAlgorithm) digest_algorithm)
digestPrefix = get_digest_prefix(digestAlgorithm) digest_prefix = get_digest_prefix(digest_algorithm)
contentLength = len(messageBodyJsonStr) content_length = len(message_body_json_str)
headers = { headers = {
'@request-target': f'post {path}', '@request-target': f'post {path}',
'@created': str(secondsSinceEpoch), '@created': str(seconds_since_epoch),
'host': toDomain, 'host': to_domain,
'date': dateStr, 'date': dateStr,
'digest': f'{digestPrefix}={bodyDigest}', 'digest': f'{digest_prefix}={body_digest}',
'content-type': 'application/activity+json', 'content-type': 'application/activity+json',
'content-length': str(contentLength) 'content-length': str(content_length)
} }
key = load_pem_private_key(privateKeyPem.encode('utf-8'), key = load_pem_private_key(private_key_pem.encode('utf-8'),
None, backend=default_backend()) None, backend=default_backend())
# build a digest for signing # build a digest for signing
signedHeaderKeys = headers.keys() signed_header_keys = headers.keys()
signedHeaderText = '' signed_header_text = ''
for headerKey in signedHeaderKeys: for header_key in signed_header_keys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signed_header_text += f'{header_key}: {headers[header_key]}\n'
signedHeaderText = signedHeaderText.strip() signed_header_text = signed_header_text.strip()
if debug: if debug:
print('\nsign_post_headers_new signedHeaderText:\n' + print('\nsign_post_headers_new signed_header_text:\n' +
signedHeaderText + '\nEND\n') signed_header_text + '\nEND\n')
# Sign the digest. Potentially other signing algorithms can be added here. # Sign the digest. Potentially other signing algorithms can be added here.
signature = '' signature = ''
if algorithm == 'rsa-sha512': if algorithm == 'rsa-sha512':
headerDigest = get_sha_512(signedHeaderText.encode('ascii')) header_digest = get_sha_512(signed_header_text.encode('ascii'))
rawSignature = key.sign(headerDigest, raw_signature = key.sign(header_digest,
padding.PKCS1v15(), padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA512())) hazutils.Prehashed(hashes.SHA512()))
signature = base64.b64encode(rawSignature).decode('ascii') signature = base64.b64encode(raw_signature).decode('ascii')
else: else:
# default rsa-sha256 # default rsa-sha256
headerDigest = get_sha_256(signedHeaderText.encode('ascii')) header_digest = get_sha_256(signed_header_text.encode('ascii'))
rawSignature = key.sign(headerDigest, raw_signature = key.sign(header_digest,
padding.PKCS1v15(), padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA256())) hazutils.Prehashed(hashes.SHA256()))
signature = base64.b64encode(rawSignature).decode('ascii') signature = base64.b64encode(raw_signature).decode('ascii')
sigKey = 'sig1' sig_key = 'sig1'
# Put it into a valid HTTP signature format # Put it into a valid HTTP signature format
signatureInputDict = { signature_input_dict = {
'keyId': keyID, 'keyId': key_id,
} }
signatureIndexHeader = '; '.join( signature_index_header = '; '.join(
[f'{k}="{v}"' for k, v in signatureInputDict.items()]) [f'{k}="{v}"' for k, v in signature_input_dict.items()])
signatureIndexHeader += '; alg=hs2019' signature_index_header += '; alg=hs2019'
signatureIndexHeader += '; created=' + str(secondsSinceEpoch) signature_index_header += '; created=' + str(seconds_since_epoch)
signatureIndexHeader += \ signature_index_header += \
'; ' + sigKey + '=(' + ', '.join(signedHeaderKeys) + ')' '; ' + sig_key + '=(' + ', '.join(signed_header_keys) + ')'
signatureDict = { signature_dict = {
sigKey: signature sig_key: signature
} }
signatureHeader = '; '.join( signature_header = '; '.join(
[f'{k}=:{v}:' for k, v in signatureDict.items()]) [f'{k}=:{v}:' for k, v in signature_dict.items()])
return signatureIndexHeader, signatureHeader return signature_index_header, signature_header
def create_signed_header(dateStr: str, privateKeyPem: str, nickname: str, def create_signed_header(dateStr: str, private_key_pem: str, nickname: str,
domain: str, port: int, domain: str, port: int,
toDomain: str, toPort: int, to_domain: str, to_port: int,
path: str, http_prefix: str, withDigest: bool, path: str, http_prefix: str, withDigest: bool,
messageBodyJsonStr: str, message_body_json_str: str,
content_type: str) -> {}: content_type: str) -> {}:
"""Note that the domain is the destination, not the sender """Note that the domain is the destination, not the sender
""" """
algorithm = 'rsa-sha256' algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256' digest_algorithm = 'rsa-sha256'
headerDomain = get_full_domain(toDomain, toPort) header_domain = get_full_domain(to_domain, to_port)
# if no date is given then create one # if no date is given then create one
if not dateStr: if not dateStr:
@ -262,54 +257,54 @@ def create_signed_header(dateStr: str, privateKeyPem: str, nickname: str,
if not withDigest: if not withDigest:
headers = { headers = {
'(request-target)': f'get {path}', '(request-target)': f'get {path}',
'host': headerDomain, 'host': header_domain,
'date': dateStr, 'date': dateStr,
'accept': content_type 'accept': content_type
} }
signatureHeader = \ signature_header = \
sign_post_headers(dateStr, privateKeyPem, nickname, sign_post_headers(dateStr, private_key_pem, nickname,
domain, port, toDomain, toPort, domain, port, to_domain, to_port,
path, http_prefix, None, content_type, path, http_prefix, None, content_type,
algorithm, None) algorithm, None)
else: else:
bodyDigest = message_content_digest(messageBodyJsonStr, body_digest = message_content_digest(message_body_json_str,
digestAlgorithm) digest_algorithm)
digestPrefix = get_digest_prefix(digestAlgorithm) digest_prefix = get_digest_prefix(digest_algorithm)
contentLength = len(messageBodyJsonStr) content_length = len(message_body_json_str)
headers = { headers = {
'(request-target)': f'post {path}', '(request-target)': f'post {path}',
'host': headerDomain, 'host': header_domain,
'date': dateStr, 'date': dateStr,
'digest': f'{digestPrefix}={bodyDigest}', 'digest': f'{digest_prefix}={body_digest}',
'content-length': str(contentLength), 'content-length': str(content_length),
'content-type': content_type 'content-type': content_type
} }
signatureHeader = \ signature_header = \
sign_post_headers(dateStr, privateKeyPem, nickname, sign_post_headers(dateStr, private_key_pem, nickname,
domain, port, domain, port,
toDomain, toPort, to_domain, to_port,
path, http_prefix, messageBodyJsonStr, path, http_prefix, message_body_json_str,
content_type, algorithm, digestAlgorithm) content_type, algorithm, digest_algorithm)
headers['signature'] = signatureHeader headers['signature'] = signature_header
return headers return headers
def _verify_recent_signature(signedDateStr: str) -> bool: def _verify_recent_signature(signed_date_str: str) -> bool:
"""Checks whether the given time taken from the header is within """Checks whether the given time taken from the header is within
12 hours of the current time 12 hours of the current time
""" """
currDate = datetime.datetime.utcnow() curr_date = datetime.datetime.utcnow()
dateFormat = "%a, %d %b %Y %H:%M:%S %Z" date_format = "%a, %d %b %Y %H:%M:%S %Z"
signedDate = datetime.datetime.strptime(signedDateStr, dateFormat) signed_date = datetime.datetime.strptime(signed_date_str, date_format)
time_diffSec = (currDate - signedDate).seconds time_diff_sec = (curr_date - signed_date).seconds
# 12 hours tollerance # 12 hours tollerance
if time_diffSec > 43200: if time_diff_sec > 43200:
print('WARN: Header signed too long ago: ' + signedDateStr) print('WARN: Header signed too long ago: ' + signed_date_str)
print(str(time_diffSec / (60 * 60)) + ' hours') print(str(time_diff_sec / (60 * 60)) + ' hours')
return False return False
if time_diffSec < 0: if time_diff_sec < 0:
print('WARN: Header signed in the future! ' + signedDateStr) print('WARN: Header signed in the future! ' + signed_date_str)
print(str(time_diffSec / (60 * 60)) + ' hours') print(str(time_diff_sec / (60 * 60)) + ' hours')
return False return False
return True return True
@ -318,7 +313,7 @@ def verify_post_headers(http_prefix: str,
publicKeyPem: str, headers: dict, publicKeyPem: str, headers: dict,
path: str, GETmethod: bool, path: str, GETmethod: bool,
messageBodyDigest: str, messageBodyDigest: str,
messageBodyJsonStr: str, debug: bool, message_body_json_str: str, debug: bool,
noRecencyCheck: bool = False) -> bool: noRecencyCheck: bool = False) -> bool:
"""Returns true or false depending on if the key that we plugged in here """Returns true or false depending on if the key that we plugged in here
validates against the headers, method, and path. validates against the headers, method, and path.
@ -326,7 +321,7 @@ def verify_post_headers(http_prefix: str,
headers - should be a dictionary of request headers headers - should be a dictionary of request headers
path - the relative url that was requested from this site path - the relative url that was requested from this site
GETmethod - GET or POST GETmethod - GET or POST
messageBodyJsonStr - the received request body (used for digest) message_body_json_str - the received request body (used for digest)
""" """
if GETmethod: if GETmethod:
@ -338,170 +333,172 @@ def verify_post_headers(http_prefix: str,
print('DEBUG: verify_post_headers ' + method) print('DEBUG: verify_post_headers ' + method)
print('verify_post_headers publicKeyPem: ' + str(publicKeyPem)) print('verify_post_headers publicKeyPem: ' + str(publicKeyPem))
print('verify_post_headers headers: ' + str(headers)) print('verify_post_headers headers: ' + str(headers))
print('verify_post_headers messageBodyJsonStr: ' + print('verify_post_headers message_body_json_str: ' +
str(messageBodyJsonStr)) str(message_body_json_str))
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'), pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend()) backend=default_backend())
# Build a dictionary of the signature values # Build a dictionary of the signature values
if headers.get('Signature-Input') or headers.get('signature-input'): if headers.get('Signature-Input') or headers.get('signature-input'):
if headers.get('Signature-Input'): if headers.get('Signature-Input'):
signatureHeader = headers['Signature-Input'] signature_header = headers['Signature-Input']
else: else:
signatureHeader = headers['signature-input'] signature_header = headers['signature-input']
fieldSep2 = ',' field_sep2 = ','
# split the signature input into separate fields # split the signature input into separate fields
signatureDict = { signature_dict = {
k.strip(): v.strip() k.strip(): v.strip()
for k, v in [i.split('=', 1) for i in signatureHeader.split(';')] for k, v in [i.split('=', 1) for i in signature_header.split(';')]
} }
requestTargetKey = None request_target_key = None
requestTargetStr = None request_target_str = None
for k, v in signatureDict.items(): for key_str, value_str in signature_dict.items():
if v.startswith('('): if value_str.startswith('('):
requestTargetKey = k request_target_key = key_str
requestTargetStr = v[1:-1] request_target_str = value_str[1:-1]
elif v.startswith('"'): elif value_str.startswith('"'):
signatureDict[k] = v[1:-1] signature_dict[key_str] = value_str[1:-1]
if not requestTargetKey: if not request_target_key:
return False return False
signatureDict[requestTargetKey] = requestTargetStr signature_dict[request_target_key] = request_target_str
else: else:
requestTargetKey = 'headers' request_target_key = 'headers'
signatureHeader = headers['signature'] signature_header = headers['signature']
fieldSep2 = ' ' field_sep2 = ' '
# split the signature input into separate fields # split the signature input into separate fields
signatureDict = { signature_dict = {
k: v[1:-1] k: v[1:-1]
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')] for k, v in [i.split('=', 1) for i in signature_header.split(',')]
} }
if debug: if debug:
print('signatureDict: ' + str(signatureDict)) print('signature_dict: ' + str(signature_dict))
# Unpack the signed headers and set values based on current headers and # Unpack the signed headers and set values based on current headers and
# body (if a digest was included) # body (if a digest was included)
signedHeaderList = [] signed_header_list = []
algorithm = 'rsa-sha256' algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256' digest_algorithm = 'rsa-sha256'
for signedHeader in signatureDict[requestTargetKey].split(fieldSep2): for signed_header in signature_dict[request_target_key].split(field_sep2):
signedHeader = signedHeader.strip() signed_header = signed_header.strip()
if debug: if debug:
print('DEBUG: verify_post_headers signedHeader=' + signedHeader) print('DEBUG: verify_post_headers signed_header=' + signed_header)
if signedHeader == '(request-target)': if signed_header == '(request-target)':
# original Mastodon http signature # original Mastodon http signature
appendStr = f'(request-target): {method.lower()} {path}' append_str = f'(request-target): {method.lower()} {path}'
signedHeaderList.append(appendStr) signed_header_list.append(append_str)
elif '@request-target' in signedHeader: elif '@request-target' in signed_header:
# https://tools.ietf.org/html/ # https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures # draft-ietf-httpbis-message-signatures
appendStr = f'@request-target: {method.lower()} {path}' append_str = f'@request-target: {method.lower()} {path}'
signedHeaderList.append(appendStr) signed_header_list.append(append_str)
elif '@created' in signedHeader: elif '@created' in signed_header:
if signatureDict.get('created'): if signature_dict.get('created'):
createdStr = str(signatureDict['created']) created_str = str(signature_dict['created'])
appendStr = f'@created: {createdStr}' append_str = f'@created: {created_str}'
signedHeaderList.append(appendStr) signed_header_list.append(append_str)
elif '@expires' in signedHeader: elif '@expires' in signed_header:
if signatureDict.get('expires'): if signature_dict.get('expires'):
expiresStr = str(signatureDict['expires']) expires_str = str(signature_dict['expires'])
appendStr = f'@expires: {expiresStr}' append_str = f'@expires: {expires_str}'
signedHeaderList.append(appendStr) signed_header_list.append(append_str)
elif '@method' in signedHeader: elif '@method' in signed_header:
appendStr = f'@expires: {method}' append_str = f'@expires: {method}'
signedHeaderList.append(appendStr) signed_header_list.append(append_str)
elif '@scheme' in signedHeader: elif '@scheme' in signed_header:
signedHeaderList.append('@scheme: http') signed_header_list.append('@scheme: http')
elif '@authority' in signedHeader: elif '@authority' in signed_header:
authorityStr = None authority_str = None
if signatureDict.get('authority'): if signature_dict.get('authority'):
authorityStr = str(signatureDict['authority']) authority_str = str(signature_dict['authority'])
elif signatureDict.get('Authority'): elif signature_dict.get('Authority'):
authorityStr = str(signatureDict['Authority']) authority_str = str(signature_dict['Authority'])
if authorityStr: if authority_str:
appendStr = f'@authority: {authorityStr}' append_str = f'@authority: {authority_str}'
signedHeaderList.append(appendStr) signed_header_list.append(append_str)
elif signedHeader == 'algorithm': elif signed_header == 'algorithm':
if headers.get(signedHeader): if headers.get(signed_header):
algorithm = headers[signedHeader] algorithm = headers[signed_header]
if debug: if debug:
print('http signature algorithm: ' + algorithm) print('http signature algorithm: ' + algorithm)
elif signedHeader == 'digest': elif signed_header == 'digest':
if messageBodyDigest: if messageBodyDigest:
bodyDigest = messageBodyDigest body_digest = messageBodyDigest
else: else:
bodyDigest = \ body_digest = \
message_content_digest(messageBodyJsonStr, digestAlgorithm) message_content_digest(message_body_json_str,
signedHeaderList.append(f'digest: SHA-256={bodyDigest}') digest_algorithm)
elif signedHeader == 'content-length': signed_header_list.append(f'digest: SHA-256={body_digest}')
if headers.get(signedHeader): elif signed_header == 'content-length':
appendStr = f'content-length: {headers[signedHeader]}' if headers.get(signed_header):
signedHeaderList.append(appendStr) append_str = f'content-length: {headers[signed_header]}'
signed_header_list.append(append_str)
elif headers.get('Content-Length'): elif headers.get('Content-Length'):
contentLength = headers['Content-Length'] content_length = headers['Content-Length']
signedHeaderList.append(f'content-length: {contentLength}') signed_header_list.append(f'content-length: {content_length}')
elif headers.get('Content-length'): elif headers.get('Content-length'):
contentLength = headers['Content-length'] content_length = headers['Content-length']
appendStr = f'content-length: {contentLength}' append_str = f'content-length: {content_length}'
signedHeaderList.append(appendStr) signed_header_list.append(append_str)
else: else:
if debug: if debug:
print('DEBUG: verify_post_headers ' + signedHeader + print('DEBUG: verify_post_headers ' + signed_header +
' not found in ' + str(headers)) ' not found in ' + str(headers))
else: else:
if headers.get(signedHeader): if headers.get(signed_header):
if signedHeader == 'date' and not noRecencyCheck: if signed_header == 'date' and not noRecencyCheck:
if not _verify_recent_signature(headers[signedHeader]): if not _verify_recent_signature(headers[signed_header]):
if debug: if debug:
print('DEBUG: ' + print('DEBUG: ' +
'verify_post_headers date is not recent ' + 'verify_post_headers date is not recent ' +
headers[signedHeader]) headers[signed_header])
return False return False
signedHeaderList.append( signed_header_list.append(
f'{signedHeader}: {headers[signedHeader]}') f'{signed_header}: {headers[signed_header]}')
else: else:
if '-' in signedHeader: if '-' in signed_header:
# capitalise with dashes # capitalise with dashes
# my-header becomes My-Header # my-header becomes My-Header
headerParts = signedHeader.split('-') header_parts = signed_header.split('-')
signedHeaderCap = None signed_header_cap = None
for part in headerParts: for part in header_parts:
if signedHeaderCap: if signed_header_cap:
signedHeaderCap += '-' + part.capitalize() signed_header_cap += '-' + part.capitalize()
else: else:
signedHeaderCap = part.capitalize() signed_header_cap = part.capitalize()
else: else:
# header becomes Header # header becomes Header
signedHeaderCap = signedHeader.capitalize() signed_header_cap = signed_header.capitalize()
if debug: if debug:
print('signedHeaderCap: ' + signedHeaderCap) print('signed_header_cap: ' + signed_header_cap)
# if this is the date header then check it is recent # if this is the date header then check it is recent
if signedHeaderCap == 'Date': if signed_header_cap == 'Date':
if not _verify_recent_signature(headers[signedHeaderCap]): signed_hdr_cap = headers[signed_header_cap]
if not _verify_recent_signature(signed_hdr_cap):
if debug: if debug:
print('DEBUG: ' + print('DEBUG: ' +
'verify_post_headers date is not recent ' + 'verify_post_headers date is not recent ' +
headers[signedHeader]) headers[signed_header])
return False return False
# add the capitalised header # add the capitalised header
if headers.get(signedHeaderCap): if headers.get(signed_header_cap):
signedHeaderList.append( signed_header_list.append(
f'{signedHeader}: {headers[signedHeaderCap]}') f'{signed_header}: {headers[signed_header_cap]}')
elif '-' in signedHeader: elif '-' in signed_header:
# my-header becomes My-header # my-header becomes My-header
signedHeaderCap = signedHeader.capitalize() signed_header_cap = signed_header.capitalize()
if headers.get(signedHeaderCap): if headers.get(signed_header_cap):
signedHeaderList.append( signed_header_list.append(
f'{signedHeader}: {headers[signedHeaderCap]}') f'{signed_header}: {headers[signed_header_cap]}')
# Now we have our header data digest # Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList) signed_header_text = '\n'.join(signed_header_list)
if debug: if debug:
print('\nverify_post_headers signedHeaderText:\n' + print('\nverify_post_headers signed_header_text:\n' +
signedHeaderText + '\nEND\n') signed_header_text + '\nEND\n')
# Get the signature, verify with public key, return result # Get the signature, verify with public key, return result
if (headers.get('Signature-Input') and headers.get('Signature')) or \ if (headers.get('Signature-Input') and headers.get('Signature')) or \
@ -509,49 +506,49 @@ def verify_post_headers(http_prefix: str,
# https://tools.ietf.org/html/ # https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures # draft-ietf-httpbis-message-signatures
if headers.get('Signature'): if headers.get('Signature'):
headersSig = headers['Signature'] headers_sig = headers['Signature']
else: else:
headersSig = headers['signature'] headers_sig = headers['signature']
# remove sig1=: # remove sig1=:
if requestTargetKey + '=:' in headersSig: if request_target_key + '=:' in headers_sig:
headersSig = headersSig.split(requestTargetKey + '=:')[1] headers_sig = headers_sig.split(request_target_key + '=:')[1]
headersSig = headersSig[:len(headersSig)-1] headers_sig = headers_sig[:len(headers_sig)-1]
signature = base64.b64decode(headersSig) signature = base64.b64decode(headers_sig)
else: else:
# Original Mastodon signature # Original Mastodon signature
headersSig = signatureDict['signature'] headers_sig = signature_dict['signature']
signature = base64.b64decode(headersSig) signature = base64.b64decode(headers_sig)
if debug: if debug:
print('signature: ' + algorithm + ' ' + headersSig) print('signature: ' + algorithm + ' ' + headers_sig)
# log unusual signing algorithms # log unusual signing algorithms
if signatureDict.get('alg'): if signature_dict.get('alg'):
print('http signature algorithm: ' + signatureDict['alg']) print('http signature algorithm: ' + signature_dict['alg'])
# If extra signing algorithms need to be added then do it here # If extra signing algorithms need to be added then do it here
if not signatureDict.get('alg'): if not signature_dict.get('alg'):
alg = hazutils.Prehashed(hashes.SHA256()) alg = hazutils.Prehashed(hashes.SHA256())
elif (signatureDict['alg'] == 'rsa-sha256' or elif (signature_dict['alg'] == 'rsa-sha256' or
signatureDict['alg'] == 'rsa-v1_5-sha256' or signature_dict['alg'] == 'rsa-v1_5-sha256' or
signatureDict['alg'] == 'hs2019'): signature_dict['alg'] == 'hs2019'):
alg = hazutils.Prehashed(hashes.SHA256()) alg = hazutils.Prehashed(hashes.SHA256())
elif (signatureDict['alg'] == 'rsa-sha512' or elif (signature_dict['alg'] == 'rsa-sha512' or
signatureDict['alg'] == 'rsa-pss-sha512'): signature_dict['alg'] == 'rsa-pss-sha512'):
alg = hazutils.Prehashed(hashes.SHA512()) alg = hazutils.Prehashed(hashes.SHA512())
else: else:
alg = hazutils.Prehashed(hashes.SHA256()) alg = hazutils.Prehashed(hashes.SHA256())
if digestAlgorithm == 'rsa-sha256': if digest_algorithm == 'rsa-sha256':
headerDigest = get_sha_256(signedHeaderText.encode('ascii')) header_digest = get_sha_256(signed_header_text.encode('ascii'))
elif digestAlgorithm == 'rsa-sha512': elif digest_algorithm == 'rsa-sha512':
headerDigest = get_sha_512(signedHeaderText.encode('ascii')) header_digest = get_sha_512(signed_header_text.encode('ascii'))
else: else:
print('Unknown http digest algorithm: ' + digestAlgorithm) print('Unknown http digest algorithm: ' + digest_algorithm)
headerDigest = '' header_digest = ''
paddingStr = padding.PKCS1v15() padding_str = padding.PKCS1v15()
try: try:
pubkey.verify(signature, headerDigest, paddingStr, alg) pubkey.verify(signature, header_digest, padding_str, alg)
return True return True
except BaseException: except BaseException:
if debug: if debug: