forked from indymedia/epicyon
Authorization for inbox access
parent
3aaf7df0bf
commit
f2c596ee2d
11
auth.py
11
auth.py
|
@ -38,6 +38,17 @@ def createBasicAuthHeader(nickname: str,password: str) -> str:
|
||||||
authStr=nickname.replace('\n','')+':'+password.replace('\n','')
|
authStr=nickname.replace('\n','')+':'+password.replace('\n','')
|
||||||
return 'Basic '+base64.b64encode(authStr.encode('utf-8')).decode('utf-8')
|
return 'Basic '+base64.b64encode(authStr.encode('utf-8')).decode('utf-8')
|
||||||
|
|
||||||
|
def nicknameFromBasicAuth(authHeader: str) -> str:
|
||||||
|
"""Returns the nickname from basic auth header
|
||||||
|
"""
|
||||||
|
if ' ' not in authHeader:
|
||||||
|
return None
|
||||||
|
base64Str = authHeader.split(' ')[1].replace('\n','')
|
||||||
|
plain = base64.b64decode(base64Str).decode('utf-8')
|
||||||
|
if ':' not in plain:
|
||||||
|
return None
|
||||||
|
return plain.split(':')[0]
|
||||||
|
|
||||||
def authorizeBasic(baseDir: str,authHeader: str) -> bool:
|
def authorizeBasic(baseDir: str,authHeader: str) -> bool:
|
||||||
"""HTTP basic auth
|
"""HTTP basic auth
|
||||||
"""
|
"""
|
||||||
|
|
15
daemon.py
15
daemon.py
|
@ -23,6 +23,7 @@ from inbox import inboxPermittedMessage
|
||||||
from inbox import inboxMessageHasParams
|
from inbox import inboxMessageHasParams
|
||||||
from follow import getFollowingFeed
|
from follow import getFollowingFeed
|
||||||
from auth import authorize
|
from auth import authorize
|
||||||
|
from auth import nicknameFromBasicAuth
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
@ -122,6 +123,20 @@ class PubServer(BaseHTTPRequestHandler):
|
||||||
if self._webfinger():
|
if self._webfinger():
|
||||||
self.server.GETbusy=False
|
self.server.GETbusy=False
|
||||||
return
|
return
|
||||||
|
# get the inbox for a given person
|
||||||
|
if self.path.endswith('/inbox'):
|
||||||
|
if '/users/' in self.path:
|
||||||
|
if self.headers.get('Authorization'):
|
||||||
|
nickname=self.path.split('/users/')[1].replace('/inbox','')
|
||||||
|
if nickname==nicknameFromBasicAuth(self.headers['Authorization']):
|
||||||
|
if authorize(self.server.baseDir,self.headers['Authorization']):
|
||||||
|
# TODO
|
||||||
|
print('inbox access not supported yet')
|
||||||
|
self.send_response(401)
|
||||||
|
self.end_headers()
|
||||||
|
self.server.POSTbusy=False
|
||||||
|
return
|
||||||
|
|
||||||
# get outbox feed for a person
|
# get outbox feed for a person
|
||||||
outboxFeed=personOutboxJson(self.server.baseDir,self.server.domain, \
|
outboxFeed=personOutboxJson(self.server.baseDir,self.server.domain, \
|
||||||
self.server.port,self.path, \
|
self.server.port,self.path, \
|
||||||
|
|
2
tests.py
2
tests.py
|
@ -35,6 +35,7 @@ from person import setBio
|
||||||
from auth import createBasicAuthHeader
|
from auth import createBasicAuthHeader
|
||||||
from auth import authorizeBasic
|
from auth import authorizeBasic
|
||||||
from auth import storeBasicCredentials
|
from auth import storeBasicCredentials
|
||||||
|
from auth import nicknameFromBasicAuth
|
||||||
|
|
||||||
testServerAliceRunning = False
|
testServerAliceRunning = False
|
||||||
testServerBobRunning = False
|
testServerBobRunning = False
|
||||||
|
@ -316,6 +317,7 @@ def testAuthentication():
|
||||||
assert storeBasicCredentials(baseDir,nickname,password)
|
assert storeBasicCredentials(baseDir,nickname,password)
|
||||||
|
|
||||||
authHeader=createBasicAuthHeader(nickname,password)
|
authHeader=createBasicAuthHeader(nickname,password)
|
||||||
|
assert nickname==nicknameFromBasicAuth(authHeader)
|
||||||
assert authorizeBasic(baseDir,authHeader)
|
assert authorizeBasic(baseDir,authHeader)
|
||||||
|
|
||||||
authHeader=createBasicAuthHeader(nickname,password+'1')
|
authHeader=createBasicAuthHeader(nickname,password+'1')
|
||||||
|
|
Loading…
Reference in New Issue