diff --git a/daemon_get.py b/daemon_get.py index 8736585ae..6c6af7d8f 100644 --- a/daemon_get.py +++ b/daemon_get.py @@ -13,8 +13,6 @@ import json import datetime import urllib.parse from shutil import copyfile -from mastoapiv1 import masto_api_v1_response -from mastoapiv2 import masto_api_v2_response from relationships import get_inactive_feed from relationships import get_moved_feed from skills import get_skills_from_list @@ -224,6 +222,7 @@ from posts import json_pin_post from posts import is_moderator from posts import get_pinned_post_as_json from posts import outbox_message_create_wrap +from daemon_get_masto_api import masto_api # Blogs can be longer, so don't show many per page MAX_POSTS_IN_BLOGS_FEED = 4 @@ -1201,25 +1200,25 @@ def daemon_http_get(self) -> None: return # minimal mastodon api - if _masto_api(self, self.path, calling_domain, ua_str, - authorized, - self.server.http_prefix, - self.server.base_dir, - self.authorized_nickname, - self.server.domain, - self.server.domain_full, - self.server.onion_domain, - self.server.i2p_domain, - self.server.translate, - self.server.registration, - self.server.system_language, - self.server.project_version, - self.server.custom_emoji, - self.server.show_node_info_accounts, - referer_domain, - self.server.debug, - self.server.known_crawlers, - self.server.sites_unavailable): + if masto_api(self, self.path, calling_domain, ua_str, + authorized, + self.server.http_prefix, + self.server.base_dir, + self.authorized_nickname, + self.server.domain, + self.server.domain_full, + self.server.onion_domain, + self.server.i2p_domain, + self.server.translate, + self.server.registration, + self.server.system_language, + self.server.project_version, + self.server.custom_emoji, + self.server.show_node_info_accounts, + referer_domain, + self.server.debug, + self.server.known_crawlers, + self.server.sites_unavailable): return fitness_performance(getreq_start_time, self.server.fitness, @@ -4918,40 +4917,6 @@ def _show_conversation_thread(self, authorized: bool, return True -def _masto_api(self, path: str, calling_domain: str, - ua_str: str, - authorized: bool, http_prefix: str, - base_dir: str, nickname: str, domain: str, - domain_full: str, - onion_domain: str, i2p_domain: str, - translate: {}, - registration: bool, - system_language: str, - project_version: str, - custom_emoji: [], - show_node_info_accounts: bool, - referer_domain: str, debug: bool, - known_crawlers: {}, - sites_unavailable: []) -> bool: - if _masto_api_v2(self, path, calling_domain, ua_str, authorized, - http_prefix, base_dir, nickname, domain, - domain_full, onion_domain, i2p_domain, - translate, registration, system_language, - project_version, - show_node_info_accounts, - referer_domain, debug, 5, - known_crawlers, sites_unavailable): - return True - return _masto_api_v1(self, path, calling_domain, ua_str, authorized, - http_prefix, base_dir, nickname, domain, - domain_full, onion_domain, i2p_domain, - translate, registration, system_language, - project_version, custom_emoji, - show_node_info_accounts, - referer_domain, debug, 5, - known_crawlers, sites_unavailable) - - def _show_cached_favicon(self, referer_domain: str, path: str, base_dir: str, getreq_start_time) -> None: """Shows a favicon image obtained from the cache @@ -13450,259 +13415,6 @@ def _show_person_profile(self, authorized: bool, return True -def _masto_api_v2(self, path: str, calling_domain: str, - ua_str: str, - authorized: bool, - http_prefix: str, - base_dir: str, nickname: str, domain: str, - domain_full: str, - onion_domain: str, i2p_domain: str, - translate: {}, - registration: bool, - system_language: str, - project_version: str, - show_node_info_accounts: bool, - referer_domain: str, - debug: bool, - calling_site_timeout: int, - known_crawlers: {}, - sites_unavailable: []) -> bool: - """This is a vestigil mastodon v2 API for the purpose - of returning an empty result to sites like - https://mastopeek.app-dist.eu - """ - if not path.startswith('/api/v2/'): - return False - - if not referer_domain: - if not (debug and self.server.unit_test): - print('mastodon api v2 request has no referer domain ' + - str(ua_str)) - http_400(self) - return True - if referer_domain == domain_full: - print('mastodon api v2 request from self') - http_400(self) - return True - if self.server.masto_api_is_active: - print('mastodon api v2 is busy during request from ' + - referer_domain) - http_503(self) - return True - self.server.masto_api_is_active = True - # is this a real website making the call ? - if not debug and not self.server.unit_test and referer_domain: - # Does calling_domain look like a domain? - if ' ' in referer_domain or \ - ';' in referer_domain or \ - '.' not in referer_domain: - print('mastodon api v2 ' + - 'referer does not look like a domain ' + - referer_domain) - http_400(self) - self.server.masto_api_is_active = False - return True - if not self.server.allow_local_network_access: - if local_network_host(referer_domain): - print('mastodon api v2 referer domain is from the ' + - 'local network ' + referer_domain) - http_400(self) - self.server.masto_api_is_active = False - return True - if not referer_is_active(http_prefix, - referer_domain, ua_str, - calling_site_timeout, - sites_unavailable): - print('mastodon api v2 referer url is not active ' + - referer_domain) - http_400(self) - self.server.masto_api_is_active = False - return True - - print('mastodon api v2: ' + path) - print('mastodon api v2: authorized ' + str(authorized)) - print('mastodon api v2: nickname ' + str(nickname)) - print('mastodon api v2: referer ' + str(referer_domain)) - crawl_time = \ - update_known_crawlers(ua_str, base_dir, - known_crawlers, - self.server.last_known_crawler) - if crawl_time is not None: - self.server.last_known_crawler = crawl_time - - broch_mode = broch_mode_is_active(base_dir) - send_json, send_json_str = \ - masto_api_v2_response(path, - calling_domain, - ua_str, - http_prefix, - base_dir, - domain, - domain_full, - onion_domain, - i2p_domain, - translate, - registration, - system_language, - project_version, - show_node_info_accounts, - broch_mode) - - if send_json is not None: - msg_str = json.dumps(send_json) - msg_str = convert_domains(calling_domain, referer_domain, - msg_str, http_prefix, domain, - onion_domain, i2p_domain) - msg = msg_str.encode('utf-8') - msglen = len(msg) - if has_accept(self, calling_domain): - protocol_str = \ - get_json_content_from_accept(self.headers.get('Accept')) - set_headers(self, protocol_str, msglen, - None, calling_domain, True) - else: - set_headers(self, 'application/ld+json', msglen, - None, calling_domain, True) - write2(self, msg) - if send_json_str: - print(send_json_str) - self.server.masto_api_is_active = False - return True - - # no api v2 endpoints were matched - http_404(self, 2) - self.server.masto_api_is_active = False - return True - - -def _masto_api_v1(self, path: str, calling_domain: str, - ua_str: str, - authorized: bool, - http_prefix: str, - base_dir: str, nickname: str, domain: str, - domain_full: str, - onion_domain: str, i2p_domain: str, - translate: {}, - registration: bool, - system_language: str, - project_version: str, - custom_emoji: [], - show_node_info_accounts: bool, - referer_domain: str, - debug: bool, - calling_site_timeout: int, - known_crawlers: {}, - sites_unavailable: []) -> bool: - """This is a vestigil mastodon API for the purpose - of returning an empty result to sites like - https://mastopeek.app-dist.eu - """ - if not path.startswith('/api/v1/'): - return False - - if not referer_domain: - if not (debug and self.server.unit_test): - print('mastodon api request has no referer domain ' + - str(ua_str)) - http_400(self) - return True - if referer_domain == domain_full: - print('mastodon api request from self') - http_400(self) - return True - if self.server.masto_api_is_active: - print('mastodon api is busy during request from ' + - referer_domain) - http_503(self) - return True - self.server.masto_api_is_active = True - # is this a real website making the call ? - if not debug and not self.server.unit_test and referer_domain: - # Does calling_domain look like a domain? - if ' ' in referer_domain or \ - ';' in referer_domain or \ - '.' not in referer_domain: - print('mastodon api ' + - 'referer does not look like a domain ' + - referer_domain) - http_400(self) - self.server.masto_api_is_active = False - return True - if not self.server.allow_local_network_access: - if local_network_host(referer_domain): - print('mastodon api referer domain is from the ' + - 'local network ' + referer_domain) - http_400(self) - self.server.masto_api_is_active = False - return True - if not referer_is_active(http_prefix, - referer_domain, ua_str, - calling_site_timeout, - sites_unavailable): - print('mastodon api referer url is not active ' + - referer_domain) - http_400(self) - self.server.masto_api_is_active = False - return True - - print('mastodon api v1: ' + path) - print('mastodon api v1: authorized ' + str(authorized)) - print('mastodon api v1: nickname ' + str(nickname)) - print('mastodon api v1: referer ' + str(referer_domain)) - crawl_time = \ - update_known_crawlers(ua_str, base_dir, - known_crawlers, - self.server.last_known_crawler) - if crawl_time is not None: - self.server.last_known_crawler = crawl_time - - broch_mode = broch_mode_is_active(base_dir) - send_json, send_json_str = \ - masto_api_v1_response(path, - calling_domain, - ua_str, - authorized, - http_prefix, - base_dir, - nickname, domain, - domain_full, - onion_domain, - i2p_domain, - translate, - registration, - system_language, - project_version, - custom_emoji, - show_node_info_accounts, - broch_mode) - - if send_json is not None: - msg_str = json.dumps(send_json) - msg_str = convert_domains(calling_domain, referer_domain, - msg_str, http_prefix, domain, - onion_domain, i2p_domain) - msg = msg_str.encode('utf-8') - msglen = len(msg) - if has_accept(self, calling_domain): - protocol_str = \ - get_json_content_from_accept(self.headers.get('Accept')) - set_headers(self, protocol_str, msglen, - None, calling_domain, True) - else: - set_headers(self, 'application/ld+json', msglen, - None, calling_domain, True) - write2(self, msg) - if send_json_str: - print(send_json_str) - self.server.masto_api_is_active = False - return True - - # no api endpoints were matched - http_404(self, 1) - self.server.masto_api_is_active = False - return True - - def _show_post_from_file(self, post_filename: str, liked_by: str, react_by: str, react_emoji: str, authorized: bool, diff --git a/daemon_get_masto_api.py b/daemon_get_masto_api.py new file mode 100644 index 000000000..fb31cc94a --- /dev/null +++ b/daemon_get_masto_api.py @@ -0,0 +1,311 @@ +__filename__ = "daemon_get_masto_api.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.5.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@libreserver.org" +__status__ = "Production" +__module_group__ = "Core" + +import json +from httpheaders import set_headers +from httpcodes import write2 +from mastoapiv1 import masto_api_v1_response +from mastoapiv2 import masto_api_v2_response +from siteactive import referer_is_active +from httpcodes import http_400 +from httpcodes import http_404 +from httpcodes import http_503 +from utils import get_json_content_from_accept +from utils import convert_domains +from utils import local_network_host +from crawlers import update_known_crawlers +from blocking import broch_mode_is_active +from daemon_utils import has_accept + + +def masto_api(self, path: str, calling_domain: str, + ua_str: str, + authorized: bool, http_prefix: str, + base_dir: str, nickname: str, domain: str, + domain_full: str, + onion_domain: str, i2p_domain: str, + translate: {}, + registration: bool, + system_language: str, + project_version: str, + custom_emoji: [], + show_node_info_accounts: bool, + referer_domain: str, debug: bool, + known_crawlers: {}, + sites_unavailable: []) -> bool: + if _masto_api_v2(self, path, calling_domain, ua_str, authorized, + http_prefix, base_dir, nickname, domain, + domain_full, onion_domain, i2p_domain, + translate, registration, system_language, + project_version, + show_node_info_accounts, + referer_domain, debug, 5, + known_crawlers, sites_unavailable): + return True + return _masto_api_v1(self, path, calling_domain, ua_str, authorized, + http_prefix, base_dir, nickname, domain, + domain_full, onion_domain, i2p_domain, + translate, registration, system_language, + project_version, custom_emoji, + show_node_info_accounts, + referer_domain, debug, 5, + known_crawlers, sites_unavailable) + + +def _masto_api_v1(self, path: str, calling_domain: str, + ua_str: str, + authorized: bool, + http_prefix: str, + base_dir: str, nickname: str, domain: str, + domain_full: str, + onion_domain: str, i2p_domain: str, + translate: {}, + registration: bool, + system_language: str, + project_version: str, + custom_emoji: [], + show_node_info_accounts: bool, + referer_domain: str, + debug: bool, + calling_site_timeout: int, + known_crawlers: {}, + sites_unavailable: []) -> bool: + """This is a vestigil mastodon API for the purpose + of returning an empty result to sites like + https://mastopeek.app-dist.eu + """ + if not path.startswith('/api/v1/'): + return False + + if not referer_domain: + if not (debug and self.server.unit_test): + print('mastodon api request has no referer domain ' + + str(ua_str)) + http_400(self) + return True + if referer_domain == domain_full: + print('mastodon api request from self') + http_400(self) + return True + if self.server.masto_api_is_active: + print('mastodon api is busy during request from ' + + referer_domain) + http_503(self) + return True + self.server.masto_api_is_active = True + # is this a real website making the call ? + if not debug and not self.server.unit_test and referer_domain: + # Does calling_domain look like a domain? + if ' ' in referer_domain or \ + ';' in referer_domain or \ + '.' not in referer_domain: + print('mastodon api ' + + 'referer does not look like a domain ' + + referer_domain) + http_400(self) + self.server.masto_api_is_active = False + return True + if not self.server.allow_local_network_access: + if local_network_host(referer_domain): + print('mastodon api referer domain is from the ' + + 'local network ' + referer_domain) + http_400(self) + self.server.masto_api_is_active = False + return True + if not referer_is_active(http_prefix, + referer_domain, ua_str, + calling_site_timeout, + sites_unavailable): + print('mastodon api referer url is not active ' + + referer_domain) + http_400(self) + self.server.masto_api_is_active = False + return True + + print('mastodon api v1: ' + path) + print('mastodon api v1: authorized ' + str(authorized)) + print('mastodon api v1: nickname ' + str(nickname)) + print('mastodon api v1: referer ' + str(referer_domain)) + crawl_time = \ + update_known_crawlers(ua_str, base_dir, + known_crawlers, + self.server.last_known_crawler) + if crawl_time is not None: + self.server.last_known_crawler = crawl_time + + broch_mode = broch_mode_is_active(base_dir) + send_json, send_json_str = \ + masto_api_v1_response(path, + calling_domain, + ua_str, + authorized, + http_prefix, + base_dir, + nickname, domain, + domain_full, + onion_domain, + i2p_domain, + translate, + registration, + system_language, + project_version, + custom_emoji, + show_node_info_accounts, + broch_mode) + + if send_json is not None: + msg_str = json.dumps(send_json) + msg_str = convert_domains(calling_domain, referer_domain, + msg_str, http_prefix, domain, + onion_domain, i2p_domain) + msg = msg_str.encode('utf-8') + msglen = len(msg) + if has_accept(self, calling_domain): + protocol_str = \ + get_json_content_from_accept(self.headers.get('Accept')) + set_headers(self, protocol_str, msglen, + None, calling_domain, True) + else: + set_headers(self, 'application/ld+json', msglen, + None, calling_domain, True) + write2(self, msg) + if send_json_str: + print(send_json_str) + self.server.masto_api_is_active = False + return True + + # no api endpoints were matched + http_404(self, 1) + self.server.masto_api_is_active = False + return True + + +def _masto_api_v2(self, path: str, calling_domain: str, + ua_str: str, + authorized: bool, + http_prefix: str, + base_dir: str, nickname: str, domain: str, + domain_full: str, + onion_domain: str, i2p_domain: str, + translate: {}, + registration: bool, + system_language: str, + project_version: str, + show_node_info_accounts: bool, + referer_domain: str, + debug: bool, + calling_site_timeout: int, + known_crawlers: {}, + sites_unavailable: []) -> bool: + """This is a vestigil mastodon v2 API for the purpose + of returning an empty result to sites like + https://mastopeek.app-dist.eu + """ + if not path.startswith('/api/v2/'): + return False + + if not referer_domain: + if not (debug and self.server.unit_test): + print('mastodon api v2 request has no referer domain ' + + str(ua_str)) + http_400(self) + return True + if referer_domain == domain_full: + print('mastodon api v2 request from self') + http_400(self) + return True + if self.server.masto_api_is_active: + print('mastodon api v2 is busy during request from ' + + referer_domain) + http_503(self) + return True + self.server.masto_api_is_active = True + # is this a real website making the call ? + if not debug and not self.server.unit_test and referer_domain: + # Does calling_domain look like a domain? + if ' ' in referer_domain or \ + ';' in referer_domain or \ + '.' not in referer_domain: + print('mastodon api v2 ' + + 'referer does not look like a domain ' + + referer_domain) + http_400(self) + self.server.masto_api_is_active = False + return True + if not self.server.allow_local_network_access: + if local_network_host(referer_domain): + print('mastodon api v2 referer domain is from the ' + + 'local network ' + referer_domain) + http_400(self) + self.server.masto_api_is_active = False + return True + if not referer_is_active(http_prefix, + referer_domain, ua_str, + calling_site_timeout, + sites_unavailable): + print('mastodon api v2 referer url is not active ' + + referer_domain) + http_400(self) + self.server.masto_api_is_active = False + return True + + print('mastodon api v2: ' + path) + print('mastodon api v2: authorized ' + str(authorized)) + print('mastodon api v2: nickname ' + str(nickname)) + print('mastodon api v2: referer ' + str(referer_domain)) + crawl_time = \ + update_known_crawlers(ua_str, base_dir, + known_crawlers, + self.server.last_known_crawler) + if crawl_time is not None: + self.server.last_known_crawler = crawl_time + + broch_mode = broch_mode_is_active(base_dir) + send_json, send_json_str = \ + masto_api_v2_response(path, + calling_domain, + ua_str, + http_prefix, + base_dir, + domain, + domain_full, + onion_domain, + i2p_domain, + translate, + registration, + system_language, + project_version, + show_node_info_accounts, + broch_mode) + + if send_json is not None: + msg_str = json.dumps(send_json) + msg_str = convert_domains(calling_domain, referer_domain, + msg_str, http_prefix, domain, + onion_domain, i2p_domain) + msg = msg_str.encode('utf-8') + msglen = len(msg) + if has_accept(self, calling_domain): + protocol_str = \ + get_json_content_from_accept(self.headers.get('Accept')) + set_headers(self, protocol_str, msglen, + None, calling_domain, True) + else: + set_headers(self, 'application/ld+json', msglen, + None, calling_domain, True) + write2(self, msg) + if send_json_str: + print(send_json_str) + self.server.masto_api_is_active = False + return True + + # no api v2 endpoints were matched + http_404(self, 2) + self.server.masto_api_is_active = False + return True diff --git a/daemon_post.py b/daemon_post.py index d83b03bd7..8d8478c2a 100644 --- a/daemon_post.py +++ b/daemon_post.py @@ -269,331 +269,6 @@ MAX_POSTS_IN_HASHTAG_FEED = 6 MAX_POSTS_IN_FEED = 12 -def _set_hashtag_category2(self, calling_domain: str, cookie: str, - path: str, base_dir: str, - domain: str, debug: bool, - system_language: str) -> None: - """On the screen after selecting a hashtag from the swarm, this sets - the category for that tag - """ - users_path = path.replace('/sethashtagcategory', '') - hashtag = '' - if '/tags/' not in users_path: - # no hashtag is specified within the path - http_404(self, 14) - return - hashtag = users_path.split('/tags/')[1].strip() - hashtag = urllib.parse.unquote_plus(hashtag) - if not hashtag: - # no hashtag was given in the path - http_404(self, 15) - return - hashtag_filename = base_dir + '/tags/' + hashtag + '.txt' - if not os.path.isfile(hashtag_filename): - # the hashtag does not exist - http_404(self, 16) - return - users_path = users_path.split('/tags/')[0] - actor_str = \ - get_instance_url(calling_domain, - self.server.http_prefix, - self.server.domain_full, - self.server.onion_domain, - self.server.i2p_domain) + \ - users_path - tag_screen_str = actor_str + '/tags/' + hashtag - - boundary = None - if ' boundary=' in self.headers['Content-type']: - boundary = self.headers['Content-type'].split('boundary=')[1] - if ';' in boundary: - boundary = boundary.split(';')[0] - - # get the nickname - nickname = get_nickname_from_actor(actor_str) - editor = None - if nickname: - editor = is_editor(base_dir, nickname) - if not hashtag or not editor: - if not nickname: - print('WARN: nickname not found in ' + actor_str) - else: - print('WARN: nickname is not a moderator' + actor_str) - redirect_headers(self, tag_screen_str, cookie, calling_domain) - self.server.postreq_busy = False - return - - if self.headers.get('Content-length'): - length = int(self.headers['Content-length']) - - # check that the POST isn't too large - if length > self.server.max_post_length: - print('Maximum links data length exceeded ' + str(length)) - redirect_headers(self, tag_screen_str, cookie, calling_domain) - self.server.postreq_busy = False - return - - try: - # read the bytes of the http form POST - post_bytes = self.rfile.read(length) - except SocketError as ex: - if ex.errno == errno.ECONNRESET: - print('EX: connection was reset while ' + - 'reading bytes from http form POST') - else: - print('EX: error while reading bytes ' + - 'from http form POST') - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - except ValueError as ex: - print('EX: failed to read bytes for POST, ' + str(ex)) - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - if not boundary: - if b'--LYNX' in post_bytes: - boundary = '--LYNX' - - if boundary: - # extract all of the text fields into a dict - fields = \ - extract_text_fields_in_post(post_bytes, boundary, debug, None) - - if fields.get('hashtagCategory'): - category_str = fields['hashtagCategory'].lower() - if not is_blocked_hashtag(base_dir, category_str) and \ - not is_filtered(base_dir, nickname, domain, category_str, - system_language): - set_hashtag_category(base_dir, hashtag, - category_str, False) - else: - category_filename = base_dir + '/tags/' + hashtag + '.category' - if os.path.isfile(category_filename): - try: - os.remove(category_filename) - except OSError: - print('EX: _set_hashtag_category unable to delete ' + - category_filename) - - # redirect back to the default timeline - redirect_headers(self, tag_screen_str, - cookie, calling_domain) - self.server.postreq_busy = False - - -def _post_login_screen(self, calling_domain: str, cookie: str, - base_dir: str, http_prefix: str, - domain: str, port: int, - ua_str: str, debug: bool, - registrations_open: bool) -> None: - """POST to login screen, containing credentials - """ - # ensure that there is a minimum delay between failed login - # attempts, to mitigate brute force - if int(time.time()) - self.server.last_login_failure < 5: - http_503(self) - self.server.postreq_busy = False - return - - # get the contents of POST containing login credentials - length = int(self.headers['Content-length']) - if length > 512: - print('Login failed - credentials too long') - http_401(self, 'Credentials are too long') - self.server.postreq_busy = False - return - - try: - login_params = self.rfile.read(length).decode('utf-8') - except SocketError as ex: - if ex.errno == errno.ECONNRESET: - print('EX: POST login read ' + - 'connection reset by peer') - else: - print('EX: POST login read socket error') - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - except ValueError as ex: - print('EX: POST login read failed, ' + str(ex)) - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - login_nickname, login_password, register = \ - html_get_login_credentials(login_params, - self.server.last_login_time, - registrations_open) - if login_nickname and login_password: - if is_system_account(login_nickname): - print('Invalid username login: ' + login_nickname + - ' (system account)') - clear_login_details(self, login_nickname, calling_domain) - self.server.postreq_busy = False - return - self.server.last_login_time = int(time.time()) - if register: - if not valid_password(login_password): - self.server.postreq_busy = False - login_url = \ - get_instance_url(calling_domain, - self.server.http_prefix, - self.server.domain_full, - self.server.onion_domain, - self.server.i2p_domain) + \ - '/login' - redirect_headers(self, login_url, cookie, calling_domain) - return - - if not register_account(base_dir, http_prefix, domain, port, - login_nickname, login_password, - self.server.manual_follower_approval): - self.server.postreq_busy = False - login_url = \ - get_instance_url(calling_domain, - self.server.http_prefix, - self.server.domain_full, - self.server.onion_domain, - self.server.i2p_domain) + \ - '/login' - redirect_headers(self, login_url, cookie, calling_domain) - return - auth_header = \ - create_basic_auth_header(login_nickname, login_password) - if self.headers.get('X-Forward-For'): - ip_address = self.headers['X-Forward-For'] - elif self.headers.get('X-Forwarded-For'): - ip_address = self.headers['X-Forwarded-For'] - else: - ip_address = self.client_address[0] - if not domain.endswith('.onion'): - if not is_local_network_address(ip_address): - print('Login attempt from IP: ' + str(ip_address)) - if not authorize_basic(base_dir, '/users/' + - login_nickname + '/outbox', - auth_header, False): - print('Login failed: ' + login_nickname) - clear_login_details(self, login_nickname, calling_domain) - fail_time = int(time.time()) - self.server.last_login_failure = fail_time - if not domain.endswith('.onion'): - if not is_local_network_address(ip_address): - record_login_failure(base_dir, ip_address, - self.server.login_failure_count, - fail_time, - self.server.log_login_failures) - self.server.postreq_busy = False - return - else: - if self.server.login_failure_count.get(ip_address): - del self.server.login_failure_count[ip_address] - if is_suspended(base_dir, login_nickname): - msg = \ - html_suspended(base_dir).encode('utf-8') - msglen = len(msg) - login_headers(self, 'text/html', - msglen, calling_domain) - write2(self, msg) - self.server.postreq_busy = False - return - # login success - redirect with authorization - print('====== Login success: ' + login_nickname + - ' ' + ua_str) - # re-activate account if needed - activate_account(base_dir, login_nickname, domain) - # This produces a deterministic token based - # on nick+password+salt - salt_filename = \ - acct_dir(base_dir, login_nickname, domain) + '/.salt' - salt = create_password(32) - if os.path.isfile(salt_filename): - try: - with open(salt_filename, 'r', - encoding='utf-8') as fp_salt: - salt = fp_salt.read() - except OSError as ex: - print('EX: Unable to read salt for ' + - login_nickname + ' ' + str(ex)) - else: - try: - with open(salt_filename, 'w+', - encoding='utf-8') as fp_salt: - fp_salt.write(salt) - except OSError as ex: - print('EX: Unable to save salt for ' + - login_nickname + ' ' + str(ex)) - - token_text = login_nickname + login_password + salt - token = sha256(token_text.encode('utf-8')).hexdigest() - self.server.tokens[login_nickname] = token - login_handle = login_nickname + '@' + domain - token_filename = \ - base_dir + '/accounts/' + \ - login_handle + '/.token' - try: - with open(token_filename, 'w+', - encoding='utf-8') as fp_tok: - fp_tok.write(token) - except OSError as ex: - print('EX: Unable to save token for ' + - login_nickname + ' ' + str(ex)) - - person_upgrade_actor(base_dir, None, - base_dir + '/accounts/' + - login_handle + '.json') - - index = self.server.tokens[login_nickname] - self.server.tokens_lookup[index] = login_nickname - cookie_str = 'SET:epicyon=' + \ - self.server.tokens[login_nickname] + '; SameSite=Strict' - tl_url = \ - get_instance_url(calling_domain, - self.server.http_prefix, - self.server.domain_full, - self.server.onion_domain, - self.server.i2p_domain) + \ - '/users/' + login_nickname + '/' + \ - self.server.default_timeline - redirect_headers(self, tl_url, cookie_str, calling_domain) - self.server.postreq_busy = False - return - else: - print('WARN: No login credentials presented to /login') - if debug: - # be careful to avoid logging the password - login_str = login_params - if '=' in login_params: - login_params_list = login_params.split('=') - login_str = '' - skip_param = False - for login_prm in login_params_list: - if not skip_param: - login_str += login_prm + '=' - else: - len_str = login_prm.split('&')[0] - if len(len_str) > 0: - login_str += login_prm + '*' - len_str = '' - if '&' in login_prm: - login_str += \ - '&' + login_prm.split('&')[1] + '=' - skip_param = False - if 'password' in login_prm: - skip_param = True - login_str = login_str[:len(login_str) - 1] - print(login_str) - http_401(self, 'No login credentials were posted') - self.server.postreq_busy = False - http_200(self) - self.server.postreq_busy = False - - def daemon_http_post(self) -> None: """HTTP POST handler """ @@ -9313,3 +8988,328 @@ def _receive_new_post_process(self, post_type: str, path: str, headers: {}, self.post_to_nickname = nickname return 1 return -1 + + +def _set_hashtag_category2(self, calling_domain: str, cookie: str, + path: str, base_dir: str, + domain: str, debug: bool, + system_language: str) -> None: + """On the screen after selecting a hashtag from the swarm, this sets + the category for that tag + """ + users_path = path.replace('/sethashtagcategory', '') + hashtag = '' + if '/tags/' not in users_path: + # no hashtag is specified within the path + http_404(self, 14) + return + hashtag = users_path.split('/tags/')[1].strip() + hashtag = urllib.parse.unquote_plus(hashtag) + if not hashtag: + # no hashtag was given in the path + http_404(self, 15) + return + hashtag_filename = base_dir + '/tags/' + hashtag + '.txt' + if not os.path.isfile(hashtag_filename): + # the hashtag does not exist + http_404(self, 16) + return + users_path = users_path.split('/tags/')[0] + actor_str = \ + get_instance_url(calling_domain, + self.server.http_prefix, + self.server.domain_full, + self.server.onion_domain, + self.server.i2p_domain) + \ + users_path + tag_screen_str = actor_str + '/tags/' + hashtag + + boundary = None + if ' boundary=' in self.headers['Content-type']: + boundary = self.headers['Content-type'].split('boundary=')[1] + if ';' in boundary: + boundary = boundary.split(';')[0] + + # get the nickname + nickname = get_nickname_from_actor(actor_str) + editor = None + if nickname: + editor = is_editor(base_dir, nickname) + if not hashtag or not editor: + if not nickname: + print('WARN: nickname not found in ' + actor_str) + else: + print('WARN: nickname is not a moderator' + actor_str) + redirect_headers(self, tag_screen_str, cookie, calling_domain) + self.server.postreq_busy = False + return + + if self.headers.get('Content-length'): + length = int(self.headers['Content-length']) + + # check that the POST isn't too large + if length > self.server.max_post_length: + print('Maximum links data length exceeded ' + str(length)) + redirect_headers(self, tag_screen_str, cookie, calling_domain) + self.server.postreq_busy = False + return + + try: + # read the bytes of the http form POST + post_bytes = self.rfile.read(length) + except SocketError as ex: + if ex.errno == errno.ECONNRESET: + print('EX: connection was reset while ' + + 'reading bytes from http form POST') + else: + print('EX: error while reading bytes ' + + 'from http form POST') + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + except ValueError as ex: + print('EX: failed to read bytes for POST, ' + str(ex)) + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + if not boundary: + if b'--LYNX' in post_bytes: + boundary = '--LYNX' + + if boundary: + # extract all of the text fields into a dict + fields = \ + extract_text_fields_in_post(post_bytes, boundary, debug, None) + + if fields.get('hashtagCategory'): + category_str = fields['hashtagCategory'].lower() + if not is_blocked_hashtag(base_dir, category_str) and \ + not is_filtered(base_dir, nickname, domain, category_str, + system_language): + set_hashtag_category(base_dir, hashtag, + category_str, False) + else: + category_filename = base_dir + '/tags/' + hashtag + '.category' + if os.path.isfile(category_filename): + try: + os.remove(category_filename) + except OSError: + print('EX: _set_hashtag_category unable to delete ' + + category_filename) + + # redirect back to the default timeline + redirect_headers(self, tag_screen_str, + cookie, calling_domain) + self.server.postreq_busy = False + + +def _post_login_screen(self, calling_domain: str, cookie: str, + base_dir: str, http_prefix: str, + domain: str, port: int, + ua_str: str, debug: bool, + registrations_open: bool) -> None: + """POST to login screen, containing credentials + """ + # ensure that there is a minimum delay between failed login + # attempts, to mitigate brute force + if int(time.time()) - self.server.last_login_failure < 5: + http_503(self) + self.server.postreq_busy = False + return + + # get the contents of POST containing login credentials + length = int(self.headers['Content-length']) + if length > 512: + print('Login failed - credentials too long') + http_401(self, 'Credentials are too long') + self.server.postreq_busy = False + return + + try: + login_params = self.rfile.read(length).decode('utf-8') + except SocketError as ex: + if ex.errno == errno.ECONNRESET: + print('EX: POST login read ' + + 'connection reset by peer') + else: + print('EX: POST login read socket error') + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + except ValueError as ex: + print('EX: POST login read failed, ' + str(ex)) + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + login_nickname, login_password, register = \ + html_get_login_credentials(login_params, + self.server.last_login_time, + registrations_open) + if login_nickname and login_password: + if is_system_account(login_nickname): + print('Invalid username login: ' + login_nickname + + ' (system account)') + clear_login_details(self, login_nickname, calling_domain) + self.server.postreq_busy = False + return + self.server.last_login_time = int(time.time()) + if register: + if not valid_password(login_password): + self.server.postreq_busy = False + login_url = \ + get_instance_url(calling_domain, + self.server.http_prefix, + self.server.domain_full, + self.server.onion_domain, + self.server.i2p_domain) + \ + '/login' + redirect_headers(self, login_url, cookie, calling_domain) + return + + if not register_account(base_dir, http_prefix, domain, port, + login_nickname, login_password, + self.server.manual_follower_approval): + self.server.postreq_busy = False + login_url = \ + get_instance_url(calling_domain, + self.server.http_prefix, + self.server.domain_full, + self.server.onion_domain, + self.server.i2p_domain) + \ + '/login' + redirect_headers(self, login_url, cookie, calling_domain) + return + auth_header = \ + create_basic_auth_header(login_nickname, login_password) + if self.headers.get('X-Forward-For'): + ip_address = self.headers['X-Forward-For'] + elif self.headers.get('X-Forwarded-For'): + ip_address = self.headers['X-Forwarded-For'] + else: + ip_address = self.client_address[0] + if not domain.endswith('.onion'): + if not is_local_network_address(ip_address): + print('Login attempt from IP: ' + str(ip_address)) + if not authorize_basic(base_dir, '/users/' + + login_nickname + '/outbox', + auth_header, False): + print('Login failed: ' + login_nickname) + clear_login_details(self, login_nickname, calling_domain) + fail_time = int(time.time()) + self.server.last_login_failure = fail_time + if not domain.endswith('.onion'): + if not is_local_network_address(ip_address): + record_login_failure(base_dir, ip_address, + self.server.login_failure_count, + fail_time, + self.server.log_login_failures) + self.server.postreq_busy = False + return + else: + if self.server.login_failure_count.get(ip_address): + del self.server.login_failure_count[ip_address] + if is_suspended(base_dir, login_nickname): + msg = \ + html_suspended(base_dir).encode('utf-8') + msglen = len(msg) + login_headers(self, 'text/html', + msglen, calling_domain) + write2(self, msg) + self.server.postreq_busy = False + return + # login success - redirect with authorization + print('====== Login success: ' + login_nickname + + ' ' + ua_str) + # re-activate account if needed + activate_account(base_dir, login_nickname, domain) + # This produces a deterministic token based + # on nick+password+salt + salt_filename = \ + acct_dir(base_dir, login_nickname, domain) + '/.salt' + salt = create_password(32) + if os.path.isfile(salt_filename): + try: + with open(salt_filename, 'r', + encoding='utf-8') as fp_salt: + salt = fp_salt.read() + except OSError as ex: + print('EX: Unable to read salt for ' + + login_nickname + ' ' + str(ex)) + else: + try: + with open(salt_filename, 'w+', + encoding='utf-8') as fp_salt: + fp_salt.write(salt) + except OSError as ex: + print('EX: Unable to save salt for ' + + login_nickname + ' ' + str(ex)) + + token_text = login_nickname + login_password + salt + token = sha256(token_text.encode('utf-8')).hexdigest() + self.server.tokens[login_nickname] = token + login_handle = login_nickname + '@' + domain + token_filename = \ + base_dir + '/accounts/' + \ + login_handle + '/.token' + try: + with open(token_filename, 'w+', + encoding='utf-8') as fp_tok: + fp_tok.write(token) + except OSError as ex: + print('EX: Unable to save token for ' + + login_nickname + ' ' + str(ex)) + + person_upgrade_actor(base_dir, None, + base_dir + '/accounts/' + + login_handle + '.json') + + index = self.server.tokens[login_nickname] + self.server.tokens_lookup[index] = login_nickname + cookie_str = 'SET:epicyon=' + \ + self.server.tokens[login_nickname] + '; SameSite=Strict' + tl_url = \ + get_instance_url(calling_domain, + self.server.http_prefix, + self.server.domain_full, + self.server.onion_domain, + self.server.i2p_domain) + \ + '/users/' + login_nickname + '/' + \ + self.server.default_timeline + redirect_headers(self, tl_url, cookie_str, calling_domain) + self.server.postreq_busy = False + return + else: + print('WARN: No login credentials presented to /login') + if debug: + # be careful to avoid logging the password + login_str = login_params + if '=' in login_params: + login_params_list = login_params.split('=') + login_str = '' + skip_param = False + for login_prm in login_params_list: + if not skip_param: + login_str += login_prm + '=' + else: + len_str = login_prm.split('&')[0] + if len(len_str) > 0: + login_str += login_prm + '*' + len_str = '' + if '&' in login_prm: + login_str += \ + '&' + login_prm.split('&')[1] + '=' + skip_param = False + if 'password' in login_prm: + skip_param = True + login_str = login_str[:len(login_str) - 1] + print(login_str) + http_401(self, 'No login credentials were posted') + self.server.postreq_busy = False + http_200(self) + self.server.postreq_busy = False