epicyon/httpsig.py

144 lines
5.5 KiB
Python

__filename__ = "posts.py"
__author__ = "Bob Mottram"
__credits__ = ['lamia']
__license__ = "AGPL3+"
__version__ = "0.0.1"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
from person import createPerson
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
#from Crypto.Signature import PKCS1_v1_5
from Crypto.Signature import pkcs1_15
from requests.auth import AuthBase
import base64
import json
def signPostHeaders(privateKeyPem: str, username: str, domain: str, path: str, https: bool, messageBodyJson) -> str:
"""Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission.
"""
prefix='https'
if not https:
prefix='http'
keyID = prefix+'://'+domain+'/'+username+'#main-key'
if not messageBodyJson:
headers = {'host': domain}
else:
bodyDigest = base64.b64encode(SHA256.new(messageBodyJson.encode()).digest())
headers = {'host': domain, 'digest': f'SHA-256={bodyDigest}'}
privateKeyPem = RSA.import_key(privateKeyPem)
headers.update({
'(request-target)': f'post {path}',
})
# build a digest for signing
signedHeaderKeys = headers.keys()
signedHeaderText = ''
for headerKey in signedHeaderKeys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
signedHeaderText = signedHeaderText.strip()
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
# Sign the digest
rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
signature = base64.b64encode(rawSignature).decode('ascii')
# Put it into a valid HTTP signature format
signatureDict = {
'keyId': keyID,
'algorithm': 'rsa-sha256',
'headers': ' '.join(signedHeaderKeys),
'signature': signature
}
signatureHeader = ','.join(
[f'{k}="{v}"' for k, v in signatureDict.items()])
return signatureHeader
def verifyPostHeaders(https: bool, publicKeyPem: str, headers: dict, path: str, GETmethod: bool, messageBodyJson: str) -> bool:
"""Returns true or false depending on if the key that we plugged in here
validates against the headers, method, and path.
publicKeyPem - the public key from an rsa key pair
headers - should be a dictionary of request headers
path - the relative url that was requested from this site
GETmethod - GET or POST
messageBodyJson - the received request body (used for digest)
"""
if GETmethod:
method='GET'
else:
method='POST'
prefix='https'
if not https:
prefix='http'
publicKeyPem = RSA.import_key(publicKeyPem)
# Build a dictionary of the signature values
signatureHeader = headers['signature']
signatureDict = {
k: v[1:-1]
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
}
# Unpack the signed headers and set values based on current headers and
# body (if a digest was included)
signedHeaderList = []
for signedHeader in signatureDict['headers'].split(' '):
if signedHeader == '(request-target)':
signedHeaderList.append(
f'(request-target): {method.lower()} {path}')
elif signedHeader == 'digest':
bodyDigest = base64.b64encode(SHA256.new(messageBodyJson.encode()).digest())
signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
else:
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeader]}')
# Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList)
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
# Get the signature, verify with public key, return result
signature = base64.b64decode(signatureDict['signature'])
try:
pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
return True
except (ValueError, TypeError):
return False
def testHttpsigBase(withDigest):
print('testHttpsig(' + str(withDigest) + ')')
username='socrates'
domain='argumentative.social'
https=True
privateKeyPem,publicKeyPem,person,wfEndpoint=createPerson(username,domain,https,False)
messageBodyJson = '{"a key": "a value", "another key": "A string"}'
if not withDigest:
headers = {'host': domain}
else:
bodyDigest = base64.b64encode(SHA256.new(messageBodyJson.encode()).digest())
headers = {'host': domain, 'digest': f'SHA-256={bodyDigest}'}
path='/inbox'
signatureHeader = signPostHeaders(privateKeyPem, username, domain, path, https, None)
headers['signature'] = signatureHeader
assert verifyPostHeaders(https, publicKeyPem, headers, '/inbox' ,False, messageBodyJson)
assert verifyPostHeaders(https, publicKeyPem, headers, '/parambulator/inbox', False , messageBodyJson) == False
assert verifyPostHeaders(https, publicKeyPem, headers, '/inbox', True, messageBodyJson) == False
if not withDigest:
# fake domain
headers = {'host': 'bogon.domain'}
else:
# correct domain but fake message
messageBodyJson = '{"a key": "a value", "another key": "Fake GNUs"}'
bodyDigest = base64.b64encode(SHA256.new(messageBodyJson.encode()).digest())
headers = {'host': domain, 'digest': f'SHA-256={bodyDigest}'}
headers['signature'] = signatureHeader
assert verifyPostHeaders(https, publicKeyPem, headers, '/inbox', True, messageBodyJson) == False
def testHttpsig():
testHttpsigBase(False)
testHttpsigBase(True)