Unit test for verifying new http signature type

merge-requests/30/head
Bob Mottram 2021-02-22 14:03:24 +00:00
parent 4e2487aa27
commit 56e9130287
2 changed files with 160 additions and 21 deletions

View File

@ -163,7 +163,8 @@ def _verifyRecentSignature(signedDateStr: str) -> bool:
def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
path: str, GETmethod: bool, path: str, GETmethod: bool,
messageBodyDigest: str, messageBodyDigest: str,
messageBodyJsonStr: str, debug: bool) -> bool: messageBodyJsonStr: str, debug: bool,
noRecencyCheck=False) -> bool:
"""Returns true or false depending on if the key that we plugged in here """Returns true or false depending on if the key that we plugged in here
validates against the headers, method, and path. validates against the headers, method, and path.
publicKeyPem - the public key from an rsa key pair publicKeyPem - the public key from an rsa key pair
@ -186,23 +187,36 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# Build a dictionary of the signature values # Build a dictionary of the signature values
if headers.get('Signature-Input'): if headers.get('Signature-Input'):
signatureHeader = headers['Signature-Input'] signatureHeader = headers['Signature-Input']
fieldSep1 = ';'
fieldSep2 = ',' fieldSep2 = ','
# split the signature input into separate fields
signatureDict = {
k.strip(): v.strip()
for k, v in [i.split('=', 1) for i in signatureHeader.split(';')]
}
requestTargetKey = None
requestTargetStr = None
for k, v in signatureDict.items():
if v.startswith('('):
requestTargetKey = k
requestTargetStr = v[1:-1]
break
if not requestTargetKey:
return False
signatureDict[requestTargetKey] = requestTargetStr
else: else:
requestTargetKey = 'headers'
signatureHeader = headers['signature'] signatureHeader = headers['signature']
fieldSep1 = ','
fieldSep2 = ' ' fieldSep2 = ' '
# split the signature input into separate fields
# split the signature input into separate fields signatureDict = {
signatureDict = { k: v[1:-1]
k: v[1:-1] for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
for k, v in [i.split('=', 1) for i in signatureHeader.split(fieldSep1)] }
}
# Unpack the signed headers and set values based on current headers and # Unpack the signed headers and set values based on current headers and
# body (if a digest was included) # body (if a digest was included)
signedHeaderList = [] signedHeaderList = []
for signedHeader in signatureDict['headers'].split(fieldSep2): for signedHeader in signatureDict[requestTargetKey].split(fieldSep2):
signedHeader = signedHeader.strip() signedHeader = signedHeader.strip()
if debug: if debug:
print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader) print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
@ -214,11 +228,11 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# https://tools.ietf.org/html/ # https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01 # draft-ietf-httpbis-message-signatures-01
appendStr = f'*request-target: {method.lower()} {path}' appendStr = f'*request-target: {method.lower()} {path}'
# remove sig1=( # remove ()
if '=(' in appendStr: # if appendStr.startswith('('):
appendStr = appendStr.split('=(')[1] # appendStr = appendStr.split('(')[1]
if ')' in appendStr: # if ')' in appendStr:
appendStr = appendStr.split(')')[0] # appendStr = appendStr.split(')')[0]
signedHeaderList.append(appendStr) signedHeaderList.append(appendStr)
elif signedHeader == 'digest': elif signedHeader == 'digest':
if messageBodyDigest: if messageBodyDigest:
@ -245,7 +259,7 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
' not found in ' + str(headers)) ' not found in ' + str(headers))
else: else:
if headers.get(signedHeader): if headers.get(signedHeader):
if signedHeader == 'date': if signedHeader == 'date' and not noRecencyCheck:
if not _verifyRecentSignature(headers[signedHeader]): if not _verifyRecentSignature(headers[signedHeader]):
if debug: if debug:
print('DEBUG: ' + print('DEBUG: ' +
@ -280,8 +294,8 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# draft-ietf-httpbis-message-signatures-01 # draft-ietf-httpbis-message-signatures-01
headersSig = headers['Signature'] headersSig = headers['Signature']
# remove sig1=: # remove sig1=:
if '=:' in headersSig: if requestTargetKey + '=:' in headersSig:
headersSig = headersSig.split('=:')[1] headersSig = headersSig.split(requestTargetKey + '=:')[1]
headersSig = headersSig[:len(headersSig)-1] headersSig = headersSig[:len(headersSig)-1]
signature = base64.b64decode(headersSig) signature = base64.b64decode(headersSig)
else: else:

131
tests.py
View File

@ -106,6 +106,133 @@ thrBob = None
thrEve = None thrEve = None
def testHttpSigNew():
print('testHttpSigNew')
messageBodyJson = {"hello": "world"}
messageBodyJsonStr = json.dumps(messageBodyJson)
publicKeyPem = \
'-----BEGIN RSA PUBLIC KEY-----\n' + \
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
'cnZxKzAGwd7v36APp7Pv6Q2jdsPBRrw\n' + \
'WEBnez6d0UDKDwGbc6nxfEXAy5mbhgajz' + \
'rw3MOEt8uA5txSKobBpKDeBLOsdJKFq\n' + \
'MGmXCQvEG7YemcxDTRPxAleIAgYYRjTSd' + \
'/QBwVW9OwNFhekro3RtlinV0a75jfZg\n' + \
'kne/YiktSvLG34lw2zqXBDTC5NHROUqGT' + \
'lML4PlNZS5Ri2U4aCNx2rUPRcKIlE0P\n' + \
'uKxI4T+HIaFpv8+rdV6eUgOrB2xeI1dSF' + \
'Fn/nnv5OoZJEIB+VmuKn3DCUcCZSFlQ\n' + \
'PSXSfBDiUGhwOw76WuSSsf1D4b/vLoJ10wIDAQAB\n' + \
'-----END RSA PUBLIC KEY-----\n'
# privKey = \
# '-----BEGIN RSA PRIVATE KEY-----\n' + \
# 'MIIEqAIBAAKCAQEAhAKYdtoeoy8zcAcR8' + \
# '74L8cnZxKzAGwd7v36APp7Pv6Q2jdsP\n' + \
# 'BRrwWEBnez6d0UDKDwGbc6nxfEXAy5mbh' + \
# 'gajzrw3MOEt8uA5txSKobBpKDeBLOsd\n' + \
# 'JKFqMGmXCQvEG7YemcxDTRPxAleIAgYYR' + \
# 'jTSd/QBwVW9OwNFhekro3RtlinV0a75\n' + \
# 'jfZgkne/YiktSvLG34lw2zqXBDTC5NHRO' + \
# 'UqGTlML4PlNZS5Ri2U4aCNx2rUPRcKI\n' + \
# 'lE0PuKxI4T+HIaFpv8+rdV6eUgOrB2xeI' + \
# '1dSFFn/nnv5OoZJEIB+VmuKn3DCUcCZ\n' + \
# 'SFlQPSXSfBDiUGhwOw76WuSSsf1D4b/vL' + \
# 'oJ10wIDAQABAoIBAG/JZuSWdoVHbi56\n' + \
# 'vjgCgkjg3lkO1KrO3nrdm6nrgA9P9qaPj' + \
# 'xuKoWaKO1cBQlE1pSWp/cKncYgD5WxE\n' + \
# 'CpAnRUXG2pG4zdkzCYzAh1i+c34L6oZoH' + \
# 'sirK6oNcEnHveydfzJL5934egm6p8DW\n' + \
# '+m1RQ70yUt4uRc0YSor+q1LGJvGQHReF0' + \
# 'WmJBZHrhz5e63Pq7lE0gIwuBqL8SMaA\n' + \
# 'yRXtK+JGxZpImTq+NHvEWWCu09SCq0r83' + \
# '8ceQI55SvzmTkwqtC+8AT2zFviMZkKR\n' + \
# 'Qo6SPsrqItxZWRty2izawTF0Bf5S2VAx7' + \
# 'O+6t3wBsQ1sLptoSgX3QblELY5asI0J\n' + \
# 'YFz7LJECgYkAsqeUJmqXE3LP8tYoIjMIA' + \
# 'KiTm9o6psPlc8CrLI9CH0UbuaA2JCOM\n' + \
# 'cCNq8SyYbTqgnWlB9ZfcAm/cFpA8tYci9' + \
# 'm5vYK8HNxQr+8FS3Qo8N9RJ8d0U5Csw\n' + \
# 'DzMYfRghAfUGwmlWj5hp1pQzAuhwbOXFt' + \
# 'xKHVsMPhz1IBtF9Y8jvgqgYHLbmyiu1\n' + \
# 'mwJ5AL0pYF0G7x81prlARURwHo0Yf52kE' + \
# 'w1dxpx+JXER7hQRWQki5/NsUEtv+8RT\n' + \
# 'qn2m6qte5DXLyn83b1qRscSdnCCwKtKWU' + \
# 'ug5q2ZbwVOCJCtmRwmnP131lWRYfj67\n' + \
# 'B/xJ1ZA6X3GEf4sNReNAtaucPEelgR2ns' + \
# 'N0gKQKBiGoqHWbK1qYvBxX2X3kbPDkv\n' + \
# '9C+celgZd2PW7aGYLCHq7nPbmfDV0yHcW' + \
# 'jOhXZ8jRMjmANVR/eLQ2EfsRLdW69bn\n' + \
# 'f3ZD7JS1fwGnO3exGmHO3HZG+6AvberKY' + \
# 'VYNHahNFEw5TsAcQWDLRpkGybBcxqZo\n' + \
# '81YCqlqidwfeO5YtlO7etx1xLyqa2NsCe' + \
# 'G9A86UjG+aeNnXEIDk1PDK+EuiThIUa\n' + \
# '/2IxKzJKWl1BKr2d4xAfR0ZnEYuRrbeDQ' + \
# 'YgTImOlfW6/GuYIxKYgEKCFHFqJATAG\n' + \
# 'IxHrq1PDOiSwXd2GmVVYyEmhZnbcp8Cxa' + \
# 'EMQoevxAta0ssMK3w6UsDtvUvYvF22m\n' + \
# 'qQKBiD5GwESzsFPy3Ga0MvZpn3D6EJQLg' + \
# 'snrtUPZx+z2Ep2x0xc5orneB5fGyF1P\n' + \
# 'WtP+fG5Q6Dpdz3LRfm+KwBCWFKQjg7uTx' + \
# 'cjerhBWEYPmEMKYwTJF5PBG9/ddvHLQ\n' + \
# 'EQeNC8fHGg4UXU8mhHnSBt3EA10qQJfRD' + \
# 's15M38eG2cYwB1PZpDHScDnDA0=\n' + \
# '-----END RSA PRIVATE KEY-----'
sigInput = \
'sig1=(date); alg=rsa-sha256; keyId="test-key-b"'
sig = \
'sig1=:HtXycCl97RBVkZi66ADKnC9c5eSSlb57GnQ4KFqNZplOpNfxqk62' + \
'JzZ484jXgLvoOTRaKfR4hwyxlcyb+BWkVasApQovBSdit9Ml/YmN2IvJDPncrlhPD' + \
'VDv36Z9/DiSO+RNHD7iLXugdXo1+MGRimW1RmYdenl/ITeb7rjfLZ4b9VNnLFtVWw' + \
'rjhAiwIqeLjodVImzVc5srrk19HMZNuUejK6I3/MyN3+3U8tIRW4LWzx6ZgGZUaEE' + \
'P0aBlBkt7Fj0Tt5/P5HNW/Sa/m8smxbOHnwzAJDa10PyjzdIbywlnWIIWtZKPPsoV' + \
'oKVopUWEU3TNhpWmaVhFrUL/O6SN3w==:'
# "hs2019", using RSASSA-PSS [RFC8017] and SHA-512 [RFC6234]
# sigInput = \
# 'sig1=(*request-target, *created, host, date, ' + \
# 'cache-control, x-empty-header, x-example); keyId="test-key-a"; ' + \
# 'alg=hs2019; created=1402170695; expires=1402170995'
# sig = \
# 'sig1=:K2qGT5srn2OGbOIDzQ6kYT+ruaycnDAAUpKv+ePFfD0RAxn/1BUe' + \
# 'Zx/Kdrq32DrfakQ6bPsvB9aqZqognNT6be4olHROIkeV879RrsrObury8L9SCEibe' + \
# 'oHyqU/yCjphSmEdd7WD+zrchK57quskKwRefy2iEC5S2uAH0EPyOZKWlvbKmKu5q4' + \
# 'CaB8X/I5/+HLZLGvDiezqi6/7p2Gngf5hwZ0lSdy39vyNMaaAT0tKo6nuVw0S1MVg' + \
# '1Q7MpWYZs0soHjttq0uLIA3DIbQfLiIvK6/l0BdWTU7+2uQj7lBkQAsFZHoA96ZZg' + \
# 'FquQrXRlmYOh+Hx5D9fJkXcXe5tmAg==:'
boxpath = '/foo'
# headers = {
# "*request-target": "get " + boxpath,
# "*created": "1402170695",
# "host": "example.org",
# "date": "Tue, 07 Jun 2014 20:51:35 GMT",
# "cache-control": "max-age=60, must-revalidate",
# "x-emptyheader": "",
# "x-example": "Example header with some whitespace.",
# "x-dictionary": "b=2",
# "x-dictionary": "a=1",
# "x-list": "(a, b, c)",
# "Signature-Input": sigInput,
# "Signature": sig
# }
headers = {
"*created": "1402170695",
"*request-target": "post /foo?param=value&pet=dog",
"host": "example.com",
"date": "Tue, 07 Jun 2014 20:51:35 GMT",
"content-type": "application/json",
"digest": "SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=",
"content-length": "18",
"Signature-Input": sigInput,
"Signature": sig
}
httpPrefix = 'https'
debug = False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, False, None,
messageBodyJsonStr, debug,
True)
def _testHttpsigBase(withDigest): def _testHttpsigBase(withDigest):
print('testHttpsig(' + str(withDigest) + ')') print('testHttpsig(' + str(withDigest) + ')')
@ -3116,9 +3243,6 @@ def testValidHashTag():
def runAllTests(): def runAllTests():
print('Running tests...') print('Running tests...')
testHttpsig()
return
testFunctions() testFunctions()
testValidHashTag() testValidHashTag()
testPrepareHtmlPostNickname() testPrepareHtmlPostNickname()
@ -3156,6 +3280,7 @@ def runAllTests():
testAddEmoji() testAddEmoji()
testActorParsing() testActorParsing()
testHttpsig() testHttpsig()
testHttpSigNew()
testCache() testCache()
testThreads() testThreads()
testCreatePerson() testCreatePerson()