2019-06-28 18:55:29 +00:00
|
|
|
__filename__ = "posts.py"
|
|
|
|
__author__ = "Bob Mottram"
|
|
|
|
__credits__ = ['lamia']
|
|
|
|
__license__ = "AGPL3+"
|
|
|
|
__version__ = "0.0.1"
|
|
|
|
__maintainer__ = "Bob Mottram"
|
|
|
|
__email__ = "bob@freedombone.net"
|
|
|
|
__status__ = "Production"
|
|
|
|
|
|
|
|
from Crypto.PublicKey import RSA
|
|
|
|
from Crypto.Hash import SHA256
|
|
|
|
#from Crypto.Signature import PKCS1_v1_5
|
|
|
|
from Crypto.Signature import pkcs1_15
|
|
|
|
from requests.auth import AuthBase
|
|
|
|
import base64
|
|
|
|
import json
|
|
|
|
|
2019-07-03 09:40:27 +00:00
|
|
|
def signPostHeaders(privateKeyPem: str, nickname: str, domain: str, \
|
2019-07-02 20:54:22 +00:00
|
|
|
port: int,path: str, \
|
|
|
|
https: bool, messageBodyJson: {}) -> str:
|
2019-06-28 18:55:29 +00:00
|
|
|
"""Returns a raw signature string that can be plugged into a header and
|
|
|
|
used to verify the authenticity of an HTTP transmission.
|
|
|
|
"""
|
|
|
|
prefix='https'
|
|
|
|
if not https:
|
2019-07-01 09:31:02 +00:00
|
|
|
prefix='http'
|
|
|
|
|
|
|
|
if port!=80 and port!=443:
|
|
|
|
domain=domain+':'+str(port)
|
|
|
|
|
2019-07-03 09:40:27 +00:00
|
|
|
keyID = prefix+'://'+domain+'/users/'+nickname+'/main-key'
|
2019-06-28 18:55:29 +00:00
|
|
|
if not messageBodyJson:
|
|
|
|
headers = {'host': domain}
|
|
|
|
else:
|
2019-07-02 20:54:22 +00:00
|
|
|
bodyDigest = \
|
|
|
|
base64.b64encode(SHA256.new(messageBodyJson.encode()).digest())
|
2019-06-28 18:55:29 +00:00
|
|
|
headers = {'host': domain, 'digest': f'SHA-256={bodyDigest}'}
|
|
|
|
privateKeyPem = RSA.import_key(privateKeyPem)
|
|
|
|
headers.update({
|
|
|
|
'(request-target)': f'post {path}',
|
|
|
|
})
|
|
|
|
# build a digest for signing
|
|
|
|
signedHeaderKeys = headers.keys()
|
|
|
|
signedHeaderText = ''
|
|
|
|
for headerKey in signedHeaderKeys:
|
|
|
|
signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
|
|
|
|
signedHeaderText = signedHeaderText.strip()
|
|
|
|
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
|
|
|
|
|
|
|
|
# Sign the digest
|
|
|
|
rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
|
|
|
|
signature = base64.b64encode(rawSignature).decode('ascii')
|
|
|
|
|
|
|
|
# Put it into a valid HTTP signature format
|
|
|
|
signatureDict = {
|
|
|
|
'keyId': keyID,
|
|
|
|
'algorithm': 'rsa-sha256',
|
|
|
|
'headers': ' '.join(signedHeaderKeys),
|
|
|
|
'signature': signature
|
|
|
|
}
|
|
|
|
signatureHeader = ','.join(
|
|
|
|
[f'{k}="{v}"' for k, v in signatureDict.items()])
|
|
|
|
return signatureHeader
|
|
|
|
|
2019-07-03 09:40:27 +00:00
|
|
|
def createSignedHeader(privateKeyPem: str,nickname: str,domain: str,port: int, \
|
2019-07-02 20:54:22 +00:00
|
|
|
path: str,https: bool,withDigest: bool, \
|
|
|
|
messageBodyJson: {}) -> {}:
|
2019-07-01 09:31:02 +00:00
|
|
|
headerDomain=domain
|
|
|
|
|
|
|
|
if port!=80 and port!=443:
|
|
|
|
headerDomain=headerDomain+':'+str(port)
|
|
|
|
|
|
|
|
if not withDigest:
|
|
|
|
headers = {'host': headerDomain}
|
|
|
|
else:
|
|
|
|
messageBodyJsonStr=json.dumps(messageBodyJson)
|
2019-07-02 20:54:22 +00:00
|
|
|
bodyDigest = \
|
|
|
|
base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest())
|
2019-07-01 09:31:02 +00:00
|
|
|
headers = {'host': headerDomain, 'digest': f'SHA-256={bodyDigest}'}
|
|
|
|
path='/inbox'
|
2019-07-03 09:40:27 +00:00
|
|
|
signatureHeader = signPostHeaders(privateKeyPem, nickname, domain, port, \
|
2019-07-02 20:54:22 +00:00
|
|
|
path, https, None)
|
2019-07-01 09:31:02 +00:00
|
|
|
headers['signature'] = signatureHeader
|
2019-07-01 11:09:09 +00:00
|
|
|
headers['Content-type'] = 'application/json'
|
2019-07-01 09:31:02 +00:00
|
|
|
return headers
|
|
|
|
|
2019-07-02 20:54:22 +00:00
|
|
|
def verifyPostHeaders(https: bool, publicKeyPem: str, headers: dict, \
|
|
|
|
path: str, GETmethod: bool, \
|
|
|
|
messageBodyJsonStr: str) -> bool:
|
2019-06-28 18:55:29 +00:00
|
|
|
"""Returns true or false depending on if the key that we plugged in here
|
|
|
|
validates against the headers, method, and path.
|
|
|
|
publicKeyPem - the public key from an rsa key pair
|
|
|
|
headers - should be a dictionary of request headers
|
|
|
|
path - the relative url that was requested from this site
|
|
|
|
GETmethod - GET or POST
|
2019-07-01 09:31:02 +00:00
|
|
|
messageBodyJsonStr - the received request body (used for digest)
|
2019-06-28 18:55:29 +00:00
|
|
|
"""
|
|
|
|
if GETmethod:
|
|
|
|
method='GET'
|
|
|
|
else:
|
|
|
|
method='POST'
|
|
|
|
|
|
|
|
prefix='https'
|
|
|
|
if not https:
|
|
|
|
prefix='http'
|
|
|
|
|
|
|
|
publicKeyPem = RSA.import_key(publicKeyPem)
|
|
|
|
# Build a dictionary of the signature values
|
|
|
|
signatureHeader = headers['signature']
|
|
|
|
signatureDict = {
|
|
|
|
k: v[1:-1]
|
|
|
|
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
|
|
|
|
}
|
|
|
|
|
|
|
|
# Unpack the signed headers and set values based on current headers and
|
|
|
|
# body (if a digest was included)
|
|
|
|
signedHeaderList = []
|
|
|
|
for signedHeader in signatureDict['headers'].split(' '):
|
|
|
|
if signedHeader == '(request-target)':
|
|
|
|
signedHeaderList.append(
|
|
|
|
f'(request-target): {method.lower()} {path}')
|
|
|
|
elif signedHeader == 'digest':
|
2019-07-02 20:54:22 +00:00
|
|
|
bodyDigest = \
|
|
|
|
base64.b64encode(SHA256.new(messageBodyJsonStr.encode()).digest())
|
2019-06-28 18:55:29 +00:00
|
|
|
signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
|
|
|
|
else:
|
|
|
|
signedHeaderList.append(
|
|
|
|
f'{signedHeader}: {headers[signedHeader]}')
|
|
|
|
|
|
|
|
# Now we have our header data digest
|
|
|
|
signedHeaderText = '\n'.join(signedHeaderList)
|
|
|
|
headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
|
|
|
|
|
|
|
|
# Get the signature, verify with public key, return result
|
|
|
|
signature = base64.b64decode(signatureDict['signature'])
|
|
|
|
|
|
|
|
try:
|
|
|
|
pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
|
|
|
|
return True
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
return False
|