From a5b97f1d92c49e429f539d2d4a6c55ddd68d5b68 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Mon, 26 Jul 2021 18:54:13 +0100 Subject: [PATCH] Daemon for federating shared items --- daemon.py | 16 ++++++++ shares.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ tests.py | 2 + 3 files changed, 125 insertions(+) diff --git a/daemon.py b/daemon.py index 709471e1b..10ea43b84 100644 --- a/daemon.py +++ b/daemon.py @@ -203,6 +203,8 @@ from webapp_welcome import htmlWelcomeScreen from webapp_welcome import isWelcomeScreenComplete from webapp_welcome_profile import htmlWelcomeProfile from webapp_welcome_final import htmlWelcomeFinal +from shares import runFederatedSharesDaemon +from shares import runFederatedSharesWatchdog from shares import updateSharedItemFederationToken from shares import createSharedItemFederationToken from shares import authorizeSharedItems @@ -15285,6 +15287,13 @@ def runDaemon(sharedItemsFederatedDomains: [], httpPrefix, domain, port, httpd.translate), daemon=True) + print('Creating federated shares thread') + httpd.thrFederatedSharesDaemon = \ + threadWithTrace(target=runFederatedSharesDaemon, + args=(baseDir, httpd, + httpPrefix, domain, + proxyType, debug), daemon=True) + # flags used when restarting the inbox queue httpd.restartInboxQueueInProgress = False httpd.restartInboxQueue = False @@ -15310,9 +15319,16 @@ def runDaemon(sharedItemsFederatedDomains: [], threadWithTrace(target=runNewswireWatchdog, args=(projectVersion, httpd), daemon=True) httpd.thrNewswireWatchdog.start() + + print('Creating federated shares watchdog') + httpd.thrFederatedSharesWatchdog = \ + threadWithTrace(target=runFederatedSharesWatchdog, + args=(projectVersion, httpd), daemon=True) + httpd.thrFederatedSharesWatchdog.start() else: httpd.thrInboxQueue.start() httpd.thrPostSchedule.start() + httpd.thrFederatedSharesDaemon.start() if clientToServer: print('Running ActivityPub client on ' + diff --git a/shares.py b/shares.py index e17a0f8db..5574f5129 100644 --- a/shares.py +++ b/shares.py @@ -12,12 +12,15 @@ import re import secrets import time import datetime +from session import getJson from webfinger import webfingerHandle from auth import createBasicAuthHeader from auth import constantTimeStringCheck from posts import getPersonBox from session import postJson from session import postImage +from session import createSession +from utils import getConfigParam from utils import getFullDomain from utils import validNickname from utils import loadJson @@ -1073,3 +1076,107 @@ def authorizeSharedItems(sharedItemsFederatedDomains: [], 'mismatch for ' + callingDomain) return False return True + + +def _updateFederatedSharesCache(session, sharedItemsFederatedDomains: [], + baseDir: str, domain: str, + httpPrefix: str, + tokensJson: {}, debug: bool) -> None: + """Updates the cache of federated shares for the instance. + This enables shared items to be available even when other instances + might not be online + """ + # create directories where catalogs will be stored + cacheDir = baseDir + '/cache' + if not os.path.isdir(cacheDir): + os.mkdir(cacheDir) + catalogsDir = cacheDir + '/catalogs' + if not os.path.isdir(catalogsDir): + os.mkdir(catalogsDir) + + asHeader = { + 'Accept': 'application/ld+json' + } + for otherDomain in sharedItemsFederatedDomains: + # NOTE: otherDomain does not have a port extension, + # so may not work in some situations + if otherDomain.startswith(domain): + # only download from instances other than this one + continue + if not tokensJson.get(otherDomain): + # token has been obtained for the other domain + continue + url = httpPrefix + '://' + otherDomain + '/catalog' + asHeader['Authorization'] = tokensJson[otherDomain] + catalogJson = getJson(session, url, asHeader, None, + debug, __version__, httpPrefix, None) + if not catalogJson: + print('WARN: failed to download shared items catalog for ' + + otherDomain) + continue + catalogFilename = catalogsDir + '/' + otherDomain + '.json' + if saveJson(catalogJson, catalogFilename): + print('Downloaded shared items catalog for ' + otherDomain) + else: + time.sleep(2) + + +def runFederatedSharesWatchdog(projectVersion: str, httpd) -> None: + """This tries to keep the federated shares update thread + running even if it dies + """ + print('Starting federated shares watchdog') + federatedSharesOriginal = \ + httpd.thrPostSchedule.clone(runFederatedSharesDaemon) + httpd.thrFederatedSharesDaemon.start() + while True: + time.sleep(55) + if httpd.thrFederatedSharesDaemon.is_alive(): + continue + httpd.thrFederatedSharesDaemon.kill() + httpd.thrFederatedSharesDaemon = \ + federatedSharesOriginal.clone(runFederatedSharesDaemon) + httpd.thrFederatedSharesDaemon.start() + print('Restarting federated shares daemon...') + + +def runFederatedSharesDaemon(baseDir: str, httpd, httpPrefix: str, + domain: str, proxyType: str, debug: bool) -> None: + """Runs the daemon used to update federated shared items + """ + secondsPerHour = 60 * 60 + fileCheckIntervalSec = 120 + time.sleep(60) + while True: + sharedItemsFederatedDomainsStr = \ + getConfigParam(baseDir, 'sharedItemsFederatedDomains') + if not sharedItemsFederatedDomainsStr: + time.sleep(fileCheckIntervalSec) + continue + + # get a list of the domains within the shared items federation + sharedItemsFederatedDomains = [] + sharedItemsFederatedDomainsList = \ + sharedItemsFederatedDomainsStr.split(',') + for sharedFederatedDomain in sharedItemsFederatedDomainsList: + sharedItemsFederatedDomains.append(sharedFederatedDomain.strip()) + if not sharedItemsFederatedDomains: + time.sleep(fileCheckIntervalSec) + continue + + # load the tokens + tokensFilename = \ + baseDir + '/accounts/sharedItemsFederationTokens.json' + if not os.path.isfile(tokensFilename): + time.sleep(fileCheckIntervalSec) + continue + tokensJson = loadJson(tokensFilename, 1, 2) + if not tokensJson: + time.sleep(fileCheckIntervalSec) + continue + + session = createSession(proxyType) + _updateFederatedSharesCache(session, sharedItemsFederatedDomains, + baseDir, domain, httpPrefix, tokensJson, + debug) + time.sleep(secondsPerHour * 6) diff --git a/tests.py b/tests.py index 229be046d..c73ed639e 100644 --- a/tests.py +++ b/tests.py @@ -3265,6 +3265,8 @@ def _testFunctions(): 'str2bool', 'runNewswireDaemon', 'runNewswireWatchdog', + 'runFederatedSharesWatchdog', + 'runFederatedSharesDaemon', 'threadSendPost', 'sendToFollowers', 'expireCache',