Fixing get request signing

merge-requests/30/head
Bob Mottram 2021-09-15 11:44:44 +01:00
parent 7652da1d91
commit a8fa0e92d5
3 changed files with 207 additions and 263 deletions

View File

@ -640,12 +640,13 @@ if args.tests:
sys.exit() sys.exit()
if args.testsnetwork: if args.testsnetwork:
print('Network Tests') print('Network Tests')
testSharedItemsFederation() baseDir = os.getcwd()
testGroupFollow() testSharedItemsFederation(baseDir)
testPostMessageBetweenServers() testGroupFollow(baseDir)
testFollowBetweenServers() testPostMessageBetweenServers(baseDir)
testClientToServer() testFollowBetweenServers(baseDir)
testUpdateActor() testClientToServer(baseDir)
testUpdateActor(baseDir)
print('All tests succeeded') print('All tests succeeded')
sys.exit() sys.exit()

View File

@ -40,7 +40,8 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
toDomain: str, toPort: int, toDomain: str, toPort: int,
path: str, path: str,
httpPrefix: str, httpPrefix: str,
messageBodyJsonStr: str) -> str: messageBodyJsonStr: str,
contentType: str) -> str:
"""Returns a raw signature string that can be plugged into a header and """Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission. used to verify the authenticity of an HTTP transmission.
""" """
@ -61,7 +62,7 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
'(request-target)': f'get {path}', '(request-target)': f'get {path}',
'host': toDomain, 'host': toDomain,
'date': dateStr, 'date': dateStr,
'accept': 'application/json' 'accept': contentType
} }
else: else:
bodyDigest = messageContentDigest(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr)
@ -208,10 +209,14 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
""" """
headerDomain = getFullDomain(toDomain, toPort) headerDomain = getFullDomain(toDomain, toPort)
# if no date is given then create one
if not dateStr: if not dateStr:
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
# Content-Type or Accept header
if not contentType: if not contentType:
contentType = 'application/activity+json' contentType = 'application/activity+json'
if not withDigest: if not withDigest:
headers = { headers = {
'(request-target)': f'get {path}', '(request-target)': f'get {path}',
@ -222,7 +227,7 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
signatureHeader = \ signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname, signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port, toDomain, toPort, domain, port, toDomain, toPort,
path, httpPrefix, None) path, httpPrefix, None, contentType)
else: else:
bodyDigest = messageContentDigest(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr) contentLength = len(messageBodyJsonStr)
@ -238,7 +243,8 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
signPostHeaders(dateStr, privateKeyPem, nickname, signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port, domain, port,
toDomain, toPort, toDomain, toPort,
path, httpPrefix, messageBodyJsonStr) path, httpPrefix, messageBodyJsonStr,
contentType)
headers['signature'] = signatureHeader headers['signature'] = signatureHeader
return headers return headers
@ -437,6 +443,9 @@ def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
else: else:
# Original Mastodon signature # Original Mastodon signature
signature = base64.b64decode(signatureDict['signature']) signature = base64.b64decode(signatureDict['signature'])
if debug:
print('signature: ' + algorithm + ' ' +
signatureDict['signature'])
# If extra signing algorithms need to be added then do it here # If extra signing algorithms need to be added then do it here
if algorithm == 'rsa-sha256': if algorithm == 'rsa-sha256':

440
tests.py
View File

