Merge branch 'main' of gitlab.com:bashrc2/epicyon

merge-requests/30/head
Bob Mottram 2021-11-22 23:07:05 +00:00
commit ff3fba4c79
6 changed files with 269 additions and 133 deletions

View File

@ -242,7 +242,6 @@ from like import updateLikesCollection
from reaction import updateReactionCollection from reaction import updateReactionCollection
from utils import undoReactionCollectionEntry from utils import undoReactionCollectionEntry
from utils import getNewPostEndpoints from utils import getNewPostEndpoints
from utils import malformedCiphertext
from utils import hasActor from utils import hasActor
from utils import setReplyIntervalHours from utils import setReplyIntervalHours
from utils import canReplyTo from utils import canReplyTo
@ -451,6 +450,8 @@ class PubServer(BaseHTTPRequestHandler):
# https://tools.ietf.org/html/ # https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01 # draft-ietf-httpbis-message-signatures-01
return self.headers['Signature-Input'] return self.headers['Signature-Input']
elif self.headers.get('signature-input'):
return self.headers['signature-input']
elif self.headers.get('signature'): elif self.headers.get('signature'):
# Ye olde Masto http sig # Ye olde Masto http sig
return self.headers['signature'] return self.headers['signature']
@ -728,7 +729,9 @@ class PubServer(BaseHTTPRequestHandler):
return False return False
# verify the GET request without any digest # verify the GET request without any digest
if verifyPostHeaders(self.server.httpPrefix, pubKey, self.headers, if verifyPostHeaders(self.server.httpPrefix,
self.server.domainFull,
pubKey, self.headers,
self.path, True, None, '', self.server.debug): self.path, True, None, '', self.server.debug):
return True return True
@ -1503,11 +1506,6 @@ class PubServer(BaseHTTPRequestHandler):
# save the json for later queue processing # save the json for later queue processing
messageBytesDecoded = messageBytes.decode('utf-8') messageBytesDecoded = messageBytes.decode('utf-8')
if malformedCiphertext(messageBytesDecoded):
print('WARN: post contains malformed ciphertext ' +
str(originalMessageJson))
return 4
if containsInvalidLocalLinks(messageBytesDecoded): if containsInvalidLocalLinks(messageBytesDecoded):
print('WARN: post contains invalid local links ' + print('WARN: post contains invalid local links ' +
str(originalMessageJson)) str(originalMessageJson))

View File

