Test switching from pycryptodome to python3-cryptography

merge-requests/30/head
Bob Mottram 2021-02-03 21:18:03 +00:00
parent 16cbe39d3b
commit bb28858c9e
1 changed files with 26 additions and 20 deletions

View File

@ -9,26 +9,26 @@ __status__ = "Production"
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06 # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
try: from cryptography.hazmat.backends import default_backend
from Cryptodome.PublicKey import RSA from cryptography.hazmat.primitives.serialization import load_pem_private_key
from Cryptodome.Hash import SHA256 from cryptography.hazmat.primitives.serialization import load_pem_public_key
from Cryptodome.Signature import pkcs1_15 from cryptography.hazmat.primitives.asymmetric import padding
except ImportError: from cryptography.hazmat.primitives import hashes
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
# from Crypto.Signature import PKCS1_v1_5
from Crypto.Signature import pkcs1_15
import base64 import base64
from time import gmtime, strftime from time import gmtime, strftime
import datetime import datetime
from utils import getFullDomain from utils import getFullDomain
def _getSHA256(msg: str):
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(msg)
return digest.finalize()
def messageContentDigest(messageBodyJsonStr: str) -> str: def messageContentDigest(messageBodyJsonStr: str) -> str:
msg = messageBodyJsonStr.encode('utf-8') msg = messageBodyJsonStr.encode('utf-8')
digestStr = SHA256.new(msg).digest() return base64.b64encode(_getSHA256(msg)).decode('utf-8')
return base64.b64encode(digestStr).decode('utf-8')
def signPostHeaders(dateStr: str, privateKeyPem: str, def signPostHeaders(dateStr: str, privateKeyPem: str,
@ -66,7 +66,8 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
'content-type': 'application/activity+json', 'content-type': 'application/activity+json',
'content-length': str(contentLength) 'content-length': str(contentLength)
} }
privateKeyPem = RSA.import_key(privateKeyPem) key = load_pem_private_key(privateKeyPem.encode('utf-8'),
None, backend=default_backend())
# headers.update({ # headers.update({
# '(request-target)': f'post {path}', # '(request-target)': f'post {path}',
# }) # })
@ -76,10 +77,11 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
for headerKey in signedHeaderKeys: for headerKey in signedHeaderKeys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
signedHeaderText = signedHeaderText.strip() signedHeaderText = signedHeaderText.strip()
headerDigest = SHA256.new(signedHeaderText.encode('ascii')) headerDigest = _getSHA256(signedHeaderText.encode('ascii'))
# Sign the digest # Sign the digest
rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest) rawSignature = key.sign(headerDigest,
padding.PKCS1v15(), hashes.SHA256())
signature = base64.b64encode(rawSignature).decode('ascii') signature = base64.b64encode(rawSignature).decode('ascii')
# Put it into a valid HTTP signature format # Put it into a valid HTTP signature format
@ -176,7 +178,8 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
if debug: if debug:
print('DEBUG: verifyPostHeaders ' + method) print('DEBUG: verifyPostHeaders ' + method)
publicKeyPem = RSA.import_key(publicKeyPem) pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend())
# Build a dictionary of the signature values # Build a dictionary of the signature values
signatureHeader = headers['signature'] signatureHeader = headers['signature']
signatureDict = { signatureDict = {
@ -244,16 +247,19 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
print('DEBUG: signedHeaderList: ' + str(signedHeaderList)) print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
# Now we have our header data digest # Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList) signedHeaderText = '\n'.join(signedHeaderList)
headerDigest = SHA256.new(signedHeaderText.encode('ascii')) headerDigest = _getSHA256(signedHeaderText.encode('ascii'))
# Get the signature, verify with public key, return result # Get the signature, verify with public key, return result
signature = base64.b64decode(signatureDict['signature']) signature = base64.b64decode(signatureDict['signature'])
try: try:
pubKey = pkcs1_15.new(publicKeyPem) pubkey.verify(
pubKey.verify(headerDigest, signature) signature,
headerDigest,
padding.PKCS1v15(),
hashes.SHA256())
return True return True
except (ValueError, TypeError): except BaseException:
if debug: if debug:
print('DEBUG: verifyPostHeaders pkcs1_15 verify failure') print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
return False return False