Support for newer http signature specification

merge-requests/30/head
Bob Mottram 2021-02-22 11:13:27 +00:00
parent 4edc3af8f7
commit 4e2487aa27
2 changed files with 54 additions and 6 deletions

View File

@ -300,6 +300,19 @@ def saveDomainQrcode(baseDir: str, httpPrefix: str,
class PubServer(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def _getheaderSignatureInput(self):
"""There are different versions of http signatures with
different header styles
"""
if self.headers.get('Signature-Input'):
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01
return self.headers['Signature-Input']
elif self.headers.get('signature'):
# Ye olde Masto http sig
return self.headers['signature']
return None
def _pathIsImage(self, path: str) -> bool:
if path.endswith('.png') or \
path.endswith('.jpg') or \
@ -13805,8 +13818,10 @@ class PubServer(BaseHTTPRequestHandler):
self._benchmarkPOSTtimings(POSTstartTime, POSTtimings, 21)
if self.headers.get('signature'):
if 'keyId=' not in self.headers['signature']:
headerSignature = self._getheaderSignatureInput()
if headerSignature:
if 'keyId=' not in headerSignature:
if self.server.debug:
print('DEBUG: POST to inbox has no keyId in ' +
'header signature parameter')

View File

@ -184,21 +184,42 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend())
# Build a dictionary of the signature values
signatureHeader = headers['signature']
if headers.get('Signature-Input'):
signatureHeader = headers['Signature-Input']
fieldSep1 = ';'
fieldSep2 = ','
else:
signatureHeader = headers['signature']
fieldSep1 = ','
fieldSep2 = ' '
# split the signature input into separate fields
signatureDict = {
k: v[1:-1]
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
for k, v in [i.split('=', 1) for i in signatureHeader.split(fieldSep1)]
}
# Unpack the signed headers and set values based on current headers and
# body (if a digest was included)
signedHeaderList = []
for signedHeader in signatureDict['headers'].split(' '):
for signedHeader in signatureDict['headers'].split(fieldSep2):
signedHeader = signedHeader.strip()
if debug:
print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
if signedHeader == '(request-target)':
# original Mastodon http signature
appendStr = f'(request-target): {method.lower()} {path}'
signedHeaderList.append(appendStr)
elif '*request-target' in signedHeader:
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01
appendStr = f'*request-target: {method.lower()} {path}'
# remove sig1=(
if '=(' in appendStr:
appendStr = appendStr.split('=(')[1]
if ')' in appendStr:
appendStr = appendStr.split(')')[0]
signedHeaderList.append(appendStr)
elif signedHeader == 'digest':
if messageBodyDigest:
bodyDigest = messageBodyDigest
@ -253,7 +274,19 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
headerDigest = getSHA256(signedHeaderText.encode('ascii'))
# Get the signature, verify with public key, return result
signature = base64.b64decode(signatureDict['signature'])
signature = None
if headers.get('Signature-Input') and headers.get('Signature'):
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01
headersSig = headers['Signature']
# remove sig1=:
if '=:' in headersSig:
headersSig = headersSig.split('=:')[1]
headersSig = headersSig[:len(headersSig)-1]
signature = base64.b64decode(headersSig)
else:
# Original Mastodon signature
signature = base64.b64decode(signatureDict['signature'])
try:
pubkey.verify(