flake8 format

main
Bob Mottram 2020-04-03 12:05:30 +00:00
parent 34abcb54c2
commit 844f898b94
1 changed files with 128 additions and 121 deletions

View File

@ -1,11 +1,11 @@
__filename__="posts.py" __filename__ = "posts.py"
__author__="Bob Mottram" __author__ = "Bob Mottram"
__credits__=['lamia'] __credits__ = ['lamia']
__license__="AGPL3+" __license__ = "AGPL3+"
__version__="1.1.0" __version__ = "1.1.0"
__maintainer__="Bob Mottram" __maintainer__ = "Bob Mottram"
__email__="bob@freedombone.net" __email__ = "bob@freedombone.net"
__status__="Production" __status__ = "Production"
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06 # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
@ -16,112 +16,119 @@ try:
except ImportError: except ImportError:
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256 from Crypto.Hash import SHA256
#from Crypto.Signature import PKCS1_v1_5 # from Crypto.Signature import PKCS1_v1_5
from Crypto.Signature import pkcs1_15 from Crypto.Signature import pkcs1_15
from requests.auth import AuthBase
import base64 import base64
import json
from time import gmtime, strftime from time import gmtime, strftime
import datetime import datetime
from pprint import pprint
def messageContentDigest(messageBodyJsonStr: str) -> str: def messageContentDigest(messageBodyJsonStr: str) -> str:
return base64.b64encode(SHA256.new(messageBodyJsonStr.encode('utf-8')).digest()).decode('utf-8') msg = messageBodyJsonStr.encode('utf-8')
digestStr = SHA256.new(msg).digest()
return base64.b64encode(digestStr).decode('utf-8')
def signPostHeaders(dateStr: str,privateKeyPem: str, \
nickname: str, \ def signPostHeaders(dateStr: str, privateKeyPem: str,
domain: str,port: int, \ nickname: str,
toDomain: str,toPort: int, \ domain: str, port: int,
path: str, \ toDomain: str, toPort: int,
httpPrefix: str, \ path: str,
httpPrefix: str,
messageBodyJsonStr: str) -> str: messageBodyJsonStr: str) -> str:
"""Returns a raw signature string that can be plugged into a header and """Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission. used to verify the authenticity of an HTTP transmission.
""" """
if port: if port:
if port!=80 and port!=443: if port != 80 and port != 443:
if ':' not in domain: if ':' not in domain:
domain=domain+':'+str(port) domain = domain + ':' + str(port)
if toPort: if toPort:
if toPort!=80 and toPort!=443: if toPort != 80 and toPort != 443:
if ':' not in toDomain: if ':' not in toDomain:
toDomain=toDomain+':'+str(port) toDomain = toDomain + ':' + str(port)
if not dateStr: if not dateStr:
dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
keyID=httpPrefix+'://'+domain+'/users/'+nickname+'#main-key' keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key'
if not messageBodyJsonStr: if not messageBodyJsonStr:
headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'content-type': 'application/json'} headers = {
'(request-target)': f'post {path}',
'host': toDomain,
'date': dateStr,
'content-type': 'application/json'
}
else: else:
bodyDigest=messageContentDigest(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength=len(messageBodyJsonStr) contentLength = len(messageBodyJsonStr)
headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json','content-length': str(contentLength)} headers = {
privateKeyPem=RSA.import_key(privateKeyPem) '(request-target)': f'post {path}',
#headers.update({ 'host': toDomain,
# '(request-target)': f'post {path}', 'date': dateStr,
#}) 'digest': f'SHA-256={bodyDigest}',
'content-type': 'application/activity+json',
'content-length': str(contentLength)
}
privateKeyPem = RSA.import_key(privateKeyPem)
# headers.update({
# '(request-target)': f'post {path}',
# })
# build a digest for signing # build a digest for signing
signedHeaderKeys=headers.keys() signedHeaderKeys = headers.keys()
signedHeaderText='' signedHeaderText = ''
for headerKey in signedHeaderKeys: for headerKey in signedHeaderKeys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
#print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}') signedHeaderText = signedHeaderText.strip()
signedHeaderText=signedHeaderText.strip() headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
#print('******************************Send: signedHeaderText: '+signedHeaderText)
headerDigest=SHA256.new(signedHeaderText.encode('ascii'))
# Sign the digest # Sign the digest
rawSignature=pkcs1_15.new(privateKeyPem).sign(headerDigest) rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
signature=base64.b64encode(rawSignature).decode('ascii') signature = base64.b64encode(rawSignature).decode('ascii')
# Put it into a valid HTTP signature format # Put it into a valid HTTP signature format
signatureDict={ signatureDict = {
'keyId': keyID, 'keyId': keyID,
'algorithm': 'rsa-sha256', 'algorithm': 'rsa-sha256',
'headers': ' '.join(signedHeaderKeys), 'headers': ' '.join(signedHeaderKeys),
'signature': signature 'signature': signature
} }
signatureHeader=','.join( signatureHeader = ','.join(
[f'{k}="{v}"' for k, v in signatureDict.items()]) [f'{k}="{v}"' for k, v in signatureDict.items()])
return signatureHeader return signatureHeader
def createSignedHeader(privateKeyPem: str,nickname: str, \
domain: str,port: int, \ def createSignedHeader(privateKeyPem: str, nickname: str,
toDomain: str,toPort: int, \ domain: str, port: int,
path: str,httpPrefix: str,withDigest: bool, \ toDomain: str, toPort: int,
path: str, httpPrefix: str, withDigest: bool,
messageBodyJsonStr: str) -> {}: messageBodyJsonStr: str) -> {}:
"""Note that the domain is the destination, not the sender """Note that the domain is the destination, not the sender
""" """
contentType='application/activity+json' contentType = 'application/activity+json'
headerDomain=toDomain headerDomain = toDomain
if toPort: if toPort:
if toPort!=80 and toPort!=443: if toPort != 80 and toPort != 443:
if ':' not in headerDomain: if ':' not in headerDomain:
headerDomain=headerDomain+':'+str(toPort) headerDomain = headerDomain + ':' + str(toPort)
dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
if not withDigest: if not withDigest:
headers={ headers = {
'(request-target)': f'post {path}','host': headerDomain,'date': dateStr '(request-target)': f'post {path}',
'host': headerDomain,
'date': dateStr
} }
signatureHeader= \ signatureHeader = \
signPostHeaders(dateStr,privateKeyPem,nickname, \ signPostHeaders(dateStr, privateKeyPem, nickname,
domain,port,toDomain,toPort, \ domain, port, toDomain, toPort,
path,httpPrefix,None) path, httpPrefix, None)
else: else:
bodyDigest=messageContentDigest(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength=len(messageBodyJsonStr) contentLength = len(messageBodyJsonStr)
#print('***************************Send (request-target): post '+path) headers = {
#print('***************************Send host: '+headerDomain)
#print('***************************Send date: '+dateStr)
#print('***************************Send digest: '+bodyDigest)
#print('***************************Send Content-type: '+contentType)
#print('***************************Send Content-Length: '+str(len(messageBodyJsonStr)))
#print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr)
headers={
'(request-target)': f'post {path}', '(request-target)': f'post {path}',
'host': headerDomain, 'host': headerDomain,
'date': dateStr, 'date': dateStr,
@ -129,36 +136,39 @@ def createSignedHeader(privateKeyPem: str,nickname: str, \
'content-length': str(contentLength), 'content-length': str(contentLength),
'content-type': contentType 'content-type': contentType
} }
signatureHeader= \ signatureHeader = \
signPostHeaders(dateStr,privateKeyPem,nickname, \ signPostHeaders(dateStr, privateKeyPem, nickname,
domain,port, \ domain, port,
toDomain,toPort, \ toDomain, toPort,
path,httpPrefix,messageBodyJsonStr) path, httpPrefix, messageBodyJsonStr)
headers['signature']=signatureHeader headers['signature'] = signatureHeader
return headers return headers
def verifyRecentSignature(signedDateStr: str) -> bool: def verifyRecentSignature(signedDateStr: str) -> bool:
"""Checks whether the given time taken from the header is within """Checks whether the given time taken from the header is within
12 hours of the current time 12 hours of the current time
""" """
currDate=datetime.datetime.utcnow() currDate = datetime.datetime.utcnow()
signedDate=datetime.datetime.strptime(signedDateStr,"%a, %d %b %Y %H:%M:%S %Z") dateFormat = "%a, %d %b %Y %H:%M:%S %Z"
timeDiffSec=(currDate-signedDate).seconds signedDate = datetime.datetime.strptime(signedDateStr, dateFormat)
timeDiffSec = (currDate - signedDate).seconds
# 12 hours tollerance # 12 hours tollerance
if timeDiffSec > 43200: if timeDiffSec > 43200:
print('WARN: Header signed too long ago: '+signedDateStr) print('WARN: Header signed too long ago: ' + signedDateStr)
print(str(timeDiffSec/(60*60))+' hours') print(str(timeDiffSec / (60 * 60)) + ' hours')
return False return False
if timeDiffSec < 0: if timeDiffSec < 0:
print('WARN: Header signed in the future! '+signedDateStr) print('WARN: Header signed in the future! ' + signedDateStr)
print(str(timeDiffSec/(60*60))+' hours') print(str(timeDiffSec / (60 * 60)) + ' hours')
return False return False
return True return True
def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
path: str,GETmethod: bool, \ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
messageBodyDigest: str, \ path: str, GETmethod: bool,
messageBodyJsonStr: str,debug: bool) -> bool: messageBodyDigest: str,
messageBodyJsonStr: str, debug: bool) -> bool:
"""Returns true or false depending on if the key that we plugged in here """Returns true or false depending on if the key that we plugged in here
validates against the headers, method, and path. validates against the headers, method, and path.
publicKeyPem - the public key from an rsa key pair publicKeyPem - the public key from an rsa key pair
@ -169,88 +179,85 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
""" """
if GETmethod: if GETmethod:
method='GET' method = 'GET'
else: else:
method='POST' method = 'POST'
if debug: if debug:
print('DEBUG: verifyPostHeaders '+method) print('DEBUG: verifyPostHeaders ' + method)
publicKeyPem=RSA.import_key(publicKeyPem) publicKeyPem = RSA.import_key(publicKeyPem)
# Build a dictionary of the signature values # Build a dictionary of the signature values
signatureHeader=headers['signature'] signatureHeader = headers['signature']
signatureDict={ signatureDict = {
k: v[1:-1] k: v[1:-1]
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')] for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
} }
#print('********************signatureHeader: '+str(signatureHeader))
#print('********************signatureDict: '+str(signatureDict))
# Unpack the signed headers and set values based on current headers and # Unpack the signed headers and set values based on current headers and
# body (if a digest was included) # body (if a digest was included)
signedHeaderList=[] signedHeaderList = []
for signedHeader in signatureDict['headers'].split(' '): for signedHeader in signatureDict['headers'].split(' '):
if debug: if debug:
print('DEBUG: verifyPostHeaders signedHeader='+signedHeader) print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
if signedHeader == '(request-target)': if signedHeader == '(request-target)':
signedHeaderList.append( appendStr = f'(request-target): {method.lower()} {path}'
f'(request-target): {method.lower()} {path}') signedHeaderList.append(appendStr)
#print('***************************Verify (request-target): '+method.lower()+' '+path)
elif signedHeader == 'digest': elif signedHeader == 'digest':
if messageBodyDigest: if messageBodyDigest:
bodyDigest=messageBodyDigest bodyDigest = messageBodyDigest
else: else:
bodyDigest=messageContentDigest(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr)
signedHeaderList.append(f'digest: SHA-256={bodyDigest}') signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
#print('***************************Verify digest: SHA-256='+bodyDigest)
#print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr)
elif signedHeader == 'content-length': elif signedHeader == 'content-length':
if headers.get(signedHeader): if headers.get(signedHeader):
signedHeaderList.append(f'content-length: {headers[signedHeader]}') appendStr = f'content-length: {headers[signedHeader]}'
signedHeaderList.append(appendStr)
else: else:
if headers.get('Content-Length'): if headers.get('Content-Length'):
contentLength=headers['Content-Length'] contentLength = headers['Content-Length']
signedHeaderList.append(f'content-length: {contentLength}') signedHeaderList.append(f'content-length: {contentLength}')
else: else:
if headers.get('Content-length'): if headers.get('Content-length'):
contentLength=headers['Content-length'] contentLength = headers['Content-length']
signedHeaderList.append(f'content-length: {contentLength}') appendStr = f'content-length: {contentLength}'
signedHeaderList.append(appendStr)
else: else:
if debug: if debug:
print('DEBUG: verifyPostHeaders '+signedHeader+' not found in '+str(headers)) print('DEBUG: verifyPostHeaders ' + signedHeader +
' not found in ' + str(headers))
else: else:
if headers.get(signedHeader): if headers.get(signedHeader):
if signedHeader=='date': if signedHeader == 'date':
if not verifyRecentSignature(headers[signedHeader]): if not verifyRecentSignature(headers[signedHeader]):
if debug: if debug:
print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader]) print('DEBUG: ' +
'verifyPostHeaders date is not recent ' +
headers[signedHeader])
return False return False
#print('***************************Verify '+signedHeader+': '+headers[signedHeader])
signedHeaderList.append( signedHeaderList.append(
f'{signedHeader}: {headers[signedHeader]}') f'{signedHeader}: {headers[signedHeader]}')
else: else:
signedHeaderCap=signedHeader.capitalize() signedHeaderCap = signedHeader.capitalize()
if signedHeaderCap=='Date': if signedHeaderCap == 'Date':
if not verifyRecentSignature(headers[signedHeaderCap]): if not verifyRecentSignature(headers[signedHeaderCap]):
if debug: if debug:
print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader]) print('DEBUG: ' +
'verifyPostHeaders date is not recent ' +
headers[signedHeader])
return False return False
#print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap])
if headers.get(signedHeaderCap): if headers.get(signedHeaderCap):
signedHeaderList.append( signedHeaderList.append(
f'{signedHeader}: {headers[signedHeaderCap]}') f'{signedHeader}: {headers[signedHeaderCap]}')
#print('***********************signedHeaderList: ')
#pprint(signedHeaderList)
if debug: if debug:
print('DEBUG: signedHeaderList: '+str(signedHeaderList)) print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
# Now we have our header data digest # Now we have our header data digest
signedHeaderText='\n'.join(signedHeaderList) signedHeaderText = '\n'.join(signedHeaderList)
#print('***********************Verify: signedHeaderText: '+signedHeaderText) headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
headerDigest=SHA256.new(signedHeaderText.encode('ascii'))
# Get the signature, verify with public key, return result # Get the signature, verify with public key, return result
signature=base64.b64decode(signatureDict['signature']) signature = base64.b64decode(signatureDict['signature'])
try: try:
pkcs1_15.new(publicKeyPem).verify(headerDigest, signature) pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)