Improve support for hs2019 http signatures

main
Bob Mottram 2021-11-22 18:30:05 +00:00
parent e48e2a0fd1
commit 0ce7573104
3 changed files with 218 additions and 117 deletions

View File

@ -450,6 +450,8 @@ class PubServer(BaseHTTPRequestHandler):
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01
return self.headers['Signature-Input']
elif self.headers.get('signature-input'):
return self.headers['signature-input']
elif self.headers.get('signature'):
# Ye olde Masto http sig
return self.headers['signature']

View File

@ -11,7 +11,7 @@ __module_group__ = "Security"
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06
#
# This might change in future
# see https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures-01
# see https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
@ -116,11 +116,11 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
path: str,
httpPrefix: str,
messageBodyJsonStr: str,
algorithm: str) -> (str, str):
algorithm: str, debug: bool) -> (str, str):
"""Returns a raw signature strings that can be plugged into a header
as "Signature-Input" and "Signature"
used to verify the authenticity of an HTTP transmission.
See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures-01
See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures
"""
domain = getFullDomain(domain, port)
@ -137,18 +137,17 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
keyID = localActorUrl(httpPrefix, nickname, domain) + '#main-key'
if not messageBodyJsonStr:
headers = {
'*request-target': f'post {path}',
'*created': str(secondsSinceEpoch),
'@request-target': f'get {path}',
'@created': str(secondsSinceEpoch),
'host': toDomain,
'date': dateStr,
'content-type': 'application/json'
'date': dateStr
}
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr)
headers = {
'*request-target': f'post {path}',
'*created': str(secondsSinceEpoch),
'@request-target': f'post {path}',
'@created': str(secondsSinceEpoch),
'host': toDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
@ -164,6 +163,10 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
signedHeaderText = signedHeaderText.strip()
if debug:
print('\nsignPostHeadersNew signedHeaderText:\n' +
signedHeaderText + '\nEND\n')
# Sign the digest. Potentially other signing algorithms can be added here.
signature = ''
if algorithm == 'rsa-sha512':
@ -298,8 +301,11 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend())
# Build a dictionary of the signature values
if headers.get('Signature-Input'):
signatureHeader = headers['Signature-Input']
if headers.get('Signature-Input') or headers.get('signature-input'):
if headers.get('Signature-Input'):
signatureHeader = headers['Signature-Input']
else:
signatureHeader = headers['signature-input']
fieldSep2 = ','
# split the signature input into separate fields
signatureDict = {
@ -342,15 +348,23 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# original Mastodon http signature
appendStr = f'(request-target): {method.lower()} {path}'
signedHeaderList.append(appendStr)
elif '*request-target' in signedHeader:
elif '@request-target' in signedHeader:
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01
appendStr = f'*request-target: {method.lower()} {path}'
# remove ()
# if appendStr.startswith('('):
# appendStr = appendStr.split('(')[1]
# if ')' in appendStr:
# appendStr = appendStr.split(')')[0]
# draft-ietf-httpbis-message-signatures
appendStr = f'@request-target: {method.lower()} {path}'
signedHeaderList.append(appendStr)
elif '@created' in signedHeader:
if signatureDict.get('created'):
createdStr = str(signatureDict['created'])
appendStr = f'@created: {createdStr}'
signedHeaderList.append(appendStr)
elif '@expires' in signedHeader:
if signatureDict.get('expires'):
expiresStr = str(signatureDict['expires'])
appendStr = f'@expires: {expiresStr}'
signedHeaderList.append(appendStr)
elif '@method' in signedHeader:
appendStr = f'@expires: {method}'
signedHeaderList.append(appendStr)
elif signedHeader == 'algorithm':
if headers.get(signedHeader):
@ -430,14 +444,18 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList)
if debug:
print('signedHeaderText:\n' + signedHeaderText + 'END')
print('\nverifyPostHeaders signedHeaderText:\n' +
signedHeaderText + '\nEND\n')
# Get the signature, verify with public key, return result
signature = None
if headers.get('Signature-Input') and headers.get('Signature'):
if (headers.get('Signature-Input') and headers.get('Signature')) or \
(headers.get('signature-input') and headers.get('signature')):
# https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01
headersSig = headers['Signature']
# draft-ietf-httpbis-message-signatures
if headers.get('Signature'):
headersSig = headers['Signature']
else:
headersSig = headers['signature']
# remove sig1=:
if requestTargetKey + '=:' in headersSig:
headersSig = headersSig.split(requestTargetKey + '=:')[1]
@ -445,10 +463,10 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
signature = base64.b64decode(headersSig)
else:
# Original Mastodon signature
signature = base64.b64decode(signatureDict['signature'])
if debug:
print('signature: ' + algorithm + ' ' +
signatureDict['signature'])
headersSig = signatureDict['signature']
signature = base64.b64decode(headersSig)
if debug:
print('signature: ' + algorithm + ' ' + headersSig)
# log unusual signing algorithms
if signatureDict.get('alg'):
@ -457,11 +475,12 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# If extra signing algorithms need to be added then do it here
if not signatureDict.get('alg'):
alg = hazutils.Prehashed(hashes.SHA256())
elif signatureDict['alg'] == 'rsa-sha256':
elif (signatureDict['alg'] == 'rsa-sha256' or
signatureDict['alg'] == 'rsa-v1_5-sha256' or
signatureDict['alg'] == 'hs2019'):
alg = hazutils.Prehashed(hashes.SHA256())
elif signatureDict['alg'] == 'hs2019':
alg = hazutils.Prehashed(hashes.SHA256())
elif signatureDict['alg'] == 'rsa-sha512':
elif (signatureDict['alg'] == 'rsa-sha512' or
signatureDict['alg'] == 'rsa-pss-sha512'):
alg = hazutils.Prehashed(hashes.SHA512())
else:
alg = hazutils.Prehashed(hashes.SHA256())

