Move json signature check to its own function

main
Bob Mottram 2021-06-07 15:07:15 +01:00
parent 9ac74f9aba
commit 38beac7f7e
1 changed files with 64 additions and 51 deletions

115
inbox.py
View File

@ -2780,6 +2780,67 @@ def _inboxQuotaExceeded(queue: {}, queueFilename: str,
return False return False
def _checkJsonSignature(baseDir: str, queueJson: {}) -> (bool, bool):
"""check if a json signature exists on this post
"""
hasJsonSignature = False
jwebsigType = None
originalJson = queueJson['original']
if not originalJson.get('@context') or \
not originalJson.get('signature'):
return hasJsonSignature, jwebsigType
if not isinstance(originalJson['signature'], dict):
return hasJsonSignature, jwebsigType
# see https://tools.ietf.org/html/rfc7515
jwebsig = originalJson['signature']
# signature exists and is of the expected type
if not jwebsig.get('type') or \
not jwebsig.get('signatureValue'):
return hasJsonSignature, jwebsigType
jwebsigType = jwebsig['type']
if jwebsigType == 'RsaSignature2017':
if hasValidContext(originalJson):
hasJsonSignature = True
else:
unknownContextsFile = \
baseDir + '/accounts/unknownContexts.txt'
unknownContext = str(originalJson['@context'])
print('unrecognized @context: ' +
unknownContext)
alreadyUnknown = False
if os.path.isfile(unknownContextsFile):
if unknownContext in \
open(unknownContextsFile).read():
alreadyUnknown = True
if not alreadyUnknown:
unknownFile = open(unknownContextsFile, "a+")
if unknownFile:
unknownFile.write(unknownContext + '\n')
unknownFile.close()
else:
print('Unrecognized jsonld signature type: ' +
jwebsigType)
unknownSignaturesFile = \
baseDir + '/accounts/unknownJsonSignatures.txt'
alreadyUnknown = False
if os.path.isfile(unknownSignaturesFile):
if jwebsigType in \
open(unknownSignaturesFile).read():
alreadyUnknown = True
if not alreadyUnknown:
unknownFile = open(unknownSignaturesFile, "a+")
if unknownFile:
unknownFile.write(jwebsigType + '\n')
unknownFile.close()
return hasJsonSignature, jwebsigType
def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int, def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int,
projectVersion: str, projectVersion: str,
baseDir: str, httpPrefix: str, sendThreads: [], postLog: [], baseDir: str, httpPrefix: str, sendThreads: [], postLog: [],
@ -2987,57 +3048,7 @@ def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int,
print('DEBUG: http header signature check success') print('DEBUG: http header signature check success')
# check if a json signature exists on this post # check if a json signature exists on this post
hasJsonSignature = False hasJsonSignature, jwebsigType = _checkJsonSignature(baseDir, queueJson)
jwebsigType = None
originalJson = queueJson['original']
if originalJson.get('@context') and \
originalJson.get('signature'):
if isinstance(originalJson['signature'], dict):
# see https://tools.ietf.org/html/rfc7515
jwebsig = originalJson['signature']
# signature exists and is of the expected type
if jwebsig.get('type') and jwebsig.get('signatureValue'):
jwebsigType = jwebsig['type']
if jwebsigType == 'RsaSignature2017':
if hasValidContext(originalJson):
hasJsonSignature = True
else:
unknownContextsFile = \
baseDir + '/accounts/unknownContexts.txt'
unknownContext = str(originalJson['@context'])
print('unrecognized @context: ' +
unknownContext)
alreadyUnknown = False
if os.path.isfile(unknownContextsFile):
if unknownContext in \
open(unknownContextsFile).read():
alreadyUnknown = True
if not alreadyUnknown:
unknownFile = open(unknownContextsFile, "a+")
if unknownFile:
unknownFile.write(unknownContext + '\n')
unknownFile.close()
else:
print('Unrecognized jsonld signature type: ' +
jwebsigType)
unknownSignaturesFile = \
baseDir + '/accounts/unknownJsonSignatures.txt'
alreadyUnknown = False
if os.path.isfile(unknownSignaturesFile):
if jwebsigType in \
open(unknownSignaturesFile).read():
alreadyUnknown = True
if not alreadyUnknown:
unknownFile = open(unknownSignaturesFile, "a+")
if unknownFile:
unknownFile.write(jwebsigType + '\n')
unknownFile.close()
# strict enforcement of json signatures # strict enforcement of json signatures
if not hasJsonSignature: if not hasJsonSignature:
@ -3053,6 +3064,7 @@ def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int,
pprint(queueJson['httpHeaders']) pprint(queueJson['httpHeaders'])
if verifyAllSignatures: if verifyAllSignatures:
originalJson = queueJson['original']
print('Queue: inbox post does not have a jsonld signature ' + print('Queue: inbox post does not have a jsonld signature ' +
keyId + ' ' + str(originalJson)) keyId + ' ' + str(originalJson))
@ -3066,6 +3078,7 @@ def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int,
if httpSignatureFailed or verifyAllSignatures: if httpSignatureFailed or verifyAllSignatures:
# use the original json message received, not one which # use the original json message received, not one which
# may have been modified along the way # may have been modified along the way
originalJson = queueJson['original']
if not verifyJsonSignature(originalJson, pubKey): if not verifyJsonSignature(originalJson, pubKey):
if debug: if debug:
print('WARN: jsonld inbox signature check failed ' + print('WARN: jsonld inbox signature check failed ' +