diff --git a/httpsig.py b/httpsig.py index 6ca49c07..ab0ead87 100644 --- a/httpsig.py +++ b/httpsig.py @@ -32,8 +32,9 @@ def signPostHeaders(privateKeyPem: str, nickname: str, domain: str, \ if not messageBodyJson: headers = {'(request-target)': f'post {path}','host': domain,'date': dateStr,'content-type': 'application/json'} else: + messageBodyJsonStr=json.dumps(messageBodyJson) bodyDigest = \ - base64.b64encode(SHA256.new(messageBodyJson.encode()).digest()).decode('utf-8') + base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8') headers = {'(request-target)': f'post {path}','host': domain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'} privateKeyPem = RSA.import_key(privateKeyPem) #headers.update({ @@ -74,15 +75,20 @@ def createSignedHeader(privateKeyPem: str,nickname: str,domain: str,port: int, \ dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) path='/inbox' + print('Testing 123 '+str(withDigest)) if not withDigest: headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr} + signatureHeader = \ + signPostHeaders(privateKeyPem, nickname, domain, port, \ + path, httpPrefix, None) else: messageBodyJsonStr=json.dumps(messageBodyJson) bodyDigest = \ base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8') headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'} - signatureHeader = signPostHeaders(privateKeyPem, nickname, domain, port, \ - path, httpPrefix, None) + signatureHeader = \ + signPostHeaders(privateKeyPem, nickname, domain, port, \ + path, httpPrefix, messageBodyJson) headers['signature'] = signatureHeader return headers @@ -121,7 +127,7 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \ f'(request-target): {method.lower()} {path}') elif signedHeader == 'digest': bodyDigest = \ - base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8').decode('utf-8') + base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8') signedHeaderList.append(f'digest: SHA-256={bodyDigest}') else: if headers.get(signedHeader): @@ -133,7 +139,7 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \ signedHeaderList.append( f'{signedHeader}: {headers[signedHeaderCap]}') - print('signedHeaderList: '+str(signedHeaderList)) + #print('signedHeaderList: '+str(signedHeaderList)) # Now we have our header data digest signedHeaderText = '\n'.join(signedHeaderList) headerDigest = SHA256.new(signedHeaderText.encode('ascii')) diff --git a/tests.py b/tests.py index de5bc5dd..09a6113d 100644 --- a/tests.py +++ b/tests.py @@ -11,6 +11,7 @@ import time import os, os.path import shutil import commentjson +import json from time import gmtime, strftime from pprint import pprint from person import createPerson @@ -74,6 +75,7 @@ def testHttpsigBase(withDigest): os.mkdir(path) os.chdir(path) + contentType='application/activity+json' nickname='socrates' domain='argumentative.social' httpPrefix='https' @@ -82,32 +84,35 @@ def testHttpsigBase(withDigest): privateKeyPem,publicKeyPem,person,wfEndpoint= \ createPerson(path,nickname,domain,port,httpPrefix,False,password) assert privateKeyPem - messageBodyJsonStr = '{"a key": "a value", "another key": "A string","yet another key": "Another string"}' + messageBodyJson = {"a key": "a value", "another key": "A string","yet another key": "Another string"} + messageBodyJsonStr=json.dumps(messageBodyJson) headersDomain=domain if port!=80 and port !=443: headersDomain=domain+':'+str(port) dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) + boxpath='/inbox' if not withDigest: headers = {'host': headersDomain,'date': dateStr,'content-type': 'application/json'} + signatureHeader = \ + signPostHeaders(privateKeyPem, nickname, domain, port, boxpath, httpPrefix, None) else: bodyDigest = \ - base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()) - headers = {'host': headersDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/json'} + base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8') + headers = {'host': headersDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType} + signatureHeader = \ + signPostHeaders(privateKeyPem, nickname, domain, port, boxpath, httpPrefix, messageBodyJson) - boxpath='/inbox' - signatureHeader = \ - signPostHeaders(privateKeyPem, nickname, domain, port, boxpath, httpPrefix, None) headers['signature'] = signatureHeader assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \ - '/inbox' ,False, \ + boxpath,False, \ messageBodyJsonStr) assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \ - '/parambulator/inbox',False, \ + '/parambulator'+boxpath,False, \ messageBodyJsonStr) == False assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, \ - '/inbox',True, \ + boxpath,True, \ messageBodyJsonStr) == False if not withDigest: # fake domain @@ -115,11 +120,11 @@ def testHttpsigBase(withDigest): else: # correct domain but fake message messageBodyJsonStr = '{"a key": "a value", "another key": "Fake GNUs", "yet another key": "More Fake GNUs"}' - bodyDigest = base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()) - headers = {'host': domain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/json'} + bodyDigest = base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest()).decode('utf-8') + headers = {'host': domain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType} headers['signature'] = signatureHeader assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \ - '/inbox',True, \ + boxpath,True, \ messageBodyJsonStr) == False os.chdir(baseDir) shutil.rmtree(path)