From de8b613d0befb3b7d9a42b30c07aab3270161c35 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Sun, 3 Mar 2024 22:01:22 +0000 Subject: [PATCH] Move daemon confirm functions into their own module --- daemon_post.py | 569 +++-------------------------------------- daemon_post_confirm.py | 531 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 562 insertions(+), 538 deletions(-) create mode 100644 daemon_post_confirm.py diff --git a/daemon_post.py b/daemon_post.py index 8db987302..517de584d 100644 --- a/daemon_post.py +++ b/daemon_post.py @@ -22,8 +22,6 @@ from utils import get_image_extension_from_mime_type from utils import remove_post_from_cache from utils import get_cached_post_filename from utils import text_in_file -from utils import has_group_type -from utils import get_status_number from utils import load_json from utils import save_json from utils import locate_post @@ -42,10 +40,6 @@ from utils import acct_dir from utils import get_nickname_from_actor from blocking import is_blocked_hashtag from blocking import contains_military_domain -from blocking import update_blocked_cache -from blocking import remove_global_block -from blocking import remove_block -from blocking import add_block from crawlers import blocked_user_agent from session import get_session_for_domain from session import establish_session @@ -62,19 +56,16 @@ from content import load_dogwhistles from content import extract_text_fields_in_post from filters import is_filtered from categories import set_hashtag_category -from httpcodes import write2 from httpcodes import http_200 from httpcodes import http_404 from httpcodes import http_400 from httpcodes import http_503 -from httpheaders import login_headers from httpheaders import redirect_headers from daemon_utils import get_user_agent from daemon_utils import post_to_outbox from daemon_utils import update_inbox_queue from daemon_utils import is_authorized from posts import is_moderator -from webapp_moderation import html_account_info from cache import store_person_in_cache from cache import remove_person_from_cache from cache import get_person_from_cache @@ -82,9 +73,6 @@ from theme import reset_theme_designer_settings from theme import set_theme from theme import set_theme_from_designer from languages import get_understood_languages -from follow import send_follow_request -from follow import unfollow_account -from follow import remove_follower from daemon_utils import post_to_outbox_thread from reading import remove_reading_event from city import get_spoofed_city @@ -96,6 +84,10 @@ from daemon_post_profile import profile_edit from daemon_post_person_options import person_options2 from daemon_post_search import receive_search_query from daemon_post_moderator import moderator_actions +from daemon_post_confirm import follow_confirm2 +from daemon_post_confirm import unfollow_confirm +from daemon_post_confirm import block_confirm2 +from daemon_post_confirm import unblock_confirm # maximum number of posts in a hashtag feed MAX_POSTS_IN_HASHTAG_FEED = 6 @@ -465,7 +457,7 @@ def daemon_http_post(self) -> None: # decision to follow in the web interface is confirmed if self.path.endswith('/followconfirm'): - _follow_confirm(self, calling_domain, cookie, + follow_confirm2(self, calling_domain, cookie, self.path, self.server.base_dir, self.server.http_prefix, @@ -481,7 +473,7 @@ def daemon_http_post(self) -> None: return fitness_performance(postreq_start_time, self.server.fitness, - '_POST', '_follow_confirm', + '_POST', 'follow_confirm2', self.server.debug) # remove a reading status from the profile screen @@ -504,27 +496,7 @@ def daemon_http_post(self) -> None: # decision to unfollow in the web interface is confirmed if self.path.endswith('/unfollowconfirm'): - _unfollow_confirm(self, calling_domain, cookie, - self.path, - self.server.base_dir, - self.server.http_prefix, - self.server.domain, - self.server.domain_full, - self.server.port, - self.server.onion_domain, - self.server.i2p_domain, - self.server.debug, - curr_session, proxy_type) - self.server.postreq_busy = False - return - - fitness_performance(postreq_start_time, self.server.fitness, - '_POST', '_unfollow_confirm', - self.server.debug) - - # decision to unblock in the web interface is confirmed - if self.path.endswith('/unblockconfirm'): - _unblock_confirm(self, calling_domain, cookie, + unfollow_confirm(self, calling_domain, cookie, self.path, self.server.base_dir, self.server.http_prefix, @@ -533,17 +505,37 @@ def daemon_http_post(self) -> None: self.server.port, self.server.onion_domain, self.server.i2p_domain, - self.server.debug) + self.server.debug, + curr_session, proxy_type) self.server.postreq_busy = False return fitness_performance(postreq_start_time, self.server.fitness, - '_POST', '_unblock_confirm', + '_POST', 'unfollow_confirm', + self.server.debug) + + # decision to unblock in the web interface is confirmed + if self.path.endswith('/unblockconfirm'): + unblock_confirm(self, calling_domain, cookie, + self.path, + self.server.base_dir, + self.server.http_prefix, + self.server.domain, + self.server.domain_full, + self.server.port, + self.server.onion_domain, + self.server.i2p_domain, + self.server.debug) + self.server.postreq_busy = False + return + + fitness_performance(postreq_start_time, self.server.fitness, + '_POST', 'unblock_confirm', self.server.debug) # decision to block in the web interface is confirmed if self.path.endswith('/blockconfirm'): - _block_confirm(self, calling_domain, cookie, + block_confirm2(self, calling_domain, cookie, self.path, self.server.base_dir, self.server.http_prefix, @@ -557,7 +549,7 @@ def daemon_http_post(self) -> None: return fitness_performance(postreq_start_time, self.server.fitness, - '_POST', '_block_confirm', + '_POST', 'block_confirm2', self.server.debug) # an option was chosen from person options screen @@ -1295,319 +1287,6 @@ def _theme_designer_edit(self, calling_domain: str, cookie: str, return -def _unfollow_confirm(self, calling_domain: str, cookie: str, - path: str, base_dir: str, http_prefix: str, - domain: str, domain_full: str, port: int, - onion_domain: str, i2p_domain: str, - debug: bool, - curr_session, proxy_type: str) -> None: - """Confirm to unfollow - """ - users_path = path.split('/unfollowconfirm')[0] - origin_path_str = http_prefix + '://' + domain_full + users_path - follower_nickname = get_nickname_from_actor(origin_path_str) - if not follower_nickname: - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - length = int(self.headers['Content-length']) - - try: - follow_confirm_params = self.rfile.read(length).decode('utf-8') - except SocketError as ex: - if ex.errno == errno.ECONNRESET: - print('EX: POST follow_confirm_params ' + - 'connection was reset') - else: - print('EX: POST follow_confirm_params socket error') - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - except ValueError as ex: - print('EX: POST follow_confirm_params rfile.read failed, ' + - str(ex)) - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - if '&submitYes=' in follow_confirm_params: - following_actor = \ - urllib.parse.unquote_plus(follow_confirm_params) - following_actor = following_actor.split('actor=')[1] - if '&' in following_actor: - following_actor = following_actor.split('&')[0] - following_nickname = get_nickname_from_actor(following_actor) - following_domain, following_port = \ - get_domain_from_actor(following_actor) - if not following_nickname or not following_domain: - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - following_domain_full = \ - get_full_domain(following_domain, following_port) - if follower_nickname == following_nickname and \ - following_domain == domain and \ - following_port == port: - if debug: - print('You cannot unfollow yourself!') - else: - if debug: - print(follower_nickname + ' stops following ' + - following_actor) - follow_actor = \ - local_actor_url(http_prefix, - follower_nickname, domain_full) - status_number, _ = get_status_number() - follow_id = follow_actor + '/statuses/' + str(status_number) - unfollow_json = { - '@context': 'https://www.w3.org/ns/activitystreams', - 'id': follow_id + '/undo', - 'type': 'Undo', - 'actor': follow_actor, - 'object': { - 'id': follow_id, - 'type': 'Follow', - 'actor': follow_actor, - 'object': following_actor - } - } - path_users_section = path.split('/users/')[1] - self.post_to_nickname = path_users_section.split('/')[0] - group_account = has_group_type(base_dir, following_actor, - self.server.person_cache) - unfollow_account(self.server.base_dir, self.post_to_nickname, - self.server.domain, - following_nickname, following_domain_full, - self.server.debug, group_account, - 'following.txt') - post_to_outbox_thread(self, unfollow_json, - curr_session, proxy_type) - - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - redirect_headers(self, origin_path_str, cookie, calling_domain) - self.server.postreq_busy = False - - -def _follow_confirm(self, calling_domain: str, cookie: str, - path: str, base_dir: str, http_prefix: str, - domain: str, domain_full: str, port: int, - onion_domain: str, i2p_domain: str, - debug: bool, - curr_session, proxy_type: str) -> None: - """Confirm to follow - """ - users_path = path.split('/followconfirm')[0] - origin_path_str = http_prefix + '://' + domain_full + users_path - follower_nickname = get_nickname_from_actor(origin_path_str) - if not follower_nickname: - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - length = int(self.headers['Content-length']) - - try: - follow_confirm_params = self.rfile.read(length).decode('utf-8') - except SocketError as ex: - if ex.errno == errno.ECONNRESET: - print('EX: POST follow_confirm_params ' + - 'connection was reset') - else: - print('EX: POST follow_confirm_params socket error') - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - except ValueError as ex: - print('EX: POST follow_confirm_params rfile.read failed, ' + - str(ex)) - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - if '&submitView=' in follow_confirm_params: - following_actor = \ - urllib.parse.unquote_plus(follow_confirm_params) - following_actor = following_actor.split('actor=')[1] - if '&' in following_actor: - following_actor = following_actor.split('&')[0] - redirect_headers(self, following_actor, cookie, calling_domain) - self.server.postreq_busy = False - return - - if '&submitInfo=' in follow_confirm_params: - following_actor = \ - urllib.parse.unquote_plus(follow_confirm_params) - following_actor = following_actor.split('actor=')[1] - if '&' in following_actor: - following_actor = following_actor.split('&')[0] - if is_moderator(base_dir, follower_nickname): - msg = \ - html_account_info(self.server.translate, - base_dir, http_prefix, - follower_nickname, - self.server.domain, - following_actor, - self.server.debug, - self.server.system_language, - self.server.signing_priv_key_pem, - users_path, - self.server.block_federated) - if msg: - msg = msg.encode('utf-8') - msglen = len(msg) - login_headers(self, 'text/html', - msglen, calling_domain) - write2(self, msg) - self.server.postreq_busy = False - return - redirect_headers(self, following_actor, cookie, calling_domain) - self.server.postreq_busy = False - return - - if '&submitYes=' in follow_confirm_params: - following_actor = \ - urllib.parse.unquote_plus(follow_confirm_params) - following_actor = following_actor.split('actor=')[1] - if '&' in following_actor: - following_actor = following_actor.split('&')[0] - following_nickname = get_nickname_from_actor(following_actor) - following_domain, following_port = \ - get_domain_from_actor(following_actor) - if not following_nickname or not following_domain: - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - if follower_nickname == following_nickname and \ - following_domain == domain and \ - following_port == port: - if debug: - print('You cannot follow yourself!') - elif (following_nickname == 'news' and - following_domain == domain and - following_port == port): - if debug: - print('You cannot follow the news actor') - else: - print('Sending follow request from ' + - follower_nickname + ' to ' + following_actor) - if not self.server.signing_priv_key_pem: - print('Sending follow request with no signing key') - - curr_domain = domain - curr_port = port - curr_http_prefix = http_prefix - curr_proxy_type = proxy_type - if onion_domain: - if not curr_domain.endswith('.onion') and \ - following_domain.endswith('.onion'): - curr_session = self.server.session_onion - curr_domain = onion_domain - curr_port = 80 - following_port = 80 - curr_http_prefix = 'http' - curr_proxy_type = 'tor' - if i2p_domain: - if not curr_domain.endswith('.i2p') and \ - following_domain.endswith('.i2p'): - curr_session = self.server.session_i2p - curr_domain = i2p_domain - curr_port = 80 - following_port = 80 - curr_http_prefix = 'http' - curr_proxy_type = 'i2p' - - curr_session = \ - establish_session("follow request", - curr_session, - curr_proxy_type, - self.server) - - send_follow_request(curr_session, - base_dir, follower_nickname, - domain, curr_domain, curr_port, - curr_http_prefix, - following_nickname, - following_domain, - following_actor, - following_port, curr_http_prefix, - False, self.server.federation_list, - self.server.send_threads, - self.server.postLog, - self.server.cached_webfingers, - self.server.person_cache, debug, - self.server.project_version, - self.server.signing_priv_key_pem, - self.server.domain, - self.server.onion_domain, - self.server.i2p_domain, - self.server.sites_unavailable, - self.server.system_language) - - if '&submitUnblock=' in follow_confirm_params: - blocking_actor = \ - urllib.parse.unquote_plus(follow_confirm_params) - blocking_actor = blocking_actor.split('actor=')[1] - if '&' in blocking_actor: - blocking_actor = blocking_actor.split('&')[0] - blocking_nickname = get_nickname_from_actor(blocking_actor) - blocking_domain, blocking_port = \ - get_domain_from_actor(blocking_actor) - if not blocking_nickname or not blocking_domain: - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - print('WARN: unable to find blocked nickname or domain in ' + - blocking_actor) - redirect_headers(self, origin_path_str, - cookie, calling_domain) - self.server.postreq_busy = False - return - blocking_domain_full = \ - get_full_domain(blocking_domain, blocking_port) - if follower_nickname == blocking_nickname and \ - blocking_domain == domain and \ - blocking_port == port: - if debug: - print('You cannot unblock yourself!') - else: - if debug: - print(follower_nickname + ' stops blocking ' + - blocking_actor) - remove_block(base_dir, - follower_nickname, domain, - blocking_nickname, blocking_domain_full) - if is_moderator(base_dir, follower_nickname): - remove_global_block(base_dir, - blocking_nickname, - blocking_domain_full) - blocked_cache_last_updated = \ - self.server.blocked_cache_last_updated - self.server.blocked_cache_last_updated = \ - update_blocked_cache(self.server.base_dir, - self.server.blocked_cache, - blocked_cache_last_updated, 0) - - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - redirect_headers(self, origin_path_str, cookie, calling_domain) - self.server.postreq_busy = False - - def _remove_reading_status(self, calling_domain: str, cookie: str, path: str, base_dir: str, http_prefix: str, domain_full: str, @@ -1682,192 +1361,6 @@ def _remove_reading_status(self, calling_domain: str, cookie: str, self.server.postreq_busy = False -def _block_confirm(self, calling_domain: str, cookie: str, - path: str, base_dir: str, http_prefix: str, - domain: str, domain_full: str, port: int, - onion_domain: str, i2p_domain: str, - debug: bool) -> None: - """Confirms a block from the person options screen - """ - users_path = path.split('/blockconfirm')[0] - origin_path_str = http_prefix + '://' + domain_full + users_path - blocker_nickname = get_nickname_from_actor(origin_path_str) - if not blocker_nickname: - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - print('WARN: unable to find nickname in ' + origin_path_str) - redirect_headers(self, origin_path_str, - cookie, calling_domain) - self.server.postreq_busy = False - return - - length = int(self.headers['Content-length']) - - try: - block_confirm_params = self.rfile.read(length).decode('utf-8') - except SocketError as ex: - if ex.errno == errno.ECONNRESET: - print('EX: POST block_confirm_params ' + - 'connection was reset') - else: - print('EX: POST block_confirm_params socket error') - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - except ValueError as ex: - print('EX: POST block_confirm_params rfile.read failed, ' + - str(ex)) - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - if '&submitYes=' in block_confirm_params: - blocking_confirm_str = \ - urllib.parse.unquote_plus(block_confirm_params) - block_reason = blocking_confirm_str.split('blockReason=')[1] - if '&' in block_reason: - block_reason = block_reason.split('&')[0] - blocking_actor = blocking_confirm_str.split('actor=')[1] - if '&' in blocking_actor: - blocking_actor = blocking_actor.split('&')[0] - blocking_nickname = get_nickname_from_actor(blocking_actor) - blocking_domain, blocking_port = \ - get_domain_from_actor(blocking_actor) - if not blocking_nickname or not blocking_domain: - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - print('WARN: unable to find nickname or domain in ' + - blocking_actor) - redirect_headers(self, origin_path_str, - cookie, calling_domain) - self.server.postreq_busy = False - return - blocking_domain_full = \ - get_full_domain(blocking_domain, blocking_port) - if blocker_nickname == blocking_nickname and \ - blocking_domain == domain and \ - blocking_port == port: - if debug: - print('You cannot block yourself!') - else: - print('Adding block by ' + blocker_nickname + - ' of ' + blocking_actor) - add_block(base_dir, blocker_nickname, - domain, blocking_nickname, - blocking_domain_full, block_reason) - remove_follower(base_dir, blocker_nickname, - domain, - blocking_nickname, - blocking_domain_full) - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - redirect_headers(self, origin_path_str, cookie, calling_domain) - self.server.postreq_busy = False - - -def _unblock_confirm(self, calling_domain: str, cookie: str, - path: str, base_dir: str, http_prefix: str, - domain: str, domain_full: str, port: int, - onion_domain: str, i2p_domain: str, - debug: bool) -> None: - """Confirms a unblock - """ - users_path = path.split('/unblockconfirm')[0] - origin_path_str = http_prefix + '://' + domain_full + users_path - blocker_nickname = get_nickname_from_actor(origin_path_str) - if not blocker_nickname: - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - print('WARN: unable to find nickname in ' + origin_path_str) - redirect_headers(self, origin_path_str, - cookie, calling_domain) - self.server.postreq_busy = False - return - - length = int(self.headers['Content-length']) - - try: - block_confirm_params = self.rfile.read(length).decode('utf-8') - except SocketError as ex: - if ex.errno == errno.ECONNRESET: - print('EX: POST block_confirm_params ' + - 'connection was reset') - else: - print('EX: POST block_confirm_params socket error') - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - except ValueError as ex: - print('EX: POST block_confirm_params rfile.read failed, ' + - str(ex)) - self.send_response(400) - self.end_headers() - self.server.postreq_busy = False - return - - if '&submitYes=' in block_confirm_params: - blocking_actor = \ - urllib.parse.unquote_plus(block_confirm_params) - blocking_actor = blocking_actor.split('actor=')[1] - if '&' in blocking_actor: - blocking_actor = blocking_actor.split('&')[0] - blocking_nickname = get_nickname_from_actor(blocking_actor) - blocking_domain, blocking_port = \ - get_domain_from_actor(blocking_actor) - if not blocking_nickname or not blocking_domain: - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - print('WARN: unable to find nickname in ' + blocking_actor) - redirect_headers(self, origin_path_str, - cookie, calling_domain) - self.server.postreq_busy = False - return - blocking_domain_full = \ - get_full_domain(blocking_domain, blocking_port) - if blocker_nickname == blocking_nickname and \ - blocking_domain == domain and \ - blocking_port == port: - if debug: - print('You cannot unblock yourself!') - else: - if debug: - print(blocker_nickname + ' stops blocking ' + - blocking_actor) - remove_block(base_dir, - blocker_nickname, domain, - blocking_nickname, blocking_domain_full) - if is_moderator(base_dir, blocker_nickname): - remove_global_block(base_dir, - blocking_nickname, - blocking_domain_full) - blocked_cache_last_updated = \ - self.server.blocked_cache_last_updated - self.server.blocked_cache_last_updated = \ - update_blocked_cache(self.server.base_dir, - self.server.blocked_cache, - blocked_cache_last_updated, 0) - if calling_domain.endswith('.onion') and onion_domain: - origin_path_str = 'http://' + onion_domain + users_path - elif (calling_domain.endswith('.i2p') and i2p_domain): - origin_path_str = 'http://' + i2p_domain + users_path - redirect_headers(self, origin_path_str, - cookie, calling_domain) - self.server.postreq_busy = False - - def _receive_vote(self, calling_domain: str, cookie: str, path: str, http_prefix: str, domain: str, domain_full: str, port: int, diff --git a/daemon_post_confirm.py b/daemon_post_confirm.py new file mode 100644 index 000000000..8f2e997a7 --- /dev/null +++ b/daemon_post_confirm.py @@ -0,0 +1,531 @@ +__filename__ = "daemon_post_confirm.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.5.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@libreserver.org" +__status__ = "Production" +__module_group__ = "Core" + +import errno +import urllib.parse +from socket import error as SocketError +from utils import get_nickname_from_actor +from utils import get_domain_from_actor +from utils import get_full_domain +from utils import local_actor_url +from utils import get_status_number +from utils import has_group_type +from follow import unfollow_account +from follow import send_follow_request +from follow import remove_follower +from daemon_utils import post_to_outbox_thread +from httpcodes import write2 +from httpheaders import redirect_headers +from httpheaders import login_headers +from posts import is_moderator +from webapp_moderation import html_account_info +from session import establish_session +from blocking import remove_block +from blocking import remove_global_block +from blocking import update_blocked_cache +from blocking import add_block + + +def unfollow_confirm(self, calling_domain: str, cookie: str, + path: str, base_dir: str, http_prefix: str, + domain: str, domain_full: str, port: int, + onion_domain: str, i2p_domain: str, + debug: bool, + curr_session, proxy_type: str) -> None: + """Confirm to unfollow + """ + users_path = path.split('/unfollowconfirm')[0] + origin_path_str = http_prefix + '://' + domain_full + users_path + follower_nickname = get_nickname_from_actor(origin_path_str) + if not follower_nickname: + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + length = int(self.headers['Content-length']) + + try: + follow_confirm_params = self.rfile.read(length).decode('utf-8') + except SocketError as ex: + if ex.errno == errno.ECONNRESET: + print('EX: POST follow_confirm_params ' + + 'connection was reset') + else: + print('EX: POST follow_confirm_params socket error') + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + except ValueError as ex: + print('EX: POST follow_confirm_params rfile.read failed, ' + + str(ex)) + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + if '&submitYes=' in follow_confirm_params: + following_actor = \ + urllib.parse.unquote_plus(follow_confirm_params) + following_actor = following_actor.split('actor=')[1] + if '&' in following_actor: + following_actor = following_actor.split('&')[0] + following_nickname = get_nickname_from_actor(following_actor) + following_domain, following_port = \ + get_domain_from_actor(following_actor) + if not following_nickname or not following_domain: + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + following_domain_full = \ + get_full_domain(following_domain, following_port) + if follower_nickname == following_nickname and \ + following_domain == domain and \ + following_port == port: + if debug: + print('You cannot unfollow yourself!') + else: + if debug: + print(follower_nickname + ' stops following ' + + following_actor) + follow_actor = \ + local_actor_url(http_prefix, + follower_nickname, domain_full) + status_number, _ = get_status_number() + follow_id = follow_actor + '/statuses/' + str(status_number) + unfollow_json = { + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': follow_id + '/undo', + 'type': 'Undo', + 'actor': follow_actor, + 'object': { + 'id': follow_id, + 'type': 'Follow', + 'actor': follow_actor, + 'object': following_actor + } + } + path_users_section = path.split('/users/')[1] + self.post_to_nickname = path_users_section.split('/')[0] + group_account = has_group_type(base_dir, following_actor, + self.server.person_cache) + unfollow_account(self.server.base_dir, self.post_to_nickname, + self.server.domain, + following_nickname, following_domain_full, + self.server.debug, group_account, + 'following.txt') + post_to_outbox_thread(self, unfollow_json, + curr_session, proxy_type) + + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + redirect_headers(self, origin_path_str, cookie, calling_domain) + self.server.postreq_busy = False + + +def follow_confirm2(self, calling_domain: str, cookie: str, + path: str, base_dir: str, http_prefix: str, + domain: str, domain_full: str, port: int, + onion_domain: str, i2p_domain: str, + debug: bool, + curr_session, proxy_type: str) -> None: + """Confirm to follow + """ + users_path = path.split('/followconfirm')[0] + origin_path_str = http_prefix + '://' + domain_full + users_path + follower_nickname = get_nickname_from_actor(origin_path_str) + if not follower_nickname: + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + length = int(self.headers['Content-length']) + + try: + follow_confirm_params = self.rfile.read(length).decode('utf-8') + except SocketError as ex: + if ex.errno == errno.ECONNRESET: + print('EX: POST follow_confirm_params ' + + 'connection was reset') + else: + print('EX: POST follow_confirm_params socket error') + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + except ValueError as ex: + print('EX: POST follow_confirm_params rfile.read failed, ' + + str(ex)) + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + if '&submitView=' in follow_confirm_params: + following_actor = \ + urllib.parse.unquote_plus(follow_confirm_params) + following_actor = following_actor.split('actor=')[1] + if '&' in following_actor: + following_actor = following_actor.split('&')[0] + redirect_headers(self, following_actor, cookie, calling_domain) + self.server.postreq_busy = False + return + + if '&submitInfo=' in follow_confirm_params: + following_actor = \ + urllib.parse.unquote_plus(follow_confirm_params) + following_actor = following_actor.split('actor=')[1] + if '&' in following_actor: + following_actor = following_actor.split('&')[0] + if is_moderator(base_dir, follower_nickname): + msg = \ + html_account_info(self.server.translate, + base_dir, http_prefix, + follower_nickname, + self.server.domain, + following_actor, + self.server.debug, + self.server.system_language, + self.server.signing_priv_key_pem, + users_path, + self.server.block_federated) + if msg: + msg = msg.encode('utf-8') + msglen = len(msg) + login_headers(self, 'text/html', + msglen, calling_domain) + write2(self, msg) + self.server.postreq_busy = False + return + redirect_headers(self, following_actor, cookie, calling_domain) + self.server.postreq_busy = False + return + + if '&submitYes=' in follow_confirm_params: + following_actor = \ + urllib.parse.unquote_plus(follow_confirm_params) + following_actor = following_actor.split('actor=')[1] + if '&' in following_actor: + following_actor = following_actor.split('&')[0] + following_nickname = get_nickname_from_actor(following_actor) + following_domain, following_port = \ + get_domain_from_actor(following_actor) + if not following_nickname or not following_domain: + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + if follower_nickname == following_nickname and \ + following_domain == domain and \ + following_port == port: + if debug: + print('You cannot follow yourself!') + elif (following_nickname == 'news' and + following_domain == domain and + following_port == port): + if debug: + print('You cannot follow the news actor') + else: + print('Sending follow request from ' + + follower_nickname + ' to ' + following_actor) + if not self.server.signing_priv_key_pem: + print('Sending follow request with no signing key') + + curr_domain = domain + curr_port = port + curr_http_prefix = http_prefix + curr_proxy_type = proxy_type + if onion_domain: + if not curr_domain.endswith('.onion') and \ + following_domain.endswith('.onion'): + curr_session = self.server.session_onion + curr_domain = onion_domain + curr_port = 80 + following_port = 80 + curr_http_prefix = 'http' + curr_proxy_type = 'tor' + if i2p_domain: + if not curr_domain.endswith('.i2p') and \ + following_domain.endswith('.i2p'): + curr_session = self.server.session_i2p + curr_domain = i2p_domain + curr_port = 80 + following_port = 80 + curr_http_prefix = 'http' + curr_proxy_type = 'i2p' + + curr_session = \ + establish_session("follow request", + curr_session, + curr_proxy_type, + self.server) + + send_follow_request(curr_session, + base_dir, follower_nickname, + domain, curr_domain, curr_port, + curr_http_prefix, + following_nickname, + following_domain, + following_actor, + following_port, curr_http_prefix, + False, self.server.federation_list, + self.server.send_threads, + self.server.postLog, + self.server.cached_webfingers, + self.server.person_cache, debug, + self.server.project_version, + self.server.signing_priv_key_pem, + self.server.domain, + self.server.onion_domain, + self.server.i2p_domain, + self.server.sites_unavailable, + self.server.system_language) + + if '&submitUnblock=' in follow_confirm_params: + blocking_actor = \ + urllib.parse.unquote_plus(follow_confirm_params) + blocking_actor = blocking_actor.split('actor=')[1] + if '&' in blocking_actor: + blocking_actor = blocking_actor.split('&')[0] + blocking_nickname = get_nickname_from_actor(blocking_actor) + blocking_domain, blocking_port = \ + get_domain_from_actor(blocking_actor) + if not blocking_nickname or not blocking_domain: + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + print('WARN: unable to find blocked nickname or domain in ' + + blocking_actor) + redirect_headers(self, origin_path_str, + cookie, calling_domain) + self.server.postreq_busy = False + return + blocking_domain_full = \ + get_full_domain(blocking_domain, blocking_port) + if follower_nickname == blocking_nickname and \ + blocking_domain == domain and \ + blocking_port == port: + if debug: + print('You cannot unblock yourself!') + else: + if debug: + print(follower_nickname + ' stops blocking ' + + blocking_actor) + remove_block(base_dir, + follower_nickname, domain, + blocking_nickname, blocking_domain_full) + if is_moderator(base_dir, follower_nickname): + remove_global_block(base_dir, + blocking_nickname, + blocking_domain_full) + blocked_cache_last_updated = \ + self.server.blocked_cache_last_updated + self.server.blocked_cache_last_updated = \ + update_blocked_cache(self.server.base_dir, + self.server.blocked_cache, + blocked_cache_last_updated, 0) + + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + redirect_headers(self, origin_path_str, cookie, calling_domain) + self.server.postreq_busy = False + + +def block_confirm2(self, calling_domain: str, cookie: str, + path: str, base_dir: str, http_prefix: str, + domain: str, domain_full: str, port: int, + onion_domain: str, i2p_domain: str, + debug: bool) -> None: + """Confirms a block from the person options screen + """ + users_path = path.split('/blockconfirm')[0] + origin_path_str = http_prefix + '://' + domain_full + users_path + blocker_nickname = get_nickname_from_actor(origin_path_str) + if not blocker_nickname: + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + print('WARN: unable to find nickname in ' + origin_path_str) + redirect_headers(self, origin_path_str, + cookie, calling_domain) + self.server.postreq_busy = False + return + + length = int(self.headers['Content-length']) + + try: + block_confirm_params = self.rfile.read(length).decode('utf-8') + except SocketError as ex: + if ex.errno == errno.ECONNRESET: + print('EX: POST block_confirm_params ' + + 'connection was reset') + else: + print('EX: POST block_confirm_params socket error') + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + except ValueError as ex: + print('EX: POST block_confirm_params rfile.read failed, ' + + str(ex)) + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + if '&submitYes=' in block_confirm_params: + blocking_confirm_str = \ + urllib.parse.unquote_plus(block_confirm_params) + block_reason = blocking_confirm_str.split('blockReason=')[1] + if '&' in block_reason: + block_reason = block_reason.split('&')[0] + blocking_actor = blocking_confirm_str.split('actor=')[1] + if '&' in blocking_actor: + blocking_actor = blocking_actor.split('&')[0] + blocking_nickname = get_nickname_from_actor(blocking_actor) + blocking_domain, blocking_port = \ + get_domain_from_actor(blocking_actor) + if not blocking_nickname or not blocking_domain: + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + print('WARN: unable to find nickname or domain in ' + + blocking_actor) + redirect_headers(self, origin_path_str, + cookie, calling_domain) + self.server.postreq_busy = False + return + blocking_domain_full = \ + get_full_domain(blocking_domain, blocking_port) + if blocker_nickname == blocking_nickname and \ + blocking_domain == domain and \ + blocking_port == port: + if debug: + print('You cannot block yourself!') + else: + print('Adding block by ' + blocker_nickname + + ' of ' + blocking_actor) + add_block(base_dir, blocker_nickname, + domain, blocking_nickname, + blocking_domain_full, block_reason) + remove_follower(base_dir, blocker_nickname, + domain, + blocking_nickname, + blocking_domain_full) + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + redirect_headers(self, origin_path_str, cookie, calling_domain) + self.server.postreq_busy = False + + +def unblock_confirm(self, calling_domain: str, cookie: str, + path: str, base_dir: str, http_prefix: str, + domain: str, domain_full: str, port: int, + onion_domain: str, i2p_domain: str, + debug: bool) -> None: + """Confirms a unblock + """ + users_path = path.split('/unblockconfirm')[0] + origin_path_str = http_prefix + '://' + domain_full + users_path + blocker_nickname = get_nickname_from_actor(origin_path_str) + if not blocker_nickname: + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + print('WARN: unable to find nickname in ' + origin_path_str) + redirect_headers(self, origin_path_str, + cookie, calling_domain) + self.server.postreq_busy = False + return + + length = int(self.headers['Content-length']) + + try: + block_confirm_params = self.rfile.read(length).decode('utf-8') + except SocketError as ex: + if ex.errno == errno.ECONNRESET: + print('EX: POST block_confirm_params ' + + 'connection was reset') + else: + print('EX: POST block_confirm_params socket error') + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + except ValueError as ex: + print('EX: POST block_confirm_params rfile.read failed, ' + + str(ex)) + self.send_response(400) + self.end_headers() + self.server.postreq_busy = False + return + + if '&submitYes=' in block_confirm_params: + blocking_actor = \ + urllib.parse.unquote_plus(block_confirm_params) + blocking_actor = blocking_actor.split('actor=')[1] + if '&' in blocking_actor: + blocking_actor = blocking_actor.split('&')[0] + blocking_nickname = get_nickname_from_actor(blocking_actor) + blocking_domain, blocking_port = \ + get_domain_from_actor(blocking_actor) + if not blocking_nickname or not blocking_domain: + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + print('WARN: unable to find nickname in ' + blocking_actor) + redirect_headers(self, origin_path_str, + cookie, calling_domain) + self.server.postreq_busy = False + return + blocking_domain_full = \ + get_full_domain(blocking_domain, blocking_port) + if blocker_nickname == blocking_nickname and \ + blocking_domain == domain and \ + blocking_port == port: + if debug: + print('You cannot unblock yourself!') + else: + if debug: + print(blocker_nickname + ' stops blocking ' + + blocking_actor) + remove_block(base_dir, + blocker_nickname, domain, + blocking_nickname, blocking_domain_full) + if is_moderator(base_dir, blocker_nickname): + remove_global_block(base_dir, + blocking_nickname, + blocking_domain_full) + blocked_cache_last_updated = \ + self.server.blocked_cache_last_updated + self.server.blocked_cache_last_updated = \ + update_blocked_cache(self.server.base_dir, + self.server.blocked_cache, + blocked_cache_last_updated, 0) + if calling_domain.endswith('.onion') and onion_domain: + origin_path_str = 'http://' + onion_domain + users_path + elif (calling_domain.endswith('.i2p') and i2p_domain): + origin_path_str = 'http://' + i2p_domain + users_path + redirect_headers(self, origin_path_str, + cookie, calling_domain) + self.server.postreq_busy = False