Move daemon login function to its own module

main
Bob Mottram 2024-03-03 14:43:51 +00:00
parent f217596d8a
commit 2bbec6493b
4 changed files with 256 additions and 233 deletions

View File

@ -13,7 +13,6 @@ import errno
import json
import os
import urllib.parse
from hashlib import sha256
from socket import error as SocketError
from utils import is_float
from utils import get_base_content_from_post
@ -63,11 +62,8 @@ from utils import local_actor_url
from utils import contains_invalid_chars
from utils import remove_id_ending
from utils import check_bad_path
from utils import is_system_account
from utils import valid_password
from utils import get_instance_url
from utils import is_local_network_address
from utils import is_suspended
from utils import acct_dir
from utils import get_nickname_from_actor
from blocking import save_block_federated_endpoints
@ -99,8 +95,6 @@ from inbox import populate_replies
from inbox import inbox_message_has_params
from inbox import inbox_permitted_message
from httpsig import getheader_signature_input
from webapp_login import html_get_login_credentials
from webapp_suspended import html_suspended
from person import deactivate_account
from person import get_actor_move_json
from person import add_actor_update_timestamp
@ -110,16 +104,9 @@ from person import get_featured_hashtags
from person import set_featured_hashtags
from person import update_memorial_flags
from person import get_actor_update_json
from person import activate_account
from person import register_account
from person import person_upgrade_actor
from person import person_snooze
from person import person_unsnooze
from auth import store_basic_credentials
from auth import create_basic_auth_header
from auth import authorize_basic
from auth import record_login_failure
from auth import create_password
from content import get_price_from_string
from content import replace_emoji_from_tags
from content import add_name_emojis_to_tags
@ -134,7 +121,6 @@ from filters import remove_global_filter
from categories import set_hashtag_category
from httpcodes import write2
from httpcodes import http_200
from httpcodes import http_401
from httpcodes import http_404
from httpcodes import http_400
from httpcodes import http_503
@ -261,6 +247,7 @@ from schedule import remove_scheduled_posts
from cwlists import get_cw_list_variable
from webfinger import webfinger_update
from webapp_column_right import html_citations
from daemon_post_login import post_login_screen
# maximum number of posts in a hashtag feed
MAX_POSTS_IN_HASHTAG_FEED = 6
@ -396,13 +383,13 @@ def daemon_http_post(self) -> None:
# POST to login screen, containing credentials
if self.path.startswith('/login'):
_post_login_screen(self, calling_domain, cookie,
self.server.base_dir,
self.server.http_prefix,
self.server.domain,
self.server.port,
ua_str, self.server.debug,
self.server.registration)
post_login_screen(self, calling_domain, cookie,
self.server.base_dir,
self.server.http_prefix,
self.server.domain,
self.server.port,
ua_str, self.server.debug,
self.server.registration)
self.server.postreq_busy = False
return
@ -9104,212 +9091,3 @@ def _set_hashtag_category2(self, calling_domain: str, cookie: str,
redirect_headers(self, tag_screen_str,
cookie, calling_domain)
self.server.postreq_busy = False
def _post_login_screen(self, calling_domain: str, cookie: str,
base_dir: str, http_prefix: str,
domain: str, port: int,
ua_str: str, debug: bool,
registrations_open: bool) -> None:
"""POST to login screen, containing credentials
"""
# ensure that there is a minimum delay between failed login
# attempts, to mitigate brute force
if int(time.time()) - self.server.last_login_failure < 5:
http_503(self)
self.server.postreq_busy = False
return
# get the contents of POST containing login credentials
length = int(self.headers['Content-length'])
if length > 512:
print('Login failed - credentials too long')
http_401(self, 'Credentials are too long')
self.server.postreq_busy = False
return
try:
login_params = self.rfile.read(length).decode('utf-8')
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('EX: POST login read ' +
'connection reset by peer')
else:
print('EX: POST login read socket error')
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
except ValueError as ex:
print('EX: POST login read failed, ' + str(ex))
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
login_nickname, login_password, register = \
html_get_login_credentials(login_params,
self.server.last_login_time,
registrations_open)
if login_nickname and login_password:
if is_system_account(login_nickname):
print('Invalid username login: ' + login_nickname +
' (system account)')
clear_login_details(self, login_nickname, calling_domain)
self.server.postreq_busy = False
return
self.server.last_login_time = int(time.time())
if register:
if not valid_password(login_password):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
if not register_account(base_dir, http_prefix, domain, port,
login_nickname, login_password,
self.server.manual_follower_approval):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
auth_header = \
create_basic_auth_header(login_nickname, login_password)
if self.headers.get('X-Forward-For'):
ip_address = self.headers['X-Forward-For']
elif self.headers.get('X-Forwarded-For'):
ip_address = self.headers['X-Forwarded-For']
else:
ip_address = self.client_address[0]
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
print('Login attempt from IP: ' + str(ip_address))
if not authorize_basic(base_dir, '/users/' +
login_nickname + '/outbox',
auth_header, False):
print('Login failed: ' + login_nickname)
clear_login_details(self, login_nickname, calling_domain)
fail_time = int(time.time())
self.server.last_login_failure = fail_time
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
record_login_failure(base_dir, ip_address,
self.server.login_failure_count,
fail_time,
self.server.log_login_failures)
self.server.postreq_busy = False
return
else:
if self.server.login_failure_count.get(ip_address):
del self.server.login_failure_count[ip_address]
if is_suspended(base_dir, login_nickname):
msg = \
html_suspended(base_dir).encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
# login success - redirect with authorization
print('====== Login success: ' + login_nickname +
' ' + ua_str)
# re-activate account if needed
activate_account(base_dir, login_nickname, domain)
# This produces a deterministic token based
# on nick+password+salt
salt_filename = \
acct_dir(base_dir, login_nickname, domain) + '/.salt'
salt = create_password(32)
if os.path.isfile(salt_filename):
try:
with open(salt_filename, 'r',
encoding='utf-8') as fp_salt:
salt = fp_salt.read()
except OSError as ex:
print('EX: Unable to read salt for ' +
login_nickname + ' ' + str(ex))
else:
try:
with open(salt_filename, 'w+',
encoding='utf-8') as fp_salt:
fp_salt.write(salt)
except OSError as ex:
print('EX: Unable to save salt for ' +
login_nickname + ' ' + str(ex))
token_text = login_nickname + login_password + salt
token = sha256(token_text.encode('utf-8')).hexdigest()
self.server.tokens[login_nickname] = token
login_handle = login_nickname + '@' + domain
token_filename = \
base_dir + '/accounts/' + \
login_handle + '/.token'
try:
with open(token_filename, 'w+',
encoding='utf-8') as fp_tok:
fp_tok.write(token)
except OSError as ex:
print('EX: Unable to save token for ' +
login_nickname + ' ' + str(ex))
person_upgrade_actor(base_dir, None,
base_dir + '/accounts/' +
login_handle + '.json')
index = self.server.tokens[login_nickname]
self.server.tokens_lookup[index] = login_nickname
cookie_str = 'SET:epicyon=' + \
self.server.tokens[login_nickname] + '; SameSite=Strict'
tl_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/users/' + login_nickname + '/' + \
self.server.default_timeline
redirect_headers(self, tl_url, cookie_str, calling_domain)
self.server.postreq_busy = False
return
else:
print('WARN: No login credentials presented to /login')
if debug:
# be careful to avoid logging the password
login_str = login_params
if '=' in login_params:
login_params_list = login_params.split('=')
login_str = ''
skip_param = False
for login_prm in login_params_list:
if not skip_param:
login_str += login_prm + '='
else:
len_str = login_prm.split('&')[0]
if len(len_str) > 0:
login_str += login_prm + '*'
len_str = ''
if '&' in login_prm:
login_str += \
'&' + login_prm.split('&')[1] + '='
skip_param = False
if 'password' in login_prm:
skip_param = True
login_str = login_str[:len(login_str) - 1]
print(login_str)
http_401(self, 'No login credentials were posted')
self.server.postreq_busy = False
http_200(self)
self.server.postreq_busy = False

View File

@ -0,0 +1,245 @@
__filename__ = "daemon_post_login.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"
import os
import time
import errno
from hashlib import sha256
from socket import error as SocketError
from auth import create_password
from auth import record_login_failure
from auth import authorize_basic
from auth import create_basic_auth_header
from httpcodes import write2
from httpcodes import http_200
from httpcodes import http_401
from httpcodes import http_503
from httpheaders import login_headers
from httpheaders import redirect_headers
from httpheaders import clear_login_details
from webapp_login import html_get_login_credentials
from webapp_suspended import html_suspended
from utils import acct_dir
from utils import is_suspended
from utils import is_local_network_address
from utils import get_instance_url
from utils import valid_password
from utils import is_system_account
from person import person_upgrade_actor
from person import activate_account2
from person import register_account
def post_login_screen(self, calling_domain: str, cookie: str,
base_dir: str, http_prefix: str,
domain: str, port: int,
ua_str: str, debug: bool,
registrations_open: bool) -> None:
"""POST to login screen, containing credentials
"""
# ensure that there is a minimum delay between failed login
# attempts, to mitigate brute force
if int(time.time()) - self.server.last_login_failure < 5:
http_503(self)
self.server.postreq_busy = False
return
# get the contents of POST containing login credentials
length = int(self.headers['Content-length'])
if length > 512:
print('Login failed - credentials too long')
http_401(self, 'Credentials are too long')
self.server.postreq_busy = False
return
try:
login_params = self.rfile.read(length).decode('utf-8')
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('EX: POST login read ' +
'connection reset by peer')
else:
print('EX: POST login read socket error')
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
except ValueError as ex:
print('EX: POST login read failed, ' + str(ex))
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
login_nickname, login_password, register = \
html_get_login_credentials(login_params,
self.server.last_login_time,
registrations_open)
if login_nickname and login_password:
if is_system_account(login_nickname):
print('Invalid username login: ' + login_nickname +
' (system account)')
clear_login_details(self, login_nickname, calling_domain)
self.server.postreq_busy = False
return
self.server.last_login_time = int(time.time())
if register:
if not valid_password(login_password):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
if not register_account(base_dir, http_prefix, domain, port,
login_nickname, login_password,
self.server.manual_follower_approval):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
auth_header = \
create_basic_auth_header(login_nickname, login_password)
if self.headers.get('X-Forward-For'):
ip_address = self.headers['X-Forward-For']
elif self.headers.get('X-Forwarded-For'):
ip_address = self.headers['X-Forwarded-For']
else:
ip_address = self.client_address[0]
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
print('Login attempt from IP: ' + str(ip_address))
if not authorize_basic(base_dir, '/users/' +
login_nickname + '/outbox',
auth_header, False):
print('Login failed: ' + login_nickname)
clear_login_details(self, login_nickname, calling_domain)
fail_time = int(time.time())
self.server.last_login_failure = fail_time
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
record_login_failure(base_dir, ip_address,
self.server.login_failure_count,
fail_time,
self.server.log_login_failures)
self.server.postreq_busy = False
return
else:
if self.server.login_failure_count.get(ip_address):
del self.server.login_failure_count[ip_address]
if is_suspended(base_dir, login_nickname):
msg = \
html_suspended(base_dir).encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
# login success - redirect with authorization
print('====== Login success: ' + login_nickname +
' ' + ua_str)
# re-activate account if needed
activate_account2(base_dir, login_nickname, domain)
# This produces a deterministic token based
# on nick+password+salt
salt_filename = \
acct_dir(base_dir, login_nickname, domain) + '/.salt'
salt = create_password(32)
if os.path.isfile(salt_filename):
try:
with open(salt_filename, 'r',
encoding='utf-8') as fp_salt:
salt = fp_salt.read()
except OSError as ex:
print('EX: Unable to read salt for ' +
login_nickname + ' ' + str(ex))
else:
try:
with open(salt_filename, 'w+',
encoding='utf-8') as fp_salt:
fp_salt.write(salt)
except OSError as ex:
print('EX: Unable to save salt for ' +
login_nickname + ' ' + str(ex))
token_text = login_nickname + login_password + salt
token = sha256(token_text.encode('utf-8')).hexdigest()
self.server.tokens[login_nickname] = token
login_handle = login_nickname + '@' + domain
token_filename = \
base_dir + '/accounts/' + \
login_handle + '/.token'
try:
with open(token_filename, 'w+',
encoding='utf-8') as fp_tok:
fp_tok.write(token)
except OSError as ex:
print('EX: Unable to save token for ' +
login_nickname + ' ' + str(ex))
person_upgrade_actor(base_dir, None,
base_dir + '/accounts/' +
login_handle + '.json')
index = self.server.tokens[login_nickname]
self.server.tokens_lookup[index] = login_nickname
cookie_str = 'SET:epicyon=' + \
self.server.tokens[login_nickname] + '; SameSite=Strict'
tl_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/users/' + login_nickname + '/' + \
self.server.default_timeline
redirect_headers(self, tl_url, cookie_str, calling_domain)
self.server.postreq_busy = False
return
else:
print('WARN: No login credentials presented to /login')
if debug:
# be careful to avoid logging the password
login_str = login_params
if '=' in login_params:
login_params_list = login_params.split('=')
login_str = ''
skip_param = False
for login_prm in login_params_list:
if not skip_param:
login_str += login_prm + '='
else:
len_str = login_prm.split('&')[0]
if len(len_str) > 0:
login_str += login_prm + '*'
len_str = ''
if '&' in login_prm:
login_str += \
'&' + login_prm.split('&')[1] + '='
skip_param = False
if 'password' in login_prm:
skip_param = True
login_str = login_str[:len(login_str) - 1]
print(login_str)
http_401(self, 'No login credentials were posted')
self.server.postreq_busy = False
http_200(self)
self.server.postreq_busy = False

View File

@ -19,7 +19,7 @@ from person import create_person
from person import create_group
from person import set_profile_image
from person import remove_account
from person import activate_account
from person import activate_account2
from person import deactivate_account
from skills import set_skill_level
from roles import set_role
@ -3062,7 +3062,7 @@ def _command_options() -> None:
if not argb.domain or not get_config_param(base_dir, 'domain'):
print('Use the --domain option to set the domain name')
sys.exit()
if activate_account(base_dir, nickname, domain):
if activate_account2(base_dir, nickname, domain):
print('Account for ' + nickname + '@' + domain + ' was activated')
else:
print('Deactivated account for ' + nickname + '@' + domain +

View File

@ -1479,7 +1479,7 @@ def deactivate_account(base_dir: str, nickname: str, domain: str) -> bool:
return os.path.isdir(deactivated_dir + '/' + nickname + '@' + domain)
def activate_account(base_dir: str, nickname: str, domain: str) -> None:
def activate_account2(base_dir: str, nickname: str, domain: str) -> None:
"""Makes a deactivated account available
"""
handle = nickname + '@' + domain