From 844f898b9424cd9d07d8308105ad52ee3c7f54e4 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Fri, 3 Apr 2020 12:05:30 +0000 Subject: [PATCH] flake8 format --- httpsig.py | 249 +++++++++++++++++++++++++++-------------------------- 1 file changed, 128 insertions(+), 121 deletions(-) diff --git a/httpsig.py b/httpsig.py index 33286e4c..97a001d6 100644 --- a/httpsig.py +++ b/httpsig.py @@ -1,11 +1,11 @@ -__filename__="posts.py" -__author__="Bob Mottram" -__credits__=['lamia'] -__license__="AGPL3+" -__version__="1.1.0" -__maintainer__="Bob Mottram" -__email__="bob@freedombone.net" -__status__="Production" +__filename__ = "posts.py" +__author__ = "Bob Mottram" +__credits__ = ['lamia'] +__license__ = "AGPL3+" +__version__ = "1.1.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@freedombone.net" +__status__ = "Production" # see https://tools.ietf.org/html/draft-cavage-http-signatures-06 @@ -16,112 +16,119 @@ try: except ImportError: from Crypto.PublicKey import RSA from Crypto.Hash import SHA256 - #from Crypto.Signature import PKCS1_v1_5 + # from Crypto.Signature import PKCS1_v1_5 from Crypto.Signature import pkcs1_15 -from requests.auth import AuthBase import base64 -import json from time import gmtime, strftime import datetime -from pprint import pprint + def messageContentDigest(messageBodyJsonStr: str) -> str: - return base64.b64encode(SHA256.new(messageBodyJsonStr.encode('utf-8')).digest()).decode('utf-8') + msg = messageBodyJsonStr.encode('utf-8') + digestStr = SHA256.new(msg).digest() + return base64.b64encode(digestStr).decode('utf-8') -def signPostHeaders(dateStr: str,privateKeyPem: str, \ - nickname: str, \ - domain: str,port: int, \ - toDomain: str,toPort: int, \ - path: str, \ - httpPrefix: str, \ + +def signPostHeaders(dateStr: str, privateKeyPem: str, + nickname: str, + domain: str, port: int, + toDomain: str, toPort: int, + path: str, + httpPrefix: str, messageBodyJsonStr: str) -> str: """Returns a raw signature string that can be plugged into a header and used to verify the authenticity of an HTTP transmission. """ if port: - if port!=80 and port!=443: + if port != 80 and port != 443: if ':' not in domain: - domain=domain+':'+str(port) + domain = domain + ':' + str(port) if toPort: - if toPort!=80 and toPort!=443: + if toPort != 80 and toPort != 443: if ':' not in toDomain: - toDomain=toDomain+':'+str(port) + toDomain = toDomain + ':' + str(port) if not dateStr: - dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) - keyID=httpPrefix+'://'+domain+'/users/'+nickname+'#main-key' + dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) + keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key' if not messageBodyJsonStr: - headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'content-type': 'application/json'} + headers = { + '(request-target)': f'post {path}', + 'host': toDomain, + 'date': dateStr, + 'content-type': 'application/json' + } else: - bodyDigest=messageContentDigest(messageBodyJsonStr) - contentLength=len(messageBodyJsonStr) - headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json','content-length': str(contentLength)} - privateKeyPem=RSA.import_key(privateKeyPem) - #headers.update({ - # '(request-target)': f'post {path}', - #}) + bodyDigest = messageContentDigest(messageBodyJsonStr) + contentLength = len(messageBodyJsonStr) + headers = { + '(request-target)': f'post {path}', + 'host': toDomain, + 'date': dateStr, + 'digest': f'SHA-256={bodyDigest}', + 'content-type': 'application/activity+json', + 'content-length': str(contentLength) + } + privateKeyPem = RSA.import_key(privateKeyPem) + # headers.update({ + # '(request-target)': f'post {path}', + # }) # build a digest for signing - signedHeaderKeys=headers.keys() - signedHeaderText='' + signedHeaderKeys = headers.keys() + signedHeaderText = '' for headerKey in signedHeaderKeys: signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' - #print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}') - signedHeaderText=signedHeaderText.strip() - #print('******************************Send: signedHeaderText: '+signedHeaderText) - headerDigest=SHA256.new(signedHeaderText.encode('ascii')) + signedHeaderText = signedHeaderText.strip() + headerDigest = SHA256.new(signedHeaderText.encode('ascii')) # Sign the digest - rawSignature=pkcs1_15.new(privateKeyPem).sign(headerDigest) - signature=base64.b64encode(rawSignature).decode('ascii') + rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest) + signature = base64.b64encode(rawSignature).decode('ascii') # Put it into a valid HTTP signature format - signatureDict={ + signatureDict = { 'keyId': keyID, 'algorithm': 'rsa-sha256', 'headers': ' '.join(signedHeaderKeys), 'signature': signature } - signatureHeader=','.join( + signatureHeader = ','.join( [f'{k}="{v}"' for k, v in signatureDict.items()]) return signatureHeader -def createSignedHeader(privateKeyPem: str,nickname: str, \ - domain: str,port: int, \ - toDomain: str,toPort: int, \ - path: str,httpPrefix: str,withDigest: bool, \ + +def createSignedHeader(privateKeyPem: str, nickname: str, + domain: str, port: int, + toDomain: str, toPort: int, + path: str, httpPrefix: str, withDigest: bool, messageBodyJsonStr: str) -> {}: """Note that the domain is the destination, not the sender """ - contentType='application/activity+json' - headerDomain=toDomain + contentType = 'application/activity+json' + headerDomain = toDomain if toPort: - if toPort!=80 and toPort!=443: + if toPort != 80 and toPort != 443: if ':' not in headerDomain: - headerDomain=headerDomain+':'+str(toPort) + headerDomain = headerDomain + ':' + str(toPort) - dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) + dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) if not withDigest: - headers={ - '(request-target)': f'post {path}','host': headerDomain,'date': dateStr + headers = { + '(request-target)': f'post {path}', + 'host': headerDomain, + 'date': dateStr } - signatureHeader= \ - signPostHeaders(dateStr,privateKeyPem,nickname, \ - domain,port,toDomain,toPort, \ - path,httpPrefix,None) + signatureHeader = \ + signPostHeaders(dateStr, privateKeyPem, nickname, + domain, port, toDomain, toPort, + path, httpPrefix, None) else: - bodyDigest=messageContentDigest(messageBodyJsonStr) - contentLength=len(messageBodyJsonStr) - #print('***************************Send (request-target): post '+path) - #print('***************************Send host: '+headerDomain) - #print('***************************Send date: '+dateStr) - #print('***************************Send digest: '+bodyDigest) - #print('***************************Send Content-type: '+contentType) - #print('***************************Send Content-Length: '+str(len(messageBodyJsonStr))) - #print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr) - headers={ + bodyDigest = messageContentDigest(messageBodyJsonStr) + contentLength = len(messageBodyJsonStr) + headers = { '(request-target)': f'post {path}', 'host': headerDomain, 'date': dateStr, @@ -129,36 +136,39 @@ def createSignedHeader(privateKeyPem: str,nickname: str, \ 'content-length': str(contentLength), 'content-type': contentType } - signatureHeader= \ - signPostHeaders(dateStr,privateKeyPem,nickname, \ - domain,port, \ - toDomain,toPort, \ - path,httpPrefix,messageBodyJsonStr) - headers['signature']=signatureHeader + signatureHeader = \ + signPostHeaders(dateStr, privateKeyPem, nickname, + domain, port, + toDomain, toPort, + path, httpPrefix, messageBodyJsonStr) + headers['signature'] = signatureHeader return headers + def verifyRecentSignature(signedDateStr: str) -> bool: """Checks whether the given time taken from the header is within 12 hours of the current time """ - currDate=datetime.datetime.utcnow() - signedDate=datetime.datetime.strptime(signedDateStr,"%a, %d %b %Y %H:%M:%S %Z") - timeDiffSec=(currDate-signedDate).seconds + currDate = datetime.datetime.utcnow() + dateFormat = "%a, %d %b %Y %H:%M:%S %Z" + signedDate = datetime.datetime.strptime(signedDateStr, dateFormat) + timeDiffSec = (currDate - signedDate).seconds # 12 hours tollerance if timeDiffSec > 43200: - print('WARN: Header signed too long ago: '+signedDateStr) - print(str(timeDiffSec/(60*60))+' hours') + print('WARN: Header signed too long ago: ' + signedDateStr) + print(str(timeDiffSec / (60 * 60)) + ' hours') return False if timeDiffSec < 0: - print('WARN: Header signed in the future! '+signedDateStr) - print(str(timeDiffSec/(60*60))+' hours') + print('WARN: Header signed in the future! ' + signedDateStr) + print(str(timeDiffSec / (60 * 60)) + ' hours') return False return True -def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \ - path: str,GETmethod: bool, \ - messageBodyDigest: str, \ - messageBodyJsonStr: str,debug: bool) -> bool: + +def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict, + path: str, GETmethod: bool, + messageBodyDigest: str, + messageBodyJsonStr: str, debug: bool) -> bool: """Returns true or false depending on if the key that we plugged in here validates against the headers, method, and path. publicKeyPem - the public key from an rsa key pair @@ -169,88 +179,85 @@ def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \ """ if GETmethod: - method='GET' + method = 'GET' else: - method='POST' + method = 'POST' if debug: - print('DEBUG: verifyPostHeaders '+method) + print('DEBUG: verifyPostHeaders ' + method) - publicKeyPem=RSA.import_key(publicKeyPem) + publicKeyPem = RSA.import_key(publicKeyPem) # Build a dictionary of the signature values - signatureHeader=headers['signature'] - signatureDict={ + signatureHeader = headers['signature'] + signatureDict = { k: v[1:-1] for k, v in [i.split('=', 1) for i in signatureHeader.split(',')] } - #print('********************signatureHeader: '+str(signatureHeader)) - #print('********************signatureDict: '+str(signatureDict)) # Unpack the signed headers and set values based on current headers and # body (if a digest was included) - signedHeaderList=[] + signedHeaderList = [] for signedHeader in signatureDict['headers'].split(' '): if debug: - print('DEBUG: verifyPostHeaders signedHeader='+signedHeader) + print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader) if signedHeader == '(request-target)': - signedHeaderList.append( - f'(request-target): {method.lower()} {path}') - #print('***************************Verify (request-target): '+method.lower()+' '+path) + appendStr = f'(request-target): {method.lower()} {path}' + signedHeaderList.append(appendStr) elif signedHeader == 'digest': if messageBodyDigest: - bodyDigest=messageBodyDigest + bodyDigest = messageBodyDigest else: - bodyDigest=messageContentDigest(messageBodyJsonStr) + bodyDigest = messageContentDigest(messageBodyJsonStr) signedHeaderList.append(f'digest: SHA-256={bodyDigest}') - #print('***************************Verify digest: SHA-256='+bodyDigest) - #print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr) elif signedHeader == 'content-length': if headers.get(signedHeader): - signedHeaderList.append(f'content-length: {headers[signedHeader]}') + appendStr = f'content-length: {headers[signedHeader]}' + signedHeaderList.append(appendStr) else: if headers.get('Content-Length'): - contentLength=headers['Content-Length'] + contentLength = headers['Content-Length'] signedHeaderList.append(f'content-length: {contentLength}') else: if headers.get('Content-length'): - contentLength=headers['Content-length'] - signedHeaderList.append(f'content-length: {contentLength}') + contentLength = headers['Content-length'] + appendStr = f'content-length: {contentLength}' + signedHeaderList.append(appendStr) else: if debug: - print('DEBUG: verifyPostHeaders '+signedHeader+' not found in '+str(headers)) + print('DEBUG: verifyPostHeaders ' + signedHeader + + ' not found in ' + str(headers)) else: if headers.get(signedHeader): - if signedHeader=='date': + if signedHeader == 'date': if not verifyRecentSignature(headers[signedHeader]): if debug: - print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader]) + print('DEBUG: ' + + 'verifyPostHeaders date is not recent ' + + headers[signedHeader]) return False - #print('***************************Verify '+signedHeader+': '+headers[signedHeader]) signedHeaderList.append( f'{signedHeader}: {headers[signedHeader]}') else: - signedHeaderCap=signedHeader.capitalize() - if signedHeaderCap=='Date': + signedHeaderCap = signedHeader.capitalize() + if signedHeaderCap == 'Date': if not verifyRecentSignature(headers[signedHeaderCap]): if debug: - print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader]) + print('DEBUG: ' + + 'verifyPostHeaders date is not recent ' + + headers[signedHeader]) return False - #print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap]) if headers.get(signedHeaderCap): signedHeaderList.append( f'{signedHeader}: {headers[signedHeaderCap]}') - #print('***********************signedHeaderList: ') - #pprint(signedHeaderList) if debug: - print('DEBUG: signedHeaderList: '+str(signedHeaderList)) + print('DEBUG: signedHeaderList: ' + str(signedHeaderList)) # Now we have our header data digest - signedHeaderText='\n'.join(signedHeaderList) - #print('***********************Verify: signedHeaderText: '+signedHeaderText) - headerDigest=SHA256.new(signedHeaderText.encode('ascii')) + signedHeaderText = '\n'.join(signedHeaderList) + headerDigest = SHA256.new(signedHeaderText.encode('ascii')) # Get the signature, verify with public key, return result - signature=base64.b64decode(signatureDict['signature']) + signature = base64.b64decode(signatureDict['signature']) try: pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)