epicyon/httpsig.py

247 lines
11 KiB
Python
Raw Normal View History

2019-06-28 18:55:29 +00:00
__filename__ = "posts.py"
__author__ = "Bob Mottram"
__credits__ = ['lamia']
__license__ = "AGPL3+"
2019-12-14 10:52:19 +00:00
__version__ = "1.1.0"
2019-06-28 18:55:29 +00:00
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
2019-08-15 22:33:42 +00:00
# see https://tools.ietf.org/html/draft-cavage-http-signatures-06
2019-06-28 18:55:29 +00:00
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
#from Crypto.Signature import PKCS1_v1_5
from Crypto.Signature import pkcs1_15
from requests.auth import AuthBase
import base64
import json
2019-08-15 09:08:18 +00:00
from time import gmtime, strftime
2019-08-23 11:20:20 +00:00
import datetime
2019-08-16 13:47:01 +00:00
from pprint import pprint
2019-06-28 18:55:29 +00:00
def messageContentDigest(messageBodyJsonStr: str) -> str:
return base64.b64encode(SHA256.new(messageBodyJsonStr.encode('utf-8')).digest()).decode('utf-8')
2019-08-16 13:47:01 +00:00
def signPostHeaders(dateStr: str,privateKeyPem: str, \
nickname: str, \
domain: str,port: int, \
toDomain: str,toPort: int, \
path: str, \
httpPrefix: str, \
messageBodyJsonStr: str) -> str:
2019-06-28 18:55:29 +00:00
"""Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission.
"""
2019-08-16 13:47:01 +00:00
if port:
if port!=80 and port!=443:
if ':' not in domain:
domain=domain+':'+str(port)
2019-07-01 09:31:02 +00:00
2019-08-16 13:47:01 +00:00
if toPort:
if toPort!=80 and toPort!=443:
if ':' not in toDomain:
toDomain=toDomain+':'+str(port)
if not dateStr:
dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
keyID=httpPrefix+'://'+domain+'/users/'+nickname+'#main-key'
if not messageBodyJsonStr:
2019-11-09 21:39:04 +00:00
headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'content-type': 'application/json'}
2019-06-28 18:55:29 +00:00
else:
bodyDigest=messageContentDigest(messageBodyJsonStr)
2019-11-12 18:21:52 +00:00
contentLength=len(messageBodyJsonStr)
headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json','content-length': str(contentLength)}
privateKeyPem=RSA.import_key(privateKeyPem)
2019-08-15 21:34:25 +00:00
#headers.update({
# '(request-target)': f'post {path}',
#})
2019-06-28 18:55:29 +00:00
# build a digest for signing
signedHeaderKeys = headers.keys()
signedHeaderText = ''
for headerKey in signedHeaderKeys:
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
2019-08-23 11:20:20 +00:00
#print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}')
2019-06-28 18:55:29 +00:00
signedHeaderText = signedHeaderText.strip()
2019-08-23 11:20:20 +00:00
#print('******************************Send: signedHeaderText: '+signedHeaderText)
2019-06-28 18:55:29 +00:00
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
# Sign the digest
rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
signature = base64.b64encode(rawSignature).decode('ascii')
# Put it into a valid HTTP signature format
signatureDict = {
'keyId': keyID,
'algorithm': 'rsa-sha256',
'headers': ' '.join(signedHeaderKeys),
'signature': signature
}
signatureHeader = ','.join(
[f'{k}="{v}"' for k, v in signatureDict.items()])
return signatureHeader
2019-08-16 13:47:01 +00:00
def createSignedHeader(privateKeyPem: str,nickname: str, \
domain: str,port: int, \
toDomain: str,toPort: int, \
2019-07-03 19:00:03 +00:00
path: str,httpPrefix: str,withDigest: bool, \
messageBodyJsonStr: str) -> {}:
2019-08-16 13:47:01 +00:00
"""Note that the domain is the destination, not the sender
"""
2019-11-09 21:39:04 +00:00
contentType='application/activity+json'
2019-08-16 13:47:01 +00:00
headerDomain=toDomain
2019-07-01 09:31:02 +00:00
2019-08-16 13:47:01 +00:00
if toPort:
if toPort!=80 and toPort!=443:
if ':' not in headerDomain:
headerDomain=headerDomain+':'+str(toPort)
2019-07-01 09:31:02 +00:00
2019-08-15 18:21:43 +00:00
dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
2019-07-01 09:31:02 +00:00
if not withDigest:
2019-08-15 21:34:25 +00:00
headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr}
2019-08-16 10:36:41 +00:00
signatureHeader = \
2019-08-16 13:47:01 +00:00
signPostHeaders(dateStr,privateKeyPem,nickname, \
domain,port,toDomain,toPort, \
path,httpPrefix,None)
2019-07-01 09:31:02 +00:00
else:
bodyDigest=messageContentDigest(messageBodyJsonStr)
2019-11-12 13:01:56 +00:00
contentLength=len(messageBodyJsonStr)
2019-08-23 11:20:20 +00:00
#print('***************************Send (request-target): post '+path)
#print('***************************Send host: '+headerDomain)
#print('***************************Send date: '+dateStr)
#print('***************************Send digest: '+bodyDigest)
#print('***************************Send Content-type: '+contentType)
2019-11-12 19:23:57 +00:00
#print('***************************Send Content-Length: '+str(len(messageBodyJsonStr)))
2019-08-23 11:20:20 +00:00
#print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr)
2019-11-12 13:15:45 +00:00
headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-length': str(contentLength),'content-type': contentType}
2019-08-16 10:36:41 +00:00
signatureHeader = \
2019-08-16 13:47:01 +00:00
signPostHeaders(dateStr,privateKeyPem,nickname, \
domain,port, \
toDomain,toPort, \
path,httpPrefix,messageBodyJsonStr)
2019-07-01 09:31:02 +00:00
headers['signature'] = signatureHeader
return headers
2019-08-23 11:30:37 +00:00
def verifyRecentSignature(signedDateStr: str) -> bool:
2019-08-23 11:31:46 +00:00
"""Checks whether the given time taken from the header is within
12 hours of the current time
"""
2019-08-23 11:30:37 +00:00
currDate=datetime.datetime.utcnow()
signedDate=datetime.datetime.strptime(signedDateStr,"%a, %d %b %Y %H:%M:%S %Z")
timeDiffSec=(currDate-signedDate).seconds
2019-08-23 11:39:16 +00:00
# 12 hours tollerance
if timeDiffSec > 43200:
2019-08-23 11:30:37 +00:00
print('WARN: Header signed too long ago: '+signedDateStr)
print(str(timeDiffSec/(60*60))+' hours')
return False
if timeDiffSec < 0:
print('WARN: Header signed in the future! '+signedDateStr)
print(str(timeDiffSec/(60*60))+' hours')
2019-08-23 11:30:37 +00:00
return False
return True
2019-08-15 21:34:25 +00:00
def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
path: str,GETmethod: bool, \
messageBodyDigest: str, \
2019-11-12 15:03:17 +00:00
messageBodyJsonStr: str,debug: bool) -> bool:
2019-06-28 18:55:29 +00:00
"""Returns true or false depending on if the key that we plugged in here
validates against the headers, method, and path.
publicKeyPem - the public key from an rsa key pair
headers - should be a dictionary of request headers
path - the relative url that was requested from this site
GETmethod - GET or POST
2019-07-01 09:31:02 +00:00
messageBodyJsonStr - the received request body (used for digest)
2019-06-28 18:55:29 +00:00
"""
2019-08-23 11:20:20 +00:00
2019-06-28 18:55:29 +00:00
if GETmethod:
method='GET'
else:
method='POST'
2019-11-12 15:03:17 +00:00
if debug:
print('DEBUG: verifyPostHeaders '+method)
2019-06-28 18:55:29 +00:00
publicKeyPem = RSA.import_key(publicKeyPem)
# Build a dictionary of the signature values
signatureHeader = headers['signature']
signatureDict = {
k: v[1:-1]
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
}
2019-08-23 10:57:27 +00:00
#print('********************signatureHeader: '+str(signatureHeader))
#print('********************signatureDict: '+str(signatureDict))
2019-06-28 18:55:29 +00:00
# Unpack the signed headers and set values based on current headers and
# body (if a digest was included)
signedHeaderList = []
for signedHeader in signatureDict['headers'].split(' '):
2019-11-12 15:03:17 +00:00
if debug:
print('DEBUG: verifyPostHeaders signedHeader='+signedHeader)
2019-06-28 18:55:29 +00:00
if signedHeader == '(request-target)':
signedHeaderList.append(
f'(request-target): {method.lower()} {path}')
2019-08-23 10:57:27 +00:00
#print('***************************Verify (request-target): '+method.lower()+' '+path)
2019-06-28 18:55:29 +00:00
elif signedHeader == 'digest':
if messageBodyDigest:
bodyDigest=messageBodyDigest
else:
2019-11-12 15:25:47 +00:00
bodyDigest=messageContentDigest(messageBodyJsonStr)
2019-06-28 18:55:29 +00:00
signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
2019-08-23 10:57:27 +00:00
#print('***************************Verify digest: SHA-256='+bodyDigest)
#print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr)
2019-11-12 18:48:29 +00:00
elif signedHeader == 'content-length':
2019-11-12 19:20:55 +00:00
if headers.get(signedHeader):
signedHeaderList.append(f'content-length: {headers[signedHeader]}')
2019-11-12 17:16:34 +00:00
else:
2019-11-12 19:32:23 +00:00
if headers.get('Content-Length'):
contentLength=headers['Content-Length']
signedHeaderList.append(f'content-length: {contentLength}')
else:
if headers.get('Content-length'):
contentLength=headers['Content-length']
signedHeaderList.append(f'content-length: {contentLength}')
else:
if debug:
print('DEBUG: verifyPostHeaders '+signedHeader+' not found in '+str(headers))
2019-06-28 18:55:29 +00:00
else:
2019-08-15 21:34:25 +00:00
if headers.get(signedHeader):
2019-11-12 16:48:05 +00:00
if signedHeader=='date':
2019-08-23 11:30:37 +00:00
if not verifyRecentSignature(headers[signedHeader]):
2019-11-12 15:03:17 +00:00
if debug:
print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader])
2019-08-23 11:30:37 +00:00
return False
2019-08-23 10:57:27 +00:00
#print('***************************Verify '+signedHeader+': '+headers[signedHeader])
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeader]}')
2019-08-15 21:34:25 +00:00
else:
signedHeaderCap=signedHeader.capitalize()
2019-11-12 16:48:05 +00:00
if signedHeaderCap=='Date':
2019-08-23 11:30:37 +00:00
if not verifyRecentSignature(headers[signedHeaderCap]):
2019-11-12 15:03:17 +00:00
if debug:
print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader])
2019-08-23 11:30:37 +00:00
return False
2019-08-23 10:57:27 +00:00
#print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap])
2019-08-15 21:34:25 +00:00
if headers.get(signedHeaderCap):
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeaderCap]}')
2019-06-28 18:55:29 +00:00
2019-08-23 10:57:27 +00:00
#print('***********************signedHeaderList: ')
#pprint(signedHeaderList)
2019-11-12 15:25:47 +00:00
if debug:
print('DEBUG: signedHeaderList: '+str(signedHeaderList))
2019-06-28 18:55:29 +00:00
# Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList)
2019-08-23 10:57:27 +00:00
#print('***********************Verify: signedHeaderText: '+signedHeaderText)
2019-06-28 18:55:29 +00:00
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
# Get the signature, verify with public key, return result
signature = base64.b64decode(signatureDict['signature'])
try:
pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
return True
except (ValueError, TypeError):
2019-11-12 15:03:17 +00:00
if debug:
print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
2019-06-28 18:55:29 +00:00
return False