""" HTTP GET for buttons within the user interface """ __filename__ = "daemon_get_buttons.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.5.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "Core GET" import os import urllib.parse from bookmarks import bookmark_post from bookmarks import undo_bookmark_post from reaction import update_reaction_collection from manualapprove import manual_deny_follow_request_thread from manualapprove import manual_approve_follow_request_thread from follow import follower_approval_active from fitnessFunctions import fitness_performance from daemon_utils import post_to_outbox from posts import get_original_post_from_announce_url from session import establish_session from httpheaders import set_headers from httpheaders import redirect_headers from httpcodes import write2 from httpcodes import http_400 from httpcodes import http_404 from utils import undo_reaction_collection_entry from utils import load_json from utils import get_full_domain from utils import get_domain_from_actor from utils import locate_post from utils import is_dm from utils import get_cached_post_filename from utils import local_actor_url from utils import get_nickname_from_actor from utils import get_instance_url from webapp_post import individual_post_as_html from webapp_confirm import html_confirm_delete def follow_approve_button(self, calling_domain: str, path: str, cookie: str, base_dir: str, http_prefix: str, domain: str, domain_full: str, port: int, onion_domain: str, i2p_domain: str, getreq_start_time, proxy_type: str, debug: bool, curr_session, federation_list: [], send_threads: {}, post_log: {}, cached_webfingers: {}, person_cache: {}, project_version: str, sites_unavailable: [], system_language: str, fitness: {}, signing_priv_key_pem: str, followers_sync_cache: {}, session_onion, session_i2p, session) -> None: """Follow approve button was pressed """ origin_path_str = path.split('/followapprove=')[0] follower_nickname = origin_path_str.replace('/users/', '') following_handle = path.split('/followapprove=')[1] if '://' in following_handle: handle_nickname = get_nickname_from_actor(following_handle) handle_domain, handle_port = \ get_domain_from_actor(following_handle) if not handle_nickname or not handle_domain: http_404(self, 49) return following_handle = \ handle_nickname + '@' + \ get_full_domain(handle_domain, handle_port) if '@' in following_handle: if onion_domain: if following_handle.endswith('.onion'): curr_session = session_onion proxy_type = 'tor' port = 80 if i2p_domain: if following_handle.endswith('.i2p'): curr_session = session_i2p proxy_type = 'i2p' port = 80 curr_session = \ establish_session("follow_approve_button", curr_session, proxy_type, self.server) if not curr_session: print('WARN: unable to establish session ' + 'when approving follow request') http_404(self, 50) return manual_approve_follow_request_thread(session, session_onion, session_i2p, onion_domain, i2p_domain, base_dir, http_prefix, follower_nickname, domain, port, following_handle, federation_list, send_threads, post_log, cached_webfingers, person_cache, debug, project_version, signing_priv_key_pem, proxy_type, followers_sync_cache, sites_unavailable, system_language) origin_path_str_absolute = \ http_prefix + '://' + domain_full + origin_path_str if calling_domain.endswith('.onion') and onion_domain: origin_path_str_absolute = \ 'http://' + onion_domain + origin_path_str elif (calling_domain.endswith('.i2p') and i2p_domain): origin_path_str_absolute = \ 'http://' + i2p_domain + origin_path_str fitness_performance(getreq_start_time, fitness, '_GET', '_follow_approve_button', debug) redirect_headers(self, origin_path_str_absolute, cookie, calling_domain, 303) def follow_deny_button(self, calling_domain: str, path: str, cookie: str, base_dir: str, http_prefix: str, domain: str, domain_full: str, port: int, onion_domain: str, i2p_domain: str, getreq_start_time, debug: bool, federation_list: [], send_threads: {}, post_log: {}, cached_webfingers: {}, person_cache: {}, project_version: str, signing_priv_key_pem: str, followers_sync_cache: {}, sites_unavailable: [], system_language: str, fitness: {}, session, session_onion, session_i2p) -> None: """Follow deny button was pressed """ origin_path_str = path.split('/followdeny=')[0] follower_nickname = origin_path_str.replace('/users/', '') following_handle = path.split('/followdeny=')[1] if '://' in following_handle: handle_nickname = get_nickname_from_actor(following_handle) handle_domain, handle_port = \ get_domain_from_actor(following_handle) if not handle_nickname or not handle_domain: http_404(self, 51) return following_handle = \ handle_nickname + '@' + \ get_full_domain(handle_domain, handle_port) if '@' in following_handle: manual_deny_follow_request_thread(session, session_onion, session_i2p, onion_domain, i2p_domain, base_dir, http_prefix, follower_nickname, domain, port, following_handle, federation_list, send_threads, post_log, cached_webfingers, person_cache, debug, project_version, signing_priv_key_pem, followers_sync_cache, sites_unavailable, system_language) origin_path_str_absolute = \ http_prefix + '://' + domain_full + origin_path_str if calling_domain.endswith('.onion') and onion_domain: origin_path_str_absolute = \ 'http://' + onion_domain + origin_path_str elif calling_domain.endswith('.i2p') and i2p_domain: origin_path_str_absolute = \ 'http://' + i2p_domain + origin_path_str redirect_headers(self, origin_path_str_absolute, cookie, calling_domain, 303) fitness_performance(getreq_start_time, fitness, '_GET', '_follow_deny_button', debug) def reaction_button(self, calling_domain: str, path: str, base_dir: str, http_prefix: str, domain: str, domain_full: str, onion_domain: str, i2p_domain: str, getreq_start_time, proxy_type: str, cookie: str, debug: str, curr_session, signing_priv_key_pem: str, recent_posts_cache: {}, max_recent_posts: int, translate: {}, cached_webfingers: {}, person_cache: {}, port: int, allow_deletion: bool, project_version: str, yt_replace_domain: str, twitter_replacement_domain: str, show_published_date_only: bool, peertube_instances: [], allow_local_network_access: bool, theme_name: str, system_language: str, max_like_count: int, cw_lists: {}, lists_enabled: {}, dogwhistles: {}, buy_sites: [], auto_cw_cache: {}, fitness: {}, account_timezone: {}, bold_reading_nicknames: {}, min_images_for_accounts: [], session_onion, session_i2p) -> None: """Press an emoji reaction button Note that this is not the emoji reaction selection icon at the bottom of the post """ page_number = 1 reaction_url = path.split('?react=')[1] if '?' in reaction_url: reaction_url = reaction_url.split('?')[0] first_post_id = '' if '?firstpost=' in path: first_post_id = path.split('?firstpost=')[1] if '?' in first_post_id: first_post_id = first_post_id.split('?')[0] first_post_id = first_post_id.replace('/', '--') first_post_id = ';firstpost=' + first_post_id.replace('#', '--') timeline_bookmark = '' if '?bm=' in path: timeline_bookmark = path.split('?bm=')[1] if '?' in timeline_bookmark: timeline_bookmark = timeline_bookmark.split('?')[0] timeline_bookmark = '#' + timeline_bookmark actor = path.split('?react=')[0] if '?page=' in path: page_number_str = path.split('?page=')[1] if ';' in page_number_str: page_number_str = page_number_str.split(';')[0] if '?' in page_number_str: page_number_str = page_number_str.split('?')[0] if '#' in page_number_str: page_number_str = page_number_str.split('#')[0] if len(page_number_str) > 5: page_number_str = "1" if page_number_str.isdigit(): page_number = int(page_number_str) timeline_str = 'inbox' if '?tl=' in path: timeline_str = path.split('?tl=')[1] if '?' in timeline_str: timeline_str = timeline_str.split('?')[0] emoji_content_encoded = None if '?emojreact=' in path: emoji_content_encoded = path.split('?emojreact=')[1] if '?' in emoji_content_encoded: emoji_content_encoded = emoji_content_encoded.split('?')[0] if not emoji_content_encoded: print('WARN: no emoji reaction ' + actor) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ actor actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) + timeline_bookmark redirect_headers(self, actor_path_str, cookie, calling_domain, 303) return emoji_content = urllib.parse.unquote_plus(emoji_content_encoded) self.post_to_nickname = get_nickname_from_actor(actor) if not self.post_to_nickname: print('WARN: unable to find nickname in ' + actor) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ actor actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) + timeline_bookmark redirect_headers(self, actor_path_str, cookie, calling_domain, 303) return if onion_domain: if '.onion/' in actor: curr_session = session_onion proxy_type = 'tor' if i2p_domain: if '.onion/' in actor: curr_session = session_i2p proxy_type = 'i2p' curr_session = \ establish_session("reactionButton", curr_session, proxy_type, self.server) if not curr_session: http_404(self, 54) return reaction_actor = \ local_actor_url(http_prefix, self.post_to_nickname, domain_full) actor_reaction = path.split('?actor=')[1] if '?' in actor_reaction: actor_reaction = actor_reaction.split('?')[0] # if this is an announce then send the emoji reaction # to the original post orig_actor, orig_post_url, orig_filename = \ get_original_post_from_announce_url(reaction_url, base_dir, self.post_to_nickname, domain) reaction_url2 = reaction_url reaction_post_filename = orig_filename if orig_actor and orig_post_url: actor_reaction = orig_actor reaction_url2 = orig_post_url reaction_post_filename = None reaction_json = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'EmojiReact', 'actor': reaction_actor, 'to': [actor_reaction], 'object': reaction_url2, 'content': emoji_content } # send out the emoji reaction to followers post_to_outbox(self, reaction_json, project_version, None, curr_session, proxy_type) fitness_performance(getreq_start_time, fitness, '_GET', '_reaction_button postToOutbox', debug) print('Locating emoji reaction post ' + reaction_url) # directly emoji reaction the post file if not reaction_post_filename: reaction_post_filename = \ locate_post(base_dir, self.post_to_nickname, domain, reaction_url) if reaction_post_filename: reaction_post_json = load_json(reaction_post_filename) if orig_filename and orig_post_url: update_reaction_collection(recent_posts_cache, base_dir, reaction_post_filename, reaction_url, reaction_actor, self.post_to_nickname, domain, debug, reaction_post_json, emoji_content) reaction_url = orig_post_url reaction_post_filename = orig_filename if debug: print('Updating emoji reaction for ' + reaction_post_filename) update_reaction_collection(recent_posts_cache, base_dir, reaction_post_filename, reaction_url, reaction_actor, self.post_to_nickname, domain, debug, None, emoji_content) if debug: print('Regenerating html post for changed ' + 'emoji reaction collection') # clear the icon from the cache so that it gets updated if reaction_post_json: cached_post_filename = \ get_cached_post_filename(base_dir, self.post_to_nickname, domain, reaction_post_json) if debug: print('Reaction post json: ' + str(reaction_post_json)) print('Reaction post nickname: ' + self.post_to_nickname + ' ' + domain) print('Reaction post cache: ' + str(cached_post_filename)) show_individual_post_icons = True manually_approve_followers = \ follower_approval_active(base_dir, self.post_to_nickname, domain) show_repeats = not is_dm(reaction_post_json) timezone = None if account_timezone.get(self.post_to_nickname): timezone = account_timezone.get(self.post_to_nickname) mitm = False if os.path.isfile(reaction_post_filename.replace('.json', '') + '.mitm'): mitm = True bold_reading = False if bold_reading_nicknames.get(self.post_to_nickname): bold_reading = True minimize_all_images = False if self.post_to_nickname in min_images_for_accounts: minimize_all_images = True individual_post_as_html(signing_priv_key_pem, False, recent_posts_cache, max_recent_posts, translate, page_number, base_dir, curr_session, cached_webfingers, person_cache, self.post_to_nickname, domain, port, reaction_post_json, None, True, allow_deletion, http_prefix, project_version, timeline_str, yt_replace_domain, twitter_replacement_domain, show_published_date_only, peertube_instances, allow_local_network_access, theme_name, system_language, max_like_count, show_repeats, show_individual_post_icons, manually_approve_followers, False, True, False, cw_lists, lists_enabled, timezone, mitm, bold_reading, dogwhistles, minimize_all_images, None, buy_sites, auto_cw_cache) else: print('WARN: Emoji reaction post not found: ' + reaction_post_filename) else: print('WARN: unable to locate file for emoji reaction post ' + reaction_url) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ '/users/' + self.post_to_nickname actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) + first_post_id + \ timeline_bookmark fitness_performance(getreq_start_time, fitness, '_GET', '_reaction_button', debug) redirect_headers(self, actor_path_str, cookie, calling_domain, 303) def reaction_button_undo(self, calling_domain: str, path: str, base_dir: str, http_prefix: str, domain: str, domain_full: str, onion_domain: str, i2p_domain: str, getreq_start_time, proxy_type: str, cookie: str, debug: str, curr_session, signing_priv_key_pem: str, recent_posts_cache: {}, max_recent_posts: int, translate: {}, cached_webfingers: {}, person_cache: {}, port: int, allow_deletion: bool, project_version: str, yt_replace_domain: str, twitter_replacement_domain: str, show_published_date_only: bool, peertube_instances: [], allow_local_network_access: bool, theme_name: str, system_language: str, max_like_count: int, cw_lists: {}, lists_enabled: {}, dogwhistles: {}, buy_sites: [], auto_cw_cache: {}, fitness: {}, account_timezone: {}, bold_reading_nicknames: {}, min_images_for_accounts: [], session_onion, session_i2p) -> None: """A button is pressed to undo emoji reaction """ page_number = 1 reaction_url = path.split('?unreact=')[1] if '?' in reaction_url: reaction_url = reaction_url.split('?')[0] first_post_id = '' if '?firstpost=' in path: first_post_id = path.split('?firstpost=')[1] if '?' in first_post_id: first_post_id = first_post_id.split('?')[0] first_post_id = first_post_id.replace('/', '--') first_post_id = ';firstpost=' + first_post_id.replace('#', '--') timeline_bookmark = '' if '?bm=' in path: timeline_bookmark = path.split('?bm=')[1] if '?' in timeline_bookmark: timeline_bookmark = timeline_bookmark.split('?')[0] timeline_bookmark = '#' + timeline_bookmark if '?page=' in path: page_number_str = path.split('?page=')[1] if ';' in page_number_str: page_number_str = page_number_str.split(';')[0] if '?' in page_number_str: page_number_str = page_number_str.split('?')[0] if '#' in page_number_str: page_number_str = page_number_str.split('#')[0] if len(page_number_str) > 5: page_number_str = "1" if page_number_str.isdigit(): page_number = int(page_number_str) timeline_str = 'inbox' if '?tl=' in path: timeline_str = path.split('?tl=')[1] if '?' in timeline_str: timeline_str = timeline_str.split('?')[0] actor = path.split('?unreact=')[0] self.post_to_nickname = get_nickname_from_actor(actor) if not self.post_to_nickname: print('WARN: unable to find nickname in ' + actor) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ actor actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) redirect_headers(self, actor_path_str, cookie, calling_domain, 303) return emoji_content_encoded = None if '?emojreact=' in path: emoji_content_encoded = path.split('?emojreact=')[1] if '?' in emoji_content_encoded: emoji_content_encoded = emoji_content_encoded.split('?')[0] if not emoji_content_encoded: print('WARN: no emoji reaction ' + actor) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ actor actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) + timeline_bookmark redirect_headers(self, actor_path_str, cookie, calling_domain, 303) return emoji_content = urllib.parse.unquote_plus(emoji_content_encoded) if onion_domain: if '.onion/' in actor: curr_session = session_onion proxy_type = 'tor' if i2p_domain: if '.onion/' in actor: curr_session = session_i2p proxy_type = 'i2p' curr_session = \ establish_session("undoReactionButton", curr_session, proxy_type, self.server) if not curr_session: http_404(self, 55) return undo_actor = \ local_actor_url(http_prefix, self.post_to_nickname, domain_full) actor_reaction = path.split('?actor=')[1] if '?' in actor_reaction: actor_reaction = actor_reaction.split('?')[0] # if this is an announce then send the emoji reaction # to the original post orig_actor, orig_post_url, orig_filename = \ get_original_post_from_announce_url(reaction_url, base_dir, self.post_to_nickname, domain) reaction_url2 = reaction_url reaction_post_filename = orig_filename if orig_actor and orig_post_url: actor_reaction = orig_actor reaction_url2 = orig_post_url reaction_post_filename = None undo_reaction_json = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Undo', 'actor': undo_actor, 'to': [actor_reaction], 'object': { 'type': 'EmojiReact', 'actor': undo_actor, 'to': [actor_reaction], 'object': reaction_url2 } } # send out the undo emoji reaction to followers post_to_outbox(self, undo_reaction_json, project_version, None, curr_session, proxy_type) # directly undo the emoji reaction within the post file if not reaction_post_filename: reaction_post_filename = \ locate_post(base_dir, self.post_to_nickname, domain, reaction_url) if reaction_post_filename: reaction_post_json = load_json(reaction_post_filename) if orig_filename and orig_post_url: undo_reaction_collection_entry(recent_posts_cache, base_dir, reaction_post_filename, undo_actor, domain, debug, reaction_post_json, emoji_content) reaction_url = orig_post_url reaction_post_filename = orig_filename if debug: print('Removing emoji reaction for ' + reaction_post_filename) undo_reaction_collection_entry(recent_posts_cache, base_dir, reaction_post_filename, undo_actor, domain, debug, reaction_post_json, emoji_content) if debug: print('Regenerating html post for changed ' + 'emoji reaction collection') if reaction_post_json: show_individual_post_icons = True manually_approve_followers = \ follower_approval_active(base_dir, self.post_to_nickname, domain) show_repeats = not is_dm(reaction_post_json) timezone = None if account_timezone.get(self.post_to_nickname): timezone = account_timezone.get(self.post_to_nickname) mitm = False if os.path.isfile(reaction_post_filename.replace('.json', '') + '.mitm'): mitm = True bold_reading = False if bold_reading_nicknames.get(self.post_to_nickname): bold_reading = True minimize_all_images = False if self.post_to_nickname in min_images_for_accounts: minimize_all_images = True individual_post_as_html(signing_priv_key_pem, False, recent_posts_cache, max_recent_posts, translate, page_number, base_dir, curr_session, cached_webfingers, person_cache, self.post_to_nickname, domain, port, reaction_post_json, None, True, allow_deletion, http_prefix, project_version, timeline_str, yt_replace_domain, twitter_replacement_domain, show_published_date_only, peertube_instances, allow_local_network_access, theme_name, system_language, max_like_count, show_repeats, show_individual_post_icons, manually_approve_followers, False, True, False, cw_lists, lists_enabled, timezone, mitm, bold_reading, dogwhistles, minimize_all_images, None, buy_sites, auto_cw_cache) else: print('WARN: Unreaction post not found: ' + reaction_post_filename) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ '/users/' + self.post_to_nickname actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) + first_post_id + \ timeline_bookmark fitness_performance(getreq_start_time, fitness, '_GET', '_undo_reaction_button', debug) redirect_headers(self, actor_path_str, cookie, calling_domain, 303) def bookmark_button(self, calling_domain: str, path: str, base_dir: str, http_prefix: str, domain: str, domain_full: str, port: int, onion_domain: str, i2p_domain: str, getreq_start_time, proxy_type: str, cookie: str, debug: str, curr_session, signing_priv_key_pem: str, recent_posts_cache: {}, max_recent_posts: int, translate: {}, cached_webfingers: {}, person_cache: {}, allow_deletion: bool, project_version: str, yt_replace_domain: str, twitter_replacement_domain: str, show_published_date_only: bool, peertube_instances: [], allow_local_network_access: bool, theme_name: str, system_language: str, max_like_count: int, cw_lists: {}, lists_enabled: {}, dogwhistles: {}, buy_sites: [], auto_cw_cache: {}, fitness: {}, federation_list: [], icons_cache: {}, account_timezone: {}, bold_reading_nicknames: {}, min_images_for_accounts: [], session_onion, session_i2p) -> None: """Bookmark button was pressed """ page_number = 1 bookmark_url = path.split('?bookmark=')[1] if '?' in bookmark_url: bookmark_url = bookmark_url.split('?')[0] first_post_id = '' if '?firstpost=' in path: first_post_id = path.split('?firstpost=')[1] if '?' in first_post_id: first_post_id = first_post_id.split('?')[0] first_post_id = first_post_id.replace('/', '--') first_post_id = ';firstpost=' + first_post_id.replace('#', '--') timeline_bookmark = '' if '?bm=' in path: timeline_bookmark = path.split('?bm=')[1] if '?' in timeline_bookmark: timeline_bookmark = timeline_bookmark.split('?')[0] timeline_bookmark = '#' + timeline_bookmark actor = path.split('?bookmark=')[0] if '?page=' in path: page_number_str = path.split('?page=')[1] if ';' in page_number_str: page_number_str = page_number_str.split(';')[0] if '?' in page_number_str: page_number_str = page_number_str.split('?')[0] if '#' in page_number_str: page_number_str = page_number_str.split('#')[0] if len(page_number_str) > 5: page_number_str = "1" if page_number_str.isdigit(): page_number = int(page_number_str) timeline_str = 'inbox' if '?tl=' in path: timeline_str = path.split('?tl=')[1] if '?' in timeline_str: timeline_str = timeline_str.split('?')[0] self.post_to_nickname = get_nickname_from_actor(actor) if not self.post_to_nickname: print('WARN: unable to find nickname in ' + actor) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ actor actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) redirect_headers(self, actor_path_str, cookie, calling_domain, 303) return if onion_domain: if '.onion/' in actor: curr_session = session_onion proxy_type = 'tor' if i2p_domain: if '.onion/' in actor: curr_session = session_i2p proxy_type = 'i2p' curr_session = \ establish_session("bookmarkButton", curr_session, proxy_type, self.server) if not curr_session: http_404(self, 56) return bookmark_actor = \ local_actor_url(http_prefix, self.post_to_nickname, domain_full) cc_list = [] bookmark_post(recent_posts_cache, base_dir, federation_list, self.post_to_nickname, domain, port, cc_list, http_prefix, bookmark_url, bookmark_actor, debug) # clear the icon from the cache so that it gets updated if icons_cache.get('bookmark.png'): del icons_cache['bookmark.png'] bookmark_filename = \ locate_post(base_dir, self.post_to_nickname, domain, bookmark_url) if bookmark_filename: print('Regenerating html post for changed bookmark') bookmark_post_json = load_json(bookmark_filename) if bookmark_post_json: cached_post_filename = \ get_cached_post_filename(base_dir, self.post_to_nickname, domain, bookmark_post_json) print('Bookmarked post json: ' + str(bookmark_post_json)) print('Bookmarked post nickname: ' + self.post_to_nickname + ' ' + domain) print('Bookmarked post cache: ' + str(cached_post_filename)) show_individual_post_icons = True manually_approve_followers = \ follower_approval_active(base_dir, self.post_to_nickname, domain) show_repeats = not is_dm(bookmark_post_json) timezone = None if account_timezone.get(self.post_to_nickname): timezone = account_timezone.get(self.post_to_nickname) mitm = False if os.path.isfile(bookmark_filename.replace('.json', '') + '.mitm'): mitm = True bold_reading = False if bold_reading_nicknames.get(self.post_to_nickname): bold_reading = True minimize_all_images = False if self.post_to_nickname in min_images_for_accounts: minimize_all_images = True individual_post_as_html(signing_priv_key_pem, False, recent_posts_cache, max_recent_posts, translate, page_number, base_dir, curr_session, cached_webfingers, person_cache, self.post_to_nickname, domain, port, bookmark_post_json, None, True, allow_deletion, http_prefix, project_version, timeline_str, yt_replace_domain, twitter_replacement_domain, show_published_date_only, peertube_instances, allow_local_network_access, theme_name, system_language, max_like_count, show_repeats, show_individual_post_icons, manually_approve_followers, False, True, False, cw_lists, lists_enabled, timezone, mitm, bold_reading, dogwhistles, minimize_all_images, None, buy_sites, auto_cw_cache) else: print('WARN: Bookmarked post not found: ' + bookmark_filename) # _post_to_outbox(self, bookmark_json, # project_version, None, # curr_session, proxy_type) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ '/users/' + self.post_to_nickname actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) + first_post_id + \ timeline_bookmark fitness_performance(getreq_start_time, fitness, '_GET', '_bookmark_button', debug) redirect_headers(self, actor_path_str, cookie, calling_domain, 303) def bookmark_button_undo(self, calling_domain: str, path: str, base_dir: str, http_prefix: str, domain: str, domain_full: str, port: int, onion_domain: str, i2p_domain: str, getreq_start_time, proxy_type: str, cookie: str, debug: str, curr_session, signing_priv_key_pem: str, recent_posts_cache: {}, max_recent_posts: int, translate: {}, cached_webfingers: {}, person_cache: {}, allow_deletion: bool, project_version: str, yt_replace_domain: str, twitter_replacement_domain: str, show_published_date_only: bool, peertube_instances: [], allow_local_network_access: bool, theme_name: str, system_language: str, max_like_count: int, cw_lists: {}, lists_enabled: {}, dogwhistles: {}, buy_sites: [], auto_cw_cache: {}, fitness: {}, federation_list: [], icons_cache: {}, account_timezone: {}, bold_reading_nicknames: {}, min_images_for_accounts: [], session_onion, session_i2p) -> None: """Button pressed to undo a bookmark """ page_number = 1 bookmark_url = path.split('?unbookmark=')[1] if '?' in bookmark_url: bookmark_url = bookmark_url.split('?')[0] first_post_id = '' if '?firstpost=' in path: first_post_id = path.split('?firstpost=')[1] if '?' in first_post_id: first_post_id = first_post_id.split('?')[0] first_post_id = first_post_id.replace('/', '--') first_post_id = ';firstpost=' + first_post_id.replace('#', '--') timeline_bookmark = '' if '?bm=' in path: timeline_bookmark = path.split('?bm=')[1] if '?' in timeline_bookmark: timeline_bookmark = timeline_bookmark.split('?')[0] timeline_bookmark = '#' + timeline_bookmark if '?page=' in path: page_number_str = path.split('?page=')[1] if ';' in page_number_str: page_number_str = page_number_str.split(';')[0] if '?' in page_number_str: page_number_str = page_number_str.split('?')[0] if '#' in page_number_str: page_number_str = page_number_str.split('#')[0] if len(page_number_str) > 5: page_number_str = "1" if page_number_str.isdigit(): page_number = int(page_number_str) timeline_str = 'inbox' if '?tl=' in path: timeline_str = path.split('?tl=')[1] if '?' in timeline_str: timeline_str = timeline_str.split('?')[0] actor = path.split('?unbookmark=')[0] self.post_to_nickname = get_nickname_from_actor(actor) if not self.post_to_nickname: print('WARN: unable to find nickname in ' + actor) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ actor actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) redirect_headers(self, actor_path_str, cookie, calling_domain, 303) return if onion_domain: if '.onion/' in actor: curr_session = session_onion proxy_type = 'tor' if i2p_domain: if '.onion/' in actor: curr_session = session_i2p proxy_type = 'i2p' curr_session = \ establish_session("undo_bookmarkButton", curr_session, proxy_type, self.server) if not curr_session: http_404(self, 57) return undo_actor = \ local_actor_url(http_prefix, self.post_to_nickname, domain_full) cc_list = [] undo_bookmark_post(recent_posts_cache, base_dir, federation_list, self.post_to_nickname, domain, port, cc_list, http_prefix, bookmark_url, undo_actor, debug) # clear the icon from the cache so that it gets updated if icons_cache.get('bookmark_inactive.png'): del icons_cache['bookmark_inactive.png'] # post_to_outbox(self, undo_bookmark_json, # project_version, None, # curr_session, proxy_type) bookmark_filename = \ locate_post(base_dir, self.post_to_nickname, domain, bookmark_url) if bookmark_filename: print('Regenerating html post for changed unbookmark') bookmark_post_json = load_json(bookmark_filename) if bookmark_post_json: cached_post_filename = \ get_cached_post_filename(base_dir, self.post_to_nickname, domain, bookmark_post_json) print('Unbookmarked post json: ' + str(bookmark_post_json)) print('Unbookmarked post nickname: ' + self.post_to_nickname + ' ' + domain) print('Unbookmarked post cache: ' + str(cached_post_filename)) show_individual_post_icons = True manually_approve_followers = \ follower_approval_active(base_dir, self.post_to_nickname, domain) show_repeats = not is_dm(bookmark_post_json) timezone = None if account_timezone.get(self.post_to_nickname): timezone = account_timezone.get(self.post_to_nickname) mitm = False if os.path.isfile(bookmark_filename.replace('.json', '') + '.mitm'): mitm = True bold_reading = False if bold_reading_nicknames.get(self.post_to_nickname): bold_reading = True minimize_all_images = False if self.post_to_nickname in min_images_for_accounts: minimize_all_images = True individual_post_as_html(signing_priv_key_pem, False, recent_posts_cache, max_recent_posts, translate, page_number, base_dir, curr_session, cached_webfingers, person_cache, self.post_to_nickname, domain, port, bookmark_post_json, None, True, allow_deletion, http_prefix, project_version, timeline_str, yt_replace_domain, twitter_replacement_domain, show_published_date_only, peertube_instances, allow_local_network_access, theme_name, system_language, max_like_count, show_repeats, show_individual_post_icons, manually_approve_followers, False, True, False, cw_lists, lists_enabled, timezone, mitm, bold_reading, dogwhistles, minimize_all_images, None, buy_sites, auto_cw_cache) else: print('WARN: Unbookmarked post not found: ' + bookmark_filename) actor_absolute = \ get_instance_url(calling_domain, http_prefix, domain_full, onion_domain, i2p_domain) + \ '/users/' + self.post_to_nickname actor_path_str = \ actor_absolute + '/' + timeline_str + \ '?page=' + str(page_number) + first_post_id + \ timeline_bookmark fitness_performance(getreq_start_time, fitness, '_GET', '_undo_bookmark_button', debug) redirect_headers(self, actor_path_str, cookie, calling_domain, 303) def delete_button(self, calling_domain: str, path: str, base_dir: str, http_prefix: str, domain_full: str, onion_domain: str, i2p_domain: str, getreq_start_time, proxy_type: str, cookie: str, debug: str, curr_session, recent_posts_cache: {}, max_recent_posts: int, translate: {}, project_version: str, cached_webfingers: {}, person_cache: {}, yt_replace_domain: str, twitter_replacement_domain: str, show_published_date_only: bool, peertube_instances: [], allow_local_network_access: bool, theme_name: str, system_language: str, max_like_count: int, signing_priv_key_pem: str, cw_lists: {}, lists_enabled: {}, dogwhistles: {}, min_images_for_accounts: [], buy_sites: [], auto_cw_cache: {}, fitness: {}, allow_deletion: bool, session_onion, session_i2p, default_timeline: str) -> None: """Delete button is pressed on a post """ if not cookie: print('ERROR: no cookie given when deleting ' + path) http_400(self) return page_number = 1 if '?page=' in path: page_number_str = path.split('?page=')[1] if '?' in page_number_str: page_number_str = page_number_str.split('?')[0] if '#' in page_number_str: page_number_str = page_number_str.split('#')[0] if len(page_number_str) > 5: page_number_str = "1" if page_number_str.isdigit(): page_number = int(page_number_str) delete_url = path.split('?delete=')[1] if '?' in delete_url: delete_url = delete_url.split('?')[0] timeline_str = default_timeline if '?tl=' in path: timeline_str = path.split('?tl=')[1] if '?' in timeline_str: timeline_str = timeline_str.split('?')[0] users_path = path.split('?delete=')[0] actor = \ http_prefix + '://' + domain_full + users_path if allow_deletion or delete_url.startswith(actor): if debug: print('DEBUG: delete_url=' + delete_url) print('DEBUG: actor=' + actor) if actor not in delete_url: # You can only delete your own posts if calling_domain.endswith('.onion') and onion_domain: actor = 'http://' + onion_domain + users_path elif calling_domain.endswith('.i2p') and i2p_domain: actor = 'http://' + i2p_domain + users_path redirect_headers(self, actor + '/' + timeline_str, cookie, calling_domain, 303) return self.post_to_nickname = get_nickname_from_actor(actor) if not self.post_to_nickname: print('WARN: unable to find nickname in ' + actor) if calling_domain.endswith('.onion') and onion_domain: actor = 'http://' + onion_domain + users_path elif calling_domain.endswith('.i2p') and i2p_domain: actor = 'http://' + i2p_domain + users_path redirect_headers(self, actor + '/' + timeline_str, cookie, calling_domain, 303) return if onion_domain: if '.onion/' in actor: curr_session = session_onion proxy_type = 'tor' if i2p_domain: if '.onion/' in actor: curr_session = session_i2p proxy_type = 'i2p' curr_session = \ establish_session("deleteButton", curr_session, proxy_type, self.server) if not curr_session: http_404(self, 58) return delete_str = \ html_confirm_delete(self.server, recent_posts_cache, max_recent_posts, translate, page_number, curr_session, base_dir, delete_url, http_prefix, project_version, cached_webfingers, person_cache, calling_domain, yt_replace_domain, twitter_replacement_domain, show_published_date_only, peertube_instances, allow_local_network_access, theme_name, system_language, max_like_count, signing_priv_key_pem, cw_lists, lists_enabled, dogwhistles, min_images_for_accounts, buy_sites, auto_cw_cache) if delete_str: delete_str_len = len(delete_str) set_headers(self, 'text/html', delete_str_len, cookie, calling_domain, False) write2(self, delete_str.encode('utf-8')) self.server.getreq_busy = False return if calling_domain.endswith('.onion') and onion_domain: actor = 'http://' + onion_domain + users_path elif (calling_domain.endswith('.i2p') and i2p_domain): actor = 'http://' + i2p_domain + users_path fitness_performance(getreq_start_time, fitness, '_GET', '_delete_button', debug) redirect_headers(self, actor + '/' + timeline_str, cookie, calling_domain, 303)