diff --git a/daemon.py b/daemon.py index 2891c2649..d2029b346 100644 --- a/daemon.py +++ b/daemon.py @@ -300,6 +300,19 @@ def saveDomainQrcode(baseDir: str, httpPrefix: str, class PubServer(BaseHTTPRequestHandler): protocol_version = 'HTTP/1.1' + def _getheaderSignatureInput(self): + """There are different versions of http signatures with + different header styles + """ + if self.headers.get('Signature-Input'): + # https://tools.ietf.org/html/ + # draft-ietf-httpbis-message-signatures-01 + return self.headers['Signature-Input'] + elif self.headers.get('signature'): + # Ye olde Masto http sig + return self.headers['signature'] + return None + def _pathIsImage(self, path: str) -> bool: if path.endswith('.png') or \ path.endswith('.jpg') or \ @@ -13805,8 +13818,10 @@ class PubServer(BaseHTTPRequestHandler): self._benchmarkPOSTtimings(POSTstartTime, POSTtimings, 21) - if self.headers.get('signature'): - if 'keyId=' not in self.headers['signature']: + headerSignature = self._getheaderSignatureInput() + + if headerSignature: + if 'keyId=' not in headerSignature: if self.server.debug: print('DEBUG: POST to inbox has no keyId in ' + 'header signature parameter') diff --git a/httpsig.py b/httpsig.py index f61d0c7d2..e99766bad 100644 --- a/httpsig.py +++ b/httpsig.py @@ -184,21 +184,42 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'), backend=default_backend()) # Build a dictionary of the signature values - signatureHeader = headers['signature'] + if headers.get('Signature-Input'): + signatureHeader = headers['Signature-Input'] + fieldSep1 = ';' + fieldSep2 = ',' + else: + signatureHeader = headers['signature'] + fieldSep1 = ',' + fieldSep2 = ' ' + + # split the signature input into separate fields signatureDict = { k: v[1:-1] - for k, v in [i.split('=', 1) for i in signatureHeader.split(',')] + for k, v in [i.split('=', 1) for i in signatureHeader.split(fieldSep1)] } # Unpack the signed headers and set values based on current headers and # body (if a digest was included) signedHeaderList = [] - for signedHeader in signatureDict['headers'].split(' '): + for signedHeader in signatureDict['headers'].split(fieldSep2): + signedHeader = signedHeader.strip() if debug: print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader) if signedHeader == '(request-target)': + # original Mastodon http signature appendStr = f'(request-target): {method.lower()} {path}' signedHeaderList.append(appendStr) + elif '*request-target' in signedHeader: + # https://tools.ietf.org/html/ + # draft-ietf-httpbis-message-signatures-01 + appendStr = f'*request-target: {method.lower()} {path}' + # remove sig1=( + if '=(' in appendStr: + appendStr = appendStr.split('=(')[1] + if ')' in appendStr: + appendStr = appendStr.split(')')[0] + signedHeaderList.append(appendStr) elif signedHeader == 'digest': if messageBodyDigest: bodyDigest = messageBodyDigest @@ -253,7 +274,19 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, headerDigest = getSHA256(signedHeaderText.encode('ascii')) # Get the signature, verify with public key, return result - signature = base64.b64decode(signatureDict['signature']) + signature = None + if headers.get('Signature-Input') and headers.get('Signature'): + # https://tools.ietf.org/html/ + # draft-ietf-httpbis-message-signatures-01 + headersSig = headers['Signature'] + # remove sig1=: + if '=:' in headersSig: + headersSig = headersSig.split('=:')[1] + headersSig = headersSig[:len(headersSig)-1] + signature = base64.b64decode(headersSig) + else: + # Original Mastodon signature + signature = base64.b64decode(signatureDict['signature']) try: pubkey.verify(