mirror of https://gitlab.com/bashrc2/epicyon
Mitigate replay attacks
parent
d05a416998
commit
c13173ed75
28
httpsig.py
28
httpsig.py
|
@ -17,6 +17,7 @@ from requests.auth import AuthBase
|
||||||
import base64
|
import base64
|
||||||
import json
|
import json
|
||||||
from time import gmtime, strftime
|
from time import gmtime, strftime
|
||||||
|
import datetime
|
||||||
from pprint import pprint
|
from pprint import pprint
|
||||||
|
|
||||||
def messageContentDigest(messageBodyJsonStr: str) -> str:
|
def messageContentDigest(messageBodyJsonStr: str) -> str:
|
||||||
|
@ -59,9 +60,9 @@ def signPostHeaders(dateStr: str,privateKeyPem: str, \
|
||||||
signedHeaderText = ''
|
signedHeaderText = ''
|
||||||
for headerKey in signedHeaderKeys:
|
for headerKey in signedHeaderKeys:
|
||||||
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
|
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
|
||||||
print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}')
|
#print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}')
|
||||||
signedHeaderText = signedHeaderText.strip()
|
signedHeaderText = signedHeaderText.strip()
|
||||||
print('******************************Send: signedHeaderText: '+signedHeaderText)
|
#print('******************************Send: signedHeaderText: '+signedHeaderText)
|
||||||
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
|
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
|
||||||
|
|
||||||
# Sign the digest
|
# Sign the digest
|
||||||
|
@ -103,12 +104,12 @@ def createSignedHeader(privateKeyPem: str,nickname: str, \
|
||||||
path,httpPrefix,None)
|
path,httpPrefix,None)
|
||||||
else:
|
else:
|
||||||
bodyDigest=messageContentDigest(messageBodyJsonStr)
|
bodyDigest=messageContentDigest(messageBodyJsonStr)
|
||||||
print('***************************Send (request-target): post '+path)
|
#print('***************************Send (request-target): post '+path)
|
||||||
print('***************************Send host: '+headerDomain)
|
#print('***************************Send host: '+headerDomain)
|
||||||
print('***************************Send date: '+dateStr)
|
#print('***************************Send date: '+dateStr)
|
||||||
print('***************************Send digest: '+bodyDigest)
|
#print('***************************Send digest: '+bodyDigest)
|
||||||
print('***************************Send Content-type: '+contentType)
|
#print('***************************Send Content-type: '+contentType)
|
||||||
print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr)
|
#print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr)
|
||||||
headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType}
|
headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType}
|
||||||
signatureHeader = \
|
signatureHeader = \
|
||||||
signPostHeaders(dateStr,privateKeyPem,nickname, \
|
signPostHeaders(dateStr,privateKeyPem,nickname, \
|
||||||
|
@ -130,6 +131,7 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
GETmethod - GET or POST
|
GETmethod - GET or POST
|
||||||
messageBodyJsonStr - the received request body (used for digest)
|
messageBodyJsonStr - the received request body (used for digest)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if GETmethod:
|
if GETmethod:
|
||||||
method='GET'
|
method='GET'
|
||||||
else:
|
else:
|
||||||
|
@ -162,6 +164,16 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
|
||||||
#print('***************************Verify digest: SHA-256='+bodyDigest)
|
#print('***************************Verify digest: SHA-256='+bodyDigest)
|
||||||
#print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr)
|
#print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr)
|
||||||
else:
|
else:
|
||||||
|
if signedHeader=='date':
|
||||||
|
# mitigate replay attacks
|
||||||
|
currDate=datetime.datetime.utcnow()
|
||||||
|
signedDate=datetime.datetime.strptime(headers[signedHeader],"%a, %d %b %Y %H:%M:%S %Z")
|
||||||
|
# 12 hours tollerance
|
||||||
|
if (currDate-signedDate).seconds > 43200:
|
||||||
|
print('WARN: Header signed too long ago: '+headers[signedHeader])
|
||||||
|
print(str((currDate-signedDate).seconds/(60*60))+' hours')
|
||||||
|
return False
|
||||||
|
|
||||||
if headers.get(signedHeader):
|
if headers.get(signedHeader):
|
||||||
#print('***************************Verify '+signedHeader+': '+headers[signedHeader])
|
#print('***************************Verify '+signedHeader+': '+headers[signedHeader])
|
||||||
signedHeaderList.append(
|
signedHeaderList.append(
|
||||||
|
|
Loading…
Reference in New Issue