From 464746fe9c970fbb64e2cc4d4d0b12827ac4897c Mon Sep 17 00:00:00 2001 From: bashrc Date: Mon, 4 May 2026 20:46:41 +0100 Subject: [PATCH] Check that muted is boolean --- posts.py | 330 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 169 insertions(+), 161 deletions(-) diff --git a/posts.py b/posts.py index 6439ee0b5..bc33392ae 100644 --- a/posts.py +++ b/posts.py @@ -4750,7 +4750,7 @@ def _create_box_items(base_dir: str, box_actor: str) -> (int, int): """Creates the list of posts within a timeline """ - index_filename = \ + index_filename: str = \ acct_dir(base_dir, timeline_nickname, original_domain) + \ '/' + index_box_name + '.index' total_posts_count: int = 0 @@ -4768,12 +4768,12 @@ def _create_box_items(base_dir: str, } first_post_id = replace_strings(first_post_id, replacements) - prev_post_filename = None + prev_post_filename: str = None try: with open(index_filename, 'r', encoding='utf-8') as fp_index: posts_added_to_timeline: int = 0 while posts_added_to_timeline < items_per_page: - post_filename = fp_index.readline() + post_filename: str = fp_index.readline() if post_filename == prev_post_filename: break @@ -4811,7 +4811,7 @@ def _create_box_items(base_dir: str, # filename of the post without any extension or path # This should also correspond to any index entry in # the posts cache - post_url = remove_eol(post_filename) + post_url: str = remove_eol(post_filename) post_url = post_url.replace('.json', '').strip() # is this a duplicate? @@ -4837,13 +4837,13 @@ def _create_box_items(base_dir: str, post_url) # read the post from file - full_post_filename = \ + full_post_filename: str = \ locate_post(base_dir, nickname, original_domain, post_url, False) if full_post_filename: # has the post been rejected? if is_a_file(full_post_filename + '.reject'): - post_url2 = post_url.replace('/', '#') + '.json' + post_url2: str = post_url.replace('/', '#') + '.json' remove_post_from_index(post_url2, False, index_filename) print('REJECT: rejected post in timeline ' + @@ -4929,12 +4929,12 @@ def _create_box_indexed(recent_posts_cache: {}, index_box_name = boxname timeline_nickname = 'news' - original_domain = domain - domain = get_full_domain(domain, port) + original_domain: str = domain + domain: str = get_full_domain(domain, port) - box_actor = local_actor_url(http_prefix, nickname, domain) + box_actor: str = local_actor_url(http_prefix, nickname, domain) - page_str = '?page=true' + page_str: str = '?page=true' if page_number: page_number = max(page_number, 1) try: @@ -4942,7 +4942,8 @@ def _create_box_indexed(recent_posts_cache: {}, except BaseException: print('EX: _create_box_indexed ' + 'unable to convert page number to string') - box_url = local_actor_url(http_prefix, nickname, domain) + '/' + boxname + box_url: str = \ + local_actor_url(http_prefix, nickname, domain) + '/' + boxname box_header = { "@context": [ 'https://www.w3.org/ns/activitystreams', @@ -4966,7 +4967,7 @@ def _create_box_indexed(recent_posts_cache: {}, 'type': 'OrderedCollectionPage' } - posts_in_box = [] + posts_in_box: list[str] = [] post_urls_in_box: list[str] = [] if not unauthorized_premium: @@ -5127,7 +5128,7 @@ def _expire_conversations_for_person(base_dir: str, max_age_days: int) -> int: """Expires entries within the conversation directory """ - conv_dir = acct_dir(base_dir, nickname, domain) + '/conversation' + conv_dir: str = acct_dir(base_dir, nickname, domain) + '/conversation' if not is_a_dir(conv_dir): print('No conversations for ' + nickname + '@' + domain) return 0 @@ -5139,10 +5140,10 @@ def _expire_conversations_for_person(base_dir: str, # don't expire muted conversations, so they stay muted continue # Time of file creation - full_filename = os.path.join(conv_dir, conv_filename) + full_filename: str = os.path.join(conv_dir, conv_filename) if not is_a_file(full_filename): continue - last_modified = file_last_modified(full_filename) + last_modified: str = file_last_modified(full_filename) # get time difference if not valid_post_date(last_modified, max_age_days, False): erase_file(full_filename, @@ -5157,7 +5158,7 @@ def _expire_posts_cache_for_person(base_dir: str, max_age_days: int) -> int: """Expires entries within the posts cache """ - cache_dir = acct_dir(base_dir, nickname, domain) + '/postcache' + cache_dir: str = acct_dir(base_dir, nickname, domain) + '/postcache' if not is_a_dir(cache_dir): print('No cached posts for ' + nickname + '@' + domain) return 0 @@ -5166,10 +5167,10 @@ def _expire_posts_cache_for_person(base_dir: str, for cache_filename in posts_in_cache: cache_filename = cache_filename.name # Time of file creation - full_filename = os.path.join(cache_dir, cache_filename) + full_filename: str = os.path.join(cache_dir, cache_filename) if not is_a_file(full_filename): continue - last_modified = file_last_modified(full_filename) + last_modified: str = file_last_modified(full_filename) # get time difference if not valid_post_date(last_modified, max_age_days, False): erase_file(full_filename, @@ -5185,12 +5186,12 @@ def _novel_fields_for_person(nickname: str, domain: str, """ if boxname not in ('inbox'): return - box_dir = create_person_dir(nickname, domain, base_dir, boxname) + box_dir: str = create_person_dir(nickname, domain, base_dir, boxname) posts_in_box = os.scandir(box_dir) posts_ctr: int = 0 fields: list[str] = [] - expected_fields = ( + expected_fields: list[str] = ( 'alsoKnownAs', 'attachment', 'capabilities', @@ -5278,7 +5279,7 @@ def _novel_fields_for_person(nickname: str, domain: str, post_filename = post_filename.name if not post_filename.endswith('.json'): continue - full_filename = os.path.join(box_dir, post_filename) + full_filename: str = os.path.join(box_dir, post_filename) if not is_a_file(full_filename): continue post_json_object = load_json(full_filename) @@ -5302,12 +5303,12 @@ def _novel_fields_for_person(nickname: str, domain: str, def novel_fields(base_dir: str) -> None: """Reports any unexpected fields within posts """ - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for handle in dirs: if '@' in handle: - nickname = handle.split('@')[0] - domain = handle.split('@')[1] + nickname: str = handle.split('@')[0] + domain: str = handle.split('@')[1] _novel_fields_for_person(nickname, domain, base_dir, 'inbox') break @@ -5330,15 +5331,16 @@ def archive_posts(base_dir: str, http_prefix: str, archive_dir: str, if not is_a_dir(archive_dir + '/accounts'): makedir(archive_dir + '/accounts') - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for handle in dirs: if '@' in handle: - nickname = handle.split('@')[0] - domain = handle.split('@')[1] - archive_subdir = None + nickname: str = handle.split('@')[0] + domain: str = handle.split('@')[1] + archive_subdir: str = None if archive_dir: - archive_handle_dir = acct_handle_dir(archive_dir, handle) + archive_handle_dir: str = \ + acct_handle_dir(archive_dir, handle) if not is_a_dir(archive_handle_dir): makedir(archive_handle_dir) if not is_a_dir(archive_handle_dir + '/inbox'): @@ -5350,21 +5352,21 @@ def archive_posts(base_dir: str, http_prefix: str, archive_dir: str, nickname, domain, base_dir, 'inbox', archive_subdir, recent_posts_cache, max_posts_in_box) - expired_announces = \ + expired_announces: int = \ _expire_announce_cache_for_person(base_dir, nickname, domain, max_cache_age_days) print('Expired ' + str(expired_announces) + ' cached announces for ' + nickname + '@' + domain) - expired_conversations = \ + expired_conversations: int = \ _expire_conversations_for_person(base_dir, nickname, domain, max_cache_age_days) print('Expired ' + str(expired_conversations) + ' conversations for ' + nickname + '@' + domain) - expired_posts = \ + expired_posts: int = \ _expire_posts_cache_for_person(base_dir, nickname, domain, max_cache_age_days) @@ -5390,8 +5392,8 @@ def _expire_posts_for_person(http_prefix: str, nickname: str, domain: str, if max_age_days <= 0: return expired_post_count - boxname = 'outbox' - box_dir = create_person_dir(nickname, domain, base_dir, boxname) + boxname: str = 'outbox' + box_dir: str = create_person_dir(nickname, domain, base_dir, boxname) posts_in_box = os.scandir(box_dir) for post_filename in posts_in_box: @@ -5399,10 +5401,10 @@ def _expire_posts_for_person(http_prefix: str, nickname: str, domain: str, if not post_filename.endswith('.json'): continue # get the post json as text - full_filename = os.path.join(box_dir, post_filename) + full_filename: str = os.path.join(box_dir, post_filename) if not is_a_file(full_filename): continue - content = \ + content: str = \ load_string(full_filename, 'EX: expire_posts_for_person unable to open content ' + full_filename) @@ -5410,7 +5412,7 @@ def _expire_posts_for_person(http_prefix: str, nickname: str, domain: str, content = '' # Get time of publication - published_str = '' + published_str: str = '' if '"published":' not in content: # If a publication date is not available then check the last # modified date @@ -5457,7 +5459,7 @@ def get_post_expiry_keep_dms(base_dir: str, nickname: str, domain: str) -> int: """ keep_dms: bool = True handle: str = nickname + '@' + domain - expire_dms_filename = \ + expire_dms_filename: str = \ acct_handle_dir(base_dir, handle) + '/.expire_posts_dms' if is_a_file(expire_dms_filename): keep_dms = False @@ -5468,8 +5470,8 @@ def set_post_expiry_keep_dms(base_dir: str, nickname: str, domain: str, keep_dms: bool) -> None: """Sets whether to keep DMs during post expiry for an account """ - handle = nickname + '@' + domain - expire_dms_filename = \ + handle: str = nickname + '@' + domain + expire_dms_filename: str = \ acct_handle_dir(base_dir, handle) + '/.expire_posts_dms' if keep_dms: if is_a_file(expire_dms_filename): @@ -5487,19 +5489,20 @@ def expire_posts(base_dir: str, http_prefix: str, """Expires posts for instance accounts """ expired_post_count: int = 0 - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for handle in dirs: if '@' not in handle: continue - nickname = handle.split('@')[0] - domain = handle.split('@')[1] - expire_posts_filename = \ + nickname: str = handle.split('@')[0] + domain: str = handle.split('@')[1] + expire_posts_filename: str = \ acct_handle_dir(base_dir, handle) + '/.expire_posts_days' if not is_a_file(expire_posts_filename): continue - keep_dms = get_post_expiry_keep_dms(base_dir, nickname, domain) - expire_days_str = \ + keep_dms: int = \ + get_post_expiry_keep_dms(base_dir, nickname, domain) + expire_days_str: str = \ load_string(expire_posts_filename, 'EX: expire_posts failed to read days file ' + expire_posts_filename) @@ -5509,7 +5512,7 @@ def expire_posts(base_dir: str, http_prefix: str, continue if not expire_days_str.isdigit(): continue - max_age_days = int(expire_days_str) + max_age_days: int = int(expire_days_str) if max_age_days <= 0: continue expired_post_count += \ @@ -5525,14 +5528,14 @@ def expire_posts(base_dir: str, http_prefix: str, def get_post_expiry_days(base_dir: str, nickname: str, domain: str) -> int: """Returns the post expiry period for the given account """ - handle = nickname + '@' + domain - expire_posts_filename = \ + handle: str = nickname + '@' + domain + expire_posts_filename: str = \ acct_handle_dir(base_dir, handle) + '/.expire_posts_days' if not is_a_file(expire_posts_filename): return 0 - days_str = load_string(expire_posts_filename, - 'EX: unable to write post expire days ' + - expire_posts_filename) + days_str: str = load_string(expire_posts_filename, + 'EX: unable to write post expire days ' + + expire_posts_filename) if not days_str: return 0 if not days_str.isdigit(): @@ -5544,10 +5547,10 @@ def set_post_expiry_days(base_dir: str, nickname: str, domain: str, max_age_days: int) -> None: """Sets the number of days after which posts from an account will expire """ - handle = nickname + '@' + domain - expire_posts_filename = \ + handle: str = nickname + '@' + domain + expire_posts_filename: str = \ acct_handle_dir(base_dir, handle) + '/.expire_posts_days' - text = str(max_age_days) + text: str = str(max_age_days) save_string(text, expire_posts_filename, 'EX: unable to write post expire days ' + expire_posts_filename) @@ -5566,7 +5569,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, if archive_dir: if not is_a_dir(archive_dir): makedir(archive_dir) - box_dir = create_person_dir(nickname, domain, base_dir, boxname) + box_dir: str = create_person_dir(nickname, domain, base_dir, boxname) posts_in_box = os.scandir(box_dir) no_of_posts: int = 0 for _ in posts_in_box: @@ -5577,13 +5580,13 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, return # remove entries from the index - handle = nickname + '@' + domain - index_filename = \ + handle: str = nickname + '@' + domain + index_filename: str = \ acct_handle_dir(base_dir, handle) + '/' + boxname + '.index' if is_a_file(index_filename): index_ctr: int = 0 # get the existing index entries as a string - new_index = '' + new_index: str = '' index_list: list[str] = \ load_list(index_filename, 'EX: archive_posts_for_person unable to read ' + @@ -5602,9 +5605,9 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, # remove any edits or replies for ext_name in ('edits', 'replies'): - ext = '.' + ext_name + ext: str = '.' + ext_name posts_in_box = os.scandir(box_dir) - edits_in_box_dict = {} + edits_in_box_dict: dict = {} edits_ctr: int = 0 edits_removed_ctr: int = 0 edit_files_ctr: int = 0 @@ -5612,8 +5615,8 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, post_filename = post_filename.name if not post_filename.endswith(ext): continue - original_post_filename = post_filename.replace(ext, '.json') - full_original_filename = \ + original_post_filename: str = post_filename.replace(ext, '.json') + full_original_filename: str = \ os.path.join(box_dir, original_post_filename) full_filename = os.path.join(box_dir, post_filename) if not is_a_file(full_original_filename): @@ -5627,13 +5630,13 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, continue edit_files_ctr += 1 if is_a_file(full_filename): - content = load_string(full_filename, - 'EX: unable to open content 2 ' + - full_filename) + content: str = load_string(full_filename, + 'EX: unable to open content 2 ' + + full_filename) if content is None: content = '' if '"published":' in content: - published_str = content.split('"published":')[1] + published_str: str = content.split('"published":')[1] if '"' in published_str: published_str = published_str.split('"')[1] # does this look like a date? @@ -5647,7 +5650,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, print(str(edits_removed_ctr) + ' ' + ext_name + ' removed from ' + boxname + ' for ' + nickname + '@' + domain) - no_of_edits = edits_ctr + no_of_edits: int = edits_ctr if no_of_edits <= max_posts_in_box: print('Checked ' + str(no_of_edits) + ' ' + boxname + ' ' + ext_name + ' in ' + str(edit_files_ctr) + @@ -5659,18 +5662,18 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, remove_edits_ctr: int = 0 for published_str, edit_filename in edits_in_box_sorted.items(): - file_path = os.path.join(box_dir, edit_filename) + file_path: str = os.path.join(box_dir, edit_filename) if not is_a_file(file_path): continue if archive_dir: archive_path = os.path.join(archive_dir, edit_filename) - ex_text = \ + ex_text: str = \ 'EX: archive_posts_for_person unable to archive ' + \ ext_name + ' ' + file_path + ' -> ' + archive_path if move_file(file_path, archive_path, ex_text): remove_edits_ctr += 1 else: - ex_text = \ + ex_text: str = \ 'EX: archive_posts_for_person unable to delete ' + \ ext_name + ' ' + edit_filename if erase_file(edit_filename, ex_text): @@ -5690,22 +5693,22 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, if not post_filename.endswith('.json'): continue # Get the published time - full_filename = os.path.join(box_dir, post_filename) + full_filename: str = os.path.join(box_dir, post_filename) if is_a_file(full_filename): - content = load_string(full_filename, - 'EX: unable to open content 1 ' + - full_filename) + content: str = load_string(full_filename, + 'EX: unable to open content 1 ' + + full_filename) if content is None: content = '' if '"published":' in content: - published_str = content.split('"published":')[1] + published_str: str = content.split('"published":')[1] if '"' in published_str: published_str = published_str.split('"')[1] if published_str.endswith('Z'): posts_in_box_dict[published_str] = post_filename posts_ctr += 1 - no_of_posts = posts_ctr + no_of_posts: int = posts_ctr if no_of_posts <= max_posts_in_box: print('Checked ' + str(no_of_posts) + ' ' + boxname + ' posts for ' + nickname + '@' + domain) @@ -5716,34 +5719,35 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, OrderedDict(sorted(posts_in_box_dict.items(), reverse=False)) # directory containing cached html posts - post_cache_dir = box_dir.replace('/' + boxname, '/postcache') + post_cache_dir: str = box_dir.replace('/' + boxname, '/postcache') remove_ctr: int = 0 for published_str, post_filename in posts_in_box_sorted.items(): - file_path = os.path.join(box_dir, post_filename) + file_path: str = os.path.join(box_dir, post_filename) if not is_a_file(file_path): continue if archive_dir: - archive_path = os.path.join(archive_dir, post_filename) + archive_path: str = os.path.join(archive_dir, post_filename) move_file(file_path, archive_path, 'EX: archive_posts_for_person unable to archive ' + file_path + ' -> ' + archive_path) - extensions = ( + extensions: list[str] = ( 'votes', 'arrived', 'muted', 'tts', 'reject', 'mitm', 'edits', 'seen', 'json' ) for ext in extensions: - ext_path = file_path.replace('.json', '.' + ext) + ext_path: str = file_path.replace('.json', '.' + ext) if is_a_file(ext_path): - new_ext_path = archive_path.replace('.json', '.' + ext) + new_ext_path: str = \ + archive_path.replace('.json', '.' + ext) move_file(ext_path, new_ext_path, 'EX: unable to archive file ' + ext_path + ' -> ' + new_ext_path) continue ext_path = file_path.replace('.json', '.json.' + ext) if is_a_file(ext_path): - new_ext_path = \ + new_ext_path: str = \ archive_path.replace('.json', '.json.' + ext) move_file(ext_path, new_ext_path, 'EX: unable to archive file ' + ext_path + @@ -5754,7 +5758,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str, file_path, False, recent_posts_cache, False) # remove cached html posts - post_cache_filename = \ + post_cache_filename: str = \ os.path.join(post_cache_dir, post_filename) post_cache_filename = post_cache_filename.replace('.json', '.html') if is_a_file(post_cache_filename): @@ -5806,10 +5810,10 @@ def get_public_posts_of_person(base_dir: str, nickname: str, domain: str, if nickname.startswith('!'): nickname = nickname[1:] group_account = True - domain_full = get_full_domain(domain, port) - handle = http_prefix + "://" + domain_full + "/@" + nickname + domain_full: str = get_full_domain(domain, port) + handle: str = http_prefix + "://" + domain_full + "/@" + nickname - wf_request = \ + wf_request: dict = \ webfinger_handle(session, handle, http_prefix, cached_webfingers, origin_domain, project_version, debug, group_account, signing_priv_key_pem, mitm_servers) @@ -5861,12 +5865,12 @@ def get_public_post_domains(session, base_dir: str, nickname: str, domain: str, session = create_session(proxy_type) if not session: return domain_list - person_cache = {} - cached_webfingers = {} + person_cache: dict = {} + cached_webfingers: dict = {} - domain_full = get_full_domain(domain, port) - handle = http_prefix + "://" + domain_full + "/@" + nickname - wf_request = \ + domain_full: str = get_full_domain(domain, port) + handle: str = http_prefix + "://" + domain_full + "/@" + nickname + wf_request: dict = \ webfinger_handle(session, handle, http_prefix, cached_webfingers, domain, project_version, debug, False, signing_priv_key_pem, mitm_servers) @@ -5886,7 +5890,7 @@ def get_public_post_domains(session, base_dir: str, nickname: str, domain: str, nickname, domain, 'outbox', 92522, system_language, mitm_servers) - post_domains = \ + post_domains: list[str] = \ get_post_domains(session, person_url, 64, debug, project_version, http_prefix, domain, word_frequency, domain_list, system_language, @@ -5904,26 +5908,26 @@ def download_follow_collection(signing_priv_key_pem: str, """Returns a list of following/followers for the given actor by downloading the json for their following/followers collection """ - prof = 'https://www.w3.org/ns/activitystreams' + prof: str = 'https://www.w3.org/ns/activitystreams' if '/channel/' not in actor or '/accounts/' not in actor: - accept_str = \ + accept_str: str = \ 'application/activity+json; ' + \ 'profile="' + prof + '"' - session_headers = { + session_headers: dict = { 'Accept': accept_str } else: - accept_str = \ + accept_str: str = \ 'application/ld+json; ' + \ 'profile="' + prof + '"' - session_headers = { + session_headers: dict = { 'Accept': accept_str } result: list[str] = [] for page_ctr in range(no_of_pages): - url = \ + url: str = \ actor + '/' + follow_type + '?page=' + str(page_number + page_ctr) - followers_json = \ + followers_json: dict = \ get_json(signing_priv_key_pem, session, url, session_headers, None, debug, mitm_servers, __version__, http_prefix, None) if get_json_valid(followers_json): @@ -5955,12 +5959,12 @@ def get_public_post_info(session, base_dir: str, nickname: str, domain: str, session = create_session(proxy_type) if not session: return {} - person_cache = {} - cached_webfingers = {} + person_cache: dict = {} + cached_webfingers: dict = {} - domain_full = get_full_domain(domain, port) - handle = http_prefix + "://" + domain_full + "/@" + nickname - wf_request = \ + domain_full: str = get_full_domain(domain, port) + handle: str = http_prefix + "://" + domain_full + "/@" + nickname + wf_request: dict = \ webfinger_handle(session, handle, http_prefix, cached_webfingers, domain, project_version, debug, False, signing_priv_key_pem, mitm_servers) @@ -5980,19 +5984,19 @@ def get_public_post_info(session, base_dir: str, nickname: str, domain: str, nickname, domain, 'outbox', 13863, system_language, mitm_servers) - max_posts = 64 - post_domains = \ + max_posts: int = 64 + post_domains: list[str] = \ get_post_domains(session, person_url, max_posts, debug, project_version, http_prefix, domain, word_frequency, [], system_language, signing_priv_key_pem, mitm_servers) post_domains.sort() - domains_info = {} + domains_info: dict = {} for pdomain in post_domains: if not domains_info.get(pdomain): domains_info[pdomain]: list[str] = [] - blocked_posts = \ + blocked_posts: dict = \ _get_posts_for_blocked_domains(base_dir, session, person_url, max_posts, debug, @@ -6017,8 +6021,8 @@ def get_public_post_domains_blocked(session, base_dir: str, """ Returns a list of domains referenced within public posts which are globally blocked on this instance """ - origin_domain = domain - post_domains = \ + origin_domain: str = domain + post_domains: list[str] = \ get_public_post_domains(session, base_dir, nickname, domain, origin_domain, proxy_type, port, http_prefix, @@ -6028,12 +6032,12 @@ def get_public_post_domains_blocked(session, base_dir: str, if not post_domains: return [] - blocking_filename = data_dir(base_dir) + '/blocking.txt' + blocking_filename: str = data_dir(base_dir) + '/blocking.txt' if not is_a_file(blocking_filename): return [] # read the blocked domains as a single string - blocked_str = \ + blocked_str: str = \ load_string(blocking_filename, 'EX: get_public_post_domains_blocked unable to read ' + blocking_filename) @@ -6060,9 +6064,9 @@ def _get_non_mutuals_of_person(base_dir: str, """Returns the followers who are not mutuals of a person i.e. accounts which follow you but you don't follow them """ - followers = \ + followers: list[str] = \ get_followers_list(base_dir, nickname, domain, 'followers.txt') - following = \ + following: list[str] = \ get_followers_list(base_dir, nickname, domain, 'following.txt') non_mutuals: list[str] = [] for handle in followers: @@ -6082,13 +6086,15 @@ def check_domains(session, base_dir: str, """Checks follower accounts for references to globally blocked domains """ word_frequency: dict = {} - non_mutuals = _get_non_mutuals_of_person(base_dir, nickname, domain) + non_mutuals: list[str] = \ + _get_non_mutuals_of_person(base_dir, nickname, domain) if not non_mutuals: print('No non-mutual followers were found') return - follower_warning_filename = data_dir(base_dir) + '/followerWarnings.txt' + follower_warning_filename: str = \ + data_dir(base_dir) + '/followerWarnings.txt' update_follower_warnings: bool = False - follower_warning_str = '' + follower_warning_str: str = '' if is_a_file(follower_warning_filename): follower_warning_str = \ load_string(follower_warning_filename, @@ -6099,12 +6105,12 @@ def check_domains(session, base_dir: str, if single_check: # checks a single random non-mutual - index = random.randrange(0, len(non_mutuals)) - handle = non_mutuals[index] + index: int = random.randrange(0, len(non_mutuals)) + handle: str = non_mutuals[index] if '@' in handle: - non_mutual_nickname = handle.split('@')[0] - non_mutual_domain = handle.split('@')[1].strip() - blocked_domains = \ + non_mutual_nickname: str = handle.split('@')[0] + non_mutual_domain: str = handle.split('@')[1].strip() + blocked_domains: list[str] = \ get_public_post_domains_blocked(session, base_dir, non_mutual_nickname, non_mutual_domain, @@ -6125,9 +6131,9 @@ def check_domains(session, base_dir: str, continue if handle in follower_warning_str: continue - non_mutual_nickname = handle.split('@')[0] - non_mutual_domain = handle.split('@')[1].strip() - blocked_domains = \ + non_mutual_nickname: str = handle.split('@')[0] + non_mutual_domain: str = handle.split('@')[1].strip() + blocked_domains: list[str] = \ get_public_post_domains_blocked(session, base_dir, non_mutual_nickname, non_mutual_domain, @@ -6156,9 +6162,9 @@ def check_domains(session, base_dir: str, def populate_replies_json(base_dir: str, nickname: str, domain: str, post_replies_filename: str, authorized: bool, replies_json: {}) -> None: - pub_str = 'https://www.w3.org/ns/activitystreams#Public' + pub_str: str = 'https://www.w3.org/ns/activitystreams#Public' # populate the items list with replies - replies_boxes = ('outbox', 'inbox') + replies_boxes: list[str] = ('outbox', 'inbox') replies_list: list[str] = \ load_list(post_replies_filename, 'EX: populate_replies_json unable to read ' + @@ -6168,8 +6174,8 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str, reply_found: bool = False # examine inbox and outbox for boxname in replies_boxes: - message_id2 = remove_eol(message_id) - search_filename = \ + message_id2: str = remove_eol(message_id) + search_filename: str = \ acct_dir(base_dir, nickname, domain) + '/' + \ boxname + '/' + \ message_id2.replace('/', '#') + '.json' @@ -6195,8 +6201,8 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str, # if not in either inbox or outbox then examine the # shared inbox if not reply_found: - message_id2 = remove_eol(message_id) - search_filename = \ + message_id2: str = remove_eol(message_id) + search_filename: str = \ data_dir(base_dir) + '/inbox@' + \ domain + '/inbox/' + \ message_id2.replace('/', '#') + '.json' @@ -6264,19 +6270,19 @@ def download_announce(session, base_dir: str, http_prefix: str, if not isinstance(post_json_object['object'], str): return None # ignore self-boosts - actor_url = get_actor_from_post(post_json_object) + actor_url: str = get_actor_from_post(post_json_object) if actor_url in post_json_object['object']: return None # get the announced post - announce_cache_dir = base_dir + '/cache/announce/' + nickname + announce_cache_dir: str = base_dir + '/cache/announce/' + nickname if not is_a_dir(announce_cache_dir): makedir(announce_cache_dir) - post_id = None + post_id: str = None if post_json_object.get('id'): post_id = remove_id_ending(post_json_object['id']) - announce_filename = \ + announce_filename: str = \ announce_cache_dir + '/' + \ post_json_object['object'].replace('/', '#') + '.json' @@ -6291,22 +6297,22 @@ def download_announce(session, base_dir: str, http_prefix: str, if post_json_object: return post_json_object else: - profile_str = 'https://www.w3.org/ns/activitystreams' - accept_str = \ + profile_str: str = 'https://www.w3.org/ns/activitystreams' + accept_str: str = \ 'application/activity+json; ' + \ 'profile="' + profile_str + '"' - as_header = { + as_header: dict = { 'Accept': accept_str } if '/channel/' in actor_url or \ '/accounts/' in actor_url: - accept_str = \ + accept_str: str = \ 'application/ld+json; ' + \ 'profile="' + profile_str + '"' - as_header = { + as_header: dict = { 'Accept': accept_str } - actor_nickname = get_nickname_from_actor(actor_url) + actor_nickname: str = get_nickname_from_actor(actor_url) if not actor_nickname: print('WARN: download_announce no actor_nickname') return None @@ -6328,7 +6334,8 @@ def download_announce(session, base_dir: str, http_prefix: str, print('Announce download blocked actor: ' + actor_nickname + '@' + actor_domain) return None - object_nickname = get_nickname_from_actor(post_json_object['object']) + object_nickname: str = \ + get_nickname_from_actor(post_json_object['object']) object_domain, _ = \ get_domain_from_actor(post_json_object['object']) if not object_domain: @@ -6355,7 +6362,7 @@ def download_announce(session, base_dir: str, http_prefix: str, if debug: print('Downloading Announce content for ' + post_json_object['object']) - announced_json = \ + announced_json: dict = \ get_json(signing_priv_key_pem, session, post_json_object['object'], as_header, None, debug, mitm_servers, @@ -6391,11 +6398,11 @@ def download_announce(session, base_dir: str, http_prefix: str, recent_posts_cache, debug) return None - announced_actor = announced_id + announced_actor: str = announced_id if announced_json.get('attributedTo'): announced_actor = get_attributed_to(announced_json['attributedTo']) - announced_json_str = str(announced_json) + announced_json_str: str = str(announced_json) if not announced_json.get('type'): print('WARN: announced post does not have a type ' + announced_json_str) @@ -6435,7 +6442,7 @@ def download_announce(session, base_dir: str, http_prefix: str, return None if announced_json['type'] == 'Video': - converted_json = \ + converted_json: dict = \ convert_video_to_note(base_dir, nickname, domain, system_language, announced_json, blocked_cache, @@ -6444,7 +6451,7 @@ def download_announce(session, base_dir: str, http_prefix: str, if converted_json: announced_json = converted_json if announced_json['type'] == 'Torrent': - converted_json = \ + converted_json: dict = \ convert_torrent_to_note(base_dir, nickname, domain, system_language, announced_json, blocked_cache, @@ -6534,7 +6541,7 @@ def download_announce(session, base_dir: str, http_prefix: str, return None # Check the content of the announce convert_post_content_to_html(announced_json) - content_str = announced_json['content'] + content_str: str = announced_json['content'] using_content_map: bool = False if 'contentMap' in announced_json: if announced_json['contentMap'].get(system_language): @@ -6548,11 +6555,11 @@ def download_announce(session, base_dir: str, http_prefix: str, recent_posts_cache, debug) return None - summary_str = \ + summary_str: str = \ get_summary_from_post(announced_json, system_language, []) - media_descriptions = \ + media_descriptions: str = \ get_media_descriptions_from_post(announced_json) - content_all = content_str + content_all: str = content_str if summary_str: content_all = \ summary_str + ' ' + content_str + ' ' + media_descriptions @@ -6680,13 +6687,13 @@ def is_image_media(session, base_dir: str, http_prefix: str, """Returns true if the given post has attached image media """ if post_json_object['type'] == 'Announce': - blocked_cache = {} + blocked_cache: dict = {} block_federated: list[str] = [] - block_military = {} - block_government = {} - block_bluesky = {} - block_nostr = {} - post_json_announce = \ + block_military: dict = {} + block_government: dict = {} + block_bluesky: dict = {} + block_nostr: dict = {} + post_json_announce: dict = \ download_announce(session, base_dir, http_prefix, nickname, domain, post_json_object, __version__, @@ -6752,9 +6759,10 @@ def post_is_muted(base_dir: str, nickname: str, domain: str, post_json_object: {}, message_id: str) -> bool: """ Returns true if the given post is muted """ - is_muted = None + is_muted: bool = None if 'muted' in post_json_object: - is_muted = post_json_object['muted'] + if isinstance(post_json_object['muted'], bool): + is_muted = post_json_object['muted'] if is_muted is True or is_muted is False: return is_muted