mirror of https://gitlab.com/bashrc2/epicyon
Debug in http signature verification
parent
fd8f696f2e
commit
127a60280f
|
@ -244,7 +244,8 @@ class PubServer(BaseHTTPRequestHandler):
|
||||||
if verifyPostHeaders(self.server.httpPrefix, \
|
if verifyPostHeaders(self.server.httpPrefix, \
|
||||||
pubKey,self.headers, \
|
pubKey,self.headers, \
|
||||||
self.path,True, \
|
self.path,True, \
|
||||||
GETrequestDigest,GETrequestBody):
|
GETrequestDigest, \
|
||||||
|
GETrequestBody,debug):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
19
httpsig.py
19
httpsig.py
|
@ -141,7 +141,7 @@ def verifyRecentSignature(signedDateStr: str) -> bool:
|
||||||
def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
path: str,GETmethod: bool, \
|
path: str,GETmethod: bool, \
|
||||||
messageBodyDigest: str, \
|
messageBodyDigest: str, \
|
||||||
messageBodyJsonStr: str) -> bool:
|
messageBodyJsonStr: str,debug: bool) -> bool:
|
||||||
"""Returns true or false depending on if the key that we plugged in here
|
"""Returns true or false depending on if the key that we plugged in here
|
||||||
validates against the headers, method, and path.
|
validates against the headers, method, and path.
|
||||||
publicKeyPem - the public key from an rsa key pair
|
publicKeyPem - the public key from an rsa key pair
|
||||||
|
@ -155,6 +155,9 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
method='GET'
|
method='GET'
|
||||||
else:
|
else:
|
||||||
method='POST'
|
method='POST'
|
||||||
|
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders '+method)
|
||||||
|
|
||||||
publicKeyPem = RSA.import_key(publicKeyPem)
|
publicKeyPem = RSA.import_key(publicKeyPem)
|
||||||
# Build a dictionary of the signature values
|
# Build a dictionary of the signature values
|
||||||
|
@ -170,7 +173,11 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
# body (if a digest was included)
|
# body (if a digest was included)
|
||||||
signedHeaderList = []
|
signedHeaderList = []
|
||||||
contentLength=len(messageBodyJsonStr)
|
contentLength=len(messageBodyJsonStr)
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders contentLength='+str(contentLength))
|
||||||
for signedHeader in signatureDict['headers'].split(' '):
|
for signedHeader in signatureDict['headers'].split(' '):
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders signedHeader='+signedHeader)
|
||||||
if signedHeader == '(request-target)':
|
if signedHeader == '(request-target)':
|
||||||
signedHeaderList.append(
|
signedHeaderList.append(
|
||||||
f'(request-target): {method.lower()} {path}')
|
f'(request-target): {method.lower()} {path}')
|
||||||
|
@ -187,9 +194,13 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
if headers.get(signedHeader):
|
if headers.get(signedHeader):
|
||||||
if signedHeader=='content-length':
|
if signedHeader=='content-length':
|
||||||
if int(headers[signedHeader])!=contentLength:
|
if int(headers[signedHeader])!=contentLength:
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders content-length does not match '+headers[signedHeader]+' != '+str(contentLength))
|
||||||
return False
|
return False
|
||||||
if signedHeader=='date':
|
if signedHeader=='date':
|
||||||
if not verifyRecentSignature(headers[signedHeader]):
|
if not verifyRecentSignature(headers[signedHeader]):
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader])
|
||||||
return False
|
return False
|
||||||
#print('***************************Verify '+signedHeader+': '+headers[signedHeader])
|
#print('***************************Verify '+signedHeader+': '+headers[signedHeader])
|
||||||
signedHeaderList.append(
|
signedHeaderList.append(
|
||||||
|
@ -198,9 +209,13 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
signedHeaderCap=signedHeader.capitalize()
|
signedHeaderCap=signedHeader.capitalize()
|
||||||
if signedHeaderCap=='Content-Length':
|
if signedHeaderCap=='Content-Length':
|
||||||
if int(headers[signedHeader])!=contentLength:
|
if int(headers[signedHeader])!=contentLength:
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders Content-Length does not match '+headers[signedHeader]+' != '+str(contentLength))
|
||||||
return False
|
return False
|
||||||
if signedHeaderCap=='Date':
|
if signedHeaderCap=='Date':
|
||||||
if not verifyRecentSignature(headers[signedHeaderCap]):
|
if not verifyRecentSignature(headers[signedHeaderCap]):
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader])
|
||||||
return False
|
return False
|
||||||
#print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap])
|
#print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap])
|
||||||
if headers.get(signedHeaderCap):
|
if headers.get(signedHeaderCap):
|
||||||
|
@ -221,4 +236,6 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
|
pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
|
||||||
return True
|
return True
|
||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError):
|
||||||
|
if debug:
|
||||||
|
print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
|
||||||
return False
|
return False
|
||||||
|
|
3
inbox.py
3
inbox.py
|
@ -1854,7 +1854,8 @@ def runInboxQueue(projectVersion: str, \
|
||||||
queueJson['httpHeaders'], \
|
queueJson['httpHeaders'], \
|
||||||
queueJson['path'],False, \
|
queueJson['path'],False, \
|
||||||
queueJson['digest'], \
|
queueJson['digest'], \
|
||||||
json.dumps(queueJson['post'])):
|
json.dumps(queueJson['post']), \
|
||||||
|
debug):
|
||||||
if debug:
|
if debug:
|
||||||
print('DEBUG: Header signature check failed')
|
print('DEBUG: Header signature check failed')
|
||||||
if os.path.isfile(queueFilename):
|
if os.path.isfile(queueFilename):
|
||||||
|
|
8
tests.py
8
tests.py
|
@ -124,13 +124,13 @@ def testHttpsigBase(withDigest):
|
||||||
headers['signature'] = signatureHeader
|
headers['signature'] = signatureHeader
|
||||||
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
||||||
boxpath,False,None, \
|
boxpath,False,None, \
|
||||||
messageBodyJsonStr)
|
messageBodyJsonStr,False)
|
||||||
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
||||||
'/parambulator'+boxpath,False,None, \
|
'/parambulator'+boxpath,False,None, \
|
||||||
messageBodyJsonStr) == False
|
messageBodyJsonStr,False) == False
|
||||||
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
||||||
boxpath,True,None, \
|
boxpath,True,None, \
|
||||||
messageBodyJsonStr) == False
|
messageBodyJsonStr,False) == False
|
||||||
if not withDigest:
|
if not withDigest:
|
||||||
# fake domain
|
# fake domain
|
||||||
headers = {'host': 'bogon.domain','date': dateStr,'content-type': 'application/json'}
|
headers = {'host': 'bogon.domain','date': dateStr,'content-type': 'application/json'}
|
||||||
|
@ -142,7 +142,7 @@ def testHttpsigBase(withDigest):
|
||||||
headers['signature'] = signatureHeader
|
headers['signature'] = signatureHeader
|
||||||
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
assert verifyPostHeaders(httpPrefix,publicKeyPem,headers, \
|
||||||
boxpath,True,None, \
|
boxpath,True,None, \
|
||||||
messageBodyJsonStr) == False
|
messageBodyJsonStr,False) == False
|
||||||
os.chdir(baseDir)
|
os.chdir(baseDir)
|
||||||
shutil.rmtree(path)
|
shutil.rmtree(path)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue