Specify signing algorithm in tests

merge-requests/30/head
Bob Mottram 2021-11-23 11:41:40 +00:00
parent e732061e07
commit 1b9277e323
3 changed files with 94 additions and 28 deletions

View File

@ -28,12 +28,41 @@ from utils import getSHA512
from utils import localActorUrl
def messageContentDigest(messageBodyJsonStr: str) -> str:
def messageContentDigest(messageBodyJsonStr: str, digestAlgorithm: str) -> str:
"""Returns the digest for the message body
"""
msg = messageBodyJsonStr.encode('utf-8')
hashResult = getSHA256(msg)
if digestAlgorithm == 'rsa-sha512' or \
digestAlgorithm == 'rsa-pss-sha512':
hashResult = getSHA512(msg)
else:
hashResult = getSHA256(msg)
return base64.b64encode(hashResult).decode('utf-8')
def getDigestPrefix(digestAlgorithm: str) -> str:
"""Returns the prefix for the message body digest
"""
if digestAlgorithm == 'rsa-sha512' or \
digestAlgorithm == 'rsa-pss-sha512':
return 'SHA-512'
return 'SHA-256'
def getDigestAlgorithmFromHeaders(httpHeaders: {}) -> str:
"""Returns the digest algorithm from http headers
"""
digestStr = None
if httpHeaders.get('digest'):
digestStr = httpHeaders['digest']
elif httpHeaders.get('Digest'):
digestStr = httpHeaders['Digest']
if digestStr:
if digestStr.startswith('SHA-512'):
return 'rsa-sha512'
return 'rsa-sha256'
def signPostHeaders(dateStr: str, privateKeyPem: str,
nickname: str,
domain: str, port: int,
@ -41,10 +70,15 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
path: str,
httpPrefix: str,
messageBodyJsonStr: str,
contentType: str) -> str:
contentType: str,
algorithm: str) -> str:
"""Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission.
"""
# it is assumed that the hash used for the digest will be the same
# as for the signature
digestAlgorithm = algorithm
domain = getFullDomain(domain, port)
toDomain = getFullDomain(toDomain, toPort)
@ -65,13 +99,15 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
'accept': contentType
}
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
bodyDigest = \
messageContentDigest(messageBodyJsonStr, digestAlgorithm)
digestPrefix = getDigestPrefix(digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'(request-target)': f'post {path}',
'host': toDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': 'application/activity+json',
'content-length': str(contentLength)
}
@ -100,7 +136,7 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
# Put it into a valid HTTP signature format
signatureDict = {
'keyId': keyID,
'algorithm': 'rsa-sha256',
'algorithm': algorithm,
'headers': ' '.join(signedHeaderKeys),
'signature': signature
}
@ -122,6 +158,10 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
used to verify the authenticity of an HTTP transmission.
See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures
"""
# it is assumed that the hash used for the digest will be the same
# as for the signature
digestAlgorithm = algorithm
domain = getFullDomain(domain, port)
toDomain = getFullDomain(toDomain, toPort)
@ -143,14 +183,15 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
'date': dateStr
}
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
digestPrefix = getDigestPrefix(digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'@request-target': f'post {path}',
'@created': str(secondsSinceEpoch),
'host': toDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': 'application/activity+json',
'content-length': str(contentLength)
}
@ -210,6 +251,8 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
contentType: str) -> {}:
"""Note that the domain is the destination, not the sender
"""
algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256'
headerDomain = getFullDomain(toDomain, toPort)
# if no date is given then create one
@ -230,15 +273,17 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port, toDomain, toPort,
path, httpPrefix, None, contentType)
path, httpPrefix, None, contentType,
algorithm)
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
digestPrefix = getDigestPrefix(digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'(request-target)': f'post {path}',
'host': headerDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'digest': f'{digestPrefix}={bodyDigest}',
'content-length': str(contentLength),
'content-type': contentType
}
@ -247,7 +292,7 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
domain, port,
toDomain, toPort,
path, httpPrefix, messageBodyJsonStr,
contentType)
contentType, algorithm)
headers['signature'] = signatureHeader
return headers
@ -341,6 +386,7 @@ def verifyPostHeaders(httpPrefix: str,
# body (if a digest was included)
signedHeaderList = []
algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256'
for signedHeader in signatureDict[requestTargetKey].split(fieldSep2):
signedHeader = signedHeader.strip()
if debug:
@ -387,7 +433,8 @@ def verifyPostHeaders(httpPrefix: str,
if messageBodyDigest:
bodyDigest = messageBodyDigest
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
bodyDigest = \
messageContentDigest(messageBodyJsonStr, digestAlgorithm)
signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
elif signedHeader == 'content-length':
if headers.get(signedHeader):

View File

@ -60,6 +60,7 @@ from utils import localActorUrl
from utils import hasObjectStringType
from categories import getHashtagCategories
from categories import setHashtagCategory
from httpsig import getDigestAlgorithmFromHeaders
from httpsig import verifyPostHeaders
from session import createSession
from follow import followerApprovalActive
@ -549,7 +550,8 @@ def savePostToInboxQueue(baseDir: str, httpPrefix: str,
sharedInboxItem = True
digestStartTime = time.time()
digest = messageContentDigest(messageBytes)
digestAlgorithm = getDigestAlgorithmFromHeaders(httpHeaders)
digest = messageContentDigest(messageBytes, digestAlgorithm)
timeDiffStr = str(int((time.time() - digestStartTime) * 1000))
if debug:
while len(timeDiffStr) < 6:

View File

@ -23,6 +23,8 @@ from shutil import copyfile
from random import randint
from time import gmtime, strftime
from pprint import pprint
from httpsig import getDigestAlgorithmFromHeaders
from httpsig import getDigestPrefix
from httpsig import createSignedHeader
from httpsig import signPostHeaders
from httpsig import signPostHeadersNew
@ -401,8 +403,11 @@ def _testHttpSigNew():
pathStr = "/" + nickname + "?param=value&pet=dog HTTP/1.1"
domain = 'example.com'
dateStr = 'Tue, 20 Apr 2021 02:07:55 GMT'
digestStr = 'SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE='
bodyDigest = messageContentDigest(messageBodyJsonStr)
algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256'
digestPrefix = getDigestPrefix(digestAlgorithm)
digestStr = digestPrefix + '=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE='
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
assert bodyDigest in digestStr
contentLength = 18
contentType = 'application/activity+json'
@ -477,7 +482,7 @@ def _testHttpSigNew():
headers = {
"host": domain,
"date": dateStr,
"digest": f'SHA-256={bodyDigest}',
"digest": f'{digestPrefix}={bodyDigest}',
"content-type": contentType,
"content-length": str(contentLength)
}
@ -486,7 +491,7 @@ def _testHttpSigNew():
domain, port,
domain, port,
pathStr, httpPrefix, messageBodyJsonStr,
'rsa-sha256', debug)
algorithm, debug)
print('signatureIndexHeader1: ' + str(signatureIndexHeader))
print('signatureHeader1: ' + str(signatureHeader))
sigInput = "keyId=\"https://example.com/users/foo#main-key\"; " + \
@ -528,6 +533,8 @@ def _testHttpsigBase(withDigest: bool, baseDir: str):
os.mkdir(path)
os.chdir(path)
algorithm = 'rsa-sha256'
digestAlgorithm = 'rsa-sha256'
contentType = 'application/activity+json'
nickname = 'socrates'
hostDomain = 'someother.instance'
@ -563,23 +570,26 @@ def _testHttpsigBase(withDigest: bool, baseDir: str):
signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port,
hostDomain, port,
boxpath, httpPrefix, None, contentType)
boxpath, httpPrefix, None, contentType,
algorithm)
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
digestPrefix = getDigestPrefix(digestAlgorithm)
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'host': headersDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port,
hostDomain, port,
boxpath, httpPrefix, messageBodyJsonStr,
contentType)
contentType, algorithm)
headers['signature'] = signatureHeader
GETmethod = not withDigest
@ -612,14 +622,16 @@ def _testHttpsigBase(withDigest: bool, baseDir: str):
'{"a key": "a value", "another key": "Fake GNUs", ' + \
'"yet another key": "More Fake GNUs"}'
contentLength = len(messageBodyJsonStr)
bodyDigest = messageContentDigest(messageBodyJsonStr)
digestPrefix = getDigestPrefix(digestAlgorithm)
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
headers = {
'host': domain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
headers['signature'] = signatureHeader
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, not GETmethod, None,
@ -5891,6 +5903,7 @@ def _testHttpsigBaseNew(withDigest: bool, baseDir: str,
os.mkdir(path)
os.chdir(path)
digestAlgorithm = algorithm
contentType = 'application/activity+json'
nickname = 'socrates'
hostDomain = 'someother.instance'
@ -5929,15 +5942,17 @@ def _testHttpsigBaseNew(withDigest: bool, baseDir: str,
boxpath, httpPrefix, messageBodyJsonStr,
algorithm, debug)
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
digestPrefix = getDigestPrefix(digestAlgorithm)
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
contentLength = len(messageBodyJsonStr)
headers = {
'host': headersDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
signatureIndexHeader, signatureHeader = \
signPostHeadersNew(dateStr, privateKeyPem, nickname,
domain, port,
@ -5980,14 +5995,16 @@ def _testHttpsigBaseNew(withDigest: bool, baseDir: str,
'{"a key": "a value", "another key": "Fake GNUs", ' + \
'"yet another key": "More Fake GNUs"}'
contentLength = len(messageBodyJsonStr)
bodyDigest = messageContentDigest(messageBodyJsonStr)
digestPrefix = getDigestPrefix(digestAlgorithm)
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
headers = {
'host': domain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'digest': f'{digestPrefix}={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
headers['signature'] = signatureHeader
headers['signature-input'] = signatureIndexHeader
pprint(headers)