250
tests.py
View File

@ -392,8 +392,20 @@ def _testSignAndVerify() -> None:
def _testHttpSigNew():
print('testHttpSigNew')
httpPrefix = 'https'
port = 443
debug = True
messageBodyJson = {"hello": "world"}
messageBodyJsonStr = json.dumps(messageBodyJson)
nickname = 'foo'
pathStr = "/" + nickname + "?param=value&pet=dog HTTP/1.1"
domain = 'example.com'
dateStr = 'Tue, 20 Apr 2021 02:07:55 GMT'
digestStr = 'SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE='
bodyDigest = messageContentDigest(messageBodyJsonStr)
assert bodyDigest in digestStr
contentLength = 18
contentType = 'application/activity+json'
publicKeyPem = \
'-----BEGIN RSA PUBLIC KEY-----\n' + \
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
@ -462,101 +474,49 @@ def _testHttpSigNew():
'EQeNC8fHGg4UXU8mhHnSBt3EA10qQJfRD' + \
's15M38eG2cYwB1PZpDHScDnDA0=\n' + \
'-----END RSA PRIVATE KEY-----'
sigInput = \
'sig1=(date); alg=rsa-sha256; keyId="test-key-b"'
sig = \
'sig1=:HtXycCl97RBVkZi66ADKnC9c5eSSlb57GnQ4KFqNZplOpNfxqk62' + \
'JzZ484jXgLvoOTRaKfR4hwyxlcyb+BWkVasApQovBSdit9Ml/YmN2IvJDPncrlhPD' + \
'VDv36Z9/DiSO+RNHD7iLXugdXo1+MGRimW1RmYdenl/ITeb7rjfLZ4b9VNnLFtVWw' + \
'rjhAiwIqeLjodVImzVc5srrk19HMZNuUejK6I3/MyN3+3U8tIRW4LWzx6ZgGZUaEE' + \
'P0aBlBkt7Fj0Tt5/P5HNW/Sa/m8smxbOHnwzAJDa10PyjzdIbywlnWIIWtZKPPsoV' + \
'oKVopUWEU3TNhpWmaVhFrUL/O6SN3w==:'
# "hs2019", using RSASSA-PSS [RFC8017] and SHA-512 [RFC6234]
# sigInput = \
# 'sig1=(*request-target, *created, host, date, ' + \
# 'cache-control, x-empty-header, x-example); keyId="test-key-a"; ' + \
# 'alg=hs2019; created=1402170695; expires=1402170995'
# sig = \
# 'sig1=:K2qGT5srn2OGbOIDzQ6kYT+ruaycnDAAUpKv+ePFfD0RAxn/1BUe' + \
# 'Zx/Kdrq32DrfakQ6bPsvB9aqZqognNT6be4olHROIkeV879RrsrObury8L9SCEibe' + \
# 'oHyqU/yCjphSmEdd7WD+zrchK57quskKwRefy2iEC5S2uAH0EPyOZKWlvbKmKu5q4' + \
# 'CaB8X/I5/+HLZLGvDiezqi6/7p2Gngf5hwZ0lSdy39vyNMaaAT0tKo6nuVw0S1MVg' + \
# '1Q7MpWYZs0soHjttq0uLIA3DIbQfLiIvK6/l0BdWTU7+2uQj7lBkQAsFZHoA96ZZg' + \
# 'FquQrXRlmYOh+Hx5D9fJkXcXe5tmAg==:'
nickname = 'foo'
boxpath = '/' + nickname
# headers = {
# "*request-target": "get " + boxpath,
# "*created": "1402170695",
# "host": "example.org",
# "date": "Tue, 07 Jun 2014 20:51:35 GMT",
# "cache-control": "max-age=60, must-revalidate",
# "x-emptyheader": "",
# "x-example": "Example header with some whitespace.",
# "x-dictionary": "b=2",
# "x-dictionary": "a=1",
# "x-list": "(a, b, c)",
# "Signature-Input": sigInput,
# "Signature": sig
# }
dateStr = "Tue, 07 Jun 2014 20:51:35 GMT"
secondsSinceEpoch = 1402174295
domain = "example.com"
port = 443
headers = {
"*created": str(secondsSinceEpoch),
"*request-target": "post /foo?param=value&pet=dog",
"host": domain,
"date": dateStr,
"content-type": "application/json",
"digest": "SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=",
"content-length": "18",
"Signature-Input": sigInput,
"Signature": sig
}
httpPrefix = 'https'
debug = False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, False, None,
messageBodyJsonStr, debug, True)
# make a deliberate mistake
headers['Signature'] = headers['Signature'].replace('V', 'B')
assert not verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, False, None,
messageBodyJsonStr, debug, True)
# test signing
bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr)
headers = {
"host": domain,
"date": dateStr,
"digest": f'SHA-256={bodyDigest}',
"content-type": "application/json",
"content-type": contentType,
"content-length": str(contentLength)
}
signatureIndexHeader, signatureHeader = \
signPostHeadersNew(dateStr, privateKeyPem, nickname,
domain, port,
domain, port,
boxpath, httpPrefix, messageBodyJsonStr,
'rsa-sha256')
expectedIndexHeader = \
'keyId="https://example.com/users/foo#main-key"; ' + \
'alg=hs2019; created=' + str(secondsSinceEpoch) + '; ' + \
'sig1=(*request-target, *created, host, date, ' + \
'digest, content-type, content-length)'
if signatureIndexHeader != expectedIndexHeader:
print('Unexpected new http header: ' + signatureIndexHeader)
print('Should be: ' + expectedIndexHeader)
assert signatureIndexHeader == expectedIndexHeader
assert signatureHeader == \
'sig1=:euX3O1KSTYXN9/oR2qFezswWm9FbrjtRymK7xBpXNQvTs' + \
'XehtrNdD8nELZKzPXMvMz7PaJd6V+fjzpHoZ9upTdqqQLK2Iwml' + \
'p4BlHqW6Aopd7sZFCWFq7/Amm5oaizpp3e0jb5XISS5m3cRKuoi' + \
'LM0x+OudmAoYGi0TEEJk8bpnJAXfVCDfmOyL3XNqQeShQHeOANG' + \
'okiKktj8ff+KLYLaPTAJkob1k/EhoPIkbw/YzAY8IZjWQNMkf+F' + \
'JChApQ5HnDCQPwD5xV9eGzBpAf6D0G19xiTmQye4Hn6tAs3fy3V' + \
'/aYa/GhW2pSrctDnAKIi4imj9joppr3CB8gqgXZOPQ==:'
pathStr, httpPrefix, messageBodyJsonStr,
'rsa-sha256', debug)
print('signatureIndexHeader1: ' + str(signatureIndexHeader))
print('signatureHeader1: ' + str(signatureHeader))
sigInput = "keyId=\"https://example.com/users/foo#main-key\"; " + \
"alg=hs2019; created=1618884475; " + \
"sig1=(@request-target, @created, host, date, digest, " + \
"content-type, content-length)"
assert signatureIndexHeader == sigInput
sig = "sig1=:NXAQ7AtDMR2iwhmH1qCwiZw5PVTjOw5+5kSu0Tsx/3gqz0D" + \
"py7OQbWqFHrNB7MmS4TukX/vDyQOFdElY5yxnEhbgRwKACq0AP4QH9H" + \
"CiRyCE8UXDdAkY4VUd6jrWjRHKRoqQN7I+Q5tb2Fu5cDfifw/PQc86Z" + \
"NmMhPrg3OjUJ9Q2Gj29NhgJ+4el1ECg0cAy4yG1M9AQ3KvQooQFvlg1" + \
"vp0H2xfbJQjv8FsR/lKiRdaVHqGR2CKrvxvPRPaOsFANp2wzEtiMk3O" + \
"TrBTYU+Zb53mIspfEeLxsNtcGmBDmQKZ9Pud8f99XGJrP+uDd3zKtnr" + \
"f3fUnRRqy37yhB7WVwkg==:"
assert signatureHeader == sig
debug = True
headers['path'] = pathStr
headers['signature'] = sig
headers['signature-input'] = sigInput
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
pathStr, False, None,
messageBodyJsonStr, debug, True)
# make a deliberate mistake
debug = False
headers['signature'] = headers['signature'].replace('V', 'B')
assert not verifyPostHeaders(httpPrefix, publicKeyPem, headers,
pathStr, False, None,
messageBodyJsonStr, debug, True)
def _testHttpsigBase(withDigest: bool, baseDir: str):
@ -5920,6 +5880,124 @@ def _testValidEmojiContent() -> None:
assert validEmojiContent('😄')
def _testHttpsigBaseNew(withDigest: bool, baseDir: str):
print('testHttpsigNew(' + str(withDigest) + ')')
debug = True
path = baseDir + '/.testHttpsigBaseNew'
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=False, onerror=None)
os.mkdir(path)
os.chdir(path)
contentType = 'application/activity+json'
nickname = 'socrates'
hostDomain = 'someother.instance'
domain = 'argumentative.social'
httpPrefix = 'https'
port = 5576
password = 'SuperSecretPassword'
privateKeyPem, publicKeyPem, person, wfEndpoint = \
createPerson(path, nickname, domain, port, httpPrefix,
False, False, password)
assert privateKeyPem
if withDigest:
messageBodyJson = {
"a key": "a value",
"another key": "A string",
"yet another key": "Another string"
}
messageBodyJsonStr = json.dumps(messageBodyJson)
else:
messageBodyJsonStr = ''
headersDomain = getFullDomain(hostDomain, port)
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
boxpath = '/inbox'
if not withDigest:
headers = {
'host': headersDomain,
'date': dateStr,
'accept': contentType
}
signatureIndexHeader, signatureHeader = \
signPostHeadersNew(dateStr, privateKeyPem, nickname,
domain, port,
hostDomain, port,
boxpath, httpPrefix, messageBodyJsonStr,
'rsa-sha256', debug)
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr)
headers = {
'host': headersDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
signatureIndexHeader, signatureHeader = \
signPostHeadersNew(dateStr, privateKeyPem, nickname,
domain, port,
hostDomain, port,
boxpath, httpPrefix, messageBodyJsonStr,
'rsa-sha256', debug)
headers['signature'] = signatureHeader
headers['signature-input'] = signatureIndexHeader
print('headers: ' + str(headers))
GETmethod = not withDigest
debug = True
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, None,
messageBodyJsonStr, debug)
debug = False
if withDigest:
# everything correct except for content-length
headers['content-length'] = str(contentLength + 2)
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, None,
messageBodyJsonStr, debug) is False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
'/parambulator' + boxpath, GETmethod, None,
messageBodyJsonStr, debug) is False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, not GETmethod, None,
messageBodyJsonStr, debug) is False
if not withDigest:
# fake domain
headers = {
'host': 'bogon.domain',
'date': dateStr,
'content-type': contentType
}
else:
# correct domain but fake message
messageBodyJsonStr = \
'{"a key": "a value", "another key": "Fake GNUs", ' + \
'"yet another key": "More Fake GNUs"}'
contentLength = len(messageBodyJsonStr)
bodyDigest = messageContentDigest(messageBodyJsonStr)
headers = {
'host': domain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
headers['signature'] = signatureHeader
headers['signature-input'] = signatureIndexHeader
pprint(headers)
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, not GETmethod, None,
messageBodyJsonStr, False) is False
os.chdir(baseDir)
shutil.rmtree(path, ignore_errors=False, onerror=None)
def runAllTests():
baseDir = os.getcwd()
print('Running tests...')
@ -5991,6 +6069,8 @@ def runAllTests():
_testHttpsig(baseDir)
_testHttpSignedGET(baseDir)
_testHttpSigNew()
_testHttpsigBaseNew(True, baseDir)
_testHttpsigBaseNew(False, baseDir)
_testCache()
_testThreads()
_testCreatePerson(baseDir)