diff --git a/auth.py b/auth.py new file mode 100644 index 00000000..02ca68bd --- /dev/null +++ b/auth.py @@ -0,0 +1,95 @@ +__filename__ = "auth.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "0.0.1" +__maintainer__ = "Bob Mottram" +__email__ = "bob@freedombone.net" +__status__ = "Production" + +import base64 +import hashlib +import binascii +import os +import shutil + +def hashPassword(password: str) -> str: + """Hash a password for storing + """ + salt = hashlib.sha256(os.urandom(60)).hexdigest().encode('ascii') + pwdhash = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), salt, 100000) + pwdhash = binascii.hexlify(pwdhash) + return (salt + pwdhash).decode('ascii') + +def verifyPassword(storedPassword: str,providedPassword: str) -> bool: + """Verify a stored password against one provided by user + """ + salt = storedPassword[:64] + storedPassword = storedPassword[64:] + pwdhash = hashlib.pbkdf2_hmac('sha512', \ + providedPassword.encode('utf-8'), \ + salt.encode('ascii'), \ + 100000) + pwdhash = binascii.hexlify(pwdhash).decode('ascii') + return pwdhash == storedPassword + +def createBasicAuthHeader(nickname: str,password: str) -> str: + """This is only used by tests + """ + authStr=nickname.replace('\n','')+':'+password.replace('\n','') + return 'Basic '+base64.b64encode(authStr.encode('utf8')).decode('utf8') + +def authorizeBasic(baseDir: str,authHeader: str) -> bool: + """HTTP basic auth + """ + if ' ' not in authHeader: + return False + base64Str = authHeader.split(' ')[1] + plain = base64.b64decode(base64Str).decode('utf8') + if ':' not in plain: + return False + nickname = plain.split(':')[0] + passwordFile=baseDir+'/accounts/passwords' + if not os.path.isfile(passwordFile): + return False + providedPassword = plain.split(':')[1] + passfile = open(passwordFile, "r") + for line in passfile: + if line.startswith(nickname+':'): + storedPassword=line.split(':')[1].replace('\n','') + return verifyPassword(storedPassword,providedPassword) + return False + +def storeBasicCredentials(baseDir: str,nickname: str,password: str) -> bool: + if ':' in nickname or ':' in password: + return False + nickname=nickname.replace('\n','').strip() + password=password.replace('\n','').strip() + + if not os.path.isdir(baseDir+'/accounts'): + os.mkdir(baseDir+'/accounts') + + passwordFile=baseDir+'/accounts/passwords' + storeStr=nickname+':'+hashPassword(password) + if os.path.isfile(passwordFile): + if nickname+':' in open(passwordFile).read(): + with open(passwordFile, "r") as fin: + with open(passwordFile+'.new', "w") as fout: + for line in fin: + if not line.startswith(nickname+':'): + fout.write(line) + else: + fout.write(storeStr+'\n') + os.rename(passwordFile+'.new', passwordFile) + else: + # append to password file + with open(passwordFile, "a") as passfile: + passfile.write(storeStr+'\n') + else: + with open(passwordFile, "w") as passfile: + passfile.write(storeStr+'\n') + return True + +def authorize(baseDir: str,authHeader: str) -> bool: + if authHeader.lower().startswith('basic '): + return authorizeBasic(baseDir,authHeader) + return False diff --git a/daemon.py b/daemon.py index 3571666f..e3a2eb06 100644 --- a/daemon.py +++ b/daemon.py @@ -22,6 +22,7 @@ from posts import getPersonPubKey from inbox import inboxPermittedMessage from inbox import inboxMessageHasParams from follow import getFollowingFeed +from auth import authorize import os import sys @@ -216,7 +217,13 @@ class PubServer(BaseHTTPRequestHandler): # TODO if self.path=='/outbox': - print('c2s posts not supported yet') + if self.headers.get('Authorization'): + if authorize(self.server.baseDir,self.headers['Authorization']): + print('c2s posts not supported yet') + self.send_response(400) + self.end_headers() + self.server.POSTbusy=False + return self.send_response(400) self.end_headers() self.server.POSTbusy=False diff --git a/tests.py b/tests.py index abf49e45..69787962 100644 --- a/tests.py +++ b/tests.py @@ -32,6 +32,9 @@ from follow import unfollowerOfPerson from person import createPerson from person import setPreferredNickname from person import setBio +from auth import createBasicAuthHeader +from auth import authorizeBasic +from auth import storeBasicCredentials testServerAliceRunning = False testServerBobRunning = False @@ -295,13 +298,38 @@ def testCreatePerson(): os.chdir(currDir) shutil.rmtree(baseDir) +def testAuthentication(): + print('testAuthentication') + currDir=os.getcwd() + nickname='test8743' + password='SuperSecretPassword12345' + + baseDir=currDir+'/.tests_authentication' + if os.path.isdir(baseDir): + shutil.rmtree(baseDir) + os.mkdir(baseDir) + os.chdir(baseDir) + + assert storeBasicCredentials(baseDir,'othernick','otherpass') + assert storeBasicCredentials(baseDir,'bad:nick','otherpass')==False + assert storeBasicCredentials(baseDir,'badnick','otherpa:ss')==False + assert storeBasicCredentials(baseDir,nickname,password) + + authHeader=createBasicAuthHeader(nickname,password) + assert authorizeBasic(baseDir,authHeader) + + authHeader=createBasicAuthHeader(nickname,password+'1') + assert authorizeBasic(baseDir,authHeader)==False + + os.chdir(currDir) + shutil.rmtree(baseDir) + def runAllTests(): print('Running tests...') testHttpsig() testCache() testThreads() testCreatePerson() + testAuthentication() testFollows() - print('Tests succeeded\n') - - + print('Tests succeeded\n')