epicyon/httpsig.py

557 lines
22 KiB
Python

__filename__ = "httpsig.py"
__author__ = "Bob Mottram"
__credits__ = ['lamia']
__license__ = "AGPL3+"
__version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Security"
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06
#
# This might change in future
# see https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import utils as hazutils
import base64
from time import gmtime, strftime
import datetime
from utils import get_full_domain
from utils import get_sha_256
from utils import get_sha_512
from utils import local_actor_url
def messageContentDigest(messageBodyJsonStr: str, digestAlgorithm: str) -> str:
"""Returns the digest for the message body
"""
msg = messageBodyJsonStr.encode('utf-8')
if digestAlgorithm == 'rsa-sha512' or \
digestAlgorithm == 'rsa-pss-sha512':
hashResult = get_sha_512(msg)
else:
hashResult = get_sha_256(msg)
return base64.b64encode(hashResult).decode('utf-8')
def getDigestPrefix(digestAlgorithm: str) -> str:
"""Returns the prefix for the message body digest
"""
if digestAlgorithm == 'rsa-sha512' or \
digestAlgorithm == 'rsa-pss-sha512':
return 'SHA-512'
return 'SHA-256'
def getDigestAlgorithmFromHeaders(httpHeaders: {}) -> str:
"""Returns the digest algorithm from http headers
"""
digestStr = None
if httpHeaders.get('digest'):
digestStr = httpHeaders['digest']
elif httpHeaders.get('Digest'):
digestStr = httpHeaders['Digest']
if digestStr:
if digestStr.startswith('SHA-512'):
return 'rsa-sha512'
return 'rsa-sha256'
def signPostHeaders(dateStr: str, privateKeyPem: str,
nickname: str,
domain: str, port: int,
toDomain: str, toPort: int,
path: str,
http_prefix: str,
messageBodyJsonStr: str,
contentType: str,
algorithm: str,
digestAlgorithm: str) -> str:
"""Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission.
"""
domain = get_full_domain(domain, port)
toDomain = get_full_domain(toDomain, toPort)
if not dateStr:
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
if nickname != domain and nickname.lower() != 'actor':
keyID = local_actor_url(http_prefix, nickname, domain)
else:
# instance actor
keyID = http_prefix + '://' + domain + '/actor'
keyID += '#main-key'
if not messageBodyJsonStr:
headers = {
'(request-target)': f'get {path}',
'host': toDomain,
'date': dateStr,
'accept': contentType
}
else:
bodyDigest = \
messageContentDigest(messageBodyJsonStr, digestAlgorithm)
digestPrefix = getDigestPrefix(digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'(request-target)': f'post {path}',
'host': toDomain,
'date': dateStr,
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': 'application/activity+json',
'content-length': str(contentLength)
}
key = load_pem_private_key(privateKeyPem.encode('utf-8'),
None, backend=default_backend())
# headers.update({
# '(request-target)': f'post {path}',
# })
# build a digest for signing
signedHeaderKeys = headers.keys()
signedHeaderText = ''
for headerKey in signedHeaderKeys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
# strip the trailing linefeed
signedHeaderText = signedHeaderText.rstrip('\n')
# signedHeaderText.encode('ascii') matches
headerDigest = get_sha_256(signedHeaderText.encode('ascii'))
# print('headerDigest2: ' + str(headerDigest))
# Sign the digest
rawSignature = key.sign(headerDigest,
padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA256()))
signature = base64.b64encode(rawSignature).decode('ascii')
# Put it into a valid HTTP signature format
signatureDict = {
'keyId': keyID,
'algorithm': algorithm,
'headers': ' '.join(signedHeaderKeys),
'signature': signature
}
signatureHeader = ','.join(
[f'{k}="{v}"' for k, v in signatureDict.items()])
return signatureHeader
def signPostHeadersNew(dateStr: str, privateKeyPem: str,
nickname: str,
domain: str, port: int,
toDomain: str, toPort: int,
path: str,
http_prefix: str,
messageBodyJsonStr: str,
algorithm: str, digestAlgorithm: str,
debug: bool) -> (str, str):
"""Returns a raw signature strings that can be plugged into a header
as "Signature-Input" and "Signature"
used to verify the authenticity of an HTTP transmission.
See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures
"""
domain = get_full_domain(domain, port)
toDomain = get_full_domain(toDomain, toPort)
timeFormat = "%a, %d %b %Y %H:%M:%S %Z"
if not dateStr:
curr_time = gmtime()
dateStr = strftime(timeFormat, curr_time)
else:
curr_time = datetime.datetime.strptime(dateStr, timeFormat)
secondsSinceEpoch = \
int((curr_time - datetime.datetime(1970, 1, 1)).total_seconds())
keyID = local_actor_url(http_prefix, nickname, domain) + '#main-key'
if not messageBodyJsonStr:
headers = {
'@request-target': f'get {path}',
'@created': str(secondsSinceEpoch),
'host': toDomain,
'date': dateStr
}
else:
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
digestPrefix = getDigestPrefix(digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'@request-target': f'post {path}',
'@created': str(secondsSinceEpoch),
'host': toDomain,
'date': dateStr,
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': 'application/activity+json',
'content-length': str(contentLength)
}
key = load_pem_private_key(privateKeyPem.encode('utf-8'),
None, backend=default_backend())
# build a digest for signing
signedHeaderKeys = headers.keys()
signedHeaderText = ''
for headerKey in signedHeaderKeys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
signedHeaderText = signedHeaderText.strip()
if debug:
print('\nsignPostHeadersNew signedHeaderText:\n' +
signedHeaderText + '\nEND\n')
# Sign the digest. Potentially other signing algorithms can be added here.
signature = ''
if algorithm == 'rsa-sha512':
headerDigest = get_sha_512(signedHeaderText.encode('ascii'))
rawSignature = key.sign(headerDigest,
padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA512()))
signature = base64.b64encode(rawSignature).decode('ascii')
else:
# default rsa-sha256
headerDigest = get_sha_256(signedHeaderText.encode('ascii'))
rawSignature = key.sign(headerDigest,
padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA256()))
signature = base64.b64encode(rawSignature).decode('ascii')
sigKey = 'sig1'
# Put it into a valid HTTP signature format
signatureInputDict = {
'keyId': keyID,
}
signatureIndexHeader = '; '.join(
[f'{k}="{v}"' for k, v in signatureInputDict.items()])
signatureIndexHeader += '; alg=hs2019'
signatureIndexHeader += '; created=' + str(secondsSinceEpoch)
signatureIndexHeader += \
'; ' + sigKey + '=(' + ', '.join(signedHeaderKeys) + ')'
signatureDict = {
sigKey: signature
}
signatureHeader = '; '.join(
[f'{k}=:{v}:' for k, v in signatureDict.items()])
return signatureIndexHeader, signatureHeader
def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
domain: str, port: int,
toDomain: str, toPort: int,
path: str, http_prefix: str, withDigest: bool,
messageBodyJsonStr: str,
contentType: str) -> {}:
"""Note that the domain is the destination, not the sender
"""
algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256'
headerDomain = get_full_domain(toDomain, toPort)
# if no date is given then create one
if not dateStr:
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
# Content-Type or Accept header
if not contentType:
contentType = 'application/activity+json'
if not withDigest:
headers = {
'(request-target)': f'get {path}',
'host': headerDomain,
'date': dateStr,
'accept': contentType
}
signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port, toDomain, toPort,
path, http_prefix, None, contentType,
algorithm, None)
else:
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
digestPrefix = getDigestPrefix(digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'(request-target)': f'post {path}',
'host': headerDomain,
'date': dateStr,
'digest': f'{digestPrefix}={bodyDigest}',
'content-length': str(contentLength),
'content-type': contentType
}
signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port,
toDomain, toPort,
path, http_prefix, messageBodyJsonStr,
contentType, algorithm, digestAlgorithm)
headers['signature'] = signatureHeader
return headers
def _verifyRecentSignature(signedDateStr: str) -> bool:
"""Checks whether the given time taken from the header is within
12 hours of the current time
"""
currDate = datetime.datetime.utcnow()
dateFormat = "%a, %d %b %Y %H:%M:%S %Z"
signedDate = datetime.datetime.strptime(signedDateStr, dateFormat)
timeDiffSec = (currDate - signedDate).seconds
# 12 hours tollerance
if timeDiffSec > 43200:
print('WARN: Header signed too long ago: ' + signedDateStr)
print(str(timeDiffSec / (60 * 60)) + ' hours')
return False
if timeDiffSec < 0:
print('WARN: Header signed in the future! ' + signedDateStr)
print(str(timeDiffSec / (60 * 60)) + ' hours')
return False
return True
def verifyPostHeaders(http_prefix: str,
publicKeyPem: str, headers: dict,
path: str, GETmethod: bool,
messageBodyDigest: str,
messageBodyJsonStr: str, debug: bool,
noRecencyCheck: bool = False) -> bool:
"""Returns true or false depending on if the key that we plugged in here
validates against the headers, method, and path.
publicKeyPem - the public key from an rsa key pair
headers - should be a dictionary of request headers
path - the relative url that was requested from this site
GETmethod - GET or POST
messageBodyJsonStr - the received request body (used for digest)
"""
if GETmethod:
method = 'GET'
else:
method = 'POST'
if debug:
print('DEBUG: verifyPostHeaders ' + method)
print('verifyPostHeaders publicKeyPem: ' + str(publicKeyPem))
print('verifyPostHeaders headers: ' + str(headers))
print('verifyPostHeaders messageBodyJsonStr: ' +
str(messageBodyJsonStr))
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend())
# Build a dictionary of the signature values
if headers.get('Signature-Input') or headers.get('signature-input'):
if headers.get('Signature-Input'):
signatureHeader = headers['Signature-Input']
else:
signatureHeader = headers['signature-input']
fieldSep2 = ','
# split the signature input into separate fields
signatureDict = {
k.strip(): v.strip()
for k, v in [i.split('=', 1) for i in signatureHeader.split(';')]
}
requestTargetKey = None
requestTargetStr = None
for k, v in signatureDict.items():
if v.startswith('('):
requestTargetKey = k
requestTargetStr = v[1:-1]
elif v.startswith('"'):
signatureDict[k] = v[1:-1]
if not requestTargetKey:
return False
signatureDict[requestTargetKey] = requestTargetStr
else:
requestTargetKey = 'headers'
signatureHeader = headers['signature']
fieldSep2 = ' '
# split the signature input into separate fields
signatureDict = {
k: v[1:-1]
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
}
if debug:
print('signatureDict: ' + str(signatureDict))
# Unpack the signed headers and set values based on current headers and
# body (if a digest was included)
signedHeaderList = []
algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256'
for signedHeader in signatureDict[requestTargetKey].split(fieldSep2):
signedHeader = signedHeader.strip()
if debug:
print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
if signedHeader == '(request-target)':
# original Mastodon http signature
appendStr = f'(request-target): {method.lower()} {path}'
signedHeaderList.append(appendStr)
elif '@request-target' in signedHeader:
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures
appendStr = f'@request-target: {method.lower()} {path}'
signedHeaderList.append(appendStr)
elif '@created' in signedHeader:
if signatureDict.get('created'):
createdStr = str(signatureDict['created'])
appendStr = f'@created: {createdStr}'
signedHeaderList.append(appendStr)
elif '@expires' in signedHeader:
if signatureDict.get('expires'):
expiresStr = str(signatureDict['expires'])
appendStr = f'@expires: {expiresStr}'
signedHeaderList.append(appendStr)
elif '@method' in signedHeader:
appendStr = f'@expires: {method}'
signedHeaderList.append(appendStr)
elif '@scheme' in signedHeader:
signedHeaderList.append('@scheme: http')
elif '@authority' in signedHeader:
authorityStr = None
if signatureDict.get('authority'):
authorityStr = str(signatureDict['authority'])
elif signatureDict.get('Authority'):
authorityStr = str(signatureDict['Authority'])
if authorityStr:
appendStr = f'@authority: {authorityStr}'
signedHeaderList.append(appendStr)
elif signedHeader == 'algorithm':
if headers.get(signedHeader):
algorithm = headers[signedHeader]
if debug:
print('http signature algorithm: ' + algorithm)
elif signedHeader == 'digest':
if messageBodyDigest:
bodyDigest = messageBodyDigest
else:
bodyDigest = \
messageContentDigest(messageBodyJsonStr, digestAlgorithm)
signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
elif signedHeader == 'content-length':
if headers.get(signedHeader):
appendStr = f'content-length: {headers[signedHeader]}'
signedHeaderList.append(appendStr)
elif headers.get('Content-Length'):
contentLength = headers['Content-Length']
signedHeaderList.append(f'content-length: {contentLength}')
elif headers.get('Content-length'):
contentLength = headers['Content-length']
appendStr = f'content-length: {contentLength}'
signedHeaderList.append(appendStr)
else:
if debug:
print('DEBUG: verifyPostHeaders ' + signedHeader +
' not found in ' + str(headers))
else:
if headers.get(signedHeader):
if signedHeader == 'date' and not noRecencyCheck:
if not _verifyRecentSignature(headers[signedHeader]):
if debug:
print('DEBUG: ' +
'verifyPostHeaders date is not recent ' +
headers[signedHeader])
return False
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeader]}')
else:
if '-' in signedHeader:
# capitalise with dashes
# my-header becomes My-Header
headerParts = signedHeader.split('-')
signedHeaderCap = None
for part in headerParts:
if signedHeaderCap:
signedHeaderCap += '-' + part.capitalize()
else:
signedHeaderCap = part.capitalize()
else:
# header becomes Header
signedHeaderCap = signedHeader.capitalize()
if debug:
print('signedHeaderCap: ' + signedHeaderCap)
# if this is the date header then check it is recent
if signedHeaderCap == 'Date':
if not _verifyRecentSignature(headers[signedHeaderCap]):
if debug:
print('DEBUG: ' +
'verifyPostHeaders date is not recent ' +
headers[signedHeader])
return False
# add the capitalised header
if headers.get(signedHeaderCap):
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeaderCap]}')
elif '-' in signedHeader:
# my-header becomes My-header
signedHeaderCap = signedHeader.capitalize()
if headers.get(signedHeaderCap):
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeaderCap]}')
# Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList)
if debug:
print('\nverifyPostHeaders signedHeaderText:\n' +
signedHeaderText + '\nEND\n')
# Get the signature, verify with public key, return result
if (headers.get('Signature-Input') and headers.get('Signature')) or \
(headers.get('signature-input') and headers.get('signature')):
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures
if headers.get('Signature'):
headersSig = headers['Signature']
else:
headersSig = headers['signature']
# remove sig1=:
if requestTargetKey + '=:' in headersSig:
headersSig = headersSig.split(requestTargetKey + '=:')[1]
headersSig = headersSig[:len(headersSig)-1]
signature = base64.b64decode(headersSig)
else:
# Original Mastodon signature
headersSig = signatureDict['signature']
signature = base64.b64decode(headersSig)
if debug:
print('signature: ' + algorithm + ' ' + headersSig)
# log unusual signing algorithms
if signatureDict.get('alg'):
print('http signature algorithm: ' + signatureDict['alg'])
# If extra signing algorithms need to be added then do it here
if not signatureDict.get('alg'):
alg = hazutils.Prehashed(hashes.SHA256())
elif (signatureDict['alg'] == 'rsa-sha256' or
signatureDict['alg'] == 'rsa-v1_5-sha256' or
signatureDict['alg'] == 'hs2019'):
alg = hazutils.Prehashed(hashes.SHA256())
elif (signatureDict['alg'] == 'rsa-sha512' or
signatureDict['alg'] == 'rsa-pss-sha512'):
alg = hazutils.Prehashed(hashes.SHA512())
else:
alg = hazutils.Prehashed(hashes.SHA256())
if digestAlgorithm == 'rsa-sha256':
headerDigest = get_sha_256(signedHeaderText.encode('ascii'))
elif digestAlgorithm == 'rsa-sha512':
headerDigest = get_sha_512(signedHeaderText.encode('ascii'))
else:
print('Unknown http digest algorithm: ' + digestAlgorithm)
headerDigest = ''
paddingStr = padding.PKCS1v15()
try:
pubkey.verify(signature, headerDigest, paddingStr, alg)
return True
except BaseException:
if debug:
print('EX: verifyPostHeaders pkcs1_15 verify failure')
return False