mirror of https://gitlab.com/bashrc2/epicyon
Revert "Test switching from pycryptodome to python3-cryptography"
This reverts commit bb28858c9e
.
merge-requests/30/head
parent
bb28858c9e
commit
22984cd16f
46
httpsig.py
46
httpsig.py
|
@ -9,26 +9,26 @@ __status__ = "Production"
|
||||||
|
|
||||||
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06
|
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06
|
||||||
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
try:
|
||||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
from Cryptodome.PublicKey import RSA
|
||||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
from Cryptodome.Hash import SHA256
|
||||||
from cryptography.hazmat.primitives.asymmetric import padding
|
from Cryptodome.Signature import pkcs1_15
|
||||||
from cryptography.hazmat.primitives import hashes
|
except ImportError:
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from Crypto.Hash import SHA256
|
||||||
|
# from Crypto.Signature import PKCS1_v1_5
|
||||||
|
from Crypto.Signature import pkcs1_15
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
from time import gmtime, strftime
|
from time import gmtime, strftime
|
||||||
import datetime
|
import datetime
|
||||||
from utils import getFullDomain
|
from utils import getFullDomain
|
||||||
|
|
||||||
|
|
||||||
def _getSHA256(msg: str):
|
|
||||||
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
|
|
||||||
digest.update(msg)
|
|
||||||
return digest.finalize()
|
|
||||||
|
|
||||||
|
|
||||||
def messageContentDigest(messageBodyJsonStr: str) -> str:
|
def messageContentDigest(messageBodyJsonStr: str) -> str:
|
||||||
msg = messageBodyJsonStr.encode('utf-8')
|
msg = messageBodyJsonStr.encode('utf-8')
|
||||||
return base64.b64encode(_getSHA256(msg)).decode('utf-8')
|
digestStr = SHA256.new(msg).digest()
|
||||||
|
return base64.b64encode(digestStr).decode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
def signPostHeaders(dateStr: str, privateKeyPem: str,
|
def signPostHeaders(dateStr: str, privateKeyPem: str,
|
||||||
|
@ -66,8 +66,7 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
|
||||||
'content-type': 'application/activity+json',
|
'content-type': 'application/activity+json',
|
||||||
'content-length': str(contentLength)
|
'content-length': str(contentLength)
|
||||||
}
|
}
|
||||||
key = load_pem_private_key(privateKeyPem.encode('utf-8'),
|
privateKeyPem = RSA.import_key(privateKeyPem)
|
||||||
None, backend=default_backend())
|
|
||||||
# headers.update({
|
# headers.update({
|
||||||
# '(request-target)': f'post {path}',
|
# '(request-target)': f'post {path}',
|
||||||
# })
|
# })
|
||||||
|
@ -77,11 +76,10 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
|
||||||
for headerKey in signedHeaderKeys:
|
for headerKey in signedHeaderKeys:
|
||||||
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
|
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
|
||||||
signedHeaderText = signedHeaderText.strip()
|
signedHeaderText = signedHeaderText.strip()
|
||||||
headerDigest = _getSHA256(signedHeaderText.encode('ascii'))
|
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
|
||||||
|
|
||||||
# Sign the digest
|
# Sign the digest
|
||||||
rawSignature = key.sign(headerDigest,
|
rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
|
||||||
padding.PKCS1v15(), hashes.SHA256())
|
|
||||||
signature = base64.b64encode(rawSignature).decode('ascii')
|
signature = base64.b64encode(rawSignature).decode('ascii')
|
||||||
|
|
||||||
# Put it into a valid HTTP signature format
|
# Put it into a valid HTTP signature format
|
||||||
|
@ -178,8 +176,7 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
|
||||||
if debug:
|
if debug:
|
||||||
print('DEBUG: verifyPostHeaders ' + method)
|
print('DEBUG: verifyPostHeaders ' + method)
|
||||||
|
|
||||||
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
|
publicKeyPem = RSA.import_key(publicKeyPem)
|
||||||
backend=default_backend())
|
|
||||||
# Build a dictionary of the signature values
|
# Build a dictionary of the signature values
|
||||||
signatureHeader = headers['signature']
|
signatureHeader = headers['signature']
|
||||||
signatureDict = {
|
signatureDict = {
|
||||||
|
@ -247,19 +244,16 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
|
||||||
print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
|
print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
|
||||||
# Now we have our header data digest
|
# Now we have our header data digest
|
||||||
signedHeaderText = '\n'.join(signedHeaderList)
|
signedHeaderText = '\n'.join(signedHeaderList)
|
||||||
headerDigest = _getSHA256(signedHeaderText.encode('ascii'))
|
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
|
||||||
|
|
||||||
# Get the signature, verify with public key, return result
|
# Get the signature, verify with public key, return result
|
||||||
signature = base64.b64decode(signatureDict['signature'])
|
signature = base64.b64decode(signatureDict['signature'])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
pubkey.verify(
|
pubKey = pkcs1_15.new(publicKeyPem)
|
||||||
signature,
|
pubKey.verify(headerDigest, signature)
|
||||||
headerDigest,
|
|
||||||
padding.PKCS1v15(),
|
|
||||||
hashes.SHA256())
|
|
||||||
return True
|
return True
|
||||||
except BaseException:
|
except (ValueError, TypeError):
|
||||||
if debug:
|
if debug:
|
||||||
print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
|
print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
|
||||||
return False
|
return False
|
||||||
|
|
Loading…
Reference in New Issue