__filename__ = "httpsig.py" __author__ = "Bob Mottram" __credits__ = ['lamia'] __license__ = "AGPL3+" __version__ = "1.2.0" __maintainer__ = "Bob Mottram" __email__ = "bob@freedombone.net" __status__ = "Production" # see https://tools.ietf.org/html/draft-cavage-http-signatures-06 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import load_pem_public_key from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import utils as hazutils import base64 from time import gmtime, strftime import datetime from utils import getFullDomain from utils import getSHA256 def messageContentDigest(messageBodyJsonStr: str) -> str: msg = messageBodyJsonStr.encode('utf-8') hashResult = getSHA256(msg) return base64.b64encode(hashResult).decode('utf-8') def signPostHeaders(dateStr: str, privateKeyPem: str, nickname: str, domain: str, port: int, toDomain: str, toPort: int, path: str, httpPrefix: str, messageBodyJsonStr: str) -> str: """Returns a raw signature string that can be plugged into a header and used to verify the authenticity of an HTTP transmission. """ domain = getFullDomain(domain, port) toDomain = getFullDomain(toDomain, toPort) if not dateStr: dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key' if not messageBodyJsonStr: headers = { '(request-target)': f'post {path}', 'host': toDomain, 'date': dateStr, 'content-type': 'application/json' } else: bodyDigest = messageContentDigest(messageBodyJsonStr) contentLength = len(messageBodyJsonStr) headers = { '(request-target)': f'post {path}', 'host': toDomain, 'date': dateStr, 'digest': f'SHA-256={bodyDigest}', 'content-type': 'application/activity+json', 'content-length': str(contentLength) } key = load_pem_private_key(privateKeyPem.encode('utf-8'), None, backend=default_backend()) # headers.update({ # '(request-target)': f'post {path}', # }) # build a digest for signing signedHeaderKeys = headers.keys() signedHeaderText = '' for headerKey in signedHeaderKeys: signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signedHeaderText = signedHeaderText.strip() # signedHeaderText.encode('ascii') matches headerDigest = getSHA256(signedHeaderText.encode('ascii')) # print('headerDigest2: ' + str(headerDigest)) # Sign the digest rawSignature = key.sign(headerDigest, padding.PKCS1v15(), hazutils.Prehashed(hashes.SHA256())) signature = base64.b64encode(rawSignature).decode('ascii') # Put it into a valid HTTP signature format signatureDict = { 'keyId': keyID, 'algorithm': 'rsa-sha256', 'headers': ' '.join(signedHeaderKeys), 'signature': signature } signatureHeader = ','.join( [f'{k}="{v}"' for k, v in signatureDict.items()]) return signatureHeader def createSignedHeader(privateKeyPem: str, nickname: str, domain: str, port: int, toDomain: str, toPort: int, path: str, httpPrefix: str, withDigest: bool, messageBodyJsonStr: str) -> {}: """Note that the domain is the destination, not the sender """ contentType = 'application/activity+json' headerDomain = getFullDomain(toDomain, toPort) dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) if not withDigest: headers = { '(request-target)': f'post {path}', 'host': headerDomain, 'date': dateStr } signatureHeader = \ signPostHeaders(dateStr, privateKeyPem, nickname, domain, port, toDomain, toPort, path, httpPrefix, None) else: bodyDigest = messageContentDigest(messageBodyJsonStr) contentLength = len(messageBodyJsonStr) headers = { '(request-target)': f'post {path}', 'host': headerDomain, 'date': dateStr, 'digest': f'SHA-256={bodyDigest}', 'content-length': str(contentLength), 'content-type': contentType } signatureHeader = \ signPostHeaders(dateStr, privateKeyPem, nickname, domain, port, toDomain, toPort, path, httpPrefix, messageBodyJsonStr) headers['signature'] = signatureHeader return headers def _verifyRecentSignature(signedDateStr: str) -> bool: """Checks whether the given time taken from the header is within 12 hours of the current time """ currDate = datetime.datetime.utcnow() dateFormat = "%a, %d %b %Y %H:%M:%S %Z" signedDate = datetime.datetime.strptime(signedDateStr, dateFormat) timeDiffSec = (currDate - signedDate).seconds # 12 hours tollerance if timeDiffSec > 43200: print('WARN: Header signed too long ago: ' + signedDateStr) print(str(timeDiffSec / (60 * 60)) + ' hours') return False if timeDiffSec < 0: print('WARN: Header signed in the future! ' + signedDateStr) print(str(timeDiffSec / (60 * 60)) + ' hours') return False return True def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, path: str, GETmethod: bool, messageBodyDigest: str, messageBodyJsonStr: str, debug: bool) -> bool: """Returns true or false depending on if the key that we plugged in here validates against the headers, method, and path. publicKeyPem - the public key from an rsa key pair headers - should be a dictionary of request headers path - the relative url that was requested from this site GETmethod - GET or POST messageBodyJsonStr - the received request body (used for digest) """ if GETmethod: method = 'GET' else: method = 'POST' if debug: print('DEBUG: verifyPostHeaders ' + method) pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'), backend=default_backend()) # Build a dictionary of the signature values signatureHeader = headers['signature'] signatureDict = { k: v[1:-1] for k, v in [i.split('=', 1) for i in signatureHeader.split(',')] } # Unpack the signed headers and set values based on current headers and # body (if a digest was included) signedHeaderList = [] for signedHeader in signatureDict['headers'].split(' '): if debug: print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader) if signedHeader == '(request-target)': appendStr = f'(request-target): {method.lower()} {path}' signedHeaderList.append(appendStr) elif signedHeader == 'digest': if messageBodyDigest: bodyDigest = messageBodyDigest else: bodyDigest = messageContentDigest(messageBodyJsonStr) signedHeaderList.append(f'digest: SHA-256={bodyDigest}') elif signedHeader == 'content-length': if headers.get(signedHeader): appendStr = f'content-length: {headers[signedHeader]}' signedHeaderList.append(appendStr) else: if headers.get('Content-Length'): contentLength = headers['Content-Length'] signedHeaderList.append(f'content-length: {contentLength}') else: if headers.get('Content-length'): contentLength = headers['Content-length'] appendStr = f'content-length: {contentLength}' signedHeaderList.append(appendStr) else: if debug: print('DEBUG: verifyPostHeaders ' + signedHeader + ' not found in ' + str(headers)) else: if headers.get(signedHeader): if signedHeader == 'date': if not _verifyRecentSignature(headers[signedHeader]): if debug: print('DEBUG: ' + 'verifyPostHeaders date is not recent ' + headers[signedHeader]) return False signedHeaderList.append( f'{signedHeader}: {headers[signedHeader]}') else: signedHeaderCap = signedHeader.capitalize() if signedHeaderCap == 'Date': if not _verifyRecentSignature(headers[signedHeaderCap]): if debug: print('DEBUG: ' + 'verifyPostHeaders date is not recent ' + headers[signedHeader]) return False if headers.get(signedHeaderCap): signedHeaderList.append( f'{signedHeader}: {headers[signedHeaderCap]}') if debug: print('DEBUG: signedHeaderList: ' + str(signedHeaderList)) # Now we have our header data digest signedHeaderText = '\n'.join(signedHeaderList) headerDigest = getSHA256(signedHeaderText.encode('ascii')) # Get the signature, verify with public key, return result signature = base64.b64decode(signatureDict['signature']) try: pubkey.verify( signature, headerDigest, padding.PKCS1v15(), hazutils.Prehashed(hashes.SHA256())) return True except BaseException: if debug: print('DEBUG: verifyPostHeaders pkcs1_15 verify failure') return False