Invert logic

merge-requests/30/head
Bob Mottram 2021-07-26 13:20:07 +01:00
parent d9e32a0719
commit ebac11f502
3 changed files with 70 additions and 15 deletions

View File

@ -203,6 +203,7 @@ from webapp_welcome import htmlWelcomeScreen
from webapp_welcome import isWelcomeScreenComplete from webapp_welcome import isWelcomeScreenComplete
from webapp_welcome_profile import htmlWelcomeProfile from webapp_welcome_profile import htmlWelcomeProfile
from webapp_welcome_final import htmlWelcomeFinal from webapp_welcome_final import htmlWelcomeFinal
from shares import createSharedItemFederationToken
from shares import authorizeSharedItems from shares import authorizeSharedItems
from shares import generateSharedItemFederationTokens from shares import generateSharedItemFederationTokens
from shares import getSharesFeedForPerson from shares import getSharesFeedForPerson
@ -15219,6 +15220,9 @@ def runDaemon(sharedItemsFederatedDomains: [],
httpd.sharedItemFederationTokens = \ httpd.sharedItemFederationTokens = \
generateSharedItemFederationTokens(httpd.sharedItemsFederatedDomains, generateSharedItemFederationTokens(httpd.sharedItemsFederatedDomains,
baseDir) baseDir)
httpd.sharedItemFederationTokens = \
createSharedItemFederationToken(baseDir, domain,
httpd.sharedItemFederationTokens)
# load peertube instances from file into a list # load peertube instances from file into a list
httpd.peertubeInstances = [] httpd.peertubeInstances = []

View File

@ -216,7 +216,7 @@ def addShare(baseDir: str,
sharesFilename = acctDir(baseDir, nickname, domain) + '/shares.json' sharesFilename = acctDir(baseDir, nickname, domain) + '/shares.json'
sharesJson = {} sharesJson = {}
if os.path.isfile(sharesFilename): if os.path.isfile(sharesFilename):
sharesJson = loadJson(sharesFilename) sharesJson = loadJson(sharesFilename, 1, 2)
duration = duration.lower() duration = duration.lower()
published = int(time.time()) published = int(time.time())
@ -304,7 +304,7 @@ def _expireSharesForAccount(baseDir: str, nickname: str, domain: str) -> None:
sharesFilename = baseDir + '/accounts/' + handle + '/shares.json' sharesFilename = baseDir + '/accounts/' + handle + '/shares.json'
if not os.path.isfile(sharesFilename): if not os.path.isfile(sharesFilename):
return return
sharesJson = loadJson(sharesFilename) sharesJson = loadJson(sharesFilename, 1, 2)
if not sharesJson: if not sharesJson:
return return
currTime = int(time.time()) currTime = int(time.time())
@ -788,7 +788,7 @@ def sharesCatalogAccountEndpoint(baseDir: str, httpPrefix: str,
sharesFilename = acctDir(baseDir, nickname, domain) + '/shares.json' sharesFilename = acctDir(baseDir, nickname, domain) + '/shares.json'
if not os.path.isfile(sharesFilename): if not os.path.isfile(sharesFilename):
return endpoint return endpoint
sharesJson = loadJson(sharesFilename) sharesJson = loadJson(sharesFilename, 1, 2)
if not sharesJson: if not sharesJson:
return endpoint return endpoint
@ -871,7 +871,7 @@ def sharesCatalogEndpoint(baseDir: str, httpPrefix: str,
acctDir(baseDir, nickname, domain) + '/shares.json' acctDir(baseDir, nickname, domain) + '/shares.json'
if not os.path.isfile(sharesFilename): if not os.path.isfile(sharesFilename):
continue continue
sharesJson = loadJson(sharesFilename) sharesJson = loadJson(sharesFilename, 1, 2)
if not sharesJson: if not sharesJson:
continue continue
@ -949,19 +949,21 @@ def generateSharedItemFederationTokens(sharedItemsFederatedDomains: [],
"""Generates tokens for shared item federated domains """Generates tokens for shared item federated domains
""" """
if not sharedItemsFederatedDomains: if not sharedItemsFederatedDomains:
return return {}
tokensJson = {} tokensJson = {}
if baseDir: if baseDir:
tokensFilename = \ tokensFilename = \
baseDir + '/accounts/sharedItemsFederationTokens.json' baseDir + '/accounts/sharedItemsFederationTokens.json'
if not os.path.isfile(tokensFilename): if os.path.isfile(tokensFilename):
tokensJson = loadJson(tokensFilename) tokensJson = loadJson(tokensFilename, 1, 2)
if tokensJson is None:
tokensJson = {}
tokensAdded = False tokensAdded = False
for domain in sharedItemsFederatedDomains: for domain in sharedItemsFederatedDomains:
if not tokensJson.get(domain): if not tokensJson.get(domain):
tokensJson[domain] = secrets.token_urlsafe(64) tokensJson[domain] = ''
tokensAdded = True tokensAdded = True
if not tokensAdded: if not tokensAdded:
@ -971,6 +973,47 @@ def generateSharedItemFederationTokens(sharedItemsFederatedDomains: [],
return tokensJson return tokensJson
def updateSharedItemFederationToken(baseDir: str,
tokenDomain: str, newToken: str,
tokensJson: {} = None) -> {}:
"""Updates a token for shared item federation
"""
if not tokensJson:
tokensJson = {}
if baseDir:
tokensFilename = \
baseDir + '/accounts/sharedItemsFederationTokens.json'
if os.path.isfile(tokensFilename):
tokensJson = loadJson(tokensFilename, 1, 2)
if tokensJson is None:
tokensJson = {}
tokensJson[tokenDomain] = newToken
if baseDir:
saveJson(tokensJson, tokensFilename)
return tokensJson
def createSharedItemFederationToken(baseDir: str,
tokenDomain: str,
tokensJson: {} = None) -> {}:
"""Updates a token for shared item federation
"""
if not tokensJson:
tokensJson = {}
if baseDir:
tokensFilename = \
baseDir + '/accounts/sharedItemsFederationTokens.json'
if os.path.isfile(tokensFilename):
tokensJson = loadJson(tokensFilename, 1, 2)
if tokensJson is None:
tokensJson = {}
if not tokensJson.get(tokenDomain):
tokensJson[tokenDomain] = secrets.token_urlsafe(64)
if baseDir:
saveJson(tokensJson, tokensFilename)
return tokensJson
def authorizeSharedItems(sharedItemsFederatedDomains: [], def authorizeSharedItems(sharedItemsFederatedDomains: [],
baseDir: str, baseDir: str,
callingDomain: str, callingDomain: str,
@ -1009,7 +1052,7 @@ def authorizeSharedItems(sharedItemsFederatedDomains: [],
print('DEBUG: shared item federation tokens file missing ' + print('DEBUG: shared item federation tokens file missing ' +
tokensFilename) tokensFilename)
return False return False
tokensJson = loadJson(tokensFilename) tokensJson = loadJson(tokensFilename, 1, 2)
if not tokensJson: if not tokensJson:
return False return False
if not tokensJson.get(callingDomain): if not tokensJson.get(callingDomain):

View File

@ -130,6 +130,8 @@ from languages import getLinksFromContent
from languages import addLinksToContent from languages import addLinksToContent
from shares import authorizeSharedItems from shares import authorizeSharedItems
from shares import generateSharedItemFederationTokens from shares import generateSharedItemFederationTokens
from shares import createSharedItemFederationToken
from shares import updateSharedItemFederationToken
testServerAliceRunning = False testServerAliceRunning = False
testServerBobRunning = False testServerBobRunning = False
@ -4276,19 +4278,25 @@ def _testAuthorizeSharedItems():
['dog.domain', 'cat.domain', 'birb.domain'] ['dog.domain', 'cat.domain', 'birb.domain']
tokensJson = \ tokensJson = \
generateSharedItemFederationTokens(sharedItemsFederatedDomains, None) generateSharedItemFederationTokens(sharedItemsFederatedDomains, None)
tokensJson = \
createSharedItemFederationToken(None, 'cat.domain', tokensJson)
assert tokensJson assert tokensJson
assert tokensJson.get('dog.domain') assert not tokensJson.get('dog.domain')
assert tokensJson.get('cat.domain') assert tokensJson.get('cat.domain')
assert tokensJson.get('birb.domain') assert not tokensJson.get('birb.domain')
assert len(tokensJson['dog.domain']) >= 64 assert len(tokensJson['dog.domain']) == 0
assert len(tokensJson['cat.domain']) >= 64 assert len(tokensJson['cat.domain']) >= 64
assert len(tokensJson['birb.domain']) >= 64 assert len(tokensJson['birb.domain']) == 0
assert not authorizeSharedItems(sharedItemsFederatedDomains, None, assert not authorizeSharedItems(sharedItemsFederatedDomains, None,
'dog.domain', 'w' * 86, 'cat.domain', 'M' * 86,
False, tokensJson) False, tokensJson)
assert authorizeSharedItems(sharedItemsFederatedDomains, None, assert authorizeSharedItems(sharedItemsFederatedDomains, None,
'dog.domain', tokensJson['dog.domain'], 'cat.domain', tokensJson['cat.domain'],
False, tokensJson) False, tokensJson)
tokensJson = \
updateSharedItemFederationToken(None,
'dog.domain', 'testToken', tokensJson)
assert tokensJson['dog.domain'] == 'testToken'
def runAllTests(): def runAllTests():