More unit tests for signing get requests

merge-requests/30/head
Bob Mottram 2021-09-14 22:22:49 +01:00
parent 0f218abe23
commit 94db0a9520
4 changed files with 239 additions and 18 deletions

View File

@ -198,7 +198,7 @@ def signPostHeadersNew(dateStr: str, privateKeyPem: str,
return signatureIndexHeader, signatureHeader
def createSignedHeader(privateKeyPem: str, nickname: str,
def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
domain: str, port: int,
toDomain: str, toPort: int,
path: str, httpPrefix: str, withDigest: bool,
@ -207,10 +207,10 @@ def createSignedHeader(privateKeyPem: str, nickname: str,
"""
headerDomain = getFullDomain(toDomain, toPort)
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
if not dateStr:
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
if not withDigest:
contentType = \
'application/activity+json'
contentType = 'application/activity+json'
headers = {
'(request-target)': f'get {path}',
'host': headerDomain,
@ -319,6 +319,9 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
}
if debug:
print('signatureDict: ' + str(signatureDict))
# Unpack the signed headers and set values based on current headers and
# body (if a digest was included)
signedHeaderList = []
@ -414,10 +417,10 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
signedHeaderList.append(
f'{signedHeader}: {headers[signedHeaderCap]}')
if debug:
print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
# Now we have our header data digest
signedHeaderText = '\n'.join(signedHeaderList)
if debug:
print('signedHeaderText:\n' + signedHeaderText + 'END')
# Get the signature, verify with public key, return result
signature = None
@ -439,6 +442,10 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
headerDigest = getSHA256(signedHeaderText.encode('ascii'))
paddingStr = padding.PKCS1v15()
alg = hazutils.Prehashed(hashes.SHA256())
elif algorithm == 'rsa-sha512':
headerDigest = getSHA512(signedHeaderText.encode('ascii'))
paddingStr = padding.PKCS1v15()
alg = hazutils.Prehashed(hashes.SHA512())
else:
print('Unknown http signature algorithm: ' + algorithm)
paddingStr = padding.PKCS1v15()
@ -451,4 +458,4 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
except BaseException:
if debug:
print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
return False
return False

View File

@ -2242,7 +2242,7 @@ def sendPost(signingPrivateKeyPem: str, projectVersion: str,
# construct the http header, including the message body digest
signatureHeaderJson = \
createSignedHeader(privateKeyPem, nickname, domain, port,
createSignedHeader(None, privateKeyPem, nickname, domain, port,
toDomain, toPort,
postPath, httpPrefix, withDigest, postJsonStr)
@ -2601,7 +2601,7 @@ def sendSignedJson(postJsonObject: {}, session, baseDir: str,
# construct the http header, including the message body digest
signatureHeaderJson = \
createSignedHeader(privateKeyPem, nickname, domain, port,
createSignedHeader(None, privateKeyPem, nickname, domain, port,
toDomain, toPort,
postPath, httpPrefix, withDigest, postJsonStr)
# optionally add a token so that the receiving instance may access

View File

@ -180,17 +180,22 @@ def _getJsonSigned(session, url: str, domainFull: str, sessionHeaders: {},
else:
path = '/actor'
signatureHeaderJson = \
createSignedHeader(signingPrivateKeyPem, 'actor', domain, port,
createSignedHeader(None, signingPrivateKeyPem, 'actor', domain, port,
toDomain, toPort, path, httpPrefix, withDigest,
messageStr)
if debug:
print('Signed GET signatureHeaderJson ' + str(signatureHeaderJson))
for key, value in signatureHeaderJson.items():
if key.startswith('(') or key.startswith('*('):
continue
sessionHeaders[key.title()] = value
if sessionHeaders.get(key.lower()):
del sessionHeaders[key.lower()]
sessionHeaders['Host'] = signatureHeaderJson['host']
sessionHeaders['Date'] = signatureHeaderJson['date']
sessionHeaders['Accept'] = signatureHeaderJson['accept']
sessionHeaders['Signature'] = signatureHeaderJson['signature']
# update the session headers from the signature headers
# for key, value in signatureHeaderJson.items():
# if key.startswith('(') or key.startswith('*('):
# continue
# sessionHeaders[key.title()] = value
# if sessionHeaders.get(key.lower()):
# del sessionHeaders[key.lower()]
if debug:
print('Signed GET sessionHeaders ' + str(sessionHeaders))

213
tests.py
View File

@ -7,6 +7,13 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Testing"
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import utils as hazutils
import time
import os
import shutil
@ -16,6 +23,7 @@ from shutil import copyfile
from random import randint
from time import gmtime, strftime
from pprint import pprint
from httpsig import createSignedHeader
from httpsig import signPostHeaders
from httpsig import signPostHeadersNew
from httpsig import verifyPostHeaders
@ -42,6 +50,7 @@ from follow import clearFollowers
from follow import sendFollowRequestViaServer
from follow import sendUnfollowRequestViaServer
from siteactive import siteIsActive
from utils import getSHA256
from utils import dangerousSVG
from utils import canReplyTo
from utils import isGroupAccount
@ -164,8 +173,209 @@ thrBob = None
thrEve = None
def _testSignAndVerify() -> None:
print('testSignAndVerify')
publicKeyPem = \
'-----BEGIN RSA PUBLIC KEY-----\n' + \
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
'cnZxKzAGwd7v36APp7Pv6Q2jdsPBRrw\n' + \
'WEBnez6d0UDKDwGbc6nxfEXAy5mbhgajz' + \
'rw3MOEt8uA5txSKobBpKDeBLOsdJKFq\n' + \
'MGmXCQvEG7YemcxDTRPxAleIAgYYRjTSd' + \
'/QBwVW9OwNFhekro3RtlinV0a75jfZg\n' + \
'kne/YiktSvLG34lw2zqXBDTC5NHROUqGT' + \
'lML4PlNZS5Ri2U4aCNx2rUPRcKIlE0P\n' + \
'uKxI4T+HIaFpv8+rdV6eUgOrB2xeI1dSF' + \
'Fn/nnv5OoZJEIB+VmuKn3DCUcCZSFlQ\n' + \
'PSXSfBDiUGhwOw76WuSSsf1D4b/vLoJ10wIDAQAB\n' + \
'-----END RSA PUBLIC KEY-----\n'
privateKeyPem = \
'-----BEGIN RSA PRIVATE KEY-----\n' + \
'MIIEqAIBAAKCAQEAhAKYdtoeoy8zcAcR8' + \
'74L8cnZxKzAGwd7v36APp7Pv6Q2jdsP\n' + \
'BRrwWEBnez6d0UDKDwGbc6nxfEXAy5mbh' + \
'gajzrw3MOEt8uA5txSKobBpKDeBLOsd\n' + \
'JKFqMGmXCQvEG7YemcxDTRPxAleIAgYYR' + \
'jTSd/QBwVW9OwNFhekro3RtlinV0a75\n' + \
'jfZgkne/YiktSvLG34lw2zqXBDTC5NHRO' + \
'UqGTlML4PlNZS5Ri2U4aCNx2rUPRcKI\n' + \
'lE0PuKxI4T+HIaFpv8+rdV6eUgOrB2xeI' + \
'1dSFFn/nnv5OoZJEIB+VmuKn3DCUcCZ\n' + \
'SFlQPSXSfBDiUGhwOw76WuSSsf1D4b/vL' + \
'oJ10wIDAQABAoIBAG/JZuSWdoVHbi56\n' + \
'vjgCgkjg3lkO1KrO3nrdm6nrgA9P9qaPj' + \
'xuKoWaKO1cBQlE1pSWp/cKncYgD5WxE\n' + \
'CpAnRUXG2pG4zdkzCYzAh1i+c34L6oZoH' + \
'sirK6oNcEnHveydfzJL5934egm6p8DW\n' + \
'+m1RQ70yUt4uRc0YSor+q1LGJvGQHReF0' + \
'WmJBZHrhz5e63Pq7lE0gIwuBqL8SMaA\n' + \
'yRXtK+JGxZpImTq+NHvEWWCu09SCq0r83' + \
'8ceQI55SvzmTkwqtC+8AT2zFviMZkKR\n' + \
'Qo6SPsrqItxZWRty2izawTF0Bf5S2VAx7' + \
'O+6t3wBsQ1sLptoSgX3QblELY5asI0J\n' + \
'YFz7LJECgYkAsqeUJmqXE3LP8tYoIjMIA' + \
'KiTm9o6psPlc8CrLI9CH0UbuaA2JCOM\n' + \
'cCNq8SyYbTqgnWlB9ZfcAm/cFpA8tYci9' + \
'm5vYK8HNxQr+8FS3Qo8N9RJ8d0U5Csw\n' + \
'DzMYfRghAfUGwmlWj5hp1pQzAuhwbOXFt' + \
'xKHVsMPhz1IBtF9Y8jvgqgYHLbmyiu1\n' + \
'mwJ5AL0pYF0G7x81prlARURwHo0Yf52kE' + \
'w1dxpx+JXER7hQRWQki5/NsUEtv+8RT\n' + \
'qn2m6qte5DXLyn83b1qRscSdnCCwKtKWU' + \
'ug5q2ZbwVOCJCtmRwmnP131lWRYfj67\n' + \
'B/xJ1ZA6X3GEf4sNReNAtaucPEelgR2ns' + \
'N0gKQKBiGoqHWbK1qYvBxX2X3kbPDkv\n' + \
'9C+celgZd2PW7aGYLCHq7nPbmfDV0yHcW' + \
'jOhXZ8jRMjmANVR/eLQ2EfsRLdW69bn\n' + \
'f3ZD7JS1fwGnO3exGmHO3HZG+6AvberKY' + \
'VYNHahNFEw5TsAcQWDLRpkGybBcxqZo\n' + \
'81YCqlqidwfeO5YtlO7etx1xLyqa2NsCe' + \
'G9A86UjG+aeNnXEIDk1PDK+EuiThIUa\n' + \
'/2IxKzJKWl1BKr2d4xAfR0ZnEYuRrbeDQ' + \
'YgTImOlfW6/GuYIxKYgEKCFHFqJATAG\n' + \
'IxHrq1PDOiSwXd2GmVVYyEmhZnbcp8Cxa' + \
'EMQoevxAta0ssMK3w6UsDtvUvYvF22m\n' + \
'qQKBiD5GwESzsFPy3Ga0MvZpn3D6EJQLg' + \
'snrtUPZx+z2Ep2x0xc5orneB5fGyF1P\n' + \
'WtP+fG5Q6Dpdz3LRfm+KwBCWFKQjg7uTx' + \
'cjerhBWEYPmEMKYwTJF5PBG9/ddvHLQ\n' + \
'EQeNC8fHGg4UXU8mhHnSBt3EA10qQJfRD' + \
's15M38eG2cYwB1PZpDHScDnDA0=\n' + \
'-----END RSA PRIVATE KEY-----'
# sign
signedHeaderText = \
'(request-target): get /actor\n' + \
'host: octodon.social\n' + \
'date: Tue, 14 Sep 2021 16:19:00 GMT\n' + \
'accept: application/json'
headerDigest = getSHA256(signedHeaderText.encode('ascii'))
key = load_pem_private_key(privateKeyPem.encode('utf-8'),
None, backend=default_backend())
rawSignature = key.sign(headerDigest,
padding.PKCS1v15(),
hazutils.Prehashed(hashes.SHA256()))
signature1 = base64.b64encode(rawSignature).decode('ascii')
# verify
paddingStr = padding.PKCS1v15()
alg = hazutils.Prehashed(hashes.SHA256())
pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
backend=default_backend())
signature2 = base64.b64decode(signature1)
pubkey.verify(signature2, headerDigest, paddingStr, alg)
def _testHttpSignedGET():
print('testHttpSignedGET')
httpPrefix = 'https'
debug = True
boxpath = "/actor"
host = "octodon.social"
user_agent = "Epicyon/1.2.0; +https://epicyon.libreserver.org/"
dateStr = 'Tue, 14 Sep 2021 16:19:00 GMT'
content_length = "0"
accept_encoding = 'gzip'
accept = 'application/activity+json'
publicKeyPem = \
'-----BEGIN RSA PUBLIC KEY-----\n' + \
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
'cnZxKzAGwd7v36APp7Pv6Q2jdsPBRrw\n' + \
'WEBnez6d0UDKDwGbc6nxfEXAy5mbhgajz' + \
'rw3MOEt8uA5txSKobBpKDeBLOsdJKFq\n' + \
'MGmXCQvEG7YemcxDTRPxAleIAgYYRjTSd' + \
'/QBwVW9OwNFhekro3RtlinV0a75jfZg\n' + \
'kne/YiktSvLG34lw2zqXBDTC5NHROUqGT' + \
'lML4PlNZS5Ri2U4aCNx2rUPRcKIlE0P\n' + \
'uKxI4T+HIaFpv8+rdV6eUgOrB2xeI1dSF' + \
'Fn/nnv5OoZJEIB+VmuKn3DCUcCZSFlQ\n' + \
'PSXSfBDiUGhwOw76WuSSsf1D4b/vLoJ10wIDAQAB\n' + \
'-----END RSA PUBLIC KEY-----\n'
privateKeyPem = \
'-----BEGIN RSA PRIVATE KEY-----\n' + \
'MIIEqAIBAAKCAQEAhAKYdtoeoy8zcAcR8' + \
'74L8cnZxKzAGwd7v36APp7Pv6Q2jdsP\n' + \
'BRrwWEBnez6d0UDKDwGbc6nxfEXAy5mbh' + \
'gajzrw3MOEt8uA5txSKobBpKDeBLOsd\n' + \
'JKFqMGmXCQvEG7YemcxDTRPxAleIAgYYR' + \
'jTSd/QBwVW9OwNFhekro3RtlinV0a75\n' + \
'jfZgkne/YiktSvLG34lw2zqXBDTC5NHRO' + \
'UqGTlML4PlNZS5Ri2U4aCNx2rUPRcKI\n' + \
'lE0PuKxI4T+HIaFpv8+rdV6eUgOrB2xeI' + \
'1dSFFn/nnv5OoZJEIB+VmuKn3DCUcCZ\n' + \
'SFlQPSXSfBDiUGhwOw76WuSSsf1D4b/vL' + \
'oJ10wIDAQABAoIBAG/JZuSWdoVHbi56\n' + \
'vjgCgkjg3lkO1KrO3nrdm6nrgA9P9qaPj' + \
'xuKoWaKO1cBQlE1pSWp/cKncYgD5WxE\n' + \
'CpAnRUXG2pG4zdkzCYzAh1i+c34L6oZoH' + \
'sirK6oNcEnHveydfzJL5934egm6p8DW\n' + \
'+m1RQ70yUt4uRc0YSor+q1LGJvGQHReF0' + \
'WmJBZHrhz5e63Pq7lE0gIwuBqL8SMaA\n' + \
'yRXtK+JGxZpImTq+NHvEWWCu09SCq0r83' + \
'8ceQI55SvzmTkwqtC+8AT2zFviMZkKR\n' + \
'Qo6SPsrqItxZWRty2izawTF0Bf5S2VAx7' + \
'O+6t3wBsQ1sLptoSgX3QblELY5asI0J\n' + \
'YFz7LJECgYkAsqeUJmqXE3LP8tYoIjMIA' + \
'KiTm9o6psPlc8CrLI9CH0UbuaA2JCOM\n' + \
'cCNq8SyYbTqgnWlB9ZfcAm/cFpA8tYci9' + \
'm5vYK8HNxQr+8FS3Qo8N9RJ8d0U5Csw\n' + \
'DzMYfRghAfUGwmlWj5hp1pQzAuhwbOXFt' + \
'xKHVsMPhz1IBtF9Y8jvgqgYHLbmyiu1\n' + \
'mwJ5AL0pYF0G7x81prlARURwHo0Yf52kE' + \
'w1dxpx+JXER7hQRWQki5/NsUEtv+8RT\n' + \
'qn2m6qte5DXLyn83b1qRscSdnCCwKtKWU' + \
'ug5q2ZbwVOCJCtmRwmnP131lWRYfj67\n' + \
'B/xJ1ZA6X3GEf4sNReNAtaucPEelgR2ns' + \
'N0gKQKBiGoqHWbK1qYvBxX2X3kbPDkv\n' + \
'9C+celgZd2PW7aGYLCHq7nPbmfDV0yHcW' + \
'jOhXZ8jRMjmANVR/eLQ2EfsRLdW69bn\n' + \
'f3ZD7JS1fwGnO3exGmHO3HZG+6AvberKY' + \
'VYNHahNFEw5TsAcQWDLRpkGybBcxqZo\n' + \
'81YCqlqidwfeO5YtlO7etx1xLyqa2NsCe' + \
'G9A86UjG+aeNnXEIDk1PDK+EuiThIUa\n' + \
'/2IxKzJKWl1BKr2d4xAfR0ZnEYuRrbeDQ' + \
'YgTImOlfW6/GuYIxKYgEKCFHFqJATAG\n' + \
'IxHrq1PDOiSwXd2GmVVYyEmhZnbcp8Cxa' + \
'EMQoevxAta0ssMK3w6UsDtvUvYvF22m\n' + \
'qQKBiD5GwESzsFPy3Ga0MvZpn3D6EJQLg' + \
'snrtUPZx+z2Ep2x0xc5orneB5fGyF1P\n' + \
'WtP+fG5Q6Dpdz3LRfm+KwBCWFKQjg7uTx' + \
'cjerhBWEYPmEMKYwTJF5PBG9/ddvHLQ\n' + \
'EQeNC8fHGg4UXU8mhHnSBt3EA10qQJfRD' + \
's15M38eG2cYwB1PZpDHScDnDA0=\n' + \
'-----END RSA PRIVATE KEY-----'
signatureHeader = createSignedHeader(dateStr,
privateKeyPem, 'actor',
'epicyon.libreserver.org', 443,
host, 443,
boxpath, httpPrefix, False,
None)
signature = signatureHeader['signature']
print('')
print(signature)
print('')
headers = {
"user-agent": user_agent,
"content-length": content_length,
"host": host,
"date": dateStr,
"accept": accept,
"accept-encoding": accept_encoding,
"signature": signature
}
# headers = signatureHeader
# print('Test1--------------------------------------------')
# print('headers: ' + str(headers))
# assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
# boxpath, True, None,
# '', debug, True)
# print('Test2--------------------------------------------')
boxpath = "/users/Actor"
host = "epicyon.libreserver.org"
@ -215,8 +425,6 @@ def _testHttpSignedGET():
"accept-encoding": accept_encoding,
"signature": signature
}
httpPrefix = 'https'
debug = True
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, True, None,
'', debug, True)
@ -5550,6 +5758,7 @@ def runAllTests():
_translateOntology()
_testGetPriceFromString()
_testFunctions()
_testSignAndVerify()
_testDangerousSVG()
_testCanReplyTo()
_testDateConversions()