Unit test for federated shared items

main
Bob Mottram 2021-08-05 12:24:24 +01:00
parent ccc0d09ece
commit 0f71fae658
5 changed files with 209 additions and 86 deletions

View File

@ -4893,9 +4893,10 @@ class PubServer(BaseHTTPRequestHandler):
self.server.sharedItemFederationTokens
self.server.sharedItemsFederatedDomains = \
siDomains
domainFull = self.server.domainFull
self.server.sharedItemFederationTokens = \
mergeSharedItemTokens(self.server.baseDir,
self.server.domain,
domainFull,
siDomains,
siTokens)
@ -10820,13 +10821,15 @@ class PubServer(BaseHTTPRequestHandler):
print('Catalog access is not authorized. Checking' +
'Authorization header')
# basic auth access to shared items catalog
if self.headers.get('Authorization'):
if self.headers.get('Origin') and \
self.headers.get('Authorization'):
permittedDomains = \
self.server.sharedItemsFederatedDomains
sharedItemTokens = self.server.sharedItemFederationTokens
originDomain = self.headers.get('Origin')
if authorizeSharedItems(permittedDomains,
self.server.baseDir,
callingDomain,
originDomain,
self.headers['Authorization'],
self.server.debug,
sharedItemTokens):
@ -10835,8 +10838,12 @@ class PubServer(BaseHTTPRequestHandler):
print('Authorization token refused for ' +
'shared items federation')
elif self.server.debug:
print('No authorization header is available for ' +
'shared items federation')
if not self.headers.get('Origin'):
print('No Origin header is available for ' +
'shared items federation')
else:
print('No Authorization header is available for ' +
'shared items federation')
# show shared items catalog for federation
if self._hasAccept(callingDomain) and catalogAuthorized:
catalogType = 'json'
@ -14625,6 +14632,49 @@ class PubServer(BaseHTTPRequestHandler):
self.server.defaultTimeline)
return
# update the shared item federation token for the calling domain
# if it is within the permitted federation
if self.headers.get('Origin') and \
self.headers.get('SharesCatalog'):
if self.server.debug:
print('SharesCatalog header: ' + self.headers['SharesCatalog'])
if not self.server.sharedItemsFederatedDomains:
siDomainsStr = getConfigParam(self.server.baseDir,
'sharedItemsFederatedDomains')
if siDomainsStr:
if self.server.debug:
print('Loading shared items federated domains list')
siDomainsList = siDomainsStr.split(',')
domainsList = self.server.sharedItemsFederatedDomains
for siDomain in siDomainsList:
domainsList.append(siDomain.strip())
originDomain = self.headers.get('Origin')
if originDomain != self.server.domainFull and \
originDomain != self.server.onionDomain and \
originDomain != self.server.i2pDomain and \
originDomain in self.server.sharedItemsFederatedDomains:
if self.server.debug:
print('DEBUG: ' +
'POST updating shared item federation ' +
'token for ' + originDomain + ' to ' +
self.server.domainFull)
sharedItemTokens = self.server.sharedItemFederationTokens
sharesToken = self.headers['SharesCatalog']
self.server.sharedItemFederationTokens = \
updateSharedItemFederationToken(self.server.baseDir,
originDomain,
sharesToken,
self.server.debug,
sharedItemTokens)
elif self.server.debug:
if originDomain not in self.server.sharedItemsFederatedDomains:
print('originDomain is not in federated domains list ' +
originDomain)
else:
print('originDomain is not a different instance. ' +
originDomain + ' ' + self.server.domainFull + ' ' +
str(self.server.sharedItemsFederatedDomains))
self._benchmarkPOSTtimings(POSTstartTime, POSTtimings, 14)
# receive different types of post created by htmlNewPost
@ -14906,23 +14956,6 @@ class PubServer(BaseHTTPRequestHandler):
self._benchmarkPOSTtimings(POSTstartTime, POSTtimings, 23)
# update the shared item federation token for the calling domain
# if it is within the permitted federation
if self.headers.get('SharesCatalog') and \
callingDomain != self.server.domain and \
callingDomain != self.server.domainFull and \
callingDomain in self.server.sharedItemsFederatedDomains:
if self.server.debug:
print('DEBUG: ' +
'POST updating shared item federation token for ' +
callingDomain)
sharedItemTokens = self.server.sharedItemFederationTokens
self.server.sharedItemFederationTokens = \
updateSharedItemFederationToken(self.server.baseDir,
callingDomain,
self.headers['SharesCatalog'],
sharedItemTokens)
if self.server.debug:
print('DEBUG: POST saving to inbox queue')
if usersInPath:
@ -15469,7 +15502,7 @@ def runDaemon(maxLikeCount: int,
generateSharedItemFederationTokens(httpd.sharedItemsFederatedDomains,
baseDir)
httpd.sharedItemFederationTokens = \
createSharedItemFederationToken(baseDir, domain,
createSharedItemFederationToken(baseDir, httpd.domainFull,
httpd.sharedItemFederationTokens)
# load peertube instances from file into a list
@ -15518,7 +15551,7 @@ def runDaemon(maxLikeCount: int,
httpd.thrFederatedSharesDaemon = \
threadWithTrace(target=runFederatedSharesDaemon,
args=(baseDir, httpd,
httpPrefix, domain,
httpPrefix, httpd.domainFull,
proxyType, debug,
httpd.systemLanguage), daemon=True)

View File

@ -614,7 +614,6 @@ if args.tests:
if args.testsnetwork:
print('Network Tests')
testSharedItemsFederation()
sys.exit()
testGroupFollow()
testPostMessageBetweenServers()
testFollowBetweenServers()

View File

@ -2009,9 +2009,11 @@ def sendPost(projectVersion: str,
federationList: [], sendThreads: [], postLog: [],
cachedWebfingers: {}, personCache: {},
isArticle: bool, systemLanguage: str,
sharedItemsFederatedDomains: [],
sharedItemFederationTokens: {},
debug: bool = False, inReplyTo: str = None,
inReplyToAtomUri: str = None, subject: str = None) -> int:
"""Post to another inbox
"""Post to another inbox. Used by unit tests.
"""
withDigest = True
@ -2099,6 +2101,26 @@ def sendPost(projectVersion: str,
toDomain, toPort,
postPath, httpPrefix, withDigest, postJsonStr)
# if the "to" domain is within the shared items
# federation list then send the token for this domain
# so that it can request a catalog
if toDomain in sharedItemsFederatedDomains:
domainFull = getFullDomain(domain, port)
if sharedItemFederationTokens.get(domainFull):
signatureHeaderJson['Origin'] = domainFull
signatureHeaderJson['SharesCatalog'] = \
sharedItemFederationTokens[domainFull]
if debug:
print('SharesCatalog added to header')
elif debug:
print(domainFull + ' not in sharedItemFederationTokens')
elif debug:
print(toDomain + ' not in sharedItemsFederatedDomains ' +
str(sharedItemsFederatedDomains))
if debug:
print('signatureHeaderJson: ' + str(signatureHeaderJson))
# Keep the number of threads being used small
while len(sendThreads) > 1000:
print('WARN: Maximum threads reached - killing send thread')
@ -2139,14 +2161,14 @@ def sendPostViaServer(projectVersion: str,
print('WARN: No session for sendPostViaServer')
return 6
fromDomain = getFullDomain(fromDomain, fromPort)
fromDomainFull = getFullDomain(fromDomain, fromPort)
handle = httpPrefix + '://' + fromDomain + '/@' + fromNickname
handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname
# lookup the inbox for the To handle
wfRequest = \
webfingerHandle(session, handle, httpPrefix, cachedWebfingers,
fromDomain, projectVersion, debug, False)
fromDomainFull, projectVersion, debug, False)
if not wfRequest:
if debug:
print('DEBUG: post webfinger failed for ' + handle)
@ -2167,7 +2189,7 @@ def sendPostViaServer(projectVersion: str,
personCache,
projectVersion, httpPrefix,
fromNickname,
fromDomain, postToBox,
fromDomainFull, postToBox,
82796)
if not inboxUrl:
if debug:
@ -2185,7 +2207,6 @@ def sendPostViaServer(projectVersion: str,
clientToServer = True
if toDomain.lower().endswith('public'):
toPersonId = 'https://www.w3.org/ns/activitystreams#Public'
fromDomainFull = getFullDomain(fromDomain, fromPort)
cc = httpPrefix + '://' + fromDomainFull + '/users/' + \
fromNickname + '/followers'
else:
@ -2217,7 +2238,7 @@ def sendPostViaServer(projectVersion: str,
if attachImageFilename:
headers = {
'host': fromDomain,
'host': fromDomainFull,
'Authorization': authHeader
}
postResult = \
@ -2229,7 +2250,7 @@ def sendPostViaServer(projectVersion: str,
# return 9
headers = {
'host': fromDomain,
'host': fromDomainFull,
'Content-type': 'application/json',
'Authorization': authHeader
}
@ -2434,7 +2455,10 @@ def sendSignedJson(postJsonObject: {}, session, baseDir: str,
# optionally add a token so that the receiving instance may access
# your shared items catalog
if sharedItemsToken:
signatureHeaderJson['Origin'] = getFullDomain(domain, port)
signatureHeaderJson['SharesCatalog'] = sharedItemsToken
elif debug:
print('Not sending shared items federation token')
# Keep the number of threads being used small
while len(sendThreads) > 1000:

View File

@ -1080,9 +1080,9 @@ def generateSharedItemFederationTokens(sharedItemsFederatedDomains: [],
tokensJson = {}
tokensAdded = False
for domain in sharedItemsFederatedDomains:
if not tokensJson.get(domain):
tokensJson[domain] = ''
for domainFull in sharedItemsFederatedDomains:
if not tokensJson.get(domainFull):
tokensJson[domainFull] = ''
tokensAdded = True
if not tokensAdded:
@ -1093,33 +1093,38 @@ def generateSharedItemFederationTokens(sharedItemsFederatedDomains: [],
def updateSharedItemFederationToken(baseDir: str,
tokenDomain: str, newToken: str,
tokenDomainFull: str, newToken: str,
debug: bool,
tokensJson: {} = None) -> {}:
"""Updates an individual token for shared item federation
"""
if debug:
print('Updating shared items token for ' + tokenDomainFull)
if not tokensJson:
tokensJson = {}
if baseDir:
tokensFilename = \
baseDir + '/accounts/sharedItemsFederationTokens.json'
if os.path.isfile(tokensFilename):
if debug:
print('Update loading tokens for ' + tokenDomainFull)
tokensJson = loadJson(tokensFilename, 1, 2)
if tokensJson is None:
tokensJson = {}
updateRequired = False
if tokensJson.get(tokenDomain):
if tokensJson[tokenDomain] != newToken:
if tokensJson.get(tokenDomainFull):
if tokensJson[tokenDomainFull] != newToken:
updateRequired = True
else:
updateRequired = True
if updateRequired:
tokensJson[tokenDomain] = newToken
tokensJson[tokenDomainFull] = newToken
if baseDir:
saveJson(tokensJson, tokensFilename)
return tokensJson
def mergeSharedItemTokens(baseDir: str, domain: str,
def mergeSharedItemTokens(baseDir: str, domainFull: str,
newSharedItemsFederatedDomains: [],
tokensJson: {}) -> {}:
"""When the shared item federation domains list has changed, update
@ -1127,20 +1132,20 @@ def mergeSharedItemTokens(baseDir: str, domain: str,
"""
removals = []
changed = False
for tokenDomain, tok in tokensJson.items():
if domain:
if tokenDomain.startswith(domain):
for tokenDomainFull, tok in tokensJson.items():
if domainFull:
if tokenDomainFull.startswith(domainFull):
continue
if tokenDomain not in newSharedItemsFederatedDomains:
removals.append(tokenDomain)
if tokenDomainFull not in newSharedItemsFederatedDomains:
removals.append(tokenDomainFull)
# remove domains no longer in the federation list
for tokenDomain in removals:
del tokensJson[tokenDomain]
for tokenDomainFull in removals:
del tokensJson[tokenDomainFull]
changed = True
# add new domains from the federation list
for tokenDomain in newSharedItemsFederatedDomains:
if tokenDomain not in tokensJson:
tokensJson[tokenDomain] = ''
for tokenDomainFull in newSharedItemsFederatedDomains:
if tokenDomainFull not in tokensJson:
tokensJson[tokenDomainFull] = ''
changed = True
if baseDir and changed:
tokensFilename = \
@ -1150,7 +1155,7 @@ def mergeSharedItemTokens(baseDir: str, domain: str,
def createSharedItemFederationToken(baseDir: str,
tokenDomain: str,
tokenDomainFull: str,
tokensJson: {} = None) -> {}:
"""Updates an individual token for shared item federation
"""
@ -1163,8 +1168,8 @@ def createSharedItemFederationToken(baseDir: str,
tokensJson = loadJson(tokensFilename, 1, 2)
if tokensJson is None:
tokensJson = {}
if not tokensJson.get(tokenDomain):
tokensJson[tokenDomain] = secrets.token_urlsafe(64)
if not tokensJson.get(tokenDomainFull):
tokensJson[tokenDomainFull] = secrets.token_urlsafe(64)
if baseDir:
saveJson(tokensJson, tokensFilename)
return tokensJson
@ -1172,7 +1177,7 @@ def createSharedItemFederationToken(baseDir: str,
def authorizeSharedItems(sharedItemsFederatedDomains: [],
baseDir: str,
callingDomain: str,
originDomainFull: str,
authHeader: str,
debug: bool,
tokensJson: {} = None) -> bool:
@ -1181,9 +1186,9 @@ def authorizeSharedItems(sharedItemsFederatedDomains: [],
if not sharedItemsFederatedDomains:
# no shared item federation
return False
if callingDomain not in sharedItemsFederatedDomains:
if originDomainFull not in sharedItemsFederatedDomains:
if debug:
print(callingDomain +
print(originDomainFull +
' is not in the shared items federation list')
return False
if 'Basic ' in authHeader:
@ -1211,21 +1216,22 @@ def authorizeSharedItems(sharedItemsFederatedDomains: [],
tokensJson = loadJson(tokensFilename, 1, 2)
if not tokensJson:
return False
if not tokensJson.get(callingDomain):
if not tokensJson.get(originDomainFull):
if debug:
print('DEBUG: shared item federation token ' +
'check failed for ' + callingDomain)
'check failed for ' + originDomainFull)
return False
if not constantTimeStringCheck(tokensJson[callingDomain], providedToken):
if not constantTimeStringCheck(tokensJson[originDomainFull],
providedToken):
if debug:
print('DEBUG: shared item federation token ' +
'mismatch for ' + callingDomain)
'mismatch for ' + originDomainFull)
return False
return True
def _updateFederatedSharesCache(session, sharedItemsFederatedDomains: [],
baseDir: str, domain: str,
baseDir: str, domainFull: str,
httpPrefix: str,
tokensJson: {}, debug: bool,
systemLanguage: str) -> None:
@ -1242,37 +1248,38 @@ def _updateFederatedSharesCache(session, sharedItemsFederatedDomains: [],
os.mkdir(catalogsDir)
asHeader = {
'Accept': 'application/ld+json'
"Accept": "application/ld+json",
"Origin": domainFull
}
for federatedDomain in sharedItemsFederatedDomains:
for federatedDomainFull in sharedItemsFederatedDomains:
# NOTE: federatedDomain does not have a port extension,
# so may not work in some situations
if federatedDomain.startswith(domain):
if federatedDomainFull.startswith(domainFull):
# only download from instances other than this one
continue
if not tokensJson.get(federatedDomain):
if not tokensJson.get(federatedDomainFull):
# token has been obtained for the other domain
continue
if not siteIsActive(httpPrefix + '://' + federatedDomain):
if not siteIsActive(httpPrefix + '://' + federatedDomainFull):
continue
url = httpPrefix + '://' + federatedDomain + '/catalog'
asHeader['Authorization'] = tokensJson[federatedDomain]
url = httpPrefix + '://' + federatedDomainFull + '/catalog'
asHeader['Authorization'] = tokensJson[federatedDomainFull]
catalogJson = getJson(session, url, asHeader, None,
debug, __version__, httpPrefix, None)
if not catalogJson:
print('WARN: failed to download shared items catalog for ' +
federatedDomain)
federatedDomainFull)
continue
catalogFilename = catalogsDir + '/' + federatedDomain + '.json'
catalogFilename = catalogsDir + '/' + federatedDomainFull + '.json'
if saveJson(catalogJson, catalogFilename):
print('Downloaded shared items catalog for ' + federatedDomain)
print('Downloaded shared items catalog for ' + federatedDomainFull)
sharesJson = _dfcToSharesFormat(catalogJson,
baseDir, systemLanguage)
if sharesJson:
sharesFilename = \
catalogsDir + '/' + federatedDomain + '.shares.json'
catalogsDir + '/' + federatedDomainFull + '.shares.json'
saveJson(sharesJson, sharesFilename)
print('Converted shares catalog for ' + federatedDomain)
print('Converted shares catalog for ' + federatedDomainFull)
else:
time.sleep(2)
@ -1297,7 +1304,7 @@ def runFederatedSharesWatchdog(projectVersion: str, httpd) -> None:
def runFederatedSharesDaemon(baseDir: str, httpd, httpPrefix: str,
domain: str, proxyType: str, debug: bool,
domainFull: str, proxyType: str, debug: bool,
systemLanguage: str) -> None:
"""Runs the daemon used to update federated shared items
"""
@ -1334,8 +1341,8 @@ def runFederatedSharesDaemon(baseDir: str, httpd, httpPrefix: str,
session = createSession(proxyType)
_updateFederatedSharesCache(session, sharedItemsFederatedDomains,
baseDir, domain, httpPrefix, tokensJson,
debug, systemLanguage)
baseDir, domainFull, httpPrefix,
tokensJson, debug, systemLanguage)
time.sleep(secondsPerHour * 6)

View File

@ -42,6 +42,7 @@ from follow import clearFollowers
from follow import sendFollowRequestViaServer
from follow import sendUnfollowRequestViaServer
from siteactive import siteIsActive
from utils import setConfigParam
from utils import isGroupActor
from utils import dateStringToSeconds
from utils import dateSecondsToString
@ -942,6 +943,8 @@ def testPostMessageBetweenServers():
ccUrl = None
alicePersonCache = {}
aliceCachedWebfingers = {}
aliceSharedItemsFederatedDomains = []
aliceSharedItemFederationTokens = {}
attachedImageFilename = baseDir + '/img/logo.png'
testImageWidth, testImageHeight = \
getImageDimensions(attachedImageFilename)
@ -967,8 +970,10 @@ def testPostMessageBetweenServers():
attachedImageFilename, mediaType,
attachedImageDescription, city, federationList,
aliceSendThreads, alicePostLog, aliceCachedWebfingers,
alicePersonCache, isArticle, systemLanguage, inReplyTo,
inReplyToAtomUri, subject)
alicePersonCache, isArticle, systemLanguage,
aliceSharedItemsFederatedDomains,
aliceSharedItemFederationTokens,
inReplyTo, inReplyToAtomUri, subject)
print('sendResult: ' + str(sendResult))
queuePath = bobDir + '/accounts/bob@' + bobDomain + '/queue'
@ -1284,6 +1289,8 @@ def testFollowBetweenServers():
alicePostLog = []
alicePersonCache = {}
aliceCachedWebfingers = {}
aliceSharedItemsFederatedDomains = []
aliceSharedItemFederationTokens = {}
alicePostLog = []
isArticle = False
city = 'London, England'
@ -1295,8 +1302,10 @@ def testFollowBetweenServers():
clientToServer, True,
None, None, None, city, federationList,
aliceSendThreads, alicePostLog, aliceCachedWebfingers,
alicePersonCache, isArticle, systemLanguage, inReplyTo,
inReplyToAtomUri, subject)
alicePersonCache, isArticle, systemLanguage,
aliceSharedItemsFederatedDomains,
aliceSharedItemFederationTokens,
inReplyTo, inReplyToAtomUri, subject)
print('sendResult: ' + str(sendResult))
queuePath = bobDir + '/accounts/bob@' + bobDomain + '/queue'
@ -1413,6 +1422,13 @@ def testSharedItemsFederation():
# In the beginning all was calm and there were no follows
print('\n\n*********************************************************')
print("Alice and Bob agree to share items catalogs")
assert os.path.isdir(aliceDir)
assert os.path.isdir(bobDir)
setConfigParam(aliceDir, 'sharedItemsFederatedDomains', bobAddress)
setConfigParam(bobDir, 'sharedItemsFederatedDomains', aliceAddress)
print('*********************************************************')
print('Alice sends a follow request to Bob')
os.chdir(aliceDir)
@ -1572,9 +1588,20 @@ def testSharedItemsFederation():
print('\n\n*********************************************************')
print('Alice sends a message to Bob')
aliceTokensFilename = \
aliceDir + '/accounts/sharedItemsFederationTokens.json'
assert os.path.isfile(aliceTokensFilename)
aliceSharedItemFederationTokens = loadJson(aliceTokensFilename)
assert aliceSharedItemFederationTokens
print('Alice shared item federation tokens:')
pprint(aliceSharedItemFederationTokens)
assert len(aliceSharedItemFederationTokens.items()) > 0
for hostStr, token in aliceSharedItemFederationTokens.items():
assert ':' in hostStr
alicePostLog = []
alicePersonCache = {}
aliceCachedWebfingers = {}
aliceSharedItemsFederatedDomains = [bobAddress]
alicePostLog = []
isArticle = False
city = 'London, England'
@ -1586,8 +1613,10 @@ def testSharedItemsFederation():
clientToServer, True,
None, None, None, city, federationList,
aliceSendThreads, alicePostLog, aliceCachedWebfingers,
alicePersonCache, isArticle, systemLanguage, inReplyTo,
inReplyToAtomUri, subject)
alicePersonCache, isArticle, systemLanguage,
aliceSharedItemsFederatedDomains,
aliceSharedItemFederationTokens, True,
inReplyTo, inReplyToAtomUri, subject)
print('sendResult: ' + str(sendResult))
queuePath = bobDir + '/accounts/bob@' + bobDomain + '/queue'
@ -1605,6 +1634,32 @@ def testSharedItemsFederation():
assert aliceMessageArrived is True
print('Message from Alice to Bob succeeded')
print('\n\n*********************************************************')
print('Check that Alice received the shared items authorization')
print('token from Bob')
aliceTokensFilename = \
aliceDir + '/accounts/sharedItemsFederationTokens.json'
bobTokensFilename = \
bobDir + '/accounts/sharedItemsFederationTokens.json'
assert os.path.isfile(aliceTokensFilename)
assert os.path.isfile(bobTokensFilename)
aliceTokens = loadJson(aliceTokensFilename)
assert aliceTokens
for hostStr, token in aliceTokens.items():
assert ':' in hostStr
assert aliceTokens.get(aliceAddress)
print('Alice tokens')
pprint(aliceTokens)
bobTokens = loadJson(bobTokensFilename)
assert bobTokens
for hostStr, token in bobTokens.items():
assert ':' in hostStr
assert bobTokens.get(bobAddress)
print("Check that Bob now has Alice's token")
assert bobTokens.get(aliceAddress)
print('Bob tokens')
pprint(bobTokens)
# stop the servers
thrAlice.kill()
thrAlice.join()
@ -1919,6 +1974,8 @@ def testGroupFollow():
alicePostLog = []
alicePersonCache = {}
aliceCachedWebfingers = {}
aliceSharedItemsFederatedDomains = []
aliceSharedItemFederationTokens = {}
alicePostLog = []
isArticle = False
city = 'London, England'
@ -1930,8 +1987,10 @@ def testGroupFollow():
saveToFile, clientToServer, True,
None, None, None, city, federationList,
aliceSendThreads, alicePostLog, aliceCachedWebfingers,
alicePersonCache, isArticle, systemLanguage, inReplyTo,
inReplyToAtomUri, subject)
alicePersonCache, isArticle, systemLanguage,
aliceSharedItemsFederatedDomains,
aliceSharedItemFederationTokens,
inReplyTo, inReplyToAtomUri, subject)
print('sendResult: ' + str(sendResult))
queuePath = \
@ -5047,7 +5106,8 @@ def _testAuthorizeSharedItems():
False, tokensJson)
tokensJson = \
updateSharedItemFederationToken(None,
'dog.domain', 'testToken', tokensJson)
'dog.domain', 'testToken',
True, tokensJson)
assert tokensJson['dog.domain'] == 'testToken'
# the shared item federation list changes