Snake case

main
Bob Mottram 2021-12-29 23:10:55 +00:00
parent a38af5e492
commit ed0ec2b675
1 changed files with 90 additions and 89 deletions

179
auth.py
View File

@ -28,11 +28,11 @@ def _hash_password(password: str) -> str:
return (salt + pwdhash).decode('ascii') return (salt + pwdhash).decode('ascii')
def _get_password_hash(salt: str, providedPassword: str) -> str: def _get_password_hash(salt: str, provided_password: str) -> str:
"""Returns the hash of a password """Returns the hash of a password
""" """
pwdhash = hashlib.pbkdf2_hmac('sha512', pwdhash = hashlib.pbkdf2_hmac('sha512',
providedPassword.encode('utf-8'), provided_password.encode('utf-8'),
salt.encode('ascii'), salt.encode('ascii'),
100000) 100000)
return binascii.hexlify(pwdhash).decode('ascii') return binascii.hexlify(pwdhash).decode('ascii')
@ -49,8 +49,8 @@ def constant_time_string_check(string1: str, string2: str) -> bool:
return False return False
ctr = 0 ctr = 0
matched = True matched = True
for ch in string1: for char in string1:
if ch != string2[ctr]: if char != string2[ctr]:
matched = False matched = False
else: else:
# this is to make the timing more even # this is to make the timing more even
@ -60,34 +60,35 @@ def constant_time_string_check(string1: str, string2: str) -> bool:
return matched return matched
def _verify_password(storedPassword: str, providedPassword: str) -> bool: def _verify_password(stored_password: str, provided_password: str) -> bool:
"""Verify a stored password against one provided by user """Verify a stored password against one provided by user
""" """
if not storedPassword: if not stored_password:
return False return False
if not providedPassword: if not provided_password:
return False return False
salt = storedPassword[:64] salt = stored_password[:64]
storedPassword = storedPassword[64:] stored_password = stored_password[64:]
pwHash = _get_password_hash(salt, providedPassword) pw_hash = _get_password_hash(salt, provided_password)
return constant_time_string_check(pwHash, storedPassword) return constant_time_string_check(pw_hash, stored_password)
def create_basic_auth_header(nickname: str, password: str) -> str: def create_basic_auth_header(nickname: str, password: str) -> str:
"""This is only used by tests """This is only used by tests
""" """
authStr = \ auth_str = \
nickname.replace('\n', '').replace('\r', '') + \ nickname.replace('\n', '').replace('\r', '') + \
':' + \ ':' + \
password.replace('\n', '').replace('\r', '') password.replace('\n', '').replace('\r', '')
return 'Basic ' + base64.b64encode(authStr.encode('utf-8')).decode('utf-8') return 'Basic ' + \
base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')
def authorize_basic(base_dir: str, path: str, authHeader: str, def authorize_basic(base_dir: str, path: str, auth_header: str,
debug: bool) -> bool: debug: bool) -> bool:
"""HTTP basic auth """HTTP basic auth
""" """
if ' ' not in authHeader: if ' ' not in auth_header:
if debug: if debug:
print('DEBUG: basic auth - Authorisation header does not ' + print('DEBUG: basic auth - Authorisation header does not ' +
'contain a space character') 'contain a space character')
@ -97,19 +98,19 @@ def authorize_basic(base_dir: str, path: str, authHeader: str,
print('DEBUG: basic auth - ' + print('DEBUG: basic auth - ' +
'path for Authorization does not contain a user') 'path for Authorization does not contain a user')
return False return False
pathUsersSection = path.split('/users/')[1] path_users_section = path.split('/users/')[1]
if '/' not in pathUsersSection: if '/' not in path_users_section:
if debug: if debug:
print('DEBUG: basic auth - this is not a users endpoint') print('DEBUG: basic auth - this is not a users endpoint')
return False return False
nicknameFromPath = pathUsersSection.split('/')[0] nickname_from_path = path_users_section.split('/')[0]
if is_system_account(nicknameFromPath): if is_system_account(nickname_from_path):
print('basic auth - attempted login using system account ' + print('basic auth - attempted login using system account ' +
nicknameFromPath + ' in path') nickname_from_path + ' in path')
return False return False
base64Str = \ base64_str = \
authHeader.split(' ')[1].replace('\n', '').replace('\r', '') auth_header.split(' ')[1].replace('\n', '').replace('\r', '')
plain = base64.b64decode(base64Str).decode('utf-8') plain = base64.b64decode(base64_str).decode('utf-8')
if ':' not in plain: if ':' not in plain:
if debug: if debug:
print('DEBUG: basic auth header does not contain a ":" ' + print('DEBUG: basic auth header does not contain a ":" ' +
@ -120,26 +121,26 @@ def authorize_basic(base_dir: str, path: str, authHeader: str,
print('basic auth - attempted login using system account ' + nickname + print('basic auth - attempted login using system account ' + nickname +
' in Auth header') ' in Auth header')
return False return False
if nickname != nicknameFromPath: if nickname != nickname_from_path:
if debug: if debug:
print('DEBUG: Nickname given in the path (' + nicknameFromPath + print('DEBUG: Nickname given in the path (' + nickname_from_path +
') does not match the one in the Authorization header (' + ') does not match the one in the Authorization header (' +
nickname + ')') nickname + ')')
return False return False
passwordFile = base_dir + '/accounts/passwords' password_file = base_dir + '/accounts/passwords'
if not os.path.isfile(passwordFile): if not os.path.isfile(password_file):
if debug: if debug:
print('DEBUG: passwords file missing') print('DEBUG: passwords file missing')
return False return False
providedPassword = plain.split(':')[1] provided_password = plain.split(':')[1]
try: try:
with open(passwordFile, 'r') as passfile: with open(password_file, 'r') as passfile:
for line in passfile: for line in passfile:
if not line.startswith(nickname + ':'): if not line.startswith(nickname + ':'):
continue continue
storedPassword = \ stored_password = \
line.split(':')[1].replace('\n', '').replace('\r', '') line.split(':')[1].replace('\n', '').replace('\r', '')
success = _verify_password(storedPassword, providedPassword) success = _verify_password(stored_password, provided_password)
if not success: if not success:
if debug: if debug:
print('DEBUG: Password check failed for ' + nickname) print('DEBUG: Password check failed for ' + nickname)
@ -148,7 +149,7 @@ def authorize_basic(base_dir: str, path: str, authHeader: str,
print('EX: failed to open password file') print('EX: failed to open password file')
return False return False
print('DEBUG: Did not find credentials for ' + nickname + print('DEBUG: Did not find credentials for ' + nickname +
' in ' + passwordFile) ' in ' + password_file)
return False return False
@ -164,40 +165,40 @@ def store_basic_credentials(base_dir: str,
if not os.path.isdir(base_dir + '/accounts'): if not os.path.isdir(base_dir + '/accounts'):
os.mkdir(base_dir + '/accounts') os.mkdir(base_dir + '/accounts')
passwordFile = base_dir + '/accounts/passwords' password_file = base_dir + '/accounts/passwords'
storeStr = nickname + ':' + _hash_password(password) store_str = nickname + ':' + _hash_password(password)
if os.path.isfile(passwordFile): if os.path.isfile(password_file):
if nickname + ':' in open(passwordFile).read(): if nickname + ':' in open(password_file).read():
try: try:
with open(passwordFile, 'r') as fin: with open(password_file, 'r') as fin:
with open(passwordFile + '.new', 'w+') as fout: with open(password_file + '.new', 'w+') as fout:
for line in fin: for line in fin:
if not line.startswith(nickname + ':'): if not line.startswith(nickname + ':'):
fout.write(line) fout.write(line)
else: else:
fout.write(storeStr + '\n') fout.write(store_str + '\n')
except OSError as ex: except OSError as ex:
print('EX: unable to save password ' + passwordFile + print('EX: unable to save password ' + password_file +
' ' + str(ex)) ' ' + str(ex))
return False return False
try: try:
os.rename(passwordFile + '.new', passwordFile) os.rename(password_file + '.new', password_file)
except OSError: except OSError:
print('EX: unable to save password 2') print('EX: unable to save password 2')
return False return False
else: else:
# append to password file # append to password file
try: try:
with open(passwordFile, 'a+') as passfile: with open(password_file, 'a+') as passfile:
passfile.write(storeStr + '\n') passfile.write(store_str + '\n')
except OSError: except OSError:
print('EX: unable to append password') print('EX: unable to append password')
return False return False
else: else:
try: try:
with open(passwordFile, 'w+') as passfile: with open(password_file, 'w+') as passfile:
passfile.write(storeStr + '\n') passfile.write(store_str + '\n')
except OSError: except OSError:
print('EX: unable to create password file') print('EX: unable to create password file')
return False return False
@ -208,11 +209,11 @@ def remove_password(base_dir: str, nickname: str) -> None:
"""Removes the password entry for the given nickname """Removes the password entry for the given nickname
This is called during account removal This is called during account removal
""" """
passwordFile = base_dir + '/accounts/passwords' password_file = base_dir + '/accounts/passwords'
if os.path.isfile(passwordFile): if os.path.isfile(password_file):
try: try:
with open(passwordFile, 'r') as fin: with open(password_file, 'r') as fin:
with open(passwordFile + '.new', 'w+') as fout: with open(password_file + '.new', 'w+') as fout:
for line in fin: for line in fin:
if not line.startswith(nickname + ':'): if not line.startswith(nickname + ':'):
fout.write(line) fout.write(line)
@ -221,71 +222,71 @@ def remove_password(base_dir: str, nickname: str) -> None:
return return
try: try:
os.rename(passwordFile + '.new', passwordFile) os.rename(password_file + '.new', password_file)
except OSError: except OSError:
print('EX: unable to remove password from file 2') print('EX: unable to remove password from file 2')
return return
def authorize(base_dir: str, path: str, authHeader: str, debug: bool) -> bool: def authorize(base_dir: str, path: str, auth_header: str, debug: bool) -> bool:
"""Authorize using http header """Authorize using http header
""" """
if authHeader.lower().startswith('basic '): if auth_header.lower().startswith('basic '):
return authorize_basic(base_dir, path, authHeader, debug) return authorize_basic(base_dir, path, auth_header, debug)
return False return False
def create_password(length: int): def create_password(length: int):
validChars = 'abcdefghijklmnopqrstuvwxyz' + \ valid_chars = 'abcdefghijklmnopqrstuvwxyz' + \
'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
return ''.join((secrets.choice(validChars) for i in range(length))) return ''.join((secrets.choice(valid_chars) for i in range(length)))
def record_login_failure(base_dir: str, ipAddress: str, def record_login_failure(base_dir: str, ip_address: str,
countDict: {}, failTime: int, count_dict: {}, fail_time: int,
logToFile: bool) -> None: log_to_file: bool) -> None:
"""Keeps ip addresses and the number of times login failures """Keeps ip addresses and the number of times login failures
occured for them in a dict occured for them in a dict
""" """
if not countDict.get(ipAddress): if not count_dict.get(ip_address):
while len(countDict.items()) > 100: while len(count_dict.items()) > 100:
oldestTime = 0 oldest_time = 0
oldestIP = None oldest_ip = None
for ipAddr, ipItem in countDict.items(): for ip_addr, ip_item in count_dict.items():
if oldestTime == 0 or ipItem['time'] < oldestTime: if oldest_time == 0 or ip_item['time'] < oldest_time:
oldestTime = ipItem['time'] oldest_time = ip_item['time']
oldestIP = ipAddr oldest_ip = ip_addr
if oldestIP: if oldest_ip:
del countDict[oldestIP] del count_dict[oldest_ip]
countDict[ipAddress] = { count_dict[ip_address] = {
"count": 1, "count": 1,
"time": failTime "time": fail_time
} }
else: else:
countDict[ipAddress]['count'] += 1 count_dict[ip_address]['count'] += 1
countDict[ipAddress]['time'] = failTime count_dict[ip_address]['time'] = fail_time
failCount = countDict[ipAddress]['count'] fail_count = count_dict[ip_address]['count']
if failCount > 4: if fail_count > 4:
print('WARN: ' + str(ipAddress) + ' failed to log in ' + print('WARN: ' + str(ip_address) + ' failed to log in ' +
str(failCount) + ' times') str(fail_count) + ' times')
if not logToFile: if not log_to_file:
return return
failureLog = base_dir + '/accounts/loginfailures.log' failure_log = base_dir + '/accounts/loginfailures.log'
writeType = 'a+' write_type = 'a+'
if not os.path.isfile(failureLog): if not os.path.isfile(failure_log):
writeType = 'w+' write_type = 'w+'
curr_time = datetime.datetime.utcnow() curr_time = datetime.datetime.utcnow()
curr_timeStr = curr_time.strftime("%Y-%m-%d %H:%M:%SZ") curr_time_str = curr_time.strftime("%Y-%m-%d %H:%M:%SZ")
try: try:
with open(failureLog, writeType) as fp: with open(failure_log, write_type) as fp_fail:
# here we use a similar format to an ssh log, so that # here we use a similar format to an ssh log, so that
# systems such as fail2ban can parse it # systems such as fail2ban can parse it
fp.write(curr_timeStr + ' ' + fp_fail.write(curr_time_str + ' ' +
'ip-127-0-0-1 sshd[20710]: ' + 'ip-127-0-0-1 sshd[20710]: ' +
'Disconnecting invalid user epicyon ' + 'Disconnecting invalid user epicyon ' +
ipAddress + ' port 443: ' + ip_address + ' port 443: ' +
'Too many authentication failures [preauth]\n') 'Too many authentication failures [preauth]\n')
except OSError: except OSError:
print('EX: record_login_failure failed ' + str(failureLog)) print('EX: record_login_failure failed ' + str(failure_log))