@ -11,7 +11,7 @@ __module_group__ = "Security"
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06 # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
# #
# This might change in future # This might change in future
# see https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures-01 # see https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import load_pem_private_key
@ -116,11 +116,11 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
path: str, path: str,
httpPrefix: str, httpPrefix: str,
messageBodyJsonStr: str, messageBodyJsonStr: str,
algorithm: str) -> (str, str): algorithm: str, debug: bool) -> (str, str):
"""Returns a raw signature strings that can be plugged into a header """Returns a raw signature strings that can be plugged into a header
as "Signature-Input" and "Signature" as "Signature-Input" and "Signature"
used to verify the authenticity of an HTTP transmission. used to verify the authenticity of an HTTP transmission.
See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures-01 See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures
""" """
domain = getFullDomain(domain, port) domain = getFullDomain(domain, port)
@ -137,18 +137,17 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
keyID = localActorUrl(httpPrefix, nickname, domain) + '#main-key' keyID = localActorUrl(httpPrefix, nickname, domain) + '#main-key'
if not messageBodyJsonStr: if not messageBodyJsonStr:
headers = { headers = {
'*request-target': f'post {path}', '@request-target': f'get {path}',
'*created': str(secondsSinceEpoch), '@created': str(secondsSinceEpoch),
'host': toDomain, 'host': toDomain,
'date': dateStr, 'date': dateStr
'content-type': 'application/json'
} }
else: else:
bodyDigest = messageContentDigest(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr) contentLength = len(messageBodyJsonStr)
headers = { headers = {
'*request-target': f'post {path}', '@request-target': f'post {path}',
'*created': str(secondsSinceEpoch), '@created': str(secondsSinceEpoch),
'host': toDomain, 'host': toDomain,
'date': dateStr, 'date': dateStr,
'digest': f'SHA-256={bodyDigest}', 'digest': f'SHA-256={bodyDigest}',
@ -164,6 +163,10 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
signedHeaderText = signedHeaderText.strip() signedHeaderText = signedHeaderText.strip()
if debug:
print('\nsignPostHeadersNew signedHeaderText:\n' +
signedHeaderText + '\nEND\n')
# Sign the digest. Potentially other signing algorithms can be added here. # Sign the digest. Potentially other signing algorithms can be added here.
signature = '' signature = ''
if algorithm == 'rsa-sha512': if algorithm == 'rsa-sha512':
@ -269,7 +272,8 @@ def _verifyRecentSignature(signedDateStr: str) -> bool:
return True return True
def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, def verifyPostHeaders(httpPrefix: str,
publicKeyPem: str, headers: dict,
path: str, GETmethod: bool, path: str, GETmethod: bool,
messageBodyDigest: str, messageBodyDigest: str,
messageBodyJsonStr: str, debug: bool, messageBodyJsonStr: str, debug: bool,
@ -298,8 +302,11 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'), pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend()) backend=default_backend())
# Build a dictionary of the signature values # Build a dictionary of the signature values
if headers.get('Signature-Input') or headers.get('signature-input'):
if headers.get('Signature-Input'): if headers.get('Signature-Input'):
signatureHeader = headers['Signature-Input'] signatureHeader = headers['Signature-Input']
else:
signatureHeader = headers['signature-input']
fieldSep2 = ',' fieldSep2 = ','
# split the signature input into separate fields # split the signature input into separate fields
signatureDict = { signatureDict = {
@ -312,7 +319,8 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
if v.startswith('('): if v.startswith('('):
requestTargetKey = k requestTargetKey = k
requestTargetStr = v[1:-1] requestTargetStr = v[1:-1]
break elif v.startswith('"'):
signatureDict[k] = v[1:-1]
if not requestTargetKey: if not requestTargetKey:
return False return False
signatureDict[requestTargetKey] = requestTargetStr signatureDict[requestTargetKey] = requestTargetStr
@ -341,19 +349,40 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# original Mastodon http signature # original Mastodon http signature
appendStr = f'(request-target): {method.lower()} {path}' appendStr = f'(request-target): {method.lower()} {path}'
signedHeaderList.append(appendStr) signedHeaderList.append(appendStr)
elif '*request-target' in signedHeader: elif '@request-target' in signedHeader:
# https://tools.ietf.org/html/ # https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01 # draft-ietf-httpbis-message-signatures
appendStr = f'*request-target: {method.lower()} {path}' appendStr = f'@request-target: {method.lower()} {path}'
# remove () signedHeaderList.append(appendStr)
# if appendStr.startswith('('): elif '@created' in signedHeader:
# appendStr = appendStr.split('(')[1] if signatureDict.get('created'):
# if ')' in appendStr: createdStr = str(signatureDict['created'])
# appendStr = appendStr.split(')')[0] appendStr = f'@created: {createdStr}'
signedHeaderList.append(appendStr)
elif '@expires' in signedHeader:
if signatureDict.get('expires'):
expiresStr = str(signatureDict['expires'])
appendStr = f'@expires: {expiresStr}'
signedHeaderList.append(appendStr)
elif '@method' in signedHeader:
appendStr = f'@expires: {method}'
signedHeaderList.append(appendStr)
elif '@scheme' in signedHeader:
signedHeaderList.append('@scheme: http')
elif '@authority' in signedHeader:
authorityStr = None
if signatureDict.get('authority'):
authorityStr = str(signatureDict['authority'])
elif signatureDict.get('Authority'):
authorityStr = str(signatureDict['Authority'])
if authorityStr:
appendStr = f'@authority: {authorityStr}'
signedHeaderList.append(appendStr) signedHeaderList.append(appendStr)
elif signedHeader == 'algorithm': elif signedHeader == 'algorithm':
if headers.get(signedHeader): if headers.get(signedHeader):
algorithm = headers[signedHeader] algorithm = headers[signedHeader]
if debug:
print('http signature algorithm: ' + algorithm)
elif signedHeader == 'digest': elif signedHeader == 'digest':
if messageBodyDigest: if messageBodyDigest:
bodyDigest = messageBodyDigest bodyDigest = messageBodyDigest
@ -427,14 +456,18 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
# Now we have our header data digest # Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList) signedHeaderText = '\n'.join(signedHeaderList)
if debug: if debug:
print('signedHeaderText:\n' + signedHeaderText + 'END') print('\nverifyPostHeaders signedHeaderText:\n' +
signedHeaderText + '\nEND\n')
# Get the signature, verify with public key, return result # Get the signature, verify with public key, return result
signature = None if (headers.get('Signature-Input') and headers.get('Signature')) or \
if headers.get('Signature-Input') and headers.get('Signature'): (headers.get('signature-input') and headers.get('signature')):
# https://tools.ietf.org/html/ # https://tools.ietf.org/html/
# draft-ietf-httpbis-message-signatures-01 # draft-ietf-httpbis-message-signatures
if headers.get('Signature'):
headersSig = headers['Signature'] headersSig = headers['Signature']
else:
headersSig = headers['signature']
# remove sig1=: # remove sig1=:
if requestTargetKey + '=:' in headersSig: if requestTargetKey + '=:' in headersSig:
headersSig = headersSig.split(requestTargetKey + '=:')[1] headersSig = headersSig.split(requestTargetKey + '=:')[1]
@ -442,25 +475,36 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
signature = base64.b64decode(headersSig) signature = base64.b64decode(headersSig)
else: else:
# Original Mastodon signature # Original Mastodon signature
signature = base64.b64decode(signatureDict['signature']) headersSig = signatureDict['signature']
signature = base64.b64decode(headersSig)
if debug: if debug:
print('signature: ' + algorithm + ' ' + print('signature: ' + algorithm + ' ' + headersSig)
signatureDict['signature'])
# log unusual signing algorithms
if signatureDict.get('alg'):
print('http signature algorithm: ' + signatureDict['alg'])
# If extra signing algorithms need to be added then do it here # If extra signing algorithms need to be added then do it here
if algorithm == 'rsa-sha256': if not signatureDict.get('alg'):
headerDigest = getSHA256(signedHeaderText.encode('ascii'))
paddingStr = padding.PKCS1v15()
alg = hazutils.Prehashed(hashes.SHA256()) alg = hazutils.Prehashed(hashes.SHA256())
elif algorithm == 'rsa-sha512': elif (signatureDict['alg'] == 'rsa-sha256' or
headerDigest = getSHA512(signedHeaderText.encode('ascii')) signatureDict['alg'] == 'rsa-v1_5-sha256' or
paddingStr = padding.PKCS1v15() signatureDict['alg'] == 'hs2019'):
alg = hazutils.Prehashed(hashes.SHA256())
elif (signatureDict['alg'] == 'rsa-sha512' or
signatureDict['alg'] == 'rsa-pss-sha512'):
alg = hazutils.Prehashed(hashes.SHA512()) alg = hazutils.Prehashed(hashes.SHA512())
else: else:
print('Unknown http signature algorithm: ' + algorithm)
paddingStr = padding.PKCS1v15()
alg = hazutils.Prehashed(hashes.SHA256()) alg = hazutils.Prehashed(hashes.SHA256())
if algorithm == 'rsa-sha256' or algorithm == 'hs2019':
headerDigest = getSHA256(signedHeaderText.encode('ascii'))
elif algorithm == 'rsa-sha512':
headerDigest = getSHA512(signedHeaderText.encode('ascii'))
else:
print('Unknown http signature algorithm: ' + algorithm)
headerDigest = '' headerDigest = ''
paddingStr = padding.PKCS1v15()
try: try:
pubkey.verify(signature, headerDigest, paddingStr, alg) pubkey.verify(signature, headerDigest, paddingStr, alg)

View File

@ -17,6 +17,7 @@ from languages import understoodPostLanguage
from like import updateLikesCollection from like import updateLikesCollection
from reaction import updateReactionCollection from reaction import updateReactionCollection
from reaction import validEmojiContent from reaction import validEmojiContent
from utils import invalidCiphertext
from utils import removeHtml from utils import removeHtml
from utils import fileLastModified from utils import fileLastModified
from utils import hasObjectString from utils import hasObjectString
@ -2258,6 +2259,9 @@ def _validPostContent(baseDir: str, nickname: str, domain: str,
print('REJECT: reply to post which does not ' + print('REJECT: reply to post which does not ' +
'allow comments: ' + originalPostId) 'allow comments: ' + originalPostId)
return False return False
if invalidCiphertext(messageJson['object']['content']):
print('REJECT: malformed ciphertext in content')
return False
if debug: if debug:
print('ACCEPT: post content is valid') print('ACCEPT: post content is valid')
return True return True

View File

@ -32,6 +32,7 @@ from webfinger import webfingerHandle
from httpsig import createSignedHeader from httpsig import createSignedHeader
from siteactive import siteIsActive from siteactive import siteIsActive
from languages import understoodPostLanguage from languages import understoodPostLanguage
from utils import invalidCiphertext
from utils import hasObjectStringType from utils import hasObjectStringType
from utils import removeIdEnding from utils import removeIdEnding
from utils import replaceUsersWithAt from utils import replaceUsersWithAt
@ -4602,6 +4603,14 @@ def downloadAnnounce(session, baseDir: str, httpPrefix: str,
recentPostsCache) recentPostsCache)
return None return None
if invalidCiphertext(contentStr):
_rejectAnnounce(announceFilename,
baseDir, nickname, domain, postId,
recentPostsCache)
print('WARN: Invalid ciphertext within announce ' +
str(announcedJson))
return None
# remove any long words # remove any long words
contentStr = removeLongWords(contentStr, 40, []) contentStr = removeLongWords(contentStr, 40, [])

253
tests.py
View File

@ -392,8 +392,20 @@ def _testSignAndVerify() -> None:
def _testHttpSigNew(): def _testHttpSigNew():
print('testHttpSigNew') print('testHttpSigNew')
httpPrefix = 'https'
port = 443
debug = True
messageBodyJson = {"hello": "world"} messageBodyJson = {"hello": "world"}
messageBodyJsonStr = json.dumps(messageBodyJson) messageBodyJsonStr = json.dumps(messageBodyJson)
nickname = 'foo'
pathStr = "/" + nickname + "?param=value&pet=dog HTTP/1.1"
domain = 'example.com'
dateStr = 'Tue, 20 Apr 2021 02:07:55 GMT'
digestStr = 'SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE='
bodyDigest = messageContentDigest(messageBodyJsonStr)
assert bodyDigest in digestStr
contentLength = 18
contentType = 'application/activity+json'
publicKeyPem = \ publicKeyPem = \
'-----BEGIN RSA PUBLIC KEY-----\n' + \ '-----BEGIN RSA PUBLIC KEY-----\n' + \
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \ 'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
@ -462,101 +474,49 @@ def _testHttpSigNew():
'EQeNC8fHGg4UXU8mhHnSBt3EA10qQJfRD' + \ 'EQeNC8fHGg4UXU8mhHnSBt3EA10qQJfRD' + \
's15M38eG2cYwB1PZpDHScDnDA0=\n' + \ 's15M38eG2cYwB1PZpDHScDnDA0=\n' + \
'-----END RSA PRIVATE KEY-----' '-----END RSA PRIVATE KEY-----'
sigInput = \
'sig1=(date); alg=rsa-sha256; keyId="test-key-b"'
sig = \
'sig1=:HtXycCl97RBVkZi66ADKnC9c5eSSlb57GnQ4KFqNZplOpNfxqk62' + \
'JzZ484jXgLvoOTRaKfR4hwyxlcyb+BWkVasApQovBSdit9Ml/YmN2IvJDPncrlhPD' + \
'VDv36Z9/DiSO+RNHD7iLXugdXo1+MGRimW1RmYdenl/ITeb7rjfLZ4b9VNnLFtVWw' + \
'rjhAiwIqeLjodVImzVc5srrk19HMZNuUejK6I3/MyN3+3U8tIRW4LWzx6ZgGZUaEE' + \
'P0aBlBkt7Fj0Tt5/P5HNW/Sa/m8smxbOHnwzAJDa10PyjzdIbywlnWIIWtZKPPsoV' + \
'oKVopUWEU3TNhpWmaVhFrUL/O6SN3w==:'
# "hs2019", using RSASSA-PSS [RFC8017] and SHA-512 [RFC6234]
# sigInput = \
# 'sig1=(*request-target, *created, host, date, ' + \
# 'cache-control, x-empty-header, x-example); keyId="test-key-a"; ' + \
# 'alg=hs2019; created=1402170695; expires=1402170995'
# sig = \
# 'sig1=:K2qGT5srn2OGbOIDzQ6kYT+ruaycnDAAUpKv+ePFfD0RAxn/1BUe' + \
# 'Zx/Kdrq32DrfakQ6bPsvB9aqZqognNT6be4olHROIkeV879RrsrObury8L9SCEibe' + \
# 'oHyqU/yCjphSmEdd7WD+zrchK57quskKwRefy2iEC5S2uAH0EPyOZKWlvbKmKu5q4' + \
# 'CaB8X/I5/+HLZLGvDiezqi6/7p2Gngf5hwZ0lSdy39vyNMaaAT0tKo6nuVw0S1MVg' + \
# '1Q7MpWYZs0soHjttq0uLIA3DIbQfLiIvK6/l0BdWTU7+2uQj7lBkQAsFZHoA96ZZg' + \
# 'FquQrXRlmYOh+Hx5D9fJkXcXe5tmAg==:'
nickname = 'foo'
boxpath = '/' + nickname
# headers = {
# "*request-target": "get " + boxpath,
# "*created": "1402170695",
# "host": "example.org",
# "date": "Tue, 07 Jun 2014 20:51:35 GMT",
# "cache-control": "max-age=60, must-revalidate",
# "x-emptyheader": "",
# "x-example": "Example header with some whitespace.",
# "x-dictionary": "b=2",
# "x-dictionary": "a=1",
# "x-list": "(a, b, c)",
# "Signature-Input": sigInput,
# "Signature": sig
# }
dateStr = "Tue, 07 Jun 2014 20:51:35 GMT"
secondsSinceEpoch = 1402174295
domain = "example.com"
port = 443
headers = {
"*created": str(secondsSinceEpoch),
"*request-target": "post /foo?param=value&pet=dog",
"host": domain,
"date": dateStr,
"content-type": "application/json",
"digest": "SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=",
"content-length": "18",
"Signature-Input": sigInput,
"Signature": sig
}
httpPrefix = 'https'
debug = False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, False, None,
messageBodyJsonStr, debug, True)
# make a deliberate mistake
headers['Signature'] = headers['Signature'].replace('V', 'B')
assert not verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, False, None,
messageBodyJsonStr, debug, True)
# test signing
bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr)
headers = { headers = {
"host": domain, "host": domain,
"date": dateStr, "date": dateStr,
"digest": f'SHA-256={bodyDigest}', "digest": f'SHA-256={bodyDigest}',
"content-type": "application/json", "content-type": contentType,
"content-length": str(contentLength) "content-length": str(contentLength)
} }
signatureIndexHeader, signatureHeader = \ signatureIndexHeader, signatureHeader = \
signPostHeadersNew(dateStr, privateKeyPem, nickname, signPostHeadersNew(dateStr, privateKeyPem, nickname,
domain, port, domain, port,
domain, port, domain, port,
boxpath, httpPrefix, messageBodyJsonStr, pathStr, httpPrefix, messageBodyJsonStr,
'rsa-sha256') 'rsa-sha256', debug)
expectedIndexHeader = \ print('signatureIndexHeader1: ' + str(signatureIndexHeader))
'keyId="https://example.com/users/foo#main-key"; ' + \ print('signatureHeader1: ' + str(signatureHeader))
'alg=hs2019; created=' + str(secondsSinceEpoch) + '; ' + \ sigInput = "keyId=\"https://example.com/users/foo#main-key\"; " + \
'sig1=(*request-target, *created, host, date, ' + \ "alg=hs2019; created=1618884475; " + \
'digest, content-type, content-length)' "sig1=(@request-target, @created, host, date, digest, " + \
if signatureIndexHeader != expectedIndexHeader: "content-type, content-length)"
print('Unexpected new http header: ' + signatureIndexHeader) assert signatureIndexHeader == sigInput
print('Should be: ' + expectedIndexHeader) sig = "sig1=:NXAQ7AtDMR2iwhmH1qCwiZw5PVTjOw5+5kSu0Tsx/3gqz0D" + \
assert signatureIndexHeader == expectedIndexHeader "py7OQbWqFHrNB7MmS4TukX/vDyQOFdElY5yxnEhbgRwKACq0AP4QH9H" + \
assert signatureHeader == \ "CiRyCE8UXDdAkY4VUd6jrWjRHKRoqQN7I+Q5tb2Fu5cDfifw/PQc86Z" + \
'sig1=:euX3O1KSTYXN9/oR2qFezswWm9FbrjtRymK7xBpXNQvTs' + \ "NmMhPrg3OjUJ9Q2Gj29NhgJ+4el1ECg0cAy4yG1M9AQ3KvQooQFvlg1" + \
'XehtrNdD8nELZKzPXMvMz7PaJd6V+fjzpHoZ9upTdqqQLK2Iwml' + \ "vp0H2xfbJQjv8FsR/lKiRdaVHqGR2CKrvxvPRPaOsFANp2wzEtiMk3O" + \
'p4BlHqW6Aopd7sZFCWFq7/Amm5oaizpp3e0jb5XISS5m3cRKuoi' + \ "TrBTYU+Zb53mIspfEeLxsNtcGmBDmQKZ9Pud8f99XGJrP+uDd3zKtnr" + \
'LM0x+OudmAoYGi0TEEJk8bpnJAXfVCDfmOyL3XNqQeShQHeOANG' + \ "f3fUnRRqy37yhB7WVwkg==:"
'okiKktj8ff+KLYLaPTAJkob1k/EhoPIkbw/YzAY8IZjWQNMkf+F' + \ assert signatureHeader == sig
'JChApQ5HnDCQPwD5xV9eGzBpAf6D0G19xiTmQye4Hn6tAs3fy3V' + \
'/aYa/GhW2pSrctDnAKIi4imj9joppr3CB8gqgXZOPQ==:' debug = True
headers['path'] = pathStr
headers['signature'] = sig
headers['signature-input'] = sigInput
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
pathStr, False, None,
messageBodyJsonStr, debug, True)
# make a deliberate mistake
debug = False
headers['signature'] = headers['signature'].replace('V', 'B')
assert not verifyPostHeaders(httpPrefix, publicKeyPem, headers,
pathStr, False, None,
messageBodyJsonStr, debug, True)
def _testHttpsigBase(withDigest: bool, baseDir: str): def _testHttpsigBase(withDigest: bool, baseDir: str):
@ -623,9 +583,10 @@ def _testHttpsigBase(withDigest: bool, baseDir: str):
headers['signature'] = signatureHeader headers['signature'] = signatureHeader
GETmethod = not withDigest GETmethod = not withDigest
debug = True
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, None, boxpath, GETmethod, None,
messageBodyJsonStr, False) messageBodyJsonStr, debug)
if withDigest: if withDigest:
# everything correct except for content-length # everything correct except for content-length
headers['content-length'] = str(contentLength + 2) headers['content-length'] = str(contentLength + 2)
@ -5919,6 +5880,124 @@ def _testValidEmojiContent() -> None:
assert validEmojiContent('😄') assert validEmojiContent('😄')
def _testHttpsigBaseNew(withDigest: bool, baseDir: str):
print('testHttpsigNew(' + str(withDigest) + ')')
debug = True
path = baseDir + '/.testHttpsigBaseNew'
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=False, onerror=None)
os.mkdir(path)
os.chdir(path)
contentType = 'application/activity+json'
nickname = 'socrates'
hostDomain = 'someother.instance'
domain = 'argumentative.social'
httpPrefix = 'https'
port = 5576
password = 'SuperSecretPassword'
privateKeyPem, publicKeyPem, person, wfEndpoint = \
createPerson(path, nickname, domain, port, httpPrefix,
False, False, password)
assert privateKeyPem
if withDigest:
messageBodyJson = {
"a key": "a value",
"another key": "A string",
"yet another key": "Another string"
}
messageBodyJsonStr = json.dumps(messageBodyJson)
else:
messageBodyJsonStr = ''
headersDomain = getFullDomain(hostDomain, port)
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
boxpath = '/inbox'
if not withDigest:
headers = {
'host': headersDomain,
'date': dateStr,
'accept': contentType
}
signatureIndexHeader, signatureHeader = \
signPostHeadersNew(dateStr, privateKeyPem, nickname,
domain, port,
hostDomain, port,
boxpath, httpPrefix, messageBodyJsonStr,
'rsa-sha256', debug)
else:
bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr)
headers = {
'host': headersDomain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
signatureIndexHeader, signatureHeader = \
signPostHeadersNew(dateStr, privateKeyPem, nickname,
domain, port,
hostDomain, port,
boxpath, httpPrefix, messageBodyJsonStr,
'rsa-sha256', debug)
headers['signature'] = signatureHeader
headers['signature-input'] = signatureIndexHeader
print('headers: ' + str(headers))
GETmethod = not withDigest
debug = True
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, None,
messageBodyJsonStr, debug)
debug = False
if withDigest:
# everything correct except for content-length
headers['content-length'] = str(contentLength + 2)
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, None,
messageBodyJsonStr, debug) is False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
'/parambulator' + boxpath, GETmethod, None,
messageBodyJsonStr, debug) is False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, not GETmethod, None,
messageBodyJsonStr, debug) is False
if not withDigest:
# fake domain
headers = {
'host': 'bogon.domain',
'date': dateStr,
'content-type': contentType
}
else:
# correct domain but fake message
messageBodyJsonStr = \
'{"a key": "a value", "another key": "Fake GNUs", ' + \
'"yet another key": "More Fake GNUs"}'
contentLength = len(messageBodyJsonStr)
bodyDigest = messageContentDigest(messageBodyJsonStr)
headers = {
'host': domain,
'date': dateStr,
'digest': f'SHA-256={bodyDigest}',
'content-type': contentType,
'content-length': str(contentLength)
}
headers['signature'] = signatureHeader
headers['signature-input'] = signatureIndexHeader
pprint(headers)
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, not GETmethod, None,
messageBodyJsonStr, False) is False
os.chdir(baseDir)
shutil.rmtree(path, ignore_errors=False, onerror=None)
def runAllTests(): def runAllTests():
baseDir = os.getcwd() baseDir = os.getcwd()
print('Running tests...') print('Running tests...')
@ -5990,6 +6069,8 @@ def runAllTests():
_testHttpsig(baseDir) _testHttpsig(baseDir)
_testHttpSignedGET(baseDir) _testHttpSignedGET(baseDir)
_testHttpSigNew() _testHttpSigNew()
_testHttpsigBaseNew(True, baseDir)
_testHttpsigBaseNew(False, baseDir)
_testCache() _testCache()
_testThreads() _testThreads()
_testCreatePerson(baseDir) _testCreatePerson(baseDir)

View File

@ -2653,8 +2653,8 @@ def isPGPEncrypted(content: str) -> bool:
return False return False
def malformedCiphertext(content: str) -> bool: def invalidCiphertext(content: str) -> bool:
"""Returns true if the given content contains a malformed key """Returns true if the given content contains an invalid key
""" """
if '----BEGIN ' in content or '----END ' in content: if '----BEGIN ' in content or '----END ' in content:
if not containsPGPPublicKey(content) and \ if not containsPGPPublicKey(content) and \