masto api daemon functions in their own module

merge-requests/30/head
Bob Mottram 2024-03-02 12:58:13 +00:00
parent c4bee4938d
commit e6ef8d65f5
3 changed files with 656 additions and 633 deletions

View File

@ -13,8 +13,6 @@ import json
import datetime
import urllib.parse
from shutil import copyfile
from mastoapiv1 import masto_api_v1_response
from mastoapiv2 import masto_api_v2_response
from relationships import get_inactive_feed
from relationships import get_moved_feed
from skills import get_skills_from_list
@ -224,6 +222,7 @@ from posts import json_pin_post
from posts import is_moderator
from posts import get_pinned_post_as_json
from posts import outbox_message_create_wrap
from daemon_get_masto_api import masto_api
# Blogs can be longer, so don't show many per page
MAX_POSTS_IN_BLOGS_FEED = 4
@ -1201,25 +1200,25 @@ def daemon_http_get(self) -> None:
return
# minimal mastodon api
if _masto_api(self, self.path, calling_domain, ua_str,
authorized,
self.server.http_prefix,
self.server.base_dir,
self.authorized_nickname,
self.server.domain,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain,
self.server.translate,
self.server.registration,
self.server.system_language,
self.server.project_version,
self.server.custom_emoji,
self.server.show_node_info_accounts,
referer_domain,
self.server.debug,
self.server.known_crawlers,
self.server.sites_unavailable):
if masto_api(self, self.path, calling_domain, ua_str,
authorized,
self.server.http_prefix,
self.server.base_dir,
self.authorized_nickname,
self.server.domain,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain,
self.server.translate,
self.server.registration,
self.server.system_language,
self.server.project_version,
self.server.custom_emoji,
self.server.show_node_info_accounts,
referer_domain,
self.server.debug,
self.server.known_crawlers,
self.server.sites_unavailable):
return
fitness_performance(getreq_start_time, self.server.fitness,
@ -4918,40 +4917,6 @@ def _show_conversation_thread(self, authorized: bool,
return True
def _masto_api(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool, http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
custom_emoji: [],
show_node_info_accounts: bool,
referer_domain: str, debug: bool,
known_crawlers: {},
sites_unavailable: []) -> bool:
if _masto_api_v2(self, path, calling_domain, ua_str, authorized,
http_prefix, base_dir, nickname, domain,
domain_full, onion_domain, i2p_domain,
translate, registration, system_language,
project_version,
show_node_info_accounts,
referer_domain, debug, 5,
known_crawlers, sites_unavailable):
return True
return _masto_api_v1(self, path, calling_domain, ua_str, authorized,
http_prefix, base_dir, nickname, domain,
domain_full, onion_domain, i2p_domain,
translate, registration, system_language,
project_version, custom_emoji,
show_node_info_accounts,
referer_domain, debug, 5,
known_crawlers, sites_unavailable)
def _show_cached_favicon(self, referer_domain: str, path: str,
base_dir: str, getreq_start_time) -> None:
"""Shows a favicon image obtained from the cache
@ -13450,259 +13415,6 @@ def _show_person_profile(self, authorized: bool,
return True
def _masto_api_v2(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool,
http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
show_node_info_accounts: bool,
referer_domain: str,
debug: bool,
calling_site_timeout: int,
known_crawlers: {},
sites_unavailable: []) -> bool:
"""This is a vestigil mastodon v2 API for the purpose
of returning an empty result to sites like
https://mastopeek.app-dist.eu
"""
if not path.startswith('/api/v2/'):
return False
if not referer_domain:
if not (debug and self.server.unit_test):
print('mastodon api v2 request has no referer domain ' +
str(ua_str))
http_400(self)
return True
if referer_domain == domain_full:
print('mastodon api v2 request from self')
http_400(self)
return True
if self.server.masto_api_is_active:
print('mastodon api v2 is busy during request from ' +
referer_domain)
http_503(self)
return True
self.server.masto_api_is_active = True
# is this a real website making the call ?
if not debug and not self.server.unit_test and referer_domain:
# Does calling_domain look like a domain?
if ' ' in referer_domain or \
';' in referer_domain or \
'.' not in referer_domain:
print('mastodon api v2 ' +
'referer does not look like a domain ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not self.server.allow_local_network_access:
if local_network_host(referer_domain):
print('mastodon api v2 referer domain is from the ' +
'local network ' + referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not referer_is_active(http_prefix,
referer_domain, ua_str,
calling_site_timeout,
sites_unavailable):
print('mastodon api v2 referer url is not active ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
print('mastodon api v2: ' + path)
print('mastodon api v2: authorized ' + str(authorized))
print('mastodon api v2: nickname ' + str(nickname))
print('mastodon api v2: referer ' + str(referer_domain))
crawl_time = \
update_known_crawlers(ua_str, base_dir,
known_crawlers,
self.server.last_known_crawler)
if crawl_time is not None:
self.server.last_known_crawler = crawl_time
broch_mode = broch_mode_is_active(base_dir)
send_json, send_json_str = \
masto_api_v2_response(path,
calling_domain,
ua_str,
http_prefix,
base_dir,
domain,
domain_full,
onion_domain,
i2p_domain,
translate,
registration,
system_language,
project_version,
show_node_info_accounts,
broch_mode)
if send_json is not None:
msg_str = json.dumps(send_json)
msg_str = convert_domains(calling_domain, referer_domain,
msg_str, http_prefix, domain,
onion_domain, i2p_domain)
msg = msg_str.encode('utf-8')
msglen = len(msg)
if has_accept(self, calling_domain):
protocol_str = \
get_json_content_from_accept(self.headers.get('Accept'))
set_headers(self, protocol_str, msglen,
None, calling_domain, True)
else:
set_headers(self, 'application/ld+json', msglen,
None, calling_domain, True)
write2(self, msg)
if send_json_str:
print(send_json_str)
self.server.masto_api_is_active = False
return True
# no api v2 endpoints were matched
http_404(self, 2)
self.server.masto_api_is_active = False
return True
def _masto_api_v1(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool,
http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
custom_emoji: [],
show_node_info_accounts: bool,
referer_domain: str,
debug: bool,
calling_site_timeout: int,
known_crawlers: {},
sites_unavailable: []) -> bool:
"""This is a vestigil mastodon API for the purpose
of returning an empty result to sites like
https://mastopeek.app-dist.eu
"""
if not path.startswith('/api/v1/'):
return False
if not referer_domain:
if not (debug and self.server.unit_test):
print('mastodon api request has no referer domain ' +
str(ua_str))
http_400(self)
return True
if referer_domain == domain_full:
print('mastodon api request from self')
http_400(self)
return True
if self.server.masto_api_is_active:
print('mastodon api is busy during request from ' +
referer_domain)
http_503(self)
return True
self.server.masto_api_is_active = True
# is this a real website making the call ?
if not debug and not self.server.unit_test and referer_domain:
# Does calling_domain look like a domain?
if ' ' in referer_domain or \
';' in referer_domain or \
'.' not in referer_domain:
print('mastodon api ' +
'referer does not look like a domain ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not self.server.allow_local_network_access:
if local_network_host(referer_domain):
print('mastodon api referer domain is from the ' +
'local network ' + referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not referer_is_active(http_prefix,
referer_domain, ua_str,
calling_site_timeout,
sites_unavailable):
print('mastodon api referer url is not active ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
print('mastodon api v1: ' + path)
print('mastodon api v1: authorized ' + str(authorized))
print('mastodon api v1: nickname ' + str(nickname))
print('mastodon api v1: referer ' + str(referer_domain))
crawl_time = \
update_known_crawlers(ua_str, base_dir,
known_crawlers,
self.server.last_known_crawler)
if crawl_time is not None:
self.server.last_known_crawler = crawl_time
broch_mode = broch_mode_is_active(base_dir)
send_json, send_json_str = \
masto_api_v1_response(path,
calling_domain,
ua_str,
authorized,
http_prefix,
base_dir,
nickname, domain,
domain_full,
onion_domain,
i2p_domain,
translate,
registration,
system_language,
project_version,
custom_emoji,
show_node_info_accounts,
broch_mode)
if send_json is not None:
msg_str = json.dumps(send_json)
msg_str = convert_domains(calling_domain, referer_domain,
msg_str, http_prefix, domain,
onion_domain, i2p_domain)
msg = msg_str.encode('utf-8')
msglen = len(msg)
if has_accept(self, calling_domain):
protocol_str = \
get_json_content_from_accept(self.headers.get('Accept'))
set_headers(self, protocol_str, msglen,
None, calling_domain, True)
else:
set_headers(self, 'application/ld+json', msglen,
None, calling_domain, True)
write2(self, msg)
if send_json_str:
print(send_json_str)
self.server.masto_api_is_active = False
return True
# no api endpoints were matched
http_404(self, 1)
self.server.masto_api_is_active = False
return True
def _show_post_from_file(self, post_filename: str, liked_by: str,
react_by: str, react_emoji: str,
authorized: bool,

View File

@ -0,0 +1,311 @@
__filename__ = "daemon_get_masto_api.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"
import json
from httpheaders import set_headers
from httpcodes import write2
from mastoapiv1 import masto_api_v1_response
from mastoapiv2 import masto_api_v2_response
from siteactive import referer_is_active
from httpcodes import http_400
from httpcodes import http_404
from httpcodes import http_503
from utils import get_json_content_from_accept
from utils import convert_domains
from utils import local_network_host
from crawlers import update_known_crawlers
from blocking import broch_mode_is_active
from daemon_utils import has_accept
def masto_api(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool, http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
custom_emoji: [],
show_node_info_accounts: bool,
referer_domain: str, debug: bool,
known_crawlers: {},
sites_unavailable: []) -> bool:
if _masto_api_v2(self, path, calling_domain, ua_str, authorized,
http_prefix, base_dir, nickname, domain,
domain_full, onion_domain, i2p_domain,
translate, registration, system_language,
project_version,
show_node_info_accounts,
referer_domain, debug, 5,
known_crawlers, sites_unavailable):
return True
return _masto_api_v1(self, path, calling_domain, ua_str, authorized,
http_prefix, base_dir, nickname, domain,
domain_full, onion_domain, i2p_domain,
translate, registration, system_language,
project_version, custom_emoji,
show_node_info_accounts,
referer_domain, debug, 5,
known_crawlers, sites_unavailable)
def _masto_api_v1(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool,
http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
custom_emoji: [],
show_node_info_accounts: bool,
referer_domain: str,
debug: bool,
calling_site_timeout: int,
known_crawlers: {},
sites_unavailable: []) -> bool:
"""This is a vestigil mastodon API for the purpose
of returning an empty result to sites like
https://mastopeek.app-dist.eu
"""
if not path.startswith('/api/v1/'):
return False
if not referer_domain:
if not (debug and self.server.unit_test):
print('mastodon api request has no referer domain ' +
str(ua_str))
http_400(self)
return True
if referer_domain == domain_full:
print('mastodon api request from self')
http_400(self)
return True
if self.server.masto_api_is_active:
print('mastodon api is busy during request from ' +
referer_domain)
http_503(self)
return True
self.server.masto_api_is_active = True
# is this a real website making the call ?
if not debug and not self.server.unit_test and referer_domain:
# Does calling_domain look like a domain?
if ' ' in referer_domain or \
';' in referer_domain or \
'.' not in referer_domain:
print('mastodon api ' +
'referer does not look like a domain ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not self.server.allow_local_network_access:
if local_network_host(referer_domain):
print('mastodon api referer domain is from the ' +
'local network ' + referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not referer_is_active(http_prefix,
referer_domain, ua_str,
calling_site_timeout,
sites_unavailable):
print('mastodon api referer url is not active ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
print('mastodon api v1: ' + path)
print('mastodon api v1: authorized ' + str(authorized))
print('mastodon api v1: nickname ' + str(nickname))
print('mastodon api v1: referer ' + str(referer_domain))
crawl_time = \
update_known_crawlers(ua_str, base_dir,
known_crawlers,
self.server.last_known_crawler)
if crawl_time is not None:
self.server.last_known_crawler = crawl_time
broch_mode = broch_mode_is_active(base_dir)
send_json, send_json_str = \
masto_api_v1_response(path,
calling_domain,
ua_str,
authorized,
http_prefix,
base_dir,
nickname, domain,
domain_full,
onion_domain,
i2p_domain,
translate,
registration,
system_language,
project_version,
custom_emoji,
show_node_info_accounts,
broch_mode)
if send_json is not None:
msg_str = json.dumps(send_json)
msg_str = convert_domains(calling_domain, referer_domain,
msg_str, http_prefix, domain,
onion_domain, i2p_domain)
msg = msg_str.encode('utf-8')
msglen = len(msg)
if has_accept(self, calling_domain):
protocol_str = \
get_json_content_from_accept(self.headers.get('Accept'))
set_headers(self, protocol_str, msglen,
None, calling_domain, True)
else:
set_headers(self, 'application/ld+json', msglen,
None, calling_domain, True)
write2(self, msg)
if send_json_str:
print(send_json_str)
self.server.masto_api_is_active = False
return True
# no api endpoints were matched
http_404(self, 1)
self.server.masto_api_is_active = False
return True
def _masto_api_v2(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool,
http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
show_node_info_accounts: bool,
referer_domain: str,
debug: bool,
calling_site_timeout: int,
known_crawlers: {},
sites_unavailable: []) -> bool:
"""This is a vestigil mastodon v2 API for the purpose
of returning an empty result to sites like
https://mastopeek.app-dist.eu
"""
if not path.startswith('/api/v2/'):
return False
if not referer_domain:
if not (debug and self.server.unit_test):
print('mastodon api v2 request has no referer domain ' +
str(ua_str))
http_400(self)
return True
if referer_domain == domain_full:
print('mastodon api v2 request from self')
http_400(self)
return True
if self.server.masto_api_is_active:
print('mastodon api v2 is busy during request from ' +
referer_domain)
http_503(self)
return True
self.server.masto_api_is_active = True
# is this a real website making the call ?
if not debug and not self.server.unit_test and referer_domain:
# Does calling_domain look like a domain?
if ' ' in referer_domain or \
';' in referer_domain or \
'.' not in referer_domain:
print('mastodon api v2 ' +
'referer does not look like a domain ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not self.server.allow_local_network_access:
if local_network_host(referer_domain):
print('mastodon api v2 referer domain is from the ' +
'local network ' + referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
if not referer_is_active(http_prefix,
referer_domain, ua_str,
calling_site_timeout,
sites_unavailable):
print('mastodon api v2 referer url is not active ' +
referer_domain)
http_400(self)
self.server.masto_api_is_active = False
return True
print('mastodon api v2: ' + path)
print('mastodon api v2: authorized ' + str(authorized))
print('mastodon api v2: nickname ' + str(nickname))
print('mastodon api v2: referer ' + str(referer_domain))
crawl_time = \
update_known_crawlers(ua_str, base_dir,
known_crawlers,
self.server.last_known_crawler)
if crawl_time is not None:
self.server.last_known_crawler = crawl_time
broch_mode = broch_mode_is_active(base_dir)
send_json, send_json_str = \
masto_api_v2_response(path,
calling_domain,
ua_str,
http_prefix,
base_dir,
domain,
domain_full,
onion_domain,
i2p_domain,
translate,
registration,
system_language,
project_version,
show_node_info_accounts,
broch_mode)
if send_json is not None:
msg_str = json.dumps(send_json)
msg_str = convert_domains(calling_domain, referer_domain,
msg_str, http_prefix, domain,
onion_domain, i2p_domain)
msg = msg_str.encode('utf-8')
msglen = len(msg)
if has_accept(self, calling_domain):
protocol_str = \
get_json_content_from_accept(self.headers.get('Accept'))
set_headers(self, protocol_str, msglen,
None, calling_domain, True)
else:
set_headers(self, 'application/ld+json', msglen,
None, calling_domain, True)
write2(self, msg)
if send_json_str:
print(send_json_str)
self.server.masto_api_is_active = False
return True
# no api v2 endpoints were matched
http_404(self, 2)
self.server.masto_api_is_active = False
return True

View File

@ -269,331 +269,6 @@ MAX_POSTS_IN_HASHTAG_FEED = 6
MAX_POSTS_IN_FEED = 12
def _set_hashtag_category2(self, calling_domain: str, cookie: str,
path: str, base_dir: str,
domain: str, debug: bool,
system_language: str) -> None:
"""On the screen after selecting a hashtag from the swarm, this sets
the category for that tag
"""
users_path = path.replace('/sethashtagcategory', '')
hashtag = ''
if '/tags/' not in users_path:
# no hashtag is specified within the path
http_404(self, 14)
return
hashtag = users_path.split('/tags/')[1].strip()
hashtag = urllib.parse.unquote_plus(hashtag)
if not hashtag:
# no hashtag was given in the path
http_404(self, 15)
return
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
# the hashtag does not exist
http_404(self, 16)
return
users_path = users_path.split('/tags/')[0]
actor_str = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
users_path
tag_screen_str = actor_str + '/tags/' + hashtag
boundary = None
if ' boundary=' in self.headers['Content-type']:
boundary = self.headers['Content-type'].split('boundary=')[1]
if ';' in boundary:
boundary = boundary.split(';')[0]
# get the nickname
nickname = get_nickname_from_actor(actor_str)
editor = None
if nickname:
editor = is_editor(base_dir, nickname)
if not hashtag or not editor:
if not nickname:
print('WARN: nickname not found in ' + actor_str)
else:
print('WARN: nickname is not a moderator' + actor_str)
redirect_headers(self, tag_screen_str, cookie, calling_domain)
self.server.postreq_busy = False
return
if self.headers.get('Content-length'):
length = int(self.headers['Content-length'])
# check that the POST isn't too large
if length > self.server.max_post_length:
print('Maximum links data length exceeded ' + str(length))
redirect_headers(self, tag_screen_str, cookie, calling_domain)
self.server.postreq_busy = False
return
try:
# read the bytes of the http form POST
post_bytes = self.rfile.read(length)
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('EX: connection was reset while ' +
'reading bytes from http form POST')
else:
print('EX: error while reading bytes ' +
'from http form POST')
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
except ValueError as ex:
print('EX: failed to read bytes for POST, ' + str(ex))
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
if not boundary:
if b'--LYNX' in post_bytes:
boundary = '--LYNX'
if boundary:
# extract all of the text fields into a dict
fields = \
extract_text_fields_in_post(post_bytes, boundary, debug, None)
if fields.get('hashtagCategory'):
category_str = fields['hashtagCategory'].lower()
if not is_blocked_hashtag(base_dir, category_str) and \
not is_filtered(base_dir, nickname, domain, category_str,
system_language):
set_hashtag_category(base_dir, hashtag,
category_str, False)
else:
category_filename = base_dir + '/tags/' + hashtag + '.category'
if os.path.isfile(category_filename):
try:
os.remove(category_filename)
except OSError:
print('EX: _set_hashtag_category unable to delete ' +
category_filename)
# redirect back to the default timeline
redirect_headers(self, tag_screen_str,
cookie, calling_domain)
self.server.postreq_busy = False
def _post_login_screen(self, calling_domain: str, cookie: str,
base_dir: str, http_prefix: str,
domain: str, port: int,
ua_str: str, debug: bool,
registrations_open: bool) -> None:
"""POST to login screen, containing credentials
"""
# ensure that there is a minimum delay between failed login
# attempts, to mitigate brute force
if int(time.time()) - self.server.last_login_failure < 5:
http_503(self)
self.server.postreq_busy = False
return
# get the contents of POST containing login credentials
length = int(self.headers['Content-length'])
if length > 512:
print('Login failed - credentials too long')
http_401(self, 'Credentials are too long')
self.server.postreq_busy = False
return
try:
login_params = self.rfile.read(length).decode('utf-8')
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('EX: POST login read ' +
'connection reset by peer')
else:
print('EX: POST login read socket error')
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
except ValueError as ex:
print('EX: POST login read failed, ' + str(ex))
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
login_nickname, login_password, register = \
html_get_login_credentials(login_params,
self.server.last_login_time,
registrations_open)
if login_nickname and login_password:
if is_system_account(login_nickname):
print('Invalid username login: ' + login_nickname +
' (system account)')
clear_login_details(self, login_nickname, calling_domain)
self.server.postreq_busy = False
return
self.server.last_login_time = int(time.time())
if register:
if not valid_password(login_password):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
if not register_account(base_dir, http_prefix, domain, port,
login_nickname, login_password,
self.server.manual_follower_approval):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
auth_header = \
create_basic_auth_header(login_nickname, login_password)
if self.headers.get('X-Forward-For'):
ip_address = self.headers['X-Forward-For']
elif self.headers.get('X-Forwarded-For'):
ip_address = self.headers['X-Forwarded-For']
else:
ip_address = self.client_address[0]
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
print('Login attempt from IP: ' + str(ip_address))
if not authorize_basic(base_dir, '/users/' +
login_nickname + '/outbox',
auth_header, False):
print('Login failed: ' + login_nickname)
clear_login_details(self, login_nickname, calling_domain)
fail_time = int(time.time())
self.server.last_login_failure = fail_time
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
record_login_failure(base_dir, ip_address,
self.server.login_failure_count,
fail_time,
self.server.log_login_failures)
self.server.postreq_busy = False
return
else:
if self.server.login_failure_count.get(ip_address):
del self.server.login_failure_count[ip_address]
if is_suspended(base_dir, login_nickname):
msg = \
html_suspended(base_dir).encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
# login success - redirect with authorization
print('====== Login success: ' + login_nickname +
' ' + ua_str)
# re-activate account if needed
activate_account(base_dir, login_nickname, domain)
# This produces a deterministic token based
# on nick+password+salt
salt_filename = \
acct_dir(base_dir, login_nickname, domain) + '/.salt'
salt = create_password(32)
if os.path.isfile(salt_filename):
try:
with open(salt_filename, 'r',
encoding='utf-8') as fp_salt:
salt = fp_salt.read()
except OSError as ex:
print('EX: Unable to read salt for ' +
login_nickname + ' ' + str(ex))
else:
try:
with open(salt_filename, 'w+',
encoding='utf-8') as fp_salt:
fp_salt.write(salt)
except OSError as ex:
print('EX: Unable to save salt for ' +
login_nickname + ' ' + str(ex))
token_text = login_nickname + login_password + salt
token = sha256(token_text.encode('utf-8')).hexdigest()
self.server.tokens[login_nickname] = token
login_handle = login_nickname + '@' + domain
token_filename = \
base_dir + '/accounts/' + \
login_handle + '/.token'
try:
with open(token_filename, 'w+',
encoding='utf-8') as fp_tok:
fp_tok.write(token)
except OSError as ex:
print('EX: Unable to save token for ' +
login_nickname + ' ' + str(ex))
person_upgrade_actor(base_dir, None,
base_dir + '/accounts/' +
login_handle + '.json')
index = self.server.tokens[login_nickname]
self.server.tokens_lookup[index] = login_nickname
cookie_str = 'SET:epicyon=' + \
self.server.tokens[login_nickname] + '; SameSite=Strict'
tl_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/users/' + login_nickname + '/' + \
self.server.default_timeline
redirect_headers(self, tl_url, cookie_str, calling_domain)
self.server.postreq_busy = False
return
else:
print('WARN: No login credentials presented to /login')
if debug:
# be careful to avoid logging the password
login_str = login_params
if '=' in login_params:
login_params_list = login_params.split('=')
login_str = ''
skip_param = False
for login_prm in login_params_list:
if not skip_param:
login_str += login_prm + '='
else:
len_str = login_prm.split('&')[0]
if len(len_str) > 0:
login_str += login_prm + '*'
len_str = ''
if '&' in login_prm:
login_str += \
'&' + login_prm.split('&')[1] + '='
skip_param = False
if 'password' in login_prm:
skip_param = True
login_str = login_str[:len(login_str) - 1]
print(login_str)
http_401(self, 'No login credentials were posted')
self.server.postreq_busy = False
http_200(self)
self.server.postreq_busy = False
def daemon_http_post(self) -> None:
"""HTTP POST handler
"""
@ -9313,3 +8988,328 @@ def _receive_new_post_process(self, post_type: str, path: str, headers: {},
self.post_to_nickname = nickname
return 1
return -1
def _set_hashtag_category2(self, calling_domain: str, cookie: str,
path: str, base_dir: str,
domain: str, debug: bool,
system_language: str) -> None:
"""On the screen after selecting a hashtag from the swarm, this sets
the category for that tag
"""
users_path = path.replace('/sethashtagcategory', '')
hashtag = ''
if '/tags/' not in users_path:
# no hashtag is specified within the path
http_404(self, 14)
return
hashtag = users_path.split('/tags/')[1].strip()
hashtag = urllib.parse.unquote_plus(hashtag)
if not hashtag:
# no hashtag was given in the path
http_404(self, 15)
return
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
# the hashtag does not exist
http_404(self, 16)
return
users_path = users_path.split('/tags/')[0]
actor_str = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
users_path
tag_screen_str = actor_str + '/tags/' + hashtag
boundary = None
if ' boundary=' in self.headers['Content-type']:
boundary = self.headers['Content-type'].split('boundary=')[1]
if ';' in boundary:
boundary = boundary.split(';')[0]
# get the nickname
nickname = get_nickname_from_actor(actor_str)
editor = None
if nickname:
editor = is_editor(base_dir, nickname)
if not hashtag or not editor:
if not nickname:
print('WARN: nickname not found in ' + actor_str)
else:
print('WARN: nickname is not a moderator' + actor_str)
redirect_headers(self, tag_screen_str, cookie, calling_domain)
self.server.postreq_busy = False
return
if self.headers.get('Content-length'):
length = int(self.headers['Content-length'])
# check that the POST isn't too large
if length > self.server.max_post_length:
print('Maximum links data length exceeded ' + str(length))
redirect_headers(self, tag_screen_str, cookie, calling_domain)
self.server.postreq_busy = False
return
try:
# read the bytes of the http form POST
post_bytes = self.rfile.read(length)
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('EX: connection was reset while ' +
'reading bytes from http form POST')
else:
print('EX: error while reading bytes ' +
'from http form POST')
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
except ValueError as ex:
print('EX: failed to read bytes for POST, ' + str(ex))
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
if not boundary:
if b'--LYNX' in post_bytes:
boundary = '--LYNX'
if boundary:
# extract all of the text fields into a dict
fields = \
extract_text_fields_in_post(post_bytes, boundary, debug, None)
if fields.get('hashtagCategory'):
category_str = fields['hashtagCategory'].lower()
if not is_blocked_hashtag(base_dir, category_str) and \
not is_filtered(base_dir, nickname, domain, category_str,
system_language):
set_hashtag_category(base_dir, hashtag,
category_str, False)
else:
category_filename = base_dir + '/tags/' + hashtag + '.category'
if os.path.isfile(category_filename):
try:
os.remove(category_filename)
except OSError:
print('EX: _set_hashtag_category unable to delete ' +
category_filename)
# redirect back to the default timeline
redirect_headers(self, tag_screen_str,
cookie, calling_domain)
self.server.postreq_busy = False
def _post_login_screen(self, calling_domain: str, cookie: str,
base_dir: str, http_prefix: str,
domain: str, port: int,
ua_str: str, debug: bool,
registrations_open: bool) -> None:
"""POST to login screen, containing credentials
"""
# ensure that there is a minimum delay between failed login
# attempts, to mitigate brute force
if int(time.time()) - self.server.last_login_failure < 5:
http_503(self)
self.server.postreq_busy = False
return
# get the contents of POST containing login credentials
length = int(self.headers['Content-length'])
if length > 512:
print('Login failed - credentials too long')
http_401(self, 'Credentials are too long')
self.server.postreq_busy = False
return
try:
login_params = self.rfile.read(length).decode('utf-8')
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('EX: POST login read ' +
'connection reset by peer')
else:
print('EX: POST login read socket error')
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
except ValueError as ex:
print('EX: POST login read failed, ' + str(ex))
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
login_nickname, login_password, register = \
html_get_login_credentials(login_params,
self.server.last_login_time,
registrations_open)
if login_nickname and login_password:
if is_system_account(login_nickname):
print('Invalid username login: ' + login_nickname +
' (system account)')
clear_login_details(self, login_nickname, calling_domain)
self.server.postreq_busy = False
return
self.server.last_login_time = int(time.time())
if register:
if not valid_password(login_password):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
if not register_account(base_dir, http_prefix, domain, port,
login_nickname, login_password,
self.server.manual_follower_approval):
self.server.postreq_busy = False
login_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/login'
redirect_headers(self, login_url, cookie, calling_domain)
return
auth_header = \
create_basic_auth_header(login_nickname, login_password)
if self.headers.get('X-Forward-For'):
ip_address = self.headers['X-Forward-For']
elif self.headers.get('X-Forwarded-For'):
ip_address = self.headers['X-Forwarded-For']
else:
ip_address = self.client_address[0]
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
print('Login attempt from IP: ' + str(ip_address))
if not authorize_basic(base_dir, '/users/' +
login_nickname + '/outbox',
auth_header, False):
print('Login failed: ' + login_nickname)
clear_login_details(self, login_nickname, calling_domain)
fail_time = int(time.time())
self.server.last_login_failure = fail_time
if not domain.endswith('.onion'):
if not is_local_network_address(ip_address):
record_login_failure(base_dir, ip_address,
self.server.login_failure_count,
fail_time,
self.server.log_login_failures)
self.server.postreq_busy = False
return
else:
if self.server.login_failure_count.get(ip_address):
del self.server.login_failure_count[ip_address]
if is_suspended(base_dir, login_nickname):
msg = \
html_suspended(base_dir).encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
# login success - redirect with authorization
print('====== Login success: ' + login_nickname +
' ' + ua_str)
# re-activate account if needed
activate_account(base_dir, login_nickname, domain)
# This produces a deterministic token based
# on nick+password+salt
salt_filename = \
acct_dir(base_dir, login_nickname, domain) + '/.salt'
salt = create_password(32)
if os.path.isfile(salt_filename):
try:
with open(salt_filename, 'r',
encoding='utf-8') as fp_salt:
salt = fp_salt.read()
except OSError as ex:
print('EX: Unable to read salt for ' +
login_nickname + ' ' + str(ex))
else:
try:
with open(salt_filename, 'w+',
encoding='utf-8') as fp_salt:
fp_salt.write(salt)
except OSError as ex:
print('EX: Unable to save salt for ' +
login_nickname + ' ' + str(ex))
token_text = login_nickname + login_password + salt
token = sha256(token_text.encode('utf-8')).hexdigest()
self.server.tokens[login_nickname] = token
login_handle = login_nickname + '@' + domain
token_filename = \
base_dir + '/accounts/' + \
login_handle + '/.token'
try:
with open(token_filename, 'w+',
encoding='utf-8') as fp_tok:
fp_tok.write(token)
except OSError as ex:
print('EX: Unable to save token for ' +
login_nickname + ' ' + str(ex))
person_upgrade_actor(base_dir, None,
base_dir + '/accounts/' +
login_handle + '.json')
index = self.server.tokens[login_nickname]
self.server.tokens_lookup[index] = login_nickname
cookie_str = 'SET:epicyon=' + \
self.server.tokens[login_nickname] + '; SameSite=Strict'
tl_url = \
get_instance_url(calling_domain,
self.server.http_prefix,
self.server.domain_full,
self.server.onion_domain,
self.server.i2p_domain) + \
'/users/' + login_nickname + '/' + \
self.server.default_timeline
redirect_headers(self, tl_url, cookie_str, calling_domain)
self.server.postreq_busy = False
return
else:
print('WARN: No login credentials presented to /login')
if debug:
# be careful to avoid logging the password
login_str = login_params
if '=' in login_params:
login_params_list = login_params.split('=')
login_str = ''
skip_param = False
for login_prm in login_params_list:
if not skip_param:
login_str += login_prm + '='
else:
len_str = login_prm.split('&')[0]
if len(len_str) > 0:
login_str += login_prm + '*'
len_str = ''
if '&' in login_prm:
login_str += \
'&' + login_prm.split('&')[1] + '='
skip_param = False
if 'password' in login_prm:
skip_param = True
login_str = login_str[:len(login_str) - 1]
print(login_str)
http_401(self, 'No login credentials were posted')
self.server.postreq_busy = False
http_200(self)
self.server.postreq_busy = False