diff --git a/httpsig.py b/httpsig.py index 443cd9a2c..88fe0a895 100644 --- a/httpsig.py +++ b/httpsig.py @@ -9,26 +9,26 @@ __status__ = "Production" # see https://tools.ietf.org/html/draft-cavage-http-signatures-06 -try: - from Cryptodome.PublicKey import RSA - from Cryptodome.Hash import SHA256 - from Cryptodome.Signature import pkcs1_15 -except ImportError: - from Crypto.PublicKey import RSA - from Crypto.Hash import SHA256 - # from Crypto.Signature import PKCS1_v1_5 - from Crypto.Signature import pkcs1_15 - +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.serialization import load_pem_private_key +from cryptography.hazmat.primitives.serialization import load_pem_public_key +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives import hashes import base64 from time import gmtime, strftime import datetime from utils import getFullDomain +def _getSHA256(msg: str): + digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) + digest.update(msg) + return digest.finalize() + + def messageContentDigest(messageBodyJsonStr: str) -> str: msg = messageBodyJsonStr.encode('utf-8') - digestStr = SHA256.new(msg).digest() - return base64.b64encode(digestStr).decode('utf-8') + return base64.b64encode(_getSHA256(msg)).decode('utf-8') def signPostHeaders(dateStr: str, privateKeyPem: str, @@ -66,7 +66,8 @@ def signPostHeaders(dateStr: str, privateKeyPem: str, 'content-type': 'application/activity+json', 'content-length': str(contentLength) } - privateKeyPem = RSA.import_key(privateKeyPem) + key = load_pem_private_key(privateKeyPem.encode('utf-8'), + None, backend=default_backend()) # headers.update({ # '(request-target)': f'post {path}', # }) @@ -76,10 +77,11 @@ def signPostHeaders(dateStr: str, privateKeyPem: str, for headerKey in signedHeaderKeys: signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signedHeaderText = signedHeaderText.strip() - headerDigest = SHA256.new(signedHeaderText.encode('ascii')) + headerDigest = _getSHA256(signedHeaderText.encode('ascii')) # Sign the digest - rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest) + rawSignature = key.sign(headerDigest, + padding.PKCS1v15(), hashes.SHA256()) signature = base64.b64encode(rawSignature).decode('ascii') # Put it into a valid HTTP signature format @@ -176,7 +178,8 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, if debug: print('DEBUG: verifyPostHeaders ' + method) - publicKeyPem = RSA.import_key(publicKeyPem) + pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'), + backend=default_backend()) # Build a dictionary of the signature values signatureHeader = headers['signature'] signatureDict = { @@ -244,16 +247,19 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, print('DEBUG: signedHeaderList: ' + str(signedHeaderList)) # Now we have our header data digest signedHeaderText = '\n'.join(signedHeaderList) - headerDigest = SHA256.new(signedHeaderText.encode('ascii')) + headerDigest = _getSHA256(signedHeaderText.encode('ascii')) # Get the signature, verify with public key, return result signature = base64.b64decode(signatureDict['signature']) try: - pubKey = pkcs1_15.new(publicKeyPem) - pubKey.verify(headerDigest, signature) + pubkey.verify( + signature, + headerDigest, + padding.PKCS1v15(), + hashes.SHA256()) return True - except (ValueError, TypeError): + except BaseException: if debug: print('DEBUG: verifyPostHeaders pkcs1_15 verify failure') return False