Fix digest encoding

master
Bob Mottram 2019-08-16 11:36:41 +01:00
parent cf69c4568b
commit 174e166769
2 changed files with 28 additions and 17 deletions

View File

@ -32,8 +32,9 @@ def signPostHeaders(privateKeyPem: str, nickname: str, domain: str, \
if not messageBodyJson: if not messageBodyJson:
headers = {'(request-target)': f'post {path}','host': domain,'date': dateStr,'content-type': 'application/json'} headers = {'(request-target)': f'post {path}','host': domain,'date': dateStr,'content-type': 'application/json'}
else: else:
messageBodyJsonStr=json.dumps(messageBodyJson)
bodyDigest = \ bodyDigest = \
base64.b64encode(SHA256.new(messageBodyJson.encode()).digest()).decode('utf-8') base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8')
headers = {'(request-target)': f'post {path}','host': domain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'} headers = {'(request-target)': f'post {path}','host': domain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'}
privateKeyPem = RSA.import_key(privateKeyPem) privateKeyPem = RSA.import_key(privateKeyPem)
#headers.update({ #headers.update({
@ -74,15 +75,20 @@ def createSignedHeader(privateKeyPem: str,nickname: str,domain: str,port: int, \
dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
path='/inbox' path='/inbox'
print('Testing 123 '+str(withDigest))
if not withDigest: if not withDigest:
headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr} headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr}
signatureHeader = \
signPostHeaders(privateKeyPem, nickname, domain, port, \
path, httpPrefix, None)
else: else:
messageBodyJsonStr=json.dumps(messageBodyJson) messageBodyJsonStr=json.dumps(messageBodyJson)
bodyDigest = \ bodyDigest = \
base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8') base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8')
headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'} headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'}
signatureHeader = signPostHeaders(privateKeyPem, nickname, domain, port, \ signatureHeader = \
path, httpPrefix, None) signPostHeaders(privateKeyPem, nickname, domain, port, \
path, httpPrefix, messageBodyJson)
headers['signature'] = signatureHeader headers['signature'] = signatureHeader
return headers return headers
@ -121,7 +127,7 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
f'(request-target): {method.lower()} {path}') f'(request-target): {method.lower()} {path}')
elif signedHeader == 'digest': elif signedHeader == 'digest':
bodyDigest = \ bodyDigest = \
base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8').decode('utf-8') base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8')
signedHeaderList.append(f'digest: SHA-256={bodyDigest}') signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
else: else:
if headers.get(signedHeader): if headers.get(signedHeader):
@ -133,7 +139,7 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
signedHeaderList.append( signedHeaderList.append(
f'{signedHeader}: {headers[signedHeaderCap]}') f'{signedHeader}: {headers[signedHeaderCap]}')
print('signedHeaderList: '+str(signedHeaderList)) #print('signedHeaderList: '+str(signedHeaderList))
# Now we have our header data digest # Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList) signedHeaderText = '\n'.join(signedHeaderList)
headerDigest = SHA256.new(signedHeaderText.encode('ascii')) headerDigest = SHA256.new(signedHeaderText.encode('ascii'))

View File

@ -11,6 +11,7 @@ import time
import os, os.path import os, os.path
import shutil import shutil
import commentjson import commentjson
import json
from time import gmtime, strftime from time import gmtime, strftime
from pprint import pprint from pprint import pprint
from person import createPerson from person import createPerson
@ -74,6 +75,7 @@ def testHttpsigBase(withDigest):
os.mkdir(path) os.mkdir(path)
os.chdir(path) os.chdir(path)
contentType='application/activity+json'
nickname='socrates' nickname='socrates'
domain='argumentative.social' domain='argumentative.social'
httpPrefix='https' httpPrefix='https'
@ -82,32 +84,35 @@ def testHttpsigBase(withDigest):
privateKeyPem,publicKeyPem,person,wfEndpoint= \ privateKeyPem,publicKeyPem,person,wfEndpoint= \
createPerson(path,nickname,domain,port,httpPrefix,False,password) createPerson(path,nickname,domain,port,httpPrefix,False,password)
assert privateKeyPem assert privateKeyPem
messageBodyJsonStr = '{"a key": "a value", "another key": "A string","yet another key": "Another string"}' messageBodyJson = {"a key": "a value", "another key": "A string","yet another key": "Another string"}
messageBodyJsonStr=json.dumps(messageBodyJson)
headersDomain=domain headersDomain=domain
if port!=80 and port !=443: if port!=80 and port !=443:
headersDomain=domain+':'+str(port) headersDomain=domain+':'+str(port)
dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
boxpath='/inbox'
if not withDigest: if not withDigest:
headers = {'host': headersDomain,'date': dateStr,'content-type': 'application/json'} headers = {'host': headersDomain,'date': dateStr,'content-type': 'application/json'}
signatureHeader = \
signPostHeaders(privateKeyPem, nickname, domain, port, boxpath, httpPrefix, None)
else: else:
bodyDigest = \ bodyDigest = \
base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()) base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8')
headers = {'host': headersDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/json'} headers = {'host': headersDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType}
signatureHeader = \
signPostHeaders(privateKeyPem, nickname, domain, port, boxpath, httpPrefix, messageBodyJson)
boxpath='/inbox'
signatureHeader = \
signPostHeaders(privateKeyPem, nickname, domain, port, boxpath, httpPrefix, None)
headers['signature'] = signatureHeader headers['signature'] = signatureHeader
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \ assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \
'/inbox' ,False, \ boxpath,False, \
messageBodyJsonStr) messageBodyJsonStr)
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \ assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \
'/parambulator/inbox',False, \ '/parambulator'+boxpath,False, \
messageBodyJsonStr) == False messageBodyJsonStr) == False
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \ assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \
'/inbox',True, \ boxpath,True, \
messageBodyJsonStr) == False messageBodyJsonStr) == False
if not withDigest: if not withDigest:
# fake domain # fake domain
@ -115,11 +120,11 @@ def testHttpsigBase(withDigest):
else: else:
# correct domain but fake message # correct domain but fake message
messageBodyJsonStr = '{"a key": "a value", "another key": "Fake GNUs", "yet another key": "More Fake GNUs"}' messageBodyJsonStr = '{"a key": "a value", "another key": "Fake GNUs", "yet another key": "More Fake GNUs"}'
bodyDigest = base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()) bodyDigest = base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8')
headers = {'host': domain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/json'} headers = {'host': domain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType}
headers['signature'] = signatureHeader headers['signature'] = signatureHeader
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \ assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
'/inbox',True, \ boxpath,True, \
messageBodyJsonStr) == False messageBodyJsonStr) == False
os.chdir(baseDir) os.chdir(baseDir)
shutil.rmtree(path) shutil.rmtree(path)