epicyon/httpsig.py

266 lines
10 KiB
Python
Raw Normal View History

__filename__ = "httpsig.py"
2020-04-03 12:05:30 +00:00
__author__ = "Bob Mottram"
__credits__ = ['lamia']
__license__ = "AGPL3+"
2021-01-26 10:07:42 +00:00
__version__ = "1.2.0"
2020-04-03 12:05:30 +00:00
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
2019-06-28 18:55:29 +00:00
2019-08-15 22:33:42 +00:00
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import utils as hazutils
2019-06-28 18:55:29 +00:00
import base64
2019-08-15 09:08:18 +00:00
from time import gmtime, strftime
2019-08-23 11:20:20 +00:00
import datetime
2020-12-16 11:38:40 +00:00
from utils import getFullDomain
from utils import getSHA256
def messageContentDigest(messageBodyJsonStr: str) -> str:
2020-04-03 12:05:30 +00:00
msg = messageBodyJsonStr.encode('utf-8')
hashResult = getSHA256(msg)
return base64.b64encode(hashResult).decode('utf-8')
2020-04-03 12:05:30 +00:00
2020-04-03 12:05:30 +00:00
def signPostHeaders(dateStr: str, privateKeyPem: str,
nickname: str,
domain: str, port: int,
toDomain: str, toPort: int,
path: str,
httpPrefix: str,
messageBodyJsonStr: str) -> str:
2019-06-28 18:55:29 +00:00
"""Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission.
"""
2020-12-16 11:38:40 +00:00
domain = getFullDomain(domain, port)
2019-07-01 09:31:02 +00:00
2020-12-16 11:38:40 +00:00
toDomain = getFullDomain(toDomain, toPort)
2019-08-16 13:47:01 +00:00
if not dateStr:
2020-04-03 12:05:30 +00:00
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key'
if not messageBodyJsonStr:
2020-04-03 12:05:30 +00:00
headers = {
'(request-target)': f'post {path}',
'host': toDomain,
'date': dateStr,
'content-type': 'application/json'
}
2019-06-28 18:55:29 +00:00
else:
2020-04-03 12:05:30 +00:00
bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr)
headers = {
'(request-target)': f'post {path}',
'host': toDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'content-type': 'application/activity+json',
'content-length': str(contentLength)
}
key = load_pem_private_key(privateKeyPem.encode('utf-8'),
None, backend=default_backend())
2020-04-03 12:05:30 +00:00
# headers.update({
# '(request-target)': f'post {path}',
# })
2019-06-28 18:55:29 +00:00
# build a digest for signing
2020-04-03 12:05:30 +00:00
signedHeaderKeys = headers.keys()
signedHeaderText = ''
2019-06-28 18:55:29 +00:00
for headerKey in signedHeaderKeys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
2020-04-03 12:05:30 +00:00
signedHeaderText = signedHeaderText.strip()
# signedHeaderText.encode('ascii') matches
headerDigest = getSHA256(signedHeaderText.encode('ascii'))
# print('headerDigest2: ' + str(headerDigest))
2019-06-28 18:55:29 +00:00
# Sign the digest
rawSignature = key.sign(headerDigest,
padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA256()))
2020-04-03 12:05:30 +00:00
signature = base64.b64encode(rawSignature).decode('ascii')
2019-06-28 18:55:29 +00:00
# Put it into a valid HTTP signature format
2020-04-03 12:05:30 +00:00
signatureDict = {
2019-06-28 18:55:29 +00:00
'keyId': keyID,
'algorithm': 'rsa-sha256',
'headers': ' '.join(signedHeaderKeys),
'signature': signature
}
2020-04-03 12:05:30 +00:00
signatureHeader = ','.join(
2019-06-28 18:55:29 +00:00
[f'{k}="{v}"' for k, v in signatureDict.items()])
return signatureHeader
2020-04-03 12:05:30 +00:00
def createSignedHeader(privateKeyPem: str, nickname: str,
domain: str, port: int,
toDomain: str, toPort: int,
path: str, httpPrefix: str, withDigest: bool,
messageBodyJsonStr: str) -> {}:
2019-08-16 13:47:01 +00:00
"""Note that the domain is the destination, not the sender
"""
2020-04-03 12:05:30 +00:00
contentType = 'application/activity+json'
2020-12-16 11:42:11 +00:00
headerDomain = getFullDomain(toDomain, toPort)
2019-07-01 09:31:02 +00:00
2020-04-03 12:05:30 +00:00
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
2019-07-01 09:31:02 +00:00
if not withDigest:
2020-04-03 12:05:30 +00:00
headers = {
'(request-target)': f'post {path}',
'host': headerDomain,
'date': dateStr
2020-03-22 20:36:19 +00:00
}
2020-04-03 12:05:30 +00:00
signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port, toDomain, toPort,
path, httpPrefix, None)
2019-07-01 09:31:02 +00:00
else:
2020-04-03 12:05:30 +00:00
bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr)
headers = {
2020-03-22 20:36:19 +00:00
'(request-target)': f'post {path}',
'host': headerDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'content-length': str(contentLength),
'content-type': contentType
}
2020-04-03 12:05:30 +00:00
signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port,
toDomain, toPort,
path, httpPrefix, messageBodyJsonStr)
headers['signature'] = signatureHeader
2019-07-01 09:31:02 +00:00
return headers
2020-04-03 12:05:30 +00:00
def _verifyRecentSignature(signedDateStr: str) -> bool:
2019-08-23 11:31:46 +00:00
"""Checks whether the given time taken from the header is within
12 hours of the current time
"""
2020-04-03 12:05:30 +00:00
currDate = datetime.datetime.utcnow()
dateFormat = "%a, %d %b %Y %H:%M:%S %Z"
signedDate = datetime.datetime.strptime(signedDateStr, dateFormat)
timeDiffSec = (currDate - signedDate).seconds
2019-08-23 11:39:16 +00:00
# 12 hours tollerance
if timeDiffSec > 43200:
2020-04-03 12:05:30 +00:00
print('WARN: Header signed too long ago: ' + signedDateStr)
print(str(timeDiffSec / (60 * 60)) + ' hours')
return False
if timeDiffSec < 0:
2020-04-03 12:05:30 +00:00
print('WARN: Header signed in the future! ' + signedDateStr)
print(str(timeDiffSec / (60 * 60)) + ' hours')
2019-08-23 11:30:37 +00:00
return False
return True
2020-04-03 12:05:30 +00:00
def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
path: str, GETmethod: bool,
messageBodyDigest: str,
messageBodyJsonStr: str, debug: bool) -> bool:
2019-06-28 18:55:29 +00:00
"""Returns true or false depending on if the key that we plugged in here
validates against the headers, method, and path.
publicKeyPem - the public key from an rsa key pair
headers - should be a dictionary of request headers
path - the relative url that was requested from this site
GETmethod - GET or POST
2019-07-01 09:31:02 +00:00
messageBodyJsonStr - the received request body (used for digest)
2019-06-28 18:55:29 +00:00
"""
2019-08-23 11:20:20 +00:00
2019-06-28 18:55:29 +00:00
if GETmethod:
2020-04-03 12:05:30 +00:00
method = 'GET'
2019-06-28 18:55:29 +00:00
else:
2020-04-03 12:05:30 +00:00
method = 'POST'
2019-11-12 15:03:17 +00:00
if debug:
2020-04-03 12:05:30 +00:00
print('DEBUG: verifyPostHeaders ' + method)
2020-03-22 21:16:02 +00:00
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend())
2019-06-28 18:55:29 +00:00
# Build a dictionary of the signature values
2020-04-03 12:05:30 +00:00
signatureHeader = headers['signature']
signatureDict = {
2019-06-28 18:55:29 +00:00
k: v[1:-1]
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
}
# Unpack the signed headers and set values based on current headers and
# body (if a digest was included)
2020-04-03 12:05:30 +00:00
signedHeaderList = []
2019-06-28 18:55:29 +00:00
for signedHeader in signatureDict['headers'].split(' '):
2019-11-12 15:03:17 +00:00
if debug:
2020-04-03 12:05:30 +00:00
print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
2019-06-28 18:55:29 +00:00
if signedHeader == '(request-target)':
2020-04-03 12:05:30 +00:00
appendStr = f'(request-target): {method.lower()} {path}'
signedHeaderList.append(appendStr)
2019-06-28 18:55:29 +00:00
elif signedHeader == 'digest':
if messageBodyDigest:
2020-04-03 12:05:30 +00:00
bodyDigest = messageBodyDigest
else:
2020-04-03 12:05:30 +00:00
bodyDigest = messageContentDigest(messageBodyJsonStr)
2019-06-28 18:55:29 +00:00
signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
2019-11-12 18:48:29 +00:00
elif signedHeader == 'content-length':
2019-11-12 19:20:55 +00:00
if headers.get(signedHeader):
2020-04-03 12:05:30 +00:00
appendStr = f'content-length: {headers[signedHeader]}'
signedHeaderList.append(appendStr)
2019-11-12 17:16:34 +00:00
else:
2019-11-12 19:32:23 +00:00
if headers.get('Content-Length'):
2020-04-03 12:05:30 +00:00
contentLength = headers['Content-Length']
2019-11-12 19:32:23 +00:00
signedHeaderList.append(f'content-length: {contentLength}')
else:
if headers.get('Content-length'):
2020-04-03 12:05:30 +00:00
contentLength = headers['Content-length']
appendStr = f'content-length: {contentLength}'
signedHeaderList.append(appendStr)
2019-11-12 19:32:23 +00:00
else:
if debug:
2020-04-03 12:05:30 +00:00
print('DEBUG: verifyPostHeaders ' + signedHeader +
' not found in ' + str(headers))
2019-06-28 18:55:29 +00:00
else:
2019-08-15 21:34:25 +00:00
if headers.get(signedHeader):
2020-04-03 12:05:30 +00:00
if signedHeader == 'date':
if not _verifyRecentSignature(headers[signedHeader]):
2019-11-12 15:03:17 +00:00
if debug:
2020-04-03 12:05:30 +00:00
print('DEBUG: ' +
'verifyPostHeaders date is not recent ' +
headers[signedHeader])
2019-08-23 11:30:37 +00:00
return False
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeader]}')
2019-08-15 21:34:25 +00:00
else:
2020-04-03 12:05:30 +00:00
signedHeaderCap = signedHeader.capitalize()
if signedHeaderCap == 'Date':
if not _verifyRecentSignature(headers[signedHeaderCap]):
2019-11-12 15:03:17 +00:00
if debug:
2020-04-03 12:05:30 +00:00
print('DEBUG: ' +
'verifyPostHeaders date is not recent ' +
headers[signedHeader])
2019-08-23 11:30:37 +00:00
return False
2019-08-15 21:34:25 +00:00
if headers.get(signedHeaderCap):
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeaderCap]}')
2019-06-28 18:55:29 +00:00
2019-11-12 15:25:47 +00:00
if debug:
2020-04-03 12:05:30 +00:00
print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
2019-06-28 18:55:29 +00:00
# Now we have our header data digest
2020-04-03 12:05:30 +00:00
signedHeaderText = '\n'.join(signedHeaderList)
headerDigest = getSHA256(signedHeaderText.encode('ascii'))
2019-06-28 18:55:29 +00:00
# Get the signature, verify with public key, return result
2020-04-03 12:05:30 +00:00
signature = base64.b64decode(signatureDict['signature'])
2019-06-28 18:55:29 +00:00
try:
pubkey.verify(
signature,
headerDigest,
padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA256()))
2019-06-28 18:55:29 +00:00
return True
except BaseException:
2019-11-12 15:03:17 +00:00
if debug:
print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
2019-06-28 18:55:29 +00:00
return False