Daemon for federating shared items

main
Bob Mottram 2021-07-26 18:54:13 +01:00
parent c194c3996b
commit a5b97f1d92
3 changed files with 125 additions and 0 deletions

View File

@ -203,6 +203,8 @@ from webapp_welcome import htmlWelcomeScreen
from webapp_welcome import isWelcomeScreenComplete from webapp_welcome import isWelcomeScreenComplete
from webapp_welcome_profile import htmlWelcomeProfile from webapp_welcome_profile import htmlWelcomeProfile
from webapp_welcome_final import htmlWelcomeFinal from webapp_welcome_final import htmlWelcomeFinal
from shares import runFederatedSharesDaemon
from shares import runFederatedSharesWatchdog
from shares import updateSharedItemFederationToken from shares import updateSharedItemFederationToken
from shares import createSharedItemFederationToken from shares import createSharedItemFederationToken
from shares import authorizeSharedItems from shares import authorizeSharedItems
@ -15285,6 +15287,13 @@ def runDaemon(sharedItemsFederatedDomains: [],
httpPrefix, domain, port, httpPrefix, domain, port,
httpd.translate), daemon=True) httpd.translate), daemon=True)
print('Creating federated shares thread')
httpd.thrFederatedSharesDaemon = \
threadWithTrace(target=runFederatedSharesDaemon,
args=(baseDir, httpd,
httpPrefix, domain,
proxyType, debug), daemon=True)
# flags used when restarting the inbox queue # flags used when restarting the inbox queue
httpd.restartInboxQueueInProgress = False httpd.restartInboxQueueInProgress = False
httpd.restartInboxQueue = False httpd.restartInboxQueue = False
@ -15310,9 +15319,16 @@ def runDaemon(sharedItemsFederatedDomains: [],
threadWithTrace(target=runNewswireWatchdog, threadWithTrace(target=runNewswireWatchdog,
args=(projectVersion, httpd), daemon=True) args=(projectVersion, httpd), daemon=True)
httpd.thrNewswireWatchdog.start() httpd.thrNewswireWatchdog.start()
print('Creating federated shares watchdog')
httpd.thrFederatedSharesWatchdog = \
threadWithTrace(target=runFederatedSharesWatchdog,
args=(projectVersion, httpd), daemon=True)
httpd.thrFederatedSharesWatchdog.start()
else: else:
httpd.thrInboxQueue.start() httpd.thrInboxQueue.start()
httpd.thrPostSchedule.start() httpd.thrPostSchedule.start()
httpd.thrFederatedSharesDaemon.start()
if clientToServer: if clientToServer:
print('Running ActivityPub client on ' + print('Running ActivityPub client on ' +

107
shares.py
View File

@ -12,12 +12,15 @@ import re
import secrets import secrets
import time import time
import datetime import datetime
from session import getJson
from webfinger import webfingerHandle from webfinger import webfingerHandle
from auth import createBasicAuthHeader from auth import createBasicAuthHeader
from auth import constantTimeStringCheck from auth import constantTimeStringCheck
from posts import getPersonBox from posts import getPersonBox
from session import postJson from session import postJson
from session import postImage from session import postImage
from session import createSession
from utils import getConfigParam
from utils import getFullDomain from utils import getFullDomain
from utils import validNickname from utils import validNickname
from utils import loadJson from utils import loadJson
@ -1073,3 +1076,107 @@ def authorizeSharedItems(sharedItemsFederatedDomains: [],
'mismatch for ' + callingDomain) 'mismatch for ' + callingDomain)
return False return False
return True return True
def _updateFederatedSharesCache(session, sharedItemsFederatedDomains: [],
baseDir: str, domain: str,
httpPrefix: str,
tokensJson: {}, debug: bool) -> None:
"""Updates the cache of federated shares for the instance.
This enables shared items to be available even when other instances
might not be online
"""
# create directories where catalogs will be stored
cacheDir = baseDir + '/cache'
if not os.path.isdir(cacheDir):
os.mkdir(cacheDir)
catalogsDir = cacheDir + '/catalogs'
if not os.path.isdir(catalogsDir):
os.mkdir(catalogsDir)
asHeader = {
'Accept': 'application/ld+json'
}
for otherDomain in sharedItemsFederatedDomains:
# NOTE: otherDomain does not have a port extension,
# so may not work in some situations
if otherDomain.startswith(domain):
# only download from instances other than this one
continue
if not tokensJson.get(otherDomain):
# token has been obtained for the other domain
continue
url = httpPrefix + '://' + otherDomain + '/catalog'
asHeader['Authorization'] = tokensJson[otherDomain]
catalogJson = getJson(session, url, asHeader, None,
debug, __version__, httpPrefix, None)
if not catalogJson:
print('WARN: failed to download shared items catalog for ' +
otherDomain)
continue
catalogFilename = catalogsDir + '/' + otherDomain + '.json'
if saveJson(catalogJson, catalogFilename):
print('Downloaded shared items catalog for ' + otherDomain)
else:
time.sleep(2)
def runFederatedSharesWatchdog(projectVersion: str, httpd) -> None:
"""This tries to keep the federated shares update thread
running even if it dies
"""
print('Starting federated shares watchdog')
federatedSharesOriginal = \
httpd.thrPostSchedule.clone(runFederatedSharesDaemon)
httpd.thrFederatedSharesDaemon.start()
while True:
time.sleep(55)
if httpd.thrFederatedSharesDaemon.is_alive():
continue
httpd.thrFederatedSharesDaemon.kill()
httpd.thrFederatedSharesDaemon = \
federatedSharesOriginal.clone(runFederatedSharesDaemon)
httpd.thrFederatedSharesDaemon.start()
print('Restarting federated shares daemon...')
def runFederatedSharesDaemon(baseDir: str, httpd, httpPrefix: str,
domain: str, proxyType: str, debug: bool) -> None:
"""Runs the daemon used to update federated shared items
"""
secondsPerHour = 60 * 60
fileCheckIntervalSec = 120
time.sleep(60)
while True:
sharedItemsFederatedDomainsStr = \
getConfigParam(baseDir, 'sharedItemsFederatedDomains')
if not sharedItemsFederatedDomainsStr:
time.sleep(fileCheckIntervalSec)
continue
# get a list of the domains within the shared items federation
sharedItemsFederatedDomains = []
sharedItemsFederatedDomainsList = \
sharedItemsFederatedDomainsStr.split(',')
for sharedFederatedDomain in sharedItemsFederatedDomainsList:
sharedItemsFederatedDomains.append(sharedFederatedDomain.strip())
if not sharedItemsFederatedDomains:
time.sleep(fileCheckIntervalSec)
continue
# load the tokens
tokensFilename = \
baseDir + '/accounts/sharedItemsFederationTokens.json'
if not os.path.isfile(tokensFilename):
time.sleep(fileCheckIntervalSec)
continue
tokensJson = loadJson(tokensFilename, 1, 2)
if not tokensJson:
time.sleep(fileCheckIntervalSec)
continue
session = createSession(proxyType)
_updateFederatedSharesCache(session, sharedItemsFederatedDomains,
baseDir, domain, httpPrefix, tokensJson,
debug)
time.sleep(secondsPerHour * 6)

View File

@ -3265,6 +3265,8 @@ def _testFunctions():
'str2bool', 'str2bool',
'runNewswireDaemon', 'runNewswireDaemon',
'runNewswireWatchdog', 'runNewswireWatchdog',
'runFederatedSharesWatchdog',
'runFederatedSharesDaemon',
'threadSendPost', 'threadSendPost',
'sendToFollowers', 'sendToFollowers',
'expireCache', 'expireCache',