__filename__ = "httpsig.py" __author__ = "Bob Mottram" __credits__ = ['lamia'] __license__ = "AGPL3+" __version__ = "1.2.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "Security" # see https://tools.ietf.org/html/draft-cavage-http-signatures-06 # # This might change in future # see https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import load_pem_public_key from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import utils as hazutils import base64 from time import gmtime, strftime import datetime from utils import getFullDomain from utils import getSHA256 from utils import getSHA512 from utils import localActorUrl def messageContentDigest(messageBodyJsonStr: str, digestAlgorithm: str) -> str: """Returns the digest for the message body """ msg = messageBodyJsonStr.encode('utf-8') if digestAlgorithm == 'rsa-sha512' or \ digestAlgorithm == 'rsa-pss-sha512': hashResult = getSHA512(msg) else: hashResult = getSHA256(msg) return base64.b64encode(hashResult).decode('utf-8') def getDigestPrefix(digestAlgorithm: str) -> str: """Returns the prefix for the message body digest """ if digestAlgorithm == 'rsa-sha512' or \ digestAlgorithm == 'rsa-pss-sha512': return 'SHA-512' return 'SHA-256' def getDigestAlgorithmFromHeaders(httpHeaders: {}) -> str: """Returns the digest algorithm from http headers """ digestStr = None if httpHeaders.get('digest'): digestStr = httpHeaders['digest'] elif httpHeaders.get('Digest'): digestStr = httpHeaders['Digest'] if digestStr: if digestStr.startswith('SHA-512'): return 'rsa-sha512' return 'rsa-sha256' def signPostHeaders(dateStr: str, privateKeyPem: str, nickname: str, domain: str, port: int, toDomain: str, toPort: int, path: str, httpPrefix: str, messageBodyJsonStr: str, contentType: str, algorithm: str, digestAlgorithm: str) -> str: """Returns a raw signature string that can be plugged into a header and used to verify the authenticity of an HTTP transmission. """ domain = getFullDomain(domain, port) toDomain = getFullDomain(toDomain, toPort) if not dateStr: dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) if nickname != domain and nickname.lower() != 'actor': keyID = localActorUrl(httpPrefix, nickname, domain) else: # instance actor keyID = httpPrefix + '://' + domain + '/actor' keyID += '#main-key' if not messageBodyJsonStr: headers = { '(request-target)': f'get {path}', 'host': toDomain, 'date': dateStr, 'accept': contentType } else: bodyDigest = \ messageContentDigest(messageBodyJsonStr, digestAlgorithm) digestPrefix = getDigestPrefix(digestAlgorithm) contentLength = len(messageBodyJsonStr) headers = { '(request-target)': f'post {path}', 'host': toDomain, 'date': dateStr, 'digest': f'{digestPrefix}={bodyDigest}', 'content-type': 'application/activity+json', 'content-length': str(contentLength) } key = load_pem_private_key(privateKeyPem.encode('utf-8'), None, backend=default_backend()) # headers.update({ # '(request-target)': f'post {path}', # }) # build a digest for signing signedHeaderKeys = headers.keys() signedHeaderText = '' for headerKey in signedHeaderKeys: signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' # strip the trailing linefeed signedHeaderText = signedHeaderText.rstrip('\n') # signedHeaderText.encode('ascii') matches headerDigest = getSHA256(signedHeaderText.encode('ascii')) # print('headerDigest2: ' + str(headerDigest)) # Sign the digest rawSignature = key.sign(headerDigest, padding.PKCS1v15(), hazutils.Prehashed(hashes.SHA256())) signature = base64.b64encode(rawSignature).decode('ascii') # Put it into a valid HTTP signature format signatureDict = { 'keyId': keyID, 'algorithm': algorithm, 'headers': ' '.join(signedHeaderKeys), 'signature': signature } signatureHeader = ','.join( [f'{k}="{v}"' for k, v in signatureDict.items()]) return signatureHeader def signPostHeadersNew(dateStr: str, privateKeyPem: str, nickname: str, domain: str, port: int, toDomain: str, toPort: int, path: str, httpPrefix: str, messageBodyJsonStr: str, algorithm: str, digestAlgorithm: str, debug: bool) -> (str, str): """Returns a raw signature strings that can be plugged into a header as "Signature-Input" and "Signature" used to verify the authenticity of an HTTP transmission. See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures """ domain = getFullDomain(domain, port) toDomain = getFullDomain(toDomain, toPort) timeFormat = "%a, %d %b %Y %H:%M:%S %Z" if not dateStr: currTime = gmtime() dateStr = strftime(timeFormat, currTime) else: currTime = datetime.datetime.strptime(dateStr, timeFormat) secondsSinceEpoch = \ int((currTime - datetime.datetime(1970, 1, 1)).total_seconds()) keyID = localActorUrl(httpPrefix, nickname, domain) + '#main-key' if not messageBodyJsonStr: headers = { '@request-target': f'get {path}', '@created': str(secondsSinceEpoch), 'host': toDomain, 'date': dateStr } else: bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm) digestPrefix = getDigestPrefix(digestAlgorithm) contentLength = len(messageBodyJsonStr) headers = { '@request-target': f'post {path}', '@created': str(secondsSinceEpoch), 'host': toDomain, 'date': dateStr, 'digest': f'{digestPrefix}={bodyDigest}', 'content-type': 'application/activity+json', 'content-length': str(contentLength) } key = load_pem_private_key(privateKeyPem.encode('utf-8'), None, backend=default_backend()) # build a digest for signing signedHeaderKeys = headers.keys() signedHeaderText = '' for headerKey in signedHeaderKeys: signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signedHeaderText = signedHeaderText.strip() if debug: print('\nsignPostHeadersNew signedHeaderText:\n' + signedHeaderText + '\nEND\n') # Sign the digest. Potentially other signing algorithms can be added here. signature = '' if algorithm == 'rsa-sha512': headerDigest = getSHA512(signedHeaderText.encode('ascii')) rawSignature = key.sign(headerDigest, padding.PKCS1v15(), hazutils.Prehashed(hashes.SHA512())) signature = base64.b64encode(rawSignature).decode('ascii') else: # default rsa-sha256 headerDigest = getSHA256(signedHeaderText.encode('ascii')) rawSignature = key.sign(headerDigest, padding.PKCS1v15(), hazutils.Prehashed(hashes.SHA256())) signature = base64.b64encode(rawSignature).decode('ascii') sigKey = 'sig1' # Put it into a valid HTTP signature format signatureInputDict = { 'keyId': keyID, } signatureIndexHeader = '; '.join( [f'{k}="{v}"' for k, v in signatureInputDict.items()]) signatureIndexHeader += '; alg=hs2019' signatureIndexHeader += '; created=' + str(secondsSinceEpoch) signatureIndexHeader += \ '; ' + sigKey + '=(' + ', '.join(signedHeaderKeys) + ')' signatureDict = { sigKey: signature } signatureHeader = '; '.join( [f'{k}=:{v}:' for k, v in signatureDict.items()]) return signatureIndexHeader, signatureHeader def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str, domain: str, port: int, toDomain: str, toPort: int, path: str, httpPrefix: str, withDigest: bool, messageBodyJsonStr: str, contentType: str) -> {}: """Note that the domain is the destination, not the sender """ algorithm = 'rsa-sha256' digestAlgorithm = 'rsa-sha256' headerDomain = getFullDomain(toDomain, toPort) # if no date is given then create one if not dateStr: dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) # Content-Type or Accept header if not contentType: contentType = 'application/activity+json' if not withDigest: headers = { '(request-target)': f'get {path}', 'host': headerDomain, 'date': dateStr, 'accept': contentType } signatureHeader = \ signPostHeaders(dateStr, privateKeyPem, nickname, domain, port, toDomain, toPort, path, httpPrefix, None, contentType, algorithm, None) else: bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm) digestPrefix = getDigestPrefix(digestAlgorithm) contentLength = len(messageBodyJsonStr) headers = { '(request-target)': f'post {path}', 'host': headerDomain, 'date': dateStr, 'digest': f'{digestPrefix}={bodyDigest}', 'content-length': str(contentLength), 'content-type': contentType } signatureHeader = \ signPostHeaders(dateStr, privateKeyPem, nickname, domain, port, toDomain, toPort, path, httpPrefix, messageBodyJsonStr, contentType, algorithm, digestAlgorithm) headers['signature'] = signatureHeader return headers def _verifyRecentSignature(signedDateStr: str) -> bool: """Checks whether the given time taken from the header is within 12 hours of the current time """ currDate = datetime.datetime.utcnow() dateFormat = "%a, %d %b %Y %H:%M:%S %Z" signedDate = datetime.datetime.strptime(signedDateStr, dateFormat) timeDiffSec = (currDate - signedDate).seconds # 12 hours tollerance if timeDiffSec > 43200: print('WARN: Header signed too long ago: ' + signedDateStr) print(str(timeDiffSec / (60 * 60)) + ' hours') return False if timeDiffSec < 0: print('WARN: Header signed in the future! ' + signedDateStr) print(str(timeDiffSec / (60 * 60)) + ' hours') return False return True def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, path: str, GETmethod: bool, messageBodyDigest: str, messageBodyJsonStr: str, debug: bool, noRecencyCheck: bool = False) -> bool: """Returns true or false depending on if the key that we plugged in here validates against the headers, method, and path. publicKeyPem - the public key from an rsa key pair headers - should be a dictionary of request headers path - the relative url that was requested from this site GETmethod - GET or POST messageBodyJsonStr - the received request body (used for digest) """ if GETmethod: method = 'GET' else: method = 'POST' if debug: print('DEBUG: verifyPostHeaders ' + method) print('verifyPostHeaders publicKeyPem: ' + str(publicKeyPem)) print('verifyPostHeaders headers: ' + str(headers)) print('verifyPostHeaders messageBodyJsonStr: ' + str(messageBodyJsonStr)) pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'), backend=default_backend()) # Build a dictionary of the signature values if headers.get('Signature-Input') or headers.get('signature-input'): if headers.get('Signature-Input'): signatureHeader = headers['Signature-Input'] else: signatureHeader = headers['signature-input'] fieldSep2 = ',' # split the signature input into separate fields signatureDict = { k.strip(): v.strip() for k, v in [i.split('=', 1) for i in signatureHeader.split(';')] } requestTargetKey = None requestTargetStr = None for k, v in signatureDict.items(): if v.startswith('('): requestTargetKey = k requestTargetStr = v[1:-1] elif v.startswith('"'): signatureDict[k] = v[1:-1] if not requestTargetKey: return False signatureDict[requestTargetKey] = requestTargetStr else: requestTargetKey = 'headers' signatureHeader = headers['signature'] fieldSep2 = ' ' # split the signature input into separate fields signatureDict = { k: v[1:-1] for k, v in [i.split('=', 1) for i in signatureHeader.split(',')] } if debug: print('signatureDict: ' + str(signatureDict)) # Unpack the signed headers and set values based on current headers and # body (if a digest was included) signedHeaderList = [] algorithm = 'rsa-sha256' digestAlgorithm = 'rsa-sha256' for signedHeader in signatureDict[requestTargetKey].split(fieldSep2): signedHeader = signedHeader.strip() if debug: print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader) if signedHeader == '(request-target)': # original Mastodon http signature appendStr = f'(request-target): {method.lower()} {path}' signedHeaderList.append(appendStr) elif '@request-target' in signedHeader: # https://tools.ietf.org/html/ # draft-ietf-httpbis-message-signatures appendStr = f'@request-target: {method.lower()} {path}' signedHeaderList.append(appendStr) elif '@created' in signedHeader: if signatureDict.get('created'): createdStr = str(signatureDict['created']) appendStr = f'@created: {createdStr}' signedHeaderList.append(appendStr) elif '@expires' in signedHeader: if signatureDict.get('expires'): expiresStr = str(signatureDict['expires']) appendStr = f'@expires: {expiresStr}' signedHeaderList.append(appendStr) elif '@method' in signedHeader: appendStr = f'@expires: {method}' signedHeaderList.append(appendStr) elif '@scheme' in signedHeader: signedHeaderList.append('@scheme: http') elif '@authority' in signedHeader: authorityStr = None if signatureDict.get('authority'): authorityStr = str(signatureDict['authority']) elif signatureDict.get('Authority'): authorityStr = str(signatureDict['Authority']) if authorityStr: appendStr = f'@authority: {authorityStr}' signedHeaderList.append(appendStr) elif signedHeader == 'algorithm': if headers.get(signedHeader): algorithm = headers[signedHeader] if debug: print('http signature algorithm: ' + algorithm) elif signedHeader == 'digest': if messageBodyDigest: bodyDigest = messageBodyDigest else: bodyDigest = \ messageContentDigest(messageBodyJsonStr, digestAlgorithm) signedHeaderList.append(f'digest: SHA-256={bodyDigest}') elif signedHeader == 'content-length': if headers.get(signedHeader): appendStr = f'content-length: {headers[signedHeader]}' signedHeaderList.append(appendStr) elif headers.get('Content-Length'): contentLength = headers['Content-Length'] signedHeaderList.append(f'content-length: {contentLength}') elif headers.get('Content-length'): contentLength = headers['Content-length'] appendStr = f'content-length: {contentLength}' signedHeaderList.append(appendStr) else: if debug: print('DEBUG: verifyPostHeaders ' + signedHeader + ' not found in ' + str(headers)) else: if headers.get(signedHeader): if signedHeader == 'date' and not noRecencyCheck: if not _verifyRecentSignature(headers[signedHeader]): if debug: print('DEBUG: ' + 'verifyPostHeaders date is not recent ' + headers[signedHeader]) return False signedHeaderList.append( f'{signedHeader}: {headers[signedHeader]}') else: if '-' in signedHeader: # capitalise with dashes # my-header becomes My-Header headerParts = signedHeader.split('-') signedHeaderCap = None for part in headerParts: if signedHeaderCap: signedHeaderCap += '-' + part.capitalize() else: signedHeaderCap = part.capitalize() else: # header becomes Header signedHeaderCap = signedHeader.capitalize() if debug: print('signedHeaderCap: ' + signedHeaderCap) # if this is the date header then check it is recent if signedHeaderCap == 'Date': if not _verifyRecentSignature(headers[signedHeaderCap]): if debug: print('DEBUG: ' + 'verifyPostHeaders date is not recent ' + headers[signedHeader]) return False # add the capitalised header if headers.get(signedHeaderCap): signedHeaderList.append( f'{signedHeader}: {headers[signedHeaderCap]}') elif '-' in signedHeader: # my-header becomes My-header signedHeaderCap = signedHeader.capitalize() if headers.get(signedHeaderCap): signedHeaderList.append( f'{signedHeader}: {headers[signedHeaderCap]}') # Now we have our header data digest signedHeaderText = '\n'.join(signedHeaderList) if debug: print('\nverifyPostHeaders signedHeaderText:\n' + signedHeaderText + '\nEND\n') # Get the signature, verify with public key, return result if (headers.get('Signature-Input') and headers.get('Signature')) or \ (headers.get('signature-input') and headers.get('signature')): # https://tools.ietf.org/html/ # draft-ietf-httpbis-message-signatures if headers.get('Signature'): headersSig = headers['Signature'] else: headersSig = headers['signature'] # remove sig1=: if requestTargetKey + '=:' in headersSig: headersSig = headersSig.split(requestTargetKey + '=:')[1] headersSig = headersSig[:len(headersSig)-1] signature = base64.b64decode(headersSig) else: # Original Mastodon signature headersSig = signatureDict['signature'] signature = base64.b64decode(headersSig) if debug: print('signature: ' + algorithm + ' ' + headersSig) # log unusual signing algorithms if signatureDict.get('alg'): print('http signature algorithm: ' + signatureDict['alg']) # If extra signing algorithms need to be added then do it here if not signatureDict.get('alg'): alg = hazutils.Prehashed(hashes.SHA256()) elif (signatureDict['alg'] == 'rsa-sha256' or signatureDict['alg'] == 'rsa-v1_5-sha256' or signatureDict['alg'] == 'hs2019'): alg = hazutils.Prehashed(hashes.SHA256()) elif (signatureDict['alg'] == 'rsa-sha512' or signatureDict['alg'] == 'rsa-pss-sha512'): alg = hazutils.Prehashed(hashes.SHA512()) else: alg = hazutils.Prehashed(hashes.SHA256()) if digestAlgorithm == 'rsa-sha256': headerDigest = getSHA256(signedHeaderText.encode('ascii')) elif digestAlgorithm == 'rsa-sha512': headerDigest = getSHA512(signedHeaderText.encode('ascii')) else: print('Unknown http digest algorithm: ' + digestAlgorithm) headerDigest = '' paddingStr = padding.PKCS1v15() try: pubkey.verify(signature, headerDigest, paddingStr, alg) return True except BaseException: if debug: print('EX: verifyPostHeaders pkcs1_15 verify failure') return False