mirror of https://gitlab.com/bashrc2/epicyon
Shared item federation tokens stored as json
parent
646254c8a9
commit
3c8e4bb5fb
54
auth.py
54
auth.py
|
@ -148,60 +148,6 @@ def authorizeBasic(baseDir: str, path: str, authHeader: str,
|
|||
return False
|
||||
|
||||
|
||||
def authorizeSharedItems(sharedItemsFederatedDomains: [],
|
||||
baseDir: str,
|
||||
callingDomain: str,
|
||||
authHeader: str,
|
||||
debug: bool) -> bool:
|
||||
"""HTTP simple token check for shared item federation
|
||||
"""
|
||||
if not sharedItemsFederatedDomains:
|
||||
# no shared item federation
|
||||
return False
|
||||
if callingDomain not in sharedItemsFederatedDomains:
|
||||
if debug:
|
||||
print(callingDomain +
|
||||
' is not in the shared items federation list')
|
||||
return False
|
||||
if 'Basic ' in authHeader:
|
||||
if debug:
|
||||
print('DEBUG: shared item federation should not use basic auth')
|
||||
return False
|
||||
providedToken = authHeader.replace('\n', '').replace('\r', '').strip()
|
||||
if not providedToken:
|
||||
if debug:
|
||||
print('DEBUG: shared item federation token is empty')
|
||||
return False
|
||||
if len(providedToken) < 60:
|
||||
if debug:
|
||||
print('DEBUG: shared item federation token is too small ' +
|
||||
providedToken)
|
||||
return False
|
||||
tokensFile = baseDir + '/accounts/sharedItemsFederationTokens'
|
||||
if not os.path.isfile(tokensFile):
|
||||
if debug:
|
||||
print('DEBUG: shared item federation tokens file missing ' +
|
||||
tokensFile)
|
||||
return False
|
||||
# check the tokens file
|
||||
with open(tokensFile, 'r') as tokfile:
|
||||
for line in tokfile:
|
||||
if not line.startswith(callingDomain + ':'):
|
||||
continue
|
||||
storedToken = \
|
||||
line.split(':')[1].replace('\n', '').replace('\r', '')
|
||||
if constantTimeStringCheck(storedToken, providedToken):
|
||||
return True
|
||||
else:
|
||||
if debug:
|
||||
print('DEBUG: shared item federation token ' +
|
||||
'check failed for ' + callingDomain)
|
||||
return False
|
||||
print('DEBUG: shared item federation token for ' + callingDomain +
|
||||
' not found in ' + tokensFile)
|
||||
return False
|
||||
|
||||
|
||||
def storeBasicCredentials(baseDir: str, nickname: str, password: str) -> bool:
|
||||
"""Stores login credentials to a file
|
||||
"""
|
||||
|
|
|
@ -105,7 +105,6 @@ from skills import actorSkillValue
|
|||
from skills import setActorSkillLevel
|
||||
from auth import recordLoginFailure
|
||||
from auth import authorize
|
||||
from auth import authorizeSharedItems
|
||||
from auth import createPassword
|
||||
from auth import createBasicAuthHeader
|
||||
from auth import authorizeBasic
|
||||
|
@ -204,6 +203,7 @@ from webapp_welcome import htmlWelcomeScreen
|
|||
from webapp_welcome import isWelcomeScreenComplete
|
||||
from webapp_welcome_profile import htmlWelcomeProfile
|
||||
from webapp_welcome_final import htmlWelcomeFinal
|
||||
from shares import authorizeSharedItems
|
||||
from shares import generateSharedItemFederationTokens
|
||||
from shares import getSharesFeedForPerson
|
||||
from shares import addShare
|
||||
|
|
83
shares.py
83
shares.py
|
@ -14,6 +14,7 @@ import time
|
|||
import datetime
|
||||
from webfinger import webfingerHandle
|
||||
from auth import createBasicAuthHeader
|
||||
from auth import constantTimeStringCheck
|
||||
from posts import getPersonBox
|
||||
from session import postJson
|
||||
from session import postImage
|
||||
|
@ -949,26 +950,72 @@ def generateSharedItemFederationTokens(sharedItemsFederatedDomains: [],
|
|||
"""
|
||||
if not sharedItemsFederatedDomains:
|
||||
return
|
||||
tokensFile = baseDir + '/accounts/sharedItemsFederationTokens'
|
||||
if not os.path.isfile(tokensFile):
|
||||
with open(tokensFile, 'w+') as fp:
|
||||
fp.write('')
|
||||
tokens = []
|
||||
with open(tokensFile, 'r') as fp:
|
||||
tokens = fp.read().split('\n')
|
||||
|
||||
tokensFilename = baseDir + '/accounts/sharedItemsFederationTokens.json'
|
||||
tokensJson = {}
|
||||
if not os.path.isfile(tokensFilename):
|
||||
tokensJson = loadJson(tokensFilename)
|
||||
|
||||
tokensAdded = False
|
||||
for domain in sharedItemsFederatedDomains:
|
||||
domainFound = False
|
||||
for line in tokens:
|
||||
if line.startswith(domain + ':'):
|
||||
domainFound = True
|
||||
break
|
||||
if not domainFound:
|
||||
newLine = domain + ':' + secrets.token_urlsafe(64)
|
||||
tokens.append(newLine)
|
||||
if not tokensJson.get(domain):
|
||||
tokensJson[domain] = secrets.token_urlsafe(64)
|
||||
tokensAdded = True
|
||||
|
||||
if not tokensAdded:
|
||||
return
|
||||
with open(tokensFile, 'w+') as fp:
|
||||
for line in tokens:
|
||||
fp.write(line + '\n')
|
||||
saveJson(tokensJson, tokensFilename)
|
||||
|
||||
|
||||
def authorizeSharedItems(sharedItemsFederatedDomains: [],
|
||||
baseDir: str,
|
||||
callingDomain: str,
|
||||
authHeader: str,
|
||||
debug: bool,
|
||||
tokensJson: {} = None) -> bool:
|
||||
"""HTTP simple token check for shared item federation
|
||||
"""
|
||||
if not sharedItemsFederatedDomains:
|
||||
# no shared item federation
|
||||
return False
|
||||
if callingDomain not in sharedItemsFederatedDomains:
|
||||
if debug:
|
||||
print(callingDomain +
|
||||
' is not in the shared items federation list')
|
||||
return False
|
||||
if 'Basic ' in authHeader:
|
||||
if debug:
|
||||
print('DEBUG: shared item federation should not use basic auth')
|
||||
return False
|
||||
providedToken = authHeader.replace('\n', '').replace('\r', '').strip()
|
||||
if not providedToken:
|
||||
if debug:
|
||||
print('DEBUG: shared item federation token is empty')
|
||||
return False
|
||||
if len(providedToken) < 60:
|
||||
if debug:
|
||||
print('DEBUG: shared item federation token is too small ' +
|
||||
providedToken)
|
||||
return False
|
||||
if not tokensJson:
|
||||
tokensFilename = \
|
||||
baseDir + '/accounts/sharedItemsFederationTokens.json'
|
||||
if not os.path.isfile(tokensFilename):
|
||||
if debug:
|
||||
print('DEBUG: shared item federation tokens file missing ' +
|
||||
tokensFilename)
|
||||
return False
|
||||
tokensJson = loadJson(tokensFilename)
|
||||
if not tokensJson:
|
||||
return False
|
||||
if not tokensJson.get(callingDomain):
|
||||
if debug:
|
||||
print('DEBUG: shared item federation token ' +
|
||||
'check failed for ' + callingDomain)
|
||||
return False
|
||||
if not constantTimeStringCheck(tokensJson[callingDomain], providedToken):
|
||||
if debug:
|
||||
print('DEBUG: shared item federation token ' +
|
||||
'mismatch for ' + callingDomain)
|
||||
return False
|
||||
return True
|
||||
|
|
Loading…
Reference in New Issue