@ -173,6 +173,122 @@ thrBob = None
thrEve = None thrEve = None
def _testHttpSignedGET(baseDir: str):
print('testHttpSignedGET')
httpPrefix = 'https'
debug = True
boxpath = "/users/Actor"
host = "epicyon.libreserver.org"
content_length = "0"
user_agent = "http.rb/4.4.1 (Mastodon/3.4.1; +https://octodon.social/)"
dateStr = 'Wed, 01 Sep 2021 16:11:10 GMT'
accept_encoding = 'gzip'
accept = \
'application/activity+json, application/ld+json'
signature = \
'keyId="https://octodon.social/actor#main-key",' + \
'algorithm="rsa-sha256",' + \
'headers="(request-target) host date accept",' + \
'signature="Fe53PS9A2OSP4x+W/svhA' + \
'jUKHBvnAR73Ez+H32au7DQklLk08Lvm8al' + \
'LS7pCor28yfyx+DfZADgq6G1mLLRZo0OOn' + \
'PFSog7DhdcygLhBUMS0KlT5KVGwUS0tw' + \
'jdiHv4OC83RiCr/ZySBgOv65YLHYmGCi5B' + \
'IqSZJRkqi8+SLmLGESlNOEzKu+jIxOBY' + \
'mEEdIpNrDeE5YrFKpfTC3vS2GnxGOo5J/4' + \
'lB2h+dlUpso+sv5rDz1d1FsqRWK8waV7' + \
'4HUfLV+qbgYRceOTyZIi50vVqLvt9CTQes' + \
'KZHG3GrrPfaBuvoUbR4MCM3BUvpB7EzL' + \
'9F17Y+Ea9mo8zjqzZm8HaZQ=="'
publicKeyPem = \
'-----BEGIN PUBLIC KEY-----\n' + \
'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMII' + \
'BCgKCAQEA1XT+ov/i4LDYuaXCwh4r\n' + \
'2rVfWtnz68wnFx3knwymwtRoAc/SFGzp9ye' + \
'5ogG1uPcbe7MeirZHhaBICynPlL32\n' + \
's9OYootI7MsQWn+vu7azxiXO7qcTPByvGcl' + \
'0vpLhtT/ApmlMintkRTVXdzBdJVM0\n' + \
'UsmYKg6U+IHNL+a1gURHGXep2Ih0BJMh4Aa' + \
'DbaID6jtpJZvbIkYgJ4IJucOe+A3T\n' + \
'YPMwkBA84ew+hso+vKQfTunyDInuPQbEzrA' + \
'zMJXEHS7IpBhdS4/cEox86BoDJ/q0\n' + \
'KOEOUpUDniFYWb9k1+9B387OviRDLIcLxNZ' + \
'nf+bNq8d+CwEXY2xGsToBle/q74d8\n' + \
'BwIDAQAB\n' + \
'-----END PUBLIC KEY-----\n'
headers = {
"user-agent": user_agent,
"content-length": content_length,
"host": host,
"date": dateStr,
"accept": accept,
"accept-encoding": accept_encoding,
"signature": signature
}
GETmethod = True
messageBodyDigest = None
messageBodyJsonStr = ''
noRecencyCheck = True
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, messageBodyDigest,
messageBodyJsonStr, debug, noRecencyCheck)
# Change a single character and the signature should fail
headers['date'] = headers['date'].replace(':10', ':11')
assert not verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, messageBodyDigest,
messageBodyJsonStr, debug, noRecencyCheck)
path = baseDir + '/.testHttpsigGET'
if os.path.isdir(path):
shutil.rmtree(path)
os.mkdir(path)
os.chdir(path)
nickname = 'testactor'
hostDomain = 'someother.instance'
domain = 'argumentative.social'
httpPrefix = 'https'
port = 443
withDigest = False
password = 'SuperSecretPassword'
noRecencyCheck = True
privateKeyPem, publicKeyPem, person, wfEndpoint = \
createPerson(path, nickname, domain, port, httpPrefix,
False, False, password)
assert privateKeyPem
assert publicKeyPem
messageBodyJsonStr = ''
headersDomain = getFullDomain(hostDomain, port)
dateStr = 'Tue, 14 Sep 2021 16:19:00 GMT'
boxpath = '/inbox'
accept = 'application/json'
# accept = 'application/activity+json'
headers = {
'user-agent': 'Epicyon/1.2.0; +https://' + domain + '/',
'host': headersDomain,
'date': dateStr,
'accept': accept,
'content-length': 0
}
signatureHeader = createSignedHeader(dateStr,
privateKeyPem, nickname,
domain, port,
hostDomain, port,
boxpath, httpPrefix, False,
None, accept)
headers['signature'] = signatureHeader['signature']
GETmethod = not withDigest
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, GETmethod, None,
messageBodyJsonStr, debug, noRecencyCheck)
if os.path.isdir(path):
shutil.rmtree(path)
def _testSignAndVerify() -> None: def _testSignAndVerify() -> None:
print('testSignAndVerify') print('testSignAndVerify')
publicKeyPem = \ publicKeyPem = \
@ -267,174 +383,6 @@ def _testSignAndVerify() -> None:
pubkey.verify(signature2, headerDigest, paddingStr, alg) pubkey.verify(signature2, headerDigest, paddingStr, alg)
def _testHttpSignedGET():
print('testHttpSignedGET')
httpPrefix = 'https'
debug = True
boxpath = "/actor"
host = "octodon.social"
user_agent = "Epicyon/1.2.0; +https://epicyon.libreserver.org/"
dateStr = 'Tue, 14 Sep 2021 16:19:00 GMT'
content_length = "0"
accept_encoding = 'gzip'
accept = 'application/activity+json'
publicKeyPem = \
'-----BEGIN RSA PUBLIC KEY-----\n' + \
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
'cnZxKzAGwd7v36APp7Pv6Q2jdsPBRrw\n' + \
'WEBnez6d0UDKDwGbc6nxfEXAy5mbhgajz' + \
'rw3MOEt8uA5txSKobBpKDeBLOsdJKFq\n' + \
'MGmXCQvEG7YemcxDTRPxAleIAgYYRjTSd' + \
'/QBwVW9OwNFhekro3RtlinV0a75jfZg\n' + \
'kne/YiktSvLG34lw2zqXBDTC5NHROUqGT' + \
'lML4PlNZS5Ri2U4aCNx2rUPRcKIlE0P\n' + \
'uKxI4T+HIaFpv8+rdV6eUgOrB2xeI1dSF' + \
'Fn/nnv5OoZJEIB+VmuKn3DCUcCZSFlQ\n' + \
'PSXSfBDiUGhwOw76WuSSsf1D4b/vLoJ10wIDAQAB\n' + \
'-----END RSA PUBLIC KEY-----\n'
privateKeyPem = \
'-----BEGIN RSA PRIVATE KEY-----\n' + \
'MIIEqAIBAAKCAQEAhAKYdtoeoy8zcAcR8' + \
'74L8cnZxKzAGwd7v36APp7Pv6Q2jdsP\n' + \
'BRrwWEBnez6d0UDKDwGbc6nxfEXAy5mbh' + \
'gajzrw3MOEt8uA5txSKobBpKDeBLOsd\n' + \
'JKFqMGmXCQvEG7YemcxDTRPxAleIAgYYR' + \
'jTSd/QBwVW9OwNFhekro3RtlinV0a75\n' + \
'jfZgkne/YiktSvLG34lw2zqXBDTC5NHRO' + \
'UqGTlML4PlNZS5Ri2U4aCNx2rUPRcKI\n' + \
'lE0PuKxI4T+HIaFpv8+rdV6eUgOrB2xeI' + \
'1dSFFn/nnv5OoZJEIB+VmuKn3DCUcCZ\n' + \
'SFlQPSXSfBDiUGhwOw76WuSSsf1D4b/vL' + \
'oJ10wIDAQABAoIBAG/JZuSWdoVHbi56\n' + \
'vjgCgkjg3lkO1KrO3nrdm6nrgA9P9qaPj' + \
'xuKoWaKO1cBQlE1pSWp/cKncYgD5WxE\n' + \
'CpAnRUXG2pG4zdkzCYzAh1i+c34L6oZoH' + \
'sirK6oNcEnHveydfzJL5934egm6p8DW\n' + \
'+m1RQ70yUt4uRc0YSor+q1LGJvGQHReF0' + \
'WmJBZHrhz5e63Pq7lE0gIwuBqL8SMaA\n' + \
'yRXtK+JGxZpImTq+NHvEWWCu09SCq0r83' + \
'8ceQI55SvzmTkwqtC+8AT2zFviMZkKR\n' + \
'Qo6SPsrqItxZWRty2izawTF0Bf5S2VAx7' + \
'O+6t3wBsQ1sLptoSgX3QblELY5asI0J\n' + \
'YFz7LJECgYkAsqeUJmqXE3LP8tYoIjMIA' + \
'KiTm9o6psPlc8CrLI9CH0UbuaA2JCOM\n' + \
'cCNq8SyYbTqgnWlB9ZfcAm/cFpA8tYci9' + \
'm5vYK8HNxQr+8FS3Qo8N9RJ8d0U5Csw\n' + \
'DzMYfRghAfUGwmlWj5hp1pQzAuhwbOXFt' + \
'xKHVsMPhz1IBtF9Y8jvgqgYHLbmyiu1\n' + \
'mwJ5AL0pYF0G7x81prlARURwHo0Yf52kE' + \
'w1dxpx+JXER7hQRWQki5/NsUEtv+8RT\n' + \
'qn2m6qte5DXLyn83b1qRscSdnCCwKtKWU' + \
'ug5q2ZbwVOCJCtmRwmnP131lWRYfj67\n' + \
'B/xJ1ZA6X3GEf4sNReNAtaucPEelgR2ns' + \
'N0gKQKBiGoqHWbK1qYvBxX2X3kbPDkv\n' + \
'9C+celgZd2PW7aGYLCHq7nPbmfDV0yHcW' + \
'jOhXZ8jRMjmANVR/eLQ2EfsRLdW69bn\n' + \
'f3ZD7JS1fwGnO3exGmHO3HZG+6AvberKY' + \
'VYNHahNFEw5TsAcQWDLRpkGybBcxqZo\n' + \
'81YCqlqidwfeO5YtlO7etx1xLyqa2NsCe' + \
'G9A86UjG+aeNnXEIDk1PDK+EuiThIUa\n' + \
'/2IxKzJKWl1BKr2d4xAfR0ZnEYuRrbeDQ' + \
'YgTImOlfW6/GuYIxKYgEKCFHFqJATAG\n' + \
'IxHrq1PDOiSwXd2GmVVYyEmhZnbcp8Cxa' + \
'EMQoevxAta0ssMK3w6UsDtvUvYvF22m\n' + \
'qQKBiD5GwESzsFPy3Ga0MvZpn3D6EJQLg' + \
'snrtUPZx+z2Ep2x0xc5orneB5fGyF1P\n' + \
'WtP+fG5Q6Dpdz3LRfm+KwBCWFKQjg7uTx' + \
'cjerhBWEYPmEMKYwTJF5PBG9/ddvHLQ\n' + \
'EQeNC8fHGg4UXU8mhHnSBt3EA10qQJfRD' + \
's15M38eG2cYwB1PZpDHScDnDA0=\n' + \
'-----END RSA PRIVATE KEY-----'
signatureHeader = createSignedHeader(dateStr,
privateKeyPem, 'actor',
'epicyon.libreserver.org', 443,
host, 443,
boxpath, httpPrefix, False,
None, accept)
signature = signatureHeader['signature']
print('')
print(signature)
print('')
headers = {
"user-agent": user_agent,
"content-length": content_length,
"host": host,
"date": dateStr,
"accept": accept,
"accept-encoding": accept_encoding,
"signature": signature
}
# headers = signatureHeader
# print('Test1--------------------------------------------')
# print('headers: ' + str(headers))
# assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
# boxpath, True, None,
# '', debug, True)
# print('Test2--------------------------------------------')
boxpath = "/users/Actor"
host = "epicyon.libreserver.org"
content_length = "0"
user_agent = "http.rb/4.4.1 (Mastodon/3.4.1; +https://octodon.social/)"
dateStr = 'Wed, 01 Sep 2021 16:11:10 GMT'
accept_encoding = 'gzip'
accept = \
'application/activity+json, application/ld+json'
signature = \
'keyId="https://octodon.social/actor#main-key",' + \
'algorithm="rsa-sha256",' + \
'headers="(request-target) host date accept",' + \
'signature="Fe53PS9A2OSP4x+W/svhA' + \
'jUKHBvnAR73Ez+H32au7DQklLk08Lvm8al' + \
'LS7pCor28yfyx+DfZADgq6G1mLLRZo0OOn' + \
'PFSog7DhdcygLhBUMS0KlT5KVGwUS0tw' + \
'jdiHv4OC83RiCr/ZySBgOv65YLHYmGCi5B' + \
'IqSZJRkqi8+SLmLGESlNOEzKu+jIxOBY' + \
'mEEdIpNrDeE5YrFKpfTC3vS2GnxGOo5J/4' + \
'lB2h+dlUpso+sv5rDz1d1FsqRWK8waV7' + \
'4HUfLV+qbgYRceOTyZIi50vVqLvt9CTQes' + \
'KZHG3GrrPfaBuvoUbR4MCM3BUvpB7EzL' + \
'9F17Y+Ea9mo8zjqzZm8HaZQ=="'
publicKeyPem = \
'-----BEGIN PUBLIC KEY-----\n' + \
'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMII' + \
'BCgKCAQEA1XT+ov/i4LDYuaXCwh4r\n' + \
'2rVfWtnz68wnFx3knwymwtRoAc/SFGzp9ye' + \
'5ogG1uPcbe7MeirZHhaBICynPlL32\n' + \
's9OYootI7MsQWn+vu7azxiXO7qcTPByvGcl' + \
'0vpLhtT/ApmlMintkRTVXdzBdJVM0\n' + \
'UsmYKg6U+IHNL+a1gURHGXep2Ih0BJMh4Aa' + \
'DbaID6jtpJZvbIkYgJ4IJucOe+A3T\n' + \
'YPMwkBA84ew+hso+vKQfTunyDInuPQbEzrA' + \
'zMJXEHS7IpBhdS4/cEox86BoDJ/q0\n' + \
'KOEOUpUDniFYWb9k1+9B387OviRDLIcLxNZ' + \
'nf+bNq8d+CwEXY2xGsToBle/q74d8\n' + \
'BwIDAQAB\n' + \
'-----END PUBLIC KEY-----\n'
headers = {
"user-agent": user_agent,
"content-length": content_length,
"host": host,
"date": dateStr,
"accept": accept,
"accept-encoding": accept_encoding,
"signature": signature
}
assert verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, True, None,
'', debug, True)
# Change a single character and the signature should fail
headers['date'] = headers['date'].replace(':10', ':11')
assert not verifyPostHeaders(httpPrefix, publicKeyPem, headers,
boxpath, True, None,
'', debug, True)
def _testHttpSigNew(): def _testHttpSigNew():
print('testHttpSigNew') print('testHttpSigNew')
messageBodyJson = {"hello": "world"} messageBodyJson = {"hello": "world"}
@ -604,10 +552,9 @@ def _testHttpSigNew():
'/aYa/GhW2pSrctDnAKIi4imj9joppr3CB8gqgXZOPQ==:' '/aYa/GhW2pSrctDnAKIi4imj9joppr3CB8gqgXZOPQ==:'
def _testHttpsigBase(withDigest): def _testHttpsigBase(withDigest: bool, baseDir: str):
print('testHttpsig(' + str(withDigest) + ')') print('testHttpsig(' + str(withDigest) + ')')
baseDir = os.getcwd()
path = baseDir + '/.testHttpsigBase' path = baseDir + '/.testHttpsigBase'
if os.path.isdir(path): if os.path.isdir(path):
shutil.rmtree(path) shutil.rmtree(path)
@ -616,6 +563,7 @@ def _testHttpsigBase(withDigest):
contentType = 'application/activity+json' contentType = 'application/activity+json'
nickname = 'socrates' nickname = 'socrates'
hostDomain = 'someother.instance'
domain = 'argumentative.social' domain = 'argumentative.social'
httpPrefix = 'https' httpPrefix = 'https'
port = 5576 port = 5576
@ -624,14 +572,17 @@ def _testHttpsigBase(withDigest):
createPerson(path, nickname, domain, port, httpPrefix, createPerson(path, nickname, domain, port, httpPrefix,
False, False, password) False, False, password)
assert privateKeyPem assert privateKeyPem
messageBodyJson = { if withDigest:
"a key": "a value", messageBodyJson = {
"another key": "A string", "a key": "a value",
"yet another key": "Another string" "another key": "A string",
} "yet another key": "Another string"
messageBodyJsonStr = json.dumps(messageBodyJson) }
messageBodyJsonStr = json.dumps(messageBodyJson)
else:
messageBodyJsonStr = ''
headersDomain = getFullDomain(domain, port) headersDomain = getFullDomain(hostDomain, port)
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
boxpath = '/inbox' boxpath = '/inbox'
@ -639,13 +590,13 @@ def _testHttpsigBase(withDigest):
headers = { headers = {
'host': headersDomain, 'host': headersDomain,
'date': dateStr, 'date': dateStr,
'accept': 'application/json' 'accept': contentType
} }
signatureHeader = \ signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname, signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port, domain, port,
domain, port, hostDomain, port,
boxpath, httpPrefix, None) boxpath, httpPrefix, None, contentType)
else: else:
bodyDigest = messageContentDigest(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr)
contentLength = len(messageBodyJsonStr) contentLength = len(messageBodyJsonStr)
@ -659,8 +610,9 @@ def _testHttpsigBase(withDigest):
signatureHeader = \ signatureHeader = \
signPostHeaders(dateStr, privateKeyPem, nickname, signPostHeaders(dateStr, privateKeyPem, nickname,
domain, port, domain, port,
domain, port, hostDomain, port,
boxpath, httpPrefix, messageBodyJsonStr) boxpath, httpPrefix, messageBodyJsonStr,
contentType)
headers['signature'] = signatureHeader headers['signature'] = signatureHeader
GETmethod = not withDigest GETmethod = not withDigest
@ -684,7 +636,7 @@ def _testHttpsigBase(withDigest):
headers = { headers = {
'host': 'bogon.domain', 'host': 'bogon.domain',
'date': dateStr, 'date': dateStr,
'content-type': 'application/json' 'content-type': contentType
} }
else: else:
# correct domain but fake message # correct domain but fake message
@ -709,9 +661,9 @@ def _testHttpsigBase(withDigest):
shutil.rmtree(path) shutil.rmtree(path)
def _testHttpsig(): def _testHttpsig(baseDir: str):
_testHttpsigBase(True) _testHttpsigBase(True, baseDir)
_testHttpsigBase(False) _testHttpsigBase(False, baseDir)
def _testCache(): def _testCache():
@ -1161,7 +1113,7 @@ def createServerGroup(path: str, domain: str, port: int,
False) False)
def testPostMessageBetweenServers(): def testPostMessageBetweenServers(baseDir: str) -> None:
print('Testing sending message from one server to the inbox of another') print('Testing sending message from one server to the inbox of another')
global testServerAliceRunning global testServerAliceRunning
@ -1173,7 +1125,6 @@ def testPostMessageBetweenServers():
httpPrefix = 'http' httpPrefix = 'http'
proxyType = None proxyType = None
baseDir = os.getcwd()
if os.path.isdir(baseDir + '/.tests'): if os.path.isdir(baseDir + '/.tests'):
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
os.mkdir(baseDir + '/.tests') os.mkdir(baseDir + '/.tests')
@ -1460,7 +1411,7 @@ def testPostMessageBetweenServers():
shutil.rmtree(bobDir) shutil.rmtree(bobDir)
def testFollowBetweenServers(): def testFollowBetweenServers(baseDir: str) -> None:
print('Testing sending a follow request from one server to another') print('Testing sending a follow request from one server to another')
global testServerAliceRunning global testServerAliceRunning
@ -1473,7 +1424,6 @@ def testFollowBetweenServers():
proxyType = None proxyType = None
federationList = [] federationList = []
baseDir = os.getcwd()
if os.path.isdir(baseDir + '/.tests'): if os.path.isdir(baseDir + '/.tests'):
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
os.mkdir(baseDir + '/.tests') os.mkdir(baseDir + '/.tests')
@ -1648,7 +1598,7 @@ def testFollowBetweenServers():
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
def testSharedItemsFederation(): def testSharedItemsFederation(baseDir: str) -> None:
print('Testing federation of shared items between Alice and Bob') print('Testing federation of shared items between Alice and Bob')
global testServerAliceRunning global testServerAliceRunning
@ -1661,7 +1611,6 @@ def testSharedItemsFederation():
proxyType = None proxyType = None
federationList = [] federationList = []
baseDir = os.getcwd()
if os.path.isdir(baseDir + '/.tests'): if os.path.isdir(baseDir + '/.tests'):
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
os.mkdir(baseDir + '/.tests') os.mkdir(baseDir + '/.tests')
@ -2043,7 +1992,7 @@ def testSharedItemsFederation():
'Alice and Bob is complete') 'Alice and Bob is complete')
def testGroupFollow(): def testGroupFollow(baseDir: str) -> None:
print('Testing following of a group') print('Testing following of a group')
global testServerAliceRunning global testServerAliceRunning
@ -2059,7 +2008,6 @@ def testGroupFollow():
proxyType = None proxyType = None
federationList = [] federationList = []
baseDir = os.getcwd()
if os.path.isdir(baseDir + '/.tests'): if os.path.isdir(baseDir + '/.tests'):
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
os.mkdir(baseDir + '/.tests') os.mkdir(baseDir + '/.tests')
@ -2425,9 +2373,9 @@ def testGroupFollow():
print('Testing following of a group is complete') print('Testing following of a group is complete')
def _testFollowersOfPerson(): def _testFollowersOfPerson(baseDir: str) -> None:
print('testFollowersOfPerson') print('testFollowersOfPerson')
currDir = os.getcwd() currDir = baseDir
nickname = 'mxpop' nickname = 'mxpop'
domain = 'diva.domain' domain = 'diva.domain'
password = 'birb' password = 'birb'
@ -2474,9 +2422,9 @@ def _testFollowersOfPerson():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def _testNoOfFollowersOnDomain(): def _testNoOfFollowersOnDomain(baseDir: str) -> None:
print('testNoOfFollowersOnDomain') print('testNoOfFollowersOnDomain')
currDir = os.getcwd() currDir = baseDir
nickname = 'mxpop' nickname = 'mxpop'
domain = 'diva.domain' domain = 'diva.domain'
otherdomain = 'soup.dragon' otherdomain = 'soup.dragon'
@ -2536,10 +2484,10 @@ def _testNoOfFollowersOnDomain():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def _testGroupFollowers(): def _testGroupFollowers(baseDir: str) -> None:
print('testGroupFollowers') print('testGroupFollowers')
currDir = os.getcwd() currDir = baseDir
nickname = 'test735' nickname = 'test735'
domain = 'mydomain.com' domain = 'mydomain.com'
password = 'somepass' password = 'somepass'
@ -2581,9 +2529,9 @@ def _testGroupFollowers():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def _testFollows(): def _testFollows(baseDir: str) -> None:
print('testFollows') print('testFollows')
currDir = os.getcwd() currDir = baseDir
nickname = 'test529' nickname = 'test529'
domain = 'testdomain.com' domain = 'testdomain.com'
password = 'mypass' password = 'mypass'
@ -2659,10 +2607,10 @@ def _testFollows():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def _testCreatePerson(): def _testCreatePerson(baseDir: str):
print('testCreatePerson') print('testCreatePerson')
systemLanguage = 'en' systemLanguage = 'en'
currDir = os.getcwd() currDir = baseDir
nickname = 'test382' nickname = 'test382'
domain = 'badgerdomain.com' domain = 'badgerdomain.com'
password = 'mypass' password = 'mypass'
@ -2727,9 +2675,9 @@ def showTestBoxes(name: str, inboxPath: str, outboxPath: str) -> None:
str(outboxPosts) + ' outbox posts') str(outboxPosts) + ' outbox posts')
def _testAuthentication(): def _testAuthentication(baseDir: str) -> None:
print('testAuthentication') print('testAuthentication')
currDir = os.getcwd() currDir = baseDir
nickname = 'test8743' nickname = 'test8743'
password = 'SuperSecretPassword12345' password = 'SuperSecretPassword12345'
@ -2767,7 +2715,7 @@ def _testAuthentication():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def testClientToServer(): def testClientToServer(baseDir: str):
print('EVENT: Testing sending a post via c2s') print('EVENT: Testing sending a post via c2s')
global testServerAliceRunning global testServerAliceRunning
@ -2781,7 +2729,6 @@ def testClientToServer():
federationList = [] federationList = []
lowBandwidth = False lowBandwidth = False
baseDir = os.getcwd()
if os.path.isdir(baseDir + '/.tests'): if os.path.isdir(baseDir + '/.tests'):
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
os.mkdir(baseDir + '/.tests') os.mkdir(baseDir + '/.tests')
@ -3351,7 +3298,7 @@ def _testWebLinks():
assert resultText == exampleText assert resultText == exampleText
def _testAddEmoji(): def _testAddEmoji(baseDir: str):
print('testAddEmoji') print('testAddEmoji')
content = "Emoji :lemon: :strawberry: :banana:" content = "Emoji :lemon: :strawberry: :banana:"
httpPrefix = 'http' httpPrefix = 'http'
@ -3360,8 +3307,7 @@ def _testAddEmoji():
port = 3682 port = 3682
recipients = [] recipients = []
hashtags = {} hashtags = {}
baseDir = os.getcwd() baseDirOriginal = baseDir
baseDirOriginal = os.getcwd()
path = baseDir + '/.tests' path = baseDir + '/.tests'
if not os.path.isdir(path): if not os.path.isdir(path):
os.mkdir(path) os.mkdir(path)
@ -3611,9 +3557,8 @@ def _testRemoveHtml():
'This string contains a url http://somesite.or.other') 'This string contains a url http://somesite.or.other')
def _testDangerousCSS(): def _testDangerousCSS(baseDir: str) -> None:
print('testDangerousCSS') print('testDangerousCSS')
baseDir = os.getcwd()
for subdir, dirs, files in os.walk(baseDir): for subdir, dirs, files in os.walk(baseDir):
for f in files: for f in files:
if not f.endswith('.css'): if not f.endswith('.css'):
@ -3622,7 +3567,7 @@ def _testDangerousCSS():
break break
def _testDangerousSVG() -> None: def _testDangerousSVG(baseDir: str) -> None:
print('testDangerousSVG') print('testDangerousSVG')
svgContent = \ svgContent = \
' <svg viewBox="0 0 10 10" xmlns="http://www.w3.org/2000/svg">' + \ ' <svg viewBox="0 0 10 10" xmlns="http://www.w3.org/2000/svg">' + \
@ -3650,7 +3595,6 @@ def _testDangerousSVG() -> None:
'</svg>' '</svg>'
assert dangerousSVG(svgContent, False) assert dangerousSVG(svgContent, False)
baseDir = os.getcwd()
assert not scanThemesForScripts(baseDir) assert not scanThemesForScripts(baseDir)
@ -3816,9 +3760,8 @@ def _testValidContentWarning():
assert resultStr == 'Invalid content warning' assert resultStr == 'Invalid content warning'
def _testTranslations(): def _testTranslations(baseDir: str) -> None:
print('testTranslations') print('testTranslations')
baseDir = os.getcwd()
languagesStr = getSupportedLanguages(baseDir) languagesStr = getSupportedLanguages(baseDir)
assert languagesStr assert languagesStr
@ -4159,10 +4102,8 @@ def _testGuessHashtagCategory() -> None:
assert guess == "bar" assert guess == "bar"
def _testGetMentionedPeople() -> None: def _testGetMentionedPeople(baseDir: str) -> None:
print('testGetMentionedPeople') print('testGetMentionedPeople')
baseDir = os.getcwd()
content = "@dragon@cave.site @bat@cave.site This is a test." content = "@dragon@cave.site @bat@cave.site This is a test."
actors = getMentionedPeople(baseDir, 'https', actors = getMentionedPeople(baseDir, 'https',
content, content,
@ -4173,8 +4114,7 @@ def _testGetMentionedPeople() -> None:
assert actors[1] == "https://cave.site/users/bat" assert actors[1] == "https://cave.site/users/bat"
def _testReplyToPublicPost() -> None: def _testReplyToPublicPost(baseDir: str) -> None:
baseDir = os.getcwd()
systemLanguage = 'en' systemLanguage = 'en'
nickname = 'test7492362' nickname = 'test7492362'
domain = 'other.site' domain = 'other.site'
@ -4703,8 +4643,7 @@ def _testFunctions():
modules, modGroups, maxModuleCalls) modules, modGroups, maxModuleCalls)
def _testLinksWithinPost() -> None: def _testLinksWithinPost(baseDir: str) -> None:
baseDir = os.getcwd()
systemLanguage = 'en' systemLanguage = 'en'
nickname = 'test27636' nickname = 'test27636'
domain = 'rando.site' domain = 'rando.site'
@ -4992,7 +4931,7 @@ def _testExtractPGPPublicKey():
assert result == pubKey assert result == pubKey
def testUpdateActor(): def testUpdateActor(baseDir: str):
print('Testing update of actor properties') print('Testing update of actor properties')
global testServerAliceRunning global testServerAliceRunning
@ -5002,7 +4941,6 @@ def testUpdateActor():
proxyType = None proxyType = None
federationList = [] federationList = []
baseDir = os.getcwd()
if os.path.isdir(baseDir + '/.tests'): if os.path.isdir(baseDir + '/.tests'):
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
os.mkdir(baseDir + '/.tests') os.mkdir(baseDir + '/.tests')
@ -5445,13 +5383,12 @@ def _testUserAgentDomain() -> None:
assert userAgentDomain(userAgent, False) is None assert userAgentDomain(userAgent, False) is None
def _testSwitchWords() -> None: def _testSwitchWords(baseDir: str) -> None:
print('testSwitchWords') print('testSwitchWords')
rules = [ rules = [
"rock -> hamster", "rock -> hamster",
"orange -> lemon" "orange -> lemon"
] ]
baseDir = os.getcwd()
nickname = 'testuser' nickname = 'testuser'
domain = 'testdomain.com' domain = 'testdomain.com'
@ -5634,13 +5571,12 @@ def _testGetPriceFromString() -> None:
assert curr == "USD" assert curr == "USD"
def _translateOntology() -> None: def _translateOntology(baseDir: str) -> None:
baseDir = os.getcwd()
ontologyTypes = getCategoryTypes(baseDir) ontologyTypes = getCategoryTypes(baseDir)
url = 'https://translate.astian.org' url = 'https://translate.astian.org'
apiKey = None apiKey = None
ltLangList = libretranslateLanguages(url, apiKey) ltLangList = libretranslateLanguages(url, apiKey)
baseDir = os.getcwd()
languagesStr = getSupportedLanguages(baseDir) languagesStr = getSupportedLanguages(baseDir)
assert languagesStr assert languagesStr
@ -5690,9 +5626,8 @@ def _translateOntology() -> None:
saveJson(ontologyJson, filename + '.new') saveJson(ontologyJson, filename + '.new')
def _testCanReplyTo() -> None: def _testCanReplyTo(baseDir: str) -> None:
print('testCanReplyTo') print('testCanReplyTo')
baseDir = os.getcwd()
systemLanguage = 'en' systemLanguage = 'en'
nickname = 'test27637' nickname = 'test27637'
domain = 'rando.site' domain = 'rando.site'
@ -5753,16 +5688,15 @@ def _testCanReplyTo() -> None:
def runAllTests(): def runAllTests():
baseDir = os.getcwd()
print('Running tests...') print('Running tests...')
_testHttpSignedGET()
return
updateDefaultThemesList(os.getcwd()) updateDefaultThemesList(os.getcwd())
_translateOntology() _translateOntology(baseDir)
_testGetPriceFromString() _testGetPriceFromString()
_testFunctions() _testFunctions()
_testSignAndVerify() _testSignAndVerify()
_testDangerousSVG() _testDangerousSVG(baseDir)
_testCanReplyTo() _testCanReplyTo(baseDir)
_testDateConversions() _testDateConversions()
_testAuthorizeSharedItems() _testAuthorizeSharedItems()
_testValidPassword() _testValidPassword()
@ -5770,7 +5704,7 @@ def runAllTests():
_testSetActorLanguages() _testSetActorLanguages()
_testLimitRepetedWords() _testLimitRepetedWords()
_testLimitWordLengths() _testLimitWordLengths()
_testSwitchWords() _testSwitchWords(baseDir)
_testUserAgentDomain() _testUserAgentDomain()
_testRoles() _testRoles()
_testSkills() _testSkills()
@ -5786,9 +5720,9 @@ def runAllTests():
_testPrepareHtmlPostNickname() _testPrepareHtmlPostNickname()
_testDomainHandling() _testDomainHandling()
_testMastoApi() _testMastoApi()
_testLinksWithinPost() _testLinksWithinPost(baseDir)
_testReplyToPublicPost() _testReplyToPublicPost(baseDir)
_testGetMentionedPeople() _testGetMentionedPeople(baseDir)
_testGuessHashtagCategory() _testGuessHashtagCategory()
_testValidNickname() _testValidNickname()
_testParseFeedDate() _testParseFeedDate()
@ -5798,12 +5732,12 @@ def runAllTests():
_testRemoveHtmlTag() _testRemoveHtmlTag()
_testReplaceEmailQuote() _testReplaceEmailQuote()
_testConstantTimeStringCheck() _testConstantTimeStringCheck()
_testTranslations() _testTranslations(baseDir)
_testValidContentWarning() _testValidContentWarning()
_testRemoveIdEnding() _testRemoveIdEnding()
_testJsonPostAllowsComments() _testJsonPostAllowsComments()
_runHtmlReplaceQuoteMarks() _runHtmlReplaceQuoteMarks()
_testDangerousCSS() _testDangerousCSS(baseDir)
_testDangerousMarkup() _testDangerousMarkup()
_testRemoveHtml() _testRemoveHtml()
_testSiteIsActive() _testSiteIsActive()
@ -5815,17 +5749,17 @@ def runAllTests():
_testSaveLoadJson() _testSaveLoadJson()
_testJsonString() _testJsonString()
_testGetStatusNumber() _testGetStatusNumber()
_testAddEmoji() _testAddEmoji(baseDir)
_testActorParsing() _testActorParsing()
_testHttpsig() _testHttpsig(baseDir)
_testHttpSignedGET() _testHttpSignedGET(baseDir)
_testHttpSigNew() _testHttpSigNew()
_testCache() _testCache()
_testThreads() _testThreads()
_testCreatePerson() _testCreatePerson(baseDir)
_testAuthentication() _testAuthentication(baseDir)
_testFollowersOfPerson() _testFollowersOfPerson(baseDir)
_testNoOfFollowersOnDomain() _testNoOfFollowersOnDomain(baseDir)
_testFollows() _testFollows(baseDir)
_testGroupFollowers() _testGroupFollowers(baseDir)
print('Tests succeeded\n') print('Tests succeeded\n')