Shared item federation uses a simple token

merge-requests/20/merge
Bob Mottram 2021-07-26 10:08:36 +01:00
parent cf9bffbed7
commit 4c8b96efc1
2 changed files with 38 additions and 40 deletions

56
auth.py
View File

@ -179,37 +179,34 @@ def generateSharedItemFederationTokens(sharedItemsFederatedDomains: [],
fp.write(line + '\n')
def authorizeDFC(sharedItemsFederatedDomains: [],
baseDir: str,
callingDomain: str,
authHeader: str,
debug: bool) -> bool:
"""HTTP basic auth for shared item federation
def authorizeSharedItems(sharedItemsFederatedDomains: [],
baseDir: str,
callingDomain: str,
authHeader: str,
debug: bool) -> bool:
"""HTTP simple token check for shared item federation
"""
if not sharedItemsFederatedDomains:
# no shared item federation
return False
if callingDomain not in sharedItemsFederatedDomains:
if debug:
print(callingDomain +
' is not in the shared items federation list')
return False
if 'Basic ' not in authHeader:
if 'Basic ' in authHeader:
if debug:
print('DEBUG: DFC basic auth - Authorisation header does not ' +
'contain a space character')
print('DEBUG: shared item federation should not use basic auth')
return False
base64Str = \
authHeader.split(' ')[1].replace('\n', '').replace('\r', '')
plain = base64.b64decode(base64Str).decode('utf-8')
if ':' not in plain:
providedToken = authHeader.replace('\n', '').replace('\r', '').strip()
if not providedToken:
if debug:
print('DEBUG: DFC basic auth header does not contain a ":" ' +
'separator for username:password')
print('DEBUG: shared item federation token is empty')
return False
basicAuthDomain = plain.split(':')[0]
if basicAuthDomain != callingDomain:
if len(providedToken) < 60:
if debug:
print('DEBUG: DFC calling domain does not match ' +
'the one in the Authorization header (' +
basicAuthDomain + ')')
print('DEBUG: shared item federation token is too small ' +
providedToken)
return False
tokensFile = baseDir + '/accounts/sharedItemsFederationTokens'
if not os.path.isfile(tokensFile):
@ -217,21 +214,22 @@ def authorizeDFC(sharedItemsFederatedDomains: [],
print('DEBUG: shared item federation tokens file missing ' +
tokensFile)
return False
providedToken = plain.split(':')[1]
# check the tokens file
with open(tokensFile, 'r') as tokfile:
for line in tokfile:
if not line.startswith(basicAuthDomain + ':'):
if not line.startswith(callingDomain + ':'):
continue
storedToken = \
line.split(':')[1].replace('\n', '').replace('\r', '')
success = _verifyPassword(storedToken, providedToken)
if not success:
if constantTimeStringCheck(storedToken, providedToken):
return True
else:
if debug:
print('DEBUG: DFC token check failed for ' +
basicAuthDomain)
return success
print('DEBUG: DFC did not find token for ' + basicAuthDomain +
' in ' + tokensFile)
print('DEBUG: shared item federation token ' +
'check failed for ' + callingDomain)
return False
print('DEBUG: shared item federation token for ' + callingDomain +
' not found in ' + tokensFile)
return False

View File

@ -106,7 +106,7 @@ from skills import setActorSkillLevel
from auth import generateSharedItemFederationTokens
from auth import recordLoginFailure
from auth import authorize
from auth import authorizeDFC
from auth import authorizeSharedItems
from auth import createPassword
from auth import createBasicAuthHeader
from auth import authorizeBasic
@ -10669,19 +10669,19 @@ class PubServer(BaseHTTPRequestHandler):
'show logout', 'isAuthorized')
if self.path.startswith('/dfc-catalog'):
catalogAuthorized = False
if authorized:
catalogAuthorized = True
else:
catalogAuthorized = authorized
if not catalogAuthorized:
# basic auth access to shared items catalog
if self.headers.get('Authorization'):
if authorizeDFC(self.server.sharedItemsFederatedDomains,
self.server.baseDir,
callingDomain,
self.headers['Authorization'],
self.server.debug):
permittedDomains = \
self.server.sharedItemsFederatedDomains
if authorizeSharedItems(permittedDomains,
self.server.baseDir,
callingDomain,
self.headers['Authorization'],
self.server.debug):
catalogAuthorized = True
# show shared items DFC catalog
# show shared items catalog for federation
if self._hasAccept(callingDomain) and catalogAuthorized:
catalogType = 'html'
if self.path.endswith('.csv') or self._requestCSV():