From 92a33c48104cc64320fa345570d14aa32b20b854 Mon Sep 17 00:00:00 2001 From: bashrc Date: Tue, 28 Apr 2026 21:11:47 +0100 Subject: [PATCH] Variable types --- utils.py | 386 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 200 insertions(+), 186 deletions(-) diff --git a/utils.py b/utils.py index 808d9d34f..8af1671b7 100644 --- a/utils.py +++ b/utils.py @@ -1936,10 +1936,10 @@ def _is_bookmarked(base_dir: str, nickname: str, domain: str, post_filename: str) -> bool: """Returns True if the given post is bookmarked """ - bookmarks_index_filename = \ + bookmarks_index_filename: str = \ acct_dir(base_dir, nickname, domain) + '/bookmarks.index' if os.path.isfile(bookmarks_index_filename): - bookmark_index = post_filename.split('/')[-1] + '\n' + bookmark_index: str = post_filename.split('/')[-1] + '\n' if text_in_file(bookmark_index, bookmarks_index_filename): return True return False @@ -1958,7 +1958,7 @@ def remove_post_from_cache(post_json_object: {}, if not recent_posts_cache.get('index'): return - post_id = post_json_object['id'] + post_id: str = post_json_object['id'] if '#' in post_id: post_id = post_id.split('#', 1)[0] post_id = remove_id_ending(post_id).replace('/', '#') @@ -1982,7 +1982,7 @@ def delete_cached_html(base_dir: str, nickname: str, domain: str, post_json_object: {}) -> None: """Removes cached html file for the given post """ - cached_post_filename = \ + cached_post_filename: str = \ get_cached_post_filename(base_dir, nickname, domain, post_json_object) if not cached_post_filename: return @@ -2061,7 +2061,7 @@ def _delete_hashtags_on_post(base_dir: str, post_json_object: {}) -> None: return # get the id of the post - post_id = remove_id_ending(post_json_object['object']['id']) + post_id: str = remove_id_ending(post_json_object['object']['id']) for tag in post_json_object['object']['tag']: if not tag.get('type'): continue @@ -2070,11 +2070,13 @@ def _delete_hashtags_on_post(base_dir: str, post_json_object: {}) -> None: if not tag.get('name'): continue # find the index file for this tag - tag_map_filename = base_dir + '/tagmaps/' + tag['name'][1:] + '.txt' + tag_map_filename: str = \ + base_dir + '/tagmaps/' + tag['name'][1:] + '.txt' if os.path.isfile(tag_map_filename): _remove_post_id_from_tag_index(tag_map_filename, post_id) # find the index file for this tag - tag_index_filename = base_dir + '/tags/' + tag['name'][1:] + '.txt' + tag_index_filename: str = \ + base_dir + '/tags/' + tag['name'][1:] + '.txt' if os.path.isfile(tag_index_filename): _remove_post_id_from_tag_index(tag_index_filename, post_id) @@ -2093,22 +2095,22 @@ def _delete_conversation_post(base_dir: str, nickname: str, domain: str, return False if not post_json_object['object'].get('id'): return False - conversation_dir = \ + conversation_dir: str = \ acct_dir(base_dir, nickname, domain) + '/conversation' if post_json_object['object'].get('conversation'): - conversation_id = post_json_object['object']['conversation'] + conversation_id: str = post_json_object['object']['conversation'] elif post_json_object['object'].get('context'): - conversation_id = post_json_object['object']['context'] + conversation_id: str = post_json_object['object']['context'] else: - conversation_id = post_json_object['object']['thread'] + conversation_id: str = post_json_object['object']['thread'] if not isinstance(conversation_id, str): return False conversation_id = conversation_id.replace('/', '#') - post_id = post_json_object['object']['id'] - conversation_filename = conversation_dir + '/' + conversation_id + post_id: str = post_json_object['object']['id'] + conversation_filename: str = conversation_dir + '/' + conversation_id if not os.path.isfile(conversation_filename): return False - conversation_str = \ + conversation_str: str = \ load_string(conversation_filename, 'EX: _delete_conversation_post unable to read ' + conversation_filename) @@ -2152,7 +2154,7 @@ def is_dm(post_json_object: {}) -> bool: return False if post_json_object['object'].get('moderationStatus'): return False - fields = ('to', 'cc') + fields: list[str] = ('to', 'cc') for field_name in fields: if not post_json_object['object'].get(field_name): continue @@ -2175,11 +2177,11 @@ def _is_remote_dm(domain_full: str, post_json_object: {}) -> bool: """ if not is_dm(post_json_object): return False - this_post_json = post_json_object + this_post_json: dict = post_json_object if has_object_dict(post_json_object): this_post_json = post_json_object['object'] if this_post_json.get('attributedTo'): - attrib = get_attributed_to(this_post_json['attributedTo']) + attrib: str = get_attributed_to(this_post_json['attributedTo']) if attrib: if '://' + domain_full not in attrib: return True @@ -2190,7 +2192,7 @@ def get_gemini_blog_title(message_json: dict, system_language: str) -> str: """Returns the title for a gemini blog post """ title_text: str = '' - title_str = get_summary_from_post(message_json, system_language, []) + title_str: str = get_summary_from_post(message_json, system_language, []) if title_str: title_text = remove_html(title_str) return title_text @@ -2200,7 +2202,7 @@ def get_gemini_blog_published(message_json: dict, debug: bool) -> str: """Returns the published date for a gemini blog post """ # get the publication date - obj = message_json + obj: dict = message_json if has_object_dict(message_json): obj = message_json['object'] if not obj.get('published'): @@ -2227,22 +2229,22 @@ def get_gemini_blog_filename(base_dir: str, nickname: str, domain: str, debug: bool, testing: bool) -> str: """Returns the filename for a gemini blog post """ - title_text = get_gemini_blog_title(message_json, system_language) - published = get_gemini_blog_published(message_json, debug) + title_text: str = get_gemini_blog_title(message_json, system_language) + published: str = get_gemini_blog_published(message_json, debug) if not published: return '' - title_text2 = title_text.replace('.', ' ') + title_text2: str = title_text.replace('.', ' ') title_text2 = title_text2.replace(' ', '_') title_text2 = title_text2.replace('"', '') if not testing: - account_dir = acct_dir(base_dir, nickname, domain) - gemini_blog_dir = account_dir + '/gemini' + account_dir: str = acct_dir(base_dir, nickname, domain) + gemini_blog_dir: str = account_dir + '/gemini' else: - account_dir = base_dir - gemini_blog_dir = account_dir + '/geminitest' + account_dir: str = base_dir + gemini_blog_dir: str = account_dir + '/geminitest' - gemini_blog_filename = \ + gemini_blog_filename: str = \ gemini_blog_dir + '/' + published + '_' + title_text2.lower() + '.gmi' return gemini_blog_filename @@ -2252,22 +2254,22 @@ def get_markdown_blog_filename(base_dir: str, nickname: str, domain: str, debug: bool, testing: bool) -> str: """Returns the filename for a markdown blog post """ - title_text = get_gemini_blog_title(message_json, system_language) - published = get_gemini_blog_published(message_json, debug) + title_text: str = get_gemini_blog_title(message_json, system_language) + published: str = get_gemini_blog_published(message_json, debug) if not published: return '' - title_text2 = title_text.replace('.', ' ') + title_text2: str = title_text.replace('.', ' ') title_text2 = title_text2.replace(' ', '_') title_text2 = title_text2.replace('"', '') if not testing: - account_dir = acct_dir(base_dir, nickname, domain) - markdown_blog_dir = account_dir + '/markdown' + account_dir: str = acct_dir(base_dir, nickname, domain) + markdown_blog_dir: str = account_dir + '/markdown' else: - account_dir = base_dir - markdown_blog_dir = account_dir + '/markdowntest' + account_dir: str = base_dir + markdown_blog_dir: str = account_dir + '/markdowntest' - markdown_blog_filename = \ + markdown_blog_filename: str = \ markdown_blog_dir + '/' + published + '_' + title_text2.lower() + '.md' return markdown_blog_filename @@ -2277,22 +2279,22 @@ def get_micron_blog_filename(base_dir: str, nickname: str, domain: str, debug: bool, testing: bool) -> str: """Returns the filename for a micron blog post """ - title_text = get_gemini_blog_title(message_json, system_language) - published = get_gemini_blog_published(message_json, debug) + title_text: str = get_gemini_blog_title(message_json, system_language) + published: str = get_gemini_blog_published(message_json, debug) if not published: return '' - title_text2 = title_text.replace('.', ' ') + title_text2: str = title_text.replace('.', ' ') title_text2 = title_text2.replace(' ', '_') title_text2 = title_text2.replace('"', '') if not testing: - account_dir = acct_dir(base_dir, nickname, domain) - micron_blog_dir = account_dir + '/micron' + account_dir: str = acct_dir(base_dir, nickname, domain) + micron_blog_dir: str = account_dir + '/micron' else: - account_dir = base_dir - micron_blog_dir = account_dir + '/microntest' + account_dir: str = base_dir + micron_blog_dir: str = account_dir + '/microntest' - micron_blog_filename = \ + micron_blog_filename: str = \ micron_blog_dir + '/' + published + '_' + title_text2.lower() + '.mu' return micron_blog_filename @@ -2304,7 +2306,7 @@ def delete_post(base_dir: str, http_prefix: str, """Recursively deletes a post and its replies and attachments Returns True if deleted """ - post_json_object = load_json(post_filename) + post_json_object: dict = load_json(post_filename) if not post_json_object: # remove any replies _delete_post_remove_replies(base_dir, nickname, domain, @@ -2343,7 +2345,7 @@ def delete_post(base_dir: str, http_prefix: str, return False # delete gemini blog post - gemini_blog_filename = \ + gemini_blog_filename: str = \ get_gemini_blog_filename(base_dir, nickname, domain, post_json_object, '', debug, False) @@ -2358,7 +2360,7 @@ def delete_post(base_dir: str, http_prefix: str, str(gemini_blog_filename)) # delete markdown blog post - markdown_blog_filename = \ + markdown_blog_filename: str = \ get_markdown_blog_filename(base_dir, nickname, domain, post_json_object, '', debug, False) @@ -2373,7 +2375,7 @@ def delete_post(base_dir: str, http_prefix: str, str(markdown_blog_filename)) # delete micron blog post - micron_blog_filename = \ + micron_blog_filename: str = \ get_micron_blog_filename(base_dir, nickname, domain, post_json_object, '', debug, False) @@ -2397,11 +2399,11 @@ def delete_post(base_dir: str, http_prefix: str, _remove_attachment(base_dir, http_prefix, nickname, domain, post_json_object) - extensions = ( + extensions: list[str] = ( 'votes', 'arrived', 'muted', 'tts', 'reject', 'mitm', 'edits', 'seen' ) for ext in extensions: - ext_filename = post_filename + '.' + ext + ext_filename: str = post_filename + '.' + ext if os.path.isfile(ext_filename): try: os.remove(ext_filename) @@ -2429,7 +2431,7 @@ def delete_post(base_dir: str, http_prefix: str, if has_object_dict(post_json_object): if post_json_object['object'].get('moderationStatus'): if post_json_object.get('id'): - post_id = remove_id_ending(post_json_object['id']) + post_id: str = remove_id_ending(post_json_object['id']) remove_moderation_post_from_index(base_dir, post_id, debug) # remove any hashtags index entries @@ -2455,7 +2457,7 @@ def _is_valid_language(text: str) -> bool: """Returns true if the given text contains a valid natural language string """ - natural_languages = { + natural_languages: dict = { "Latin": [65, 866], "Greek": [880, 1280], "isArmenian": [1328, 1424], @@ -2546,7 +2548,7 @@ def _get_reserved_words() -> str: def get_nickname_validation_pattern() -> str: """Returns a html text input validation pattern for nickname """ - reserved_names = _get_reserved_words() + reserved_names: list[str] = _get_reserved_words() pattern: str = '' for word in reserved_names: if pattern: @@ -2559,9 +2561,11 @@ def get_nickname_validation_pattern() -> str: def _is_reserved_name(nickname: str) -> bool: """Is the given nickname reserved for some special function? """ - reserved_names = ('users', 'accounts', 'profile', 'statuses', - 'search', 'channel', - 'http', 'https', 'ipfs', 'ipns') + reserved_names: list[str] = ( + 'users', 'accounts', 'profile', 'statuses', + 'search', 'channel', + 'http', 'https', 'ipfs', 'ipns' + ) if nickname in reserved_names: return True return False @@ -2578,7 +2582,7 @@ def valid_nickname(domain: str, nickname: str) -> bool: return False if not _is_valid_language(nickname): return False - forbidden_chars = (' ', '/', ':', '@') + forbidden_chars: list[str] = (' ', '/', ':', '@') for char in forbidden_chars: if char in nickname: return False @@ -2598,7 +2602,7 @@ def no_of_accounts(base_dir: str) -> bool: """Returns the number of accounts on the system """ account_ctr: int = 0 - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for account in dirs: if is_account_dir(account): @@ -2612,17 +2616,17 @@ def no_of_active_accounts_monthly(base_dir: str, months: int) -> bool: """ account_ctr: int = 0 curr_time = int(time.time()) - month_seconds = int(60*60*24*30*months) - dir_str = data_dir(base_dir) + month_seconds: int = int(60*60*24*30*months) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for account in dirs: if not is_account_dir(account): continue - last_used_filename = \ + last_used_filename: str = \ dir_str + '/' + account + '/.lastUsed' if not os.path.isfile(last_used_filename): continue - last_used = \ + last_used: str = \ load_string(last_used_filename, 'EX: no_of_active_accounts_monthly ' + 'unable to read ' + last_used_filename) @@ -2651,7 +2655,8 @@ def get_cached_post_directory(base_dir: str, nickname: str, domain: str) -> str: """Returns the directory where the html post cache exists """ - html_post_cache_dir = acct_dir(base_dir, nickname, domain) + '/postcache' + html_post_cache_dir: str = \ + acct_dir(base_dir, nickname, domain) + '/postcache' return html_post_cache_dir @@ -2659,15 +2664,16 @@ def get_cached_post_filename(base_dir: str, nickname: str, domain: str, post_json_object: {}) -> str: """Returns the html cache filename for the given post """ - cached_post_dir = get_cached_post_directory(base_dir, nickname, domain) + cached_post_dir: str = \ + get_cached_post_directory(base_dir, nickname, domain) if not os.path.isdir(cached_post_dir): # print('ERROR: invalid html cache directory ' + cached_post_dir) return None if '@' not in cached_post_dir: # print('ERROR: invalid html cache directory ' + cached_post_dir) return None - cached_post_id = remove_id_ending(post_json_object['id']) - cached_post_filename = \ + cached_post_id: str = remove_id_ending(post_json_object['id']) + cached_post_filename: str = \ cached_post_dir + '/' + cached_post_id.replace('/', '#') return cached_post_filename + '.html' @@ -2688,8 +2694,8 @@ def get_css(css_filename: str) -> str: if not os.path.isfile(css_filename): return None - css = load_string(css_filename, - 'EX: get_css unable to read ' + css_filename) + css: str = load_string(css_filename, + 'EX: get_css unable to read ' + css_filename) if css: return css return None @@ -2748,21 +2754,21 @@ def reject_post_id(base_dir: str, nickname: str, domain: str, """ Marks the given post as rejected, for example an announce which is too old """ - post_filename = locate_post(base_dir, nickname, domain, post_id) + post_filename: str = locate_post(base_dir, nickname, domain, post_id) if not post_filename: return post_url = None if recent_posts_cache.get('index'): # if this is a full path then remove the directories - index_filename = post_filename + index_filename: str = post_filename if '/' in post_filename: index_filename = post_filename.split('/')[-1] # filename of the post without any extension or path # This should also correspond to any index entry in # the posts cache - post_url = remove_eol(index_filename) + post_url: str = remove_eol(index_filename) post_url = post_url.replace('.json', '').strip() if post_url in recent_posts_cache['index']: @@ -2776,15 +2782,15 @@ def reject_post_id(base_dir: str, nickname: str, domain: str, post_filename + '.reject') # if the post is in the inbox index then remove it - index_file = \ + index_file: str = \ acct_dir(base_dir, nickname, domain) + '/inbox.index' if not post_url: - index_filename = post_filename + index_filename: str = post_filename if '/' in post_filename: index_filename = post_filename.split('/')[-1] - post_url = remove_eol(index_filename) + post_url: str = remove_eol(index_filename) post_url = post_url.replace('.json', '').strip() - post_url2 = post_url.replace('/', '#') + '.json' + post_url2: str = post_url.replace('/', '#') + '.json' remove_post_from_index(post_url2, debug, index_file) @@ -2795,22 +2801,22 @@ def load_translations_from_file(base_dir: str, language: str) -> ({}, str): print('ERROR: translations directory not found') return None, None if not language: - system_language = locale.getlocale()[0] + system_language: str = locale.getlocale()[0] else: - system_language = language + system_language: str = language if not system_language: - system_language = 'en' + system_language: str = 'en' if '_' in system_language: - system_language = system_language.split('_')[0] + system_language: str = system_language.split('_')[0] while '/' in system_language: - system_language = system_language.split('/')[1] + system_language: str = system_language.split('/')[1] if '.' in system_language: - system_language = system_language.split('.')[0] - translations_file = base_dir + '/translations/' + \ + system_language: str = system_language.split('.')[0] + translations_file: str = base_dir + '/translations/' + \ system_language + '.json' if not os.path.isfile(translations_file): - system_language = 'en' - translations_file = base_dir + '/translations/' + \ + system_language: str = 'en' + translations_file: str = base_dir + '/translations/' + \ system_language + '.json' return load_json(translations_file), system_language @@ -2824,7 +2830,7 @@ def dm_allowed_from_domain(base_dir: str, i.e. Mostly you only want DMs from followers, but there are a few particular instances that you trust """ - dm_allowed_instances_file = \ + dm_allowed_instances_file: str = \ acct_dir(base_dir, nickname, domain) + '/dmAllowedInstances.txt' if not os.path.isfile(dm_allowed_instances_file): return False @@ -2873,7 +2879,7 @@ def get_alt_path(actor: str, domain_full: str, calling_domain: str) -> str: """Returns alternate path from the actor eg. https://clearnetdomain/path becomes http://oniondomain/path """ - post_actor = actor + post_actor: str = actor if calling_domain not in actor and domain_full in actor: if calling_domain.endswith('.onion') or \ calling_domain.endswith('.i2p'): @@ -2888,7 +2894,7 @@ def get_actor_property_url(actor_json: {}, property_name: str) -> str: """ if not actor_json.get('attachment'): return '' - property_name = property_name.lower() + property_name: str = property_name.lower() for property_value in actor_json['attachment']: if not isinstance(property_value, dict): print("WARN: actor attachment is not dict: " + str(property_value)) @@ -2976,7 +2982,7 @@ def valid_url_prefix(url: str) -> bool: """ if '/' not in url: return False - prefixes = ('https:', 'http:', 'hyper:', 'i2p:', 'gnunet:') + prefixes: list[str] = ('https:', 'http:', 'hyper:', 'i2p:', 'gnunet:') for pre in prefixes: if url.startswith(pre): return True @@ -3064,13 +3070,13 @@ def get_currencies() -> {}: def get_supported_languages(base_dir: str) -> []: """Returns a list of supported languages """ - translations_dir = base_dir + '/translations' + translations_dir: str = base_dir + '/translations' languages_str: list[str] = [] for _, _, files in os.walk(translations_dir): for fname in files: if not fname.endswith('.json'): continue - lang = fname.split('.')[0] + lang: str = fname.split('.')[0] if len(lang) == 2: languages_str.append(lang) break @@ -3080,7 +3086,7 @@ def get_supported_languages(base_dir: str) -> []: def get_category_types(base_dir: str) -> []: """Returns the list of ontologies """ - ontology_dir = base_dir + '/ontology' + ontology_dir: str = base_dir + '/ontology' categories: list[str] = [] for _, _, files in os.walk(ontology_dir): for fname in files: @@ -3090,7 +3096,7 @@ def get_category_types(base_dir: str) -> []: continue if fname.startswith('custom'): continue - ontology_filename = fname.split('.')[0] + ontology_filename: str = fname.split('.')[0] if 'Types' in ontology_filename: categories.append(ontology_filename.replace('Types', '')) break @@ -3143,13 +3149,13 @@ def has_actor(post_json_object: {}, debug: bool) -> bool: """Does the given post have an actor? """ if post_json_object.get('actor'): - actor_url = get_actor_from_post(post_json_object) + actor_url: str = get_actor_from_post(post_json_object) if '#' in actor_url or not actor_url: return False return True if debug: if post_json_object.get('type'): - msg = post_json_object['type'] + ' has missing actor' + msg: str = post_json_object['type'] + ' has missing actor' if post_json_object.get('id'): msg += ' ' + post_json_object['id'] print(msg) @@ -3248,8 +3254,8 @@ def valid_hash_tag(hashtag: str) -> bool: def load_bold_reading(base_dir: str) -> {}: """Returns a dictionary containing the bold reading status for each account """ - bold_reading = {} - dir_str = data_dir(base_dir) + bold_reading: dict = {} + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: if '@' not in acct: @@ -3267,8 +3273,8 @@ def load_bold_reading(base_dir: str) -> {}: def load_hide_follows(base_dir: str) -> {}: """Returns a dictionary containing the hide follows status for each account """ - hide_follows = {} - dir_str = data_dir(base_dir) + hide_follows: dict = {} + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: if '@' not in acct: @@ -3287,18 +3293,18 @@ def load_hide_recent_posts(base_dir: str) -> {}: """Returns a dictionary containing the hide recent posts status for each account """ - hide_recent_posts = {} - dir_str = data_dir(base_dir) + hide_recent_posts: dict = {} + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: if '@' not in acct: continue if acct.startswith('inbox@') or acct.startswith('Actor@'): continue - hide_recent_posts_filename = \ + hide_recent_posts_filename: str = \ dir_str + '/' + acct + '/.hideRecentPosts' if os.path.isfile(hide_recent_posts_filename): - nickname = acct.split('@')[0] + nickname: str = acct.split('@')[0] hide_recent_posts[nickname] = True break return hide_recent_posts @@ -3397,7 +3403,7 @@ def _is_yggdrasil_request(calling_domain: str, referer_domain: str, def disallow_reply(content: str) -> bool: """Are replies not allowed for the given post? """ - disallow_strings = ( + disallow_strings: list[str] = ( ':reply_no:', ':noreply:', ':noreplies:', @@ -3413,7 +3419,7 @@ def disallow_reply(content: str) -> bool: 'dontatme', 'noresponses' ) - content_lower = content.lower() + content_lower: str = content.lower() for diss in disallow_strings: if diss in content_lower: return True @@ -3451,7 +3457,7 @@ def safe_system_string(text: str) -> str: def get_json_content_from_accept(accept: str) -> str: """returns the json content type for the given accept """ - protocol_str = 'application/json' + protocol_str: str = 'application/json' if accept: if 'application/ld+json' in accept: protocol_str = 'application/ld+json' @@ -3472,7 +3478,7 @@ def load_min_images_for_accounts(base_dir: str) -> []: be minimized by default """ min_images_for_accounts: list[str] = [] - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for subdir, dirs, _ in os.walk(dir_str): for account in dirs: if not is_account_dir(account): @@ -3491,7 +3497,8 @@ def set_minimize_all_images(base_dir: str, """Add of remove a file indicating that all images for an account should be minimized by default """ - filename = acct_dir(base_dir, nickname, domain) + '/.minimize_all_images' + filename: str = \ + acct_dir(base_dir, nickname, domain) + '/.minimize_all_images' if minimize: if nickname not in min_images_for_accounts: min_images_for_accounts.append(nickname) @@ -3515,14 +3522,14 @@ def load_reverse_timeline(base_dir: str) -> []: see reversed timelines """ reverse_sequence: list[str] = [] - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: if not is_account_dir(acct): continue - nickname = acct.split('@')[0] - domain = acct.split('@')[1] - reverse_filename = \ + nickname: str = acct.split('@')[0] + domain: str = acct.split('@')[1] + reverse_filename: str = \ acct_dir(base_dir, nickname, domain) + '/.reverse_timeline' if os.path.isfile(reverse_filename): if nickname not in reverse_sequence: @@ -3535,14 +3542,14 @@ def save_reverse_timeline(base_dir: str, reverse_sequence: []) -> None: """Saves flags for each user indicating whether they prefer to see reversed timelines """ - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: if not is_account_dir(acct): continue - nickname = acct.split('@')[0] - domain = acct.split('@')[1] - reverse_filename = \ + nickname: str = acct.split('@')[0] + domain: str = acct.split('@')[1] + reverse_filename: str = \ acct_dir(base_dir, nickname, domain) + '/.reverse_timeline' if nickname in reverse_sequence: if not os.path.isfile(reverse_filename): @@ -3564,9 +3571,13 @@ def license_link_from_name(license_name: str) -> str: """ if '://' in license_name: return license_name - value_upper = license_name.upper() - cc_strings1 = ('CC-BY-SA-NC', 'CC-BY-NC-SA', 'CC BY SA NC', 'CC BY NC SA') - cc_strings2 = ('CC-BY-SA', 'CC-SA-BY', 'CC BY SA', 'CC SA BY') + value_upper: str = license_name.upper() + cc_strings1: list[str] = ( + 'CC-BY-SA-NC', 'CC-BY-NC-SA', 'CC BY SA NC', 'CC BY NC SA' + ) + cc_strings2: list[str] = ( + 'CC-BY-SA', 'CC-SA-BY', 'CC BY SA', 'CC SA BY' + ) if string_contains(value_upper, cc_strings1): value = 'https://creativecommons.org/licenses/by-nc-sa/4.0' elif string_contains(value_upper, cc_strings2): @@ -3635,7 +3646,7 @@ def unescaped_text(txt: str) -> str: def valid_content_warning(summary: str) -> str: """Returns a validated content warning """ - cw_str = remove_html(summary) + cw_str: str = remove_html(summary) # hashtags within content warnings apparently cause a lot of trouble # so remove them if '#' in cw_str: @@ -3649,17 +3660,17 @@ def harmless_markup(post_json_object: {}) -> None: if not isinstance(post_json_object['object'], dict): return - remove_trash = [' id="wordads-inline-marker"'] + remove_trash: list[str] = [' id="wordads-inline-marker"'] for field_name in ('content', 'summary'): if post_json_object['object'].get(field_name): # tidy up content warnings if field_name == 'summary': - summary = post_json_object['object'][field_name] + summary: str = post_json_object['object'][field_name] post_json_object['object'][field_name] = \ valid_content_warning(summary) - text = post_json_object['object'][field_name] + text: str = post_json_object['object'][field_name] # take out the trash for trash in remove_trash: @@ -3673,10 +3684,10 @@ def harmless_markup(post_json_object: {}) -> None: post_json_object['object'][field_name] = \ remove_markup_tag(text, 'pre') - map_name = field_name + 'Map' + map_name: str = field_name + 'Map' if post_json_object['object'].get(map_name): if isinstance(post_json_object['object'][map_name], dict): - map_dict = post_json_object['object'][map_name].items() + map_dict: dict = post_json_object['object'][map_name].items() for lang, content in map_dict: if not isinstance(content, str): continue @@ -3685,7 +3696,8 @@ def harmless_markup(post_json_object: {}) -> None: if field_name == 'summary': post_json_object['object'][map_name][lang] = \ valid_content_warning(content) - content = post_json_object['object'][map_name][lang] + content: str = \ + post_json_object['object'][map_name][lang] # take out the trash for trash in remove_trash: @@ -3695,10 +3707,10 @@ def harmless_markup(post_json_object: {}) -> None: # remove things which would cause display issues if dangerous_markup(content, False, ['pre']): - content = remove_html(content) + content: str = remove_html(content) post_json_object['object'][map_name][lang] = \ content - content = post_json_object['object'][map_name][lang] + content: str = post_json_object['object'][map_name][lang] post_json_object['object'][map_name][lang] = \ remove_markup_tag(content, 'pre') else: @@ -3725,7 +3737,7 @@ def ap_proxy_type(json_object: {}) -> str: def language_right_to_left(language: str) -> bool: """is the given language written from right to left? """ - rtl_languages = ('ar', 'fa', 'he', 'yi') + rtl_languages: list[str] = ('ar', 'fa', 'he', 'yi') if language in rtl_languages: return True return False @@ -3736,7 +3748,7 @@ def binary_is_image(filename: str, media_binary) -> bool: """ if len(media_binary) < 13: return False - filename_lower = filename.lower() + filename_lower: str = filename.lower() bin_is_image: bool = False if filename_lower.endswith('.jpeg') or filename_lower.endswith('jpg'): if media_binary[6:10] in (b'JFIF', b'Exif'): @@ -3772,12 +3784,12 @@ def get_status_count(base_dir: str) -> int: """Get the total number of posts """ status_ctr: int = 0 - accounts_dir = data_dir(base_dir) + accounts_dir: str = data_dir(base_dir) for _, dirs, _ in os.walk(accounts_dir): for acct in dirs: if not is_account_dir(acct): continue - account_dir = os.path.join(accounts_dir, acct + '/outbox') + account_dir: str = os.path.join(accounts_dir, acct + '/outbox') for _, _, files2 in os.walk(account_dir): status_ctr += len(files2) break @@ -3789,8 +3801,8 @@ def lines_in_file(filename: str) -> int: """Returns the number of lines in a file """ if os.path.isfile(filename): - text = load_string(filename, - 'EX: lines_in_file error reading ' + filename) + text: str = load_string(filename, + 'EX: lines_in_file error reading ' + filename) if text: return len(text.split('\n')) return 0 @@ -3819,7 +3831,7 @@ def get_media_url_from_video(post_json_object: {}) -> (str, str, str, if not media_link.get('href'): continue if media_link.get('tag'): - media_tags = media_link['tag'] + media_tags: list[dict] = media_link['tag'] if isinstance(media_tags, list): for tag_link in media_tags: if not isinstance(tag_link, dict): @@ -3830,23 +3842,23 @@ def get_media_url_from_video(post_json_object: {}) -> (str, str, str, continue if tag_link['mediaType'] == 'video/mp4' or \ tag_link['mediaType'] == 'video/ogv': - media_type = tag_link['mediaType'] - media_url = remove_html(tag_link['href']) + media_type: str = tag_link['mediaType'] + media_url: str = remove_html(tag_link['href']) break if media_type and media_url: continue if media_link['mediaType'] == 'application/x-bittorrent': - media_torrent = remove_html(media_link['href']) + media_torrent: str = remove_html(media_link['href']) if media_link['href'].startswith('magnet:'): - media_magnet = remove_html(media_link['href']) + media_magnet: str = remove_html(media_link['href']) elif media_link['href'].startswith('bencoded:'): - media_bencoded = remove_html(media_link['href']) + media_bencoded: str = remove_html(media_link['href']) if media_link['mediaType'] != 'video/mp4' and \ media_link['mediaType'] != 'video/ogv': continue if not media_url: - media_type = media_link['mediaType'] - media_url = remove_html(media_link['href']) + media_type: str = media_link['mediaType'] + media_url: str = remove_html(media_link['href']) return media_type, media_url, media_torrent, media_magnet, media_bencoded @@ -3873,7 +3885,7 @@ def get_media_url_from_torrent(post_json_object: {}) -> (str, str, str, if not media_link.get('href'): continue if media_link.get('tag'): - media_tags = media_link['tag'] + media_tags: list[dict] = media_link['tag'] if isinstance(media_tags, list): for tag_link in media_tags: if not isinstance(tag_link, dict): @@ -3886,28 +3898,30 @@ def get_media_url_from_torrent(post_json_object: {}) -> (str, str, str, string_starts_with(tag_link['mediaType'], ('magnet:', 'bencoded:')): if tag_link['mediaType'].startswith('magnet:'): - media_magnet = remove_html(media_link['href']) + media_magnet: str = remove_html(media_link['href']) elif tag_link['mediaType'].startswith('bencoded:'): - media_bencoded = remove_html(media_link['href']) + media_bencoded: str = \ + remove_html(media_link['href']) else: - media_torrent = remove_html(media_link['href']) - media_type = tag_link['mediaType'] - media_url = remove_html(tag_link['href']) + media_torrent: str = \ + remove_html(media_link['href']) + media_type: str = tag_link['mediaType'] + media_url: str = remove_html(tag_link['href']) break if media_type and media_url: continue if media_link['mediaType'] == 'application/x-bittorrent': - media_torrent = remove_html(media_link['href']) + media_torrent: str = remove_html(media_link['href']) if media_link['href'].startswith('magnet:'): - media_magnet = remove_html(media_link['href']) + media_magnet: str = remove_html(media_link['href']) elif media_link['href'].startswith('bencoded:'): - media_bencoded = remove_html(media_link['href']) + media_bencoded: str = remove_html(media_link['href']) if media_link['mediaType'] != 'video/mp4' and \ media_link['mediaType'] != 'video/ogv': continue if not media_url: - media_type = media_link['mediaType'] - media_url = remove_html(media_link['href']) + media_type: str = media_link['mediaType'] + media_url: str = remove_html(media_link['href']) return media_type, media_url, media_torrent, media_magnet, media_bencoded @@ -3918,7 +3932,7 @@ def get_reply_to(post_json_object: {}) -> str: if not isinstance(post_json_object['inReplyTo'], str): if isinstance(post_json_object['inReplyTo'], dict): if post_json_object['inReplyTo'].get('id'): - reply_id = post_json_object['inReplyTo']['id'] + reply_id: str = post_json_object['inReplyTo']['id'] if isinstance(reply_id, str): return reply_id print('WARN: inReplyTo is not a string ' + @@ -3929,7 +3943,7 @@ def get_reply_to(post_json_object: {}) -> str: if not isinstance(post_json_object['inReplyToBook'], str): if isinstance(post_json_object['inReplyToBook'], dict): if post_json_object['inReplyToBook'].get('id'): - reply_id = post_json_object['inReplyToBook']['id'] + reply_id: str = post_json_object['inReplyToBook']['id'] if isinstance(reply_id, str): return reply_id print('WARN: inReplyToBook is not a string ' + @@ -3954,7 +3968,7 @@ def resembles_domain(text: str) -> bool: Why not use validators? It's so that exotic, potentially p2p domains may be used. """ - not_domain_chars = ( + not_domain_chars: list[str] = ( ' ', '/', '-', '<', ';', '"', '(', ')', '_', ',', '?', "'" ) if string_contains(text, not_domain_chars): @@ -3981,20 +3995,20 @@ def post_summary_contains_links(message_json: {}) -> bool: if message_json['object']['type'] not in ('Person', 'Application', 'Group'): if len(message_json['object']['summary']) > 1024: - actor_url = get_actor_from_post(message_json) + actor_url: str = get_actor_from_post(message_json) print('INBOX: summary is too long ' + actor_url + ' ' + message_json['object']['summary']) return True if '://' in message_json['object']['summary']: - actor_url = get_actor_from_post(message_json) + actor_url: str = get_actor_from_post(message_json) print('INBOX: summary should not contain links ' + actor_url + ' ' + message_json['object']['summary']) return True else: if len(message_json['object']['summary']) > 4096: - actor_url = get_actor_from_post(message_json) + actor_url: str = get_actor_from_post(message_json) print('INBOX: person summary is too long ' + actor_url + ' ' + message_json['object']['summary']) @@ -4010,7 +4024,7 @@ def convert_domains(calling_domain: str, referer_domain: str, yggdrasil_domain: str) -> str: """Convert domains to onion or i2p, depending upon who is asking """ - curr_http_prefix = http_prefix + '://' + curr_http_prefix: str = http_prefix + '://' if _is_onion_request(calling_domain, referer_domain, domain, onion_domain): @@ -4045,25 +4059,24 @@ def get_instance_url(calling_domain: str, """ if calling_domain.endswith('.onion') and \ onion_domain: - instance_url = 'http://' + onion_domain + instance_url: str = 'http://' + onion_domain elif (calling_domain.endswith('.i2p') and i2p_domain): - instance_url = 'http://' + i2p_domain + instance_url: str = 'http://' + i2p_domain elif (is_yggdrasil_address(calling_domain) and yggdrasil_domain): - instance_url = 'http://' + yggdrasil_domain + instance_url: str = 'http://' + yggdrasil_domain else: - instance_url = \ - http_prefix + '://' + domain_full + instance_url: str = http_prefix + '://' + domain_full return instance_url def check_bad_path(path: str): """for http GET or POST check that the path looks valid """ - path_lower = path.lower() + path_lower: str = path.lower() - bad_strings = [ + bad_strings: list[str] = [ '..', '/.', '%2e%2e', '%252e%252e', '/sftp.', '/sftp-', '/statistics', '/config/', 'settings.', 'credentials', '/packs/', '/backend/', '/apis/', '/laravel/', '/js/', '/root/' @@ -4071,14 +4084,14 @@ def check_bad_path(path: str): # allow /.well-known/... if '/.' in path_lower: - good_starts = ('/.well-known/', '/users/.well-known/') + good_starts: list[str] = ('/.well-known/', '/users/.well-known/') if string_starts_with(path_lower, good_starts): - bad_strings = ['..', '%2e%2e', '%252e%252e'] + bad_strings: list[str] = ['..', '%2e%2e', '%252e%252e'] if path_lower.startswith('/wp-'): return True - bad_endings = ( + bad_endings: list[str] = ( '.js', '.ts', '.py', '.php', '.bak', '.env', '.local', '.yml', '.rs', '.ru', '.old', '.backup', '~', '.ini' ) @@ -4097,7 +4110,7 @@ def set_premium_account(base_dir: str, nickname: str, domain: str, flag_state: bool) -> bool: """ Set or clear the premium account flag """ - premium_filename = acct_dir(base_dir, nickname, domain) + '/.premium' + premium_filename: str = acct_dir(base_dir, nickname, domain) + '/.premium' if os.path.isfile(premium_filename): if not flag_state: try: @@ -4157,23 +4170,23 @@ def get_image_file(base_dir: str, name: str, directory: str, banner_extensions = get_image_extensions() banner_file: str = '' banner_filename: str = '' - im_name = name + im_name: str = name for ext in banner_extensions: - banner_file_test = im_name + '.' + ext - banner_filename_test = directory + '/' + banner_file_test + banner_file_test: str = im_name + '.' + ext + banner_filename_test: str = directory + '/' + banner_file_test if not os.path.isfile(banner_filename_test): continue banner_file = banner_file_test banner_filename = banner_filename_test return banner_file, banner_filename # if not found then use the default image - curr_theme = 'default' + curr_theme: str = 'default' if theme: curr_theme = theme directory = base_dir + '/theme/' + curr_theme for ext in banner_extensions: - banner_file_test = name + '.' + ext - banner_filename_test = directory + '/' + banner_file_test + banner_file_test: str = name + '.' + ext + banner_filename_test: str = directory + '/' + banner_file_test if not os.path.isfile(banner_filename_test): continue banner_file = name + '_' + curr_theme + '.' + ext @@ -4186,7 +4199,7 @@ def get_watermark_file(base_dir: str, nickname: str, domain: str) -> (str, str): """Gets the filename for watermarking when an image is attached to a post """ - account_dir = acct_dir(base_dir, nickname, domain) + account_dir: str = acct_dir(base_dir, nickname, domain) watermark_file, watermark_filename = \ get_image_file(base_dir, 'watermark_image', account_dir, '') return watermark_file, watermark_filename @@ -4231,9 +4244,10 @@ def load_instance_software(base_dir: str) -> []: """For each domain encountered this stores the instance type such as mastodon, epicyon, pixelfed, etc """ - instance_software_filename = data_dir(base_dir) + '/instance_software.json' + instance_software_filename: str = \ + data_dir(base_dir) + '/instance_software.json' if os.path.isfile(instance_software_filename): - instance_software_json = load_json(instance_software_filename) + instance_software_json: dict = load_json(instance_software_filename) if instance_software_json: return instance_software_json return {} @@ -4294,27 +4308,27 @@ def replace_embedded_map_with_link(text: str, translate: {}) -> str: """ if '
' not in text: return text - map_str_start = 'https://www.openstreetmap.org' - map_str = 'src="' + map_str_start + map_str_start: str = 'https://www.openstreetmap.org' + map_str: str = 'src="' + map_str_start if map_str not in text: return text - sections = text.split('
') + sections: list[str] = text.split('
') for section_str in sections: if '
' not in section_str: continue - section_str = \ + section_str: str = \ '
' + section_str.split('
')[0] + '
' if map_str not in section_str: continue # get the map url - map_url = map_str_start + section_str.split(map_str)[1] + map_url: str = map_str_start + section_str.split(map_str)[1] if '"' not in map_url: continue map_url = map_url.split('"')[0] - show_map_str = 'Show Map' + show_map_str: str = 'Show Map' if translate.get('Show Map'): show_map_str = translate['Show Map'] - map_link = \ + map_link: str = \ '' + \ show_map_str + '' @@ -4327,7 +4341,7 @@ def is_private_browser(ua_str: str) -> bool: """Does the given user agent indicate that the browser is specialised for privacy? """ - ua_str_lower = ua_str.lower() + ua_str_lower: str = ua_str.lower() if string_contains(ua_str_lower, ('librewolf', 'privacy', 'private')): return True