From b98910cd0f540f3424972f997cb3399431405479 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Sun, 21 Sep 2025 12:38:54 +0100 Subject: [PATCH] Function position --- acceptreject.py | 6 +-- announce.py | 8 ++-- auth.py | 8 ++-- availability.py | 2 +- blocking.py | 60 +++++++++++++------------- blog.py | 73 ++++++++++++++++---------------- bookmarks.py | 8 ++-- cache.py | 6 +-- categories.py | 10 ++--- city.py | 14 +++--- content.py | 104 ++++++++++++++++++++++----------------------- conversation.py | 4 +- crawlers.py | 2 +- cwlists.py | 2 +- daemon.py | 110 ++++++++++++++++++++++++------------------------ utils.py | 30 ++++++------- 16 files changed, 223 insertions(+), 224 deletions(-) diff --git a/acceptreject.py b/acceptreject.py index 70c65b2ca..d6ada1ea7 100644 --- a/acceptreject.py +++ b/acceptreject.py @@ -217,7 +217,7 @@ def _reject_quote_request(message_json: {}, domain_full: str, not curr_domain.endswith('.onion') and \ domain_to_follow.endswith('.onion'): curr_session = session_onion - curr_http_prefix = 'http' + curr_http_prefix: str = 'http' curr_domain = onion_domain curr_port = 80 port = 80 @@ -228,7 +228,7 @@ def _reject_quote_request(message_json: {}, domain_full: str, not curr_domain.endswith('.i2p') and domain_to_follow.endswith('.i2p')): curr_session = session_i2p - curr_http_prefix = 'http' + curr_http_prefix: str = 'http' curr_domain = i2p_domain curr_port = 80 port = 80 @@ -403,7 +403,7 @@ def receive_accept_reject(base_dir: str, domain: str, message_json: {}, nickname = get_nickname_from_actor(actor_url) if not nickname: # single user instance - nickname = 'dev' + nickname: str = 'dev' if debug: print('DEBUG: ' + message_json['type'] + ' does not contain a nickname. ' + diff --git a/announce.py b/announce.py index cfa16b777..12daac5c6 100644 --- a/announce.py +++ b/announce.py @@ -264,7 +264,7 @@ def announce_public(session, base_dir: str, federation_list: [], """ from_domain = get_full_domain(domain, port) - to_url = 'https://www.w3.org/ns/activitystreams#Public' + to_url: str = 'https://www.w3.org/ns/activitystreams#Public' cc_url = local_actor_url(http_prefix, nickname, from_domain) + '/followers' return create_announce(session, base_dir, federation_list, nickname, domain, port, @@ -296,7 +296,7 @@ def send_announce_via_server(base_dir: str, session, from_domain_full = get_full_domain(from_domain, from_port) - to_url = 'https://www.w3.org/ns/activitystreams#Public' + to_url: str = 'https://www.w3.org/ns/activitystreams#Public' actor_str = local_actor_url(http_prefix, from_nickname, from_domain_full) cc_url = actor_str + '/followers' @@ -333,7 +333,7 @@ def send_announce_via_server(base_dir: str, session, ' did not return a dict. ' + str(wf_request)) return 1 - post_to_box = 'outbox' + post_to_box: str = 'outbox' # get the actor inbox for the To handle origin_domain = from_domain @@ -422,7 +422,7 @@ def send_undo_announce_via_server(base_dir: str, session, ' did not return a dict. ' + str(wf_request)) return 1 - post_to_box = 'outbox' + post_to_box: str = 'outbox' # get the actor inbox for the To handle origin_domain = domain diff --git a/auth.py b/auth.py index 90abf1f2c..efbee4a5b 100644 --- a/auth.py +++ b/auth.py @@ -257,7 +257,7 @@ def authorize(base_dir: str, path: str, auth_header: str, debug: bool) -> bool: def create_password(length: int): - valid_chars = 'abcdefghijklmnopqrstuvwxyz' + \ + valid_chars: str = 'abcdefghijklmnopqrstuvwxyz' + \ 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' return ''.join((secrets.choice(valid_chars) for i in range(length))) @@ -293,10 +293,10 @@ def record_login_failure(base_dir: str, ip_address: str, if not log_to_file: return - failure_log = data_dir(base_dir) + '/loginfailures.log' - write_type = 'a+' + failure_log: str = data_dir(base_dir) + '/loginfailures.log' + write_type: str = 'a+' if not os.path.isfile(failure_log): - write_type = 'w+' + write_type: str = 'w+' curr_time = date_utcnow() curr_time_str = curr_time.strftime("%Y-%m-%d %H:%M:%SZ") try: diff --git a/availability.py b/availability.py index bf6e0fdaa..75a87fa89 100644 --- a/availability.py +++ b/availability.py @@ -127,7 +127,7 @@ def send_availability_via_server(base_dir: str, session, ' did not return a dict. ' + str(wf_request)) return 1 - post_to_box = 'outbox' + post_to_box: str = 'outbox' # get the actor inbox for the To handle origin_domain = domain diff --git a/blocking.py b/blocking.py index 7e7ac6d8f..99c746fcb 100644 --- a/blocking.py +++ b/blocking.py @@ -60,7 +60,7 @@ def get_global_block_reason(search_text: str, if not text_in_file(search_text, blocking_reasons_filename): return '' - reasons_str = '' + reasons_str: str = '' try: with open(blocking_reasons_filename, 'r', encoding='utf-8') as fp_reas: @@ -93,8 +93,8 @@ def get_account_blocks(base_dir: str, if not os.path.isfile(blocking_filename): return '' - blocked_accounts_textarea = '' - blocking_file_text = '' + blocked_accounts_textarea: str = '' + blocking_file_text: str = '' try: with open(blocking_filename, 'r', encoding='utf-8') as fp_block: blocking_file_text = fp_block.read() @@ -155,7 +155,7 @@ def blocked_timeline_json(actor: str, page_number: int, items_per_page: int, if index >= len(blocked_list): break block_handle = blocked_list[index] - block_reason = '' + block_reason: str = '' if ' - ' in block_handle: block_reason = block_handle.split(' - ')[1] block_handle = block_handle.split(' - ')[0] @@ -187,8 +187,8 @@ def add_account_blocks(base_dir: str, if blocked_accounts_textarea is None: return False blocklist = blocked_accounts_textarea.split('\n') - blocking_file_text = '' - blocking_reasons_file_text = '' + blocking_file_text: str = '' + blocking_reasons_file_text: str = '' for line in blocklist: line = line.strip() reason = None @@ -271,7 +271,7 @@ def _add_global_block_reason(base_dir: str, print('EX: unable to add blocking reason ' + block_id) else: - reasons_str = '' + reasons_str: str = '' try: with open(blocking_reasons_filename, 'r', encoding='utf-8') as fp_reas: @@ -279,7 +279,7 @@ def _add_global_block_reason(base_dir: str, except OSError: print('EX: unable to read blocking reasons') reasons_lines = reasons_str.split('\n') - new_reasons_str = '' + new_reasons_str: str = '' for line in reasons_lines: if not line.startswith(block_id + ' '): new_reasons_str += line + '\n' @@ -375,7 +375,7 @@ def _add_block_reason(base_dir: str, print('EX: unable to add blocking reason 2 ' + block_id) else: - reasons_str = '' + reasons_str: str = '' try: with open(blocking_reasons_filename, 'r', encoding='utf-8') as fp_reas: @@ -383,7 +383,7 @@ def _add_block_reason(base_dir: str, except OSError: print('EX: unable to read blocking reasons 2') reasons_lines = reasons_str.split('\n') - new_reasons_str = '' + new_reasons_str: str = '' for line in reasons_lines: if not line.startswith(block_id + ' '): new_reasons_str += line + '\n' @@ -428,7 +428,7 @@ def add_block(base_dir: str, nickname: str, domain: str, acct_dir(base_dir, nickname, domain) + '/following.txt' if os.path.isfile(following_filename): if text_in_file(block_handle + '\n', following_filename): - following_str = '' + following_str: str = '' try: with open(following_filename, 'r', encoding='utf-8') as fp_foll: @@ -453,7 +453,7 @@ def add_block(base_dir: str, nickname: str, domain: str, acct_dir(base_dir, nickname, domain) + '/followers.txt' if os.path.isfile(followers_filename): if text_in_file(block_handle + '\n', followers_filename): - followers_str = '' + followers_str: str = '' try: with open(followers_filename, 'r', encoding='utf-8') as fp_foll: @@ -504,7 +504,7 @@ def _remove_global_block_reason(base_dir: str, if not text_in_file(unblock_id + ' ', unblocking_filename): return False - reasons_str = '' + reasons_str: str = '' try: with open(unblocking_filename, 'r', encoding='utf-8') as fp_reas: @@ -512,7 +512,7 @@ def _remove_global_block_reason(base_dir: str, except OSError: print('EX: unable to read blocking reasons 3') reasons_lines = reasons_str.split('\n') - new_reasons_str = '' + new_reasons_str: str = '' for line in reasons_lines: if line.startswith(unblock_id + ' '): continue @@ -640,7 +640,7 @@ def is_blocked_hashtag(base_dir: str, hashtag: str) -> bool: if os.path.isfile(global_blocking_filename): hashtag = hashtag.strip('\n').strip('\r') if not hashtag.startswith('#'): - hashtag = '#' + hashtag + hashtag: str = '#' + hashtag if text_in_file(hashtag + '\n', global_blocking_filename): return True return False @@ -650,7 +650,7 @@ def get_domain_blocklist(base_dir: str) -> str: """Returns all globally blocked domains as a string This can be used for fast matching to mitigate flooding """ - blocked_str = '' + blocked_str: str = '' evil_domains = evil_incarnate() for evil in evil_domains: @@ -748,7 +748,7 @@ def is_blocked_domain(base_dir: str, domain: str, else: blocked_cache_list = blocked_cache - search_str = '*@' + domain + search_str: str = '*@' + domain for curr_block_list in (blocked_cache_list, block_federated_list): for blocked_str in curr_block_list: blocked_str = blocked_str.strip() @@ -1011,7 +1011,7 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str, handle = following_nickname + '@' + following_domain if text_in_file(handle + '\n', blocking_filename, False): - file_text = '' + file_text: str = '' try: with open(blocking_filename, 'r', encoding='utf-8') as fp_noannounce: @@ -1020,7 +1020,7 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str, print('EX: unable to read noannounce add: ' + blocking_filename + ' ' + handle) - new_file_text = '' + new_file_text: str = '' file_text_list = file_text.split('\n') handle_lower = handle.lower() for allowed in file_text_list: @@ -1058,7 +1058,7 @@ def allowed_announce_remove(base_dir: str, nickname: str, domain: str, blocking_filename + ' ' + handle) return - file_text = '' + file_text: str = '' if not text_in_file(handle + '\n', blocking_filename, False): try: with open(blocking_filename, 'r', @@ -1091,7 +1091,7 @@ def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str, handle = following_nickname + '@' + following_domain if not text_in_file(handle + '\n', blocking_filename, False): - file_text = '' + file_text: str = '' try: with open(blocking_filename, 'r', encoding='utf-8') as fp_quotes: @@ -1123,7 +1123,7 @@ def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str, if not os.path.isfile(blocking_filename): return - file_text = '' + file_text: str = '' if text_in_file(handle + '\n', blocking_filename, False): try: with open(blocking_filename, 'r', @@ -1825,7 +1825,7 @@ def import_blocking_file(base_dir: str, nickname: str, domain: str, # already blocked continue append_blocks.append(blocked_domain_name) - blocked_comment = '' + blocked_comment: str = '' if '"' in line_str: quote_section = line_str.split('"') if len(quote_section) > 1: @@ -1905,7 +1905,7 @@ def export_blocking_file(base_dir: str, nickname: str, domain: str) -> str: blocked_domain = blocked_domain.strip() if blocked_domain.startswith('#'): continue - reason_str = '' + reason_str: str = '' for reason_line in blocking_reasons: if reason_line.startswith(blocked_domain + ' '): reason_str = reason_line.split(' ', 1)[1] @@ -2042,7 +2042,7 @@ def load_blocked_nostr(base_dir: str) -> {}: def save_blocked_military(base_dir: str, block_military: {}) -> None: """Saves a list of nicknames for accounts which block military instances """ - nicknames_str = '' + nicknames_str: str = '' for nickname, _ in block_military.items(): nicknames_str += nickname + '\n' @@ -2058,7 +2058,7 @@ def save_blocked_military(base_dir: str, block_military: {}) -> None: def save_blocked_government(base_dir: str, block_government: {}) -> None: """Saves a list of nicknames for accounts which block government instances """ - nicknames_str = '' + nicknames_str: str = '' for nickname, _ in block_government.items(): nicknames_str += nickname + '\n' @@ -2074,7 +2074,7 @@ def save_blocked_government(base_dir: str, block_government: {}) -> None: def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None: """Saves a list of nicknames for accounts which block bluesky bridges """ - nicknames_str = '' + nicknames_str: str = '' for nickname, _ in block_bluesky.items(): nicknames_str += nickname + '\n' @@ -2090,7 +2090,7 @@ def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None: def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None: """Saves a list of nicknames for accounts which block nostr bridges """ - nicknames_str = '' + nicknames_str: str = '' for nickname, _ in block_nostr.items(): nicknames_str += nickname + '\n' @@ -2247,7 +2247,7 @@ def _update_federated_blocks(session, base_dir: str, print('DEBUG: federated blocklist endpoints: ' + str(block_federated_endpoints)) - new_block_api_str = '' + new_block_api_str: str = '' for endpoint in block_federated_endpoints: if not endpoint: continue @@ -2328,7 +2328,7 @@ def save_block_federated_endpoints(base_dir: str, block_api_endpoints_filename = \ data_dir(base_dir) + '/block_api_endpoints.txt' result: list[str] = [] - block_federated_endpoints_str = '' + block_federated_endpoints_str: str = '' for endpoint in block_federated_endpoints: if not endpoint: continue diff --git a/blog.py b/blog.py index 30b36f404..e48746818 100644 --- a/blog.py +++ b/blog.py @@ -162,7 +162,7 @@ def _get_blog_replies(base_dir: str, http_prefix: str, translate: {}, print('EX: unable to read blog 4 ' + post_filename) if lines: - replies_str = '' + replies_str: str = '' for reply_post_id in lines: reply_post_id = remove_eol(reply_post_id) replacements = { @@ -187,7 +187,7 @@ def _get_blog_replies(base_dir: str, http_prefix: str, translate: {}, replies_str += rply # indicate the reply indentation level - indent_str = '>' + indent_str: str = '>' indent_level = 0 while indent_level < depth: indent_str += ' >' @@ -210,13 +210,13 @@ def _html_blog_post_content(debug: bool, session, authorized: bool, """Returns the content for a single blog post """ linked_author = False - actor = '' - blog_str = '' - message_link = '' + actor: str = '' + blog_str: str = '' + message_link: str = '' if post_json_object['object'].get('id'): message_link = \ post_json_object['object']['id'].replace('/statuses/', '/') - title_str = '' + title_str: str = '' article_added = False if post_json_object['object'].get('summary'): title_str = post_json_object['object']['summary'] @@ -263,14 +263,14 @@ def _html_blog_post_content(debug: bool, session, authorized: bool, blog_str += ' ' + handle blog_str += '\n' - avatar_link = '' - reply_str = '' - announce_str = '' - like_str = '' - bookmark_str = '' - delete_str = '' - mute_str = '' - is_muted = False + avatar_link: str = '' + reply_str: str = '' + announce_str: str = '' + like_str: str = '' + bookmark_str: str = '' + delete_str: str = '' + mute_str: str = '' + is_muted: bool = False person_url = local_actor_url(http_prefix, nickname, domain_full) actor_json = \ @@ -308,7 +308,7 @@ def _html_blog_post_content(debug: bool, session, authorized: bool, else: blog_str += '
' + content_str + '
\n' - citations_str = '' + citations_str: str = '' if post_json_object['object'].get('tag'): for tag_json in post_json_object['object']['tag']: if not isinstance(tag_json, dict): @@ -328,7 +328,7 @@ def _html_blog_post_content(debug: bool, session, authorized: bool, '
  • ' + \ '' + citation_name + '
  • \n' if citations_str: - citations_str = '

    ' + translate['Citations'] + \ + citations_str: str = '

    ' + translate['Citations'] + \ ':

    ' + \ '\n' + citations_str + '\n' @@ -346,7 +346,7 @@ def _html_blog_post_content(debug: bool, session, authorized: bool, # separator between blogs should be centered if '
    ' not in blog_separator: - blog_separator = '
    ' + blog_separator + '
    ' + blog_separator: str = '
    ' + blog_separator + '
    ' if replies == 0: blog_str += blog_separator + '\n' @@ -379,8 +379,8 @@ def _html_blog_post_rss2(domain: str, post_json_object: {}, system_language: str) -> str: """Returns the RSS version 2 feed for a single blog post """ - rss_str = '' - message_link = '' + rss_str: str = '' + message_link: str = '' if post_json_object['object'].get('id'): message_link = \ post_json_object['object']['id'].replace('/statuses/', '/') @@ -398,7 +398,7 @@ def _html_blog_post_rss2(domain: str, post_json_object: {}, system_language) description = first_paragraph_from_string(content) description = escape_text(description) - rss_str = ' ' + rss_str: str = ' ' rss_str += ' ' + title_str + '' rss_str += ' ' + message_link + '' rss_str += \ @@ -413,8 +413,8 @@ def _html_blog_post_rss3(domain: str, post_json_object: {}, system_language: str) -> str: """Returns the RSS version 3 feed for a single blog post """ - rss_str = '' - message_link = '' + rss_str: str = '' + message_link: str = '' if post_json_object['object'].get('id'): message_link = \ post_json_object['object']['id'].replace('/statuses/', '/') @@ -431,7 +431,7 @@ def _html_blog_post_rss3(domain: str, post_json_object: {}, get_base_content_from_post(post_json_object, system_language) description = first_paragraph_from_string(content) - rss_str = 'title: ' + title_str + '\n' + rss_str: str = 'title: ' + title_str + '\n' rss_str += 'link: ' + message_link + '\n' rss_str += 'description: ' + description + '\n' rss_str += 'created: ' + rss_date_str + '\n\n' @@ -479,7 +479,7 @@ def html_blog_post(session, authorized: bool, debug: bool, content_license_url: str) -> str: """Returns a html blog post """ - blog_str = '' + blog_str: str = '' css_filename = base_dir + '/epicyon-blog.css' if os.path.isfile(base_dir + '/blog.css'): @@ -491,7 +491,7 @@ def html_blog_post(session, authorized: bool, if post_json_object['object'].get('updated'): modified = post_json_object['object']['updated'] title = post_json_object['object']['summary'] - url = '' + url: str = '' if post_json_object['object'].get('url'): url_str = get_url_from_post(post_json_object['object']['url']) url = remove_html(url_str) @@ -545,7 +545,7 @@ def html_blog_page(authorized: bool, session, if ' ' in nickname or '@' in nickname or \ '\n' in nickname or '\r' in nickname: return None - blog_str = '' + blog_str: str = '' css_filename = base_dir + '/epicyon-profile.css' if os.path.isfile(base_dir + '/epicyon.css'): @@ -574,7 +574,7 @@ def html_blog_page(authorized: bool, session, # show previous and next buttons if page_number is not None: - navigate_str = '

    ' + navigate_str: str = '

    ' if page_number > 1: # show previous button navigate_str += '' + edit_blog_text: str = '

    ' + fp_blog.read() + '

    ' except OSError: print('EX: html_edit_blog unable to read ' + dir_str + '/newpost.txt') @@ -829,7 +829,7 @@ def html_edit_blog(media_instance: bool, translate: {}, path = path.split('?')[0] path_base = path - edit_blog_image_section = '
    ' + edit_blog_image_section: str = '
    ' edit_blog_image_section += ' ' edit_blog_image_section += \ @@ -843,11 +843,10 @@ def html_edit_blog(media_instance: bool, translate: {}, placeholder_message = translate['Write something'] + '...' endpoint = 'editblogpost' placeholder_subject = translate['Title'] - scope_icon = 'scope_blog.png' - scope_description = translate['Blog'] + scope_icon: str = 'scope_blog.png' + scope_description: str = translate['Blog'] - date_and_location = '' - date_and_location = '
    ' + date_and_location: str = '
    ' date_and_location += \ '

    ' + placeholder_subject + '
    ' - title_str = '' + title_str: str = '' if post_json_object['object'].get('summary'): title_str = post_json_object['object']['summary'] edit_blog_form += \ diff --git a/bookmarks.py b/bookmarks.py index a573be494..6aeaa85ee 100644 --- a/bookmarks.py +++ b/bookmarks.py @@ -78,7 +78,7 @@ def undo_bookmarks_collection_entry(recent_posts_cache: {}, bookmark_index = remove_eol(bookmark_index) if not text_in_file(bookmark_index, bookmarks_index_filename): return - index_str = '' + index_str: str = '' try: with open(bookmarks_index_filename, 'r', encoding='utf-8') as fp_index: @@ -199,7 +199,7 @@ def update_bookmarks_collection(recent_posts_cache: {}, str(post_json_object)) return - bookmarks_ending = '/bookmarks' + bookmarks_ending: str = '/bookmarks' if not object_url.endswith(bookmarks_ending): collection_id = object_url + bookmarks_ending else: @@ -445,7 +445,7 @@ def send_bookmark_via_server(base_dir: str, session, ' did not return a dict. ' + str(wf_request)) return 1 - post_to_box = 'outbox' + post_to_box: str = 'outbox' # get the actor inbox for the To handle origin_domain = domain @@ -542,7 +542,7 @@ def send_undo_bookmark_via_server(base_dir: str, session, ' did not return a dict. ' + str(wf_request)) return 1 - post_to_box = 'outbox' + post_to_box: str = 'outbox' # get the actor inbox for the To handle origin_domain = domain diff --git a/cache.py b/cache.py index 1566e2b6c..56bb915da 100644 --- a/cache.py +++ b/cache.py @@ -243,7 +243,7 @@ def get_person_pub_key(base_dir: str, session, person_url: str, elif i2p_domain: if '.i2p/' in person_url: person_domain = i2p_domain - profile_str = 'https://www.w3.org/ns/activitystreams' + profile_str: str = 'https://www.w3.org/ns/activitystreams' accept_str = \ 'application/activity+json; profile="' + profile_str + '"' as_header = { @@ -288,7 +288,7 @@ def cache_svg_images(session, base_dir: str, http_prefix: str, return False cached = False post_id = remove_id_ending(obj['id']).replace('/', '--') - actor = 'unknown' + actor: str = 'unknown' if post_attachments and obj.get('attributedTo'): actor = get_attributed_to(obj['attributedTo']) log_filename = data_dir(base_dir) + '/svg_scripts_log.txt' @@ -417,7 +417,7 @@ def clear_from_post_caches(base_dir: str, recent_posts_cache: {}, """Clears cached html for the given post, so that edits to news will appear """ - filename = '/postcache/' + post_id + '.html' + filename: str = '/postcache/' + post_id + '.html' dir_str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: diff --git a/categories.py b/categories.py index df6bff65c..1fdaf5b00 100644 --- a/categories.py +++ b/categories.py @@ -48,7 +48,7 @@ def get_hashtag_category(base_dir: str, hashtag: str) -> str: def load_city_hashtags(base_dir: str, translate: {}) -> None: """create hashtag categories for cities """ - category_str = 'places' + category_str: str = 'places' if translate.get(category_str): category_str = translate[category_str] @@ -91,7 +91,7 @@ def load_city_hashtags(base_dir: str, translate: {}) -> None: city_filename) if '-' in hashtag: section = hashtag.split('-') - new_hashtag = '' + new_hashtag: str = '' for text in section: new_hashtag += text.lower().title() hashtag2 = new_hashtag @@ -107,7 +107,7 @@ def load_city_hashtags(base_dir: str, translate: {}) -> None: city_filename) if ' ' in hashtag: section = hashtag.split(' ') - new_hashtag = '' + new_hashtag: str = '' for text in section: new_hashtag += text.lower().title() hashtag2 = new_hashtag @@ -207,7 +207,7 @@ def update_hashtag_categories(base_dir: str) -> None: category_list.append(category_str) category_list.sort() - category_list_str = '' + category_list_str: str = '' for category_str in category_list: category_list_str += category_str + '\n' @@ -291,7 +291,7 @@ def guess_hashtag_category(tag_name: str, hashtag_categories: {}, if len(tag_name) < min_tag_length: return '' - category_matched = '' + category_matched: str = '' tag_matched_len = 0 finished = False diff --git a/city.py b/city.py index 4028bdfa6..bd04d708f 100644 --- a/city.py +++ b/city.py @@ -214,8 +214,8 @@ def spoof_geolocation(base_dir: str, variance_at_location = 0.0004 default_latitude = 51.8744 default_longitude = 0.368333 - default_latdirection = 'N' - default_longdirection = 'W' + default_latdirection: str = 'N' + default_longdirection: str = 'W' if cities_list: cities = cities_list @@ -257,13 +257,13 @@ def spoof_geolocation(base_dir: str, area_km2 = 0 if len(city_fields) > 3: area_km2 = int(city_fields[3]) - latdirection = 'N' - longdirection = 'E' + latdirection: str = 'N' + longdirection: str = 'E' if 'S' in latitude: - latdirection = 'S' + latdirection: str = 'S' latitude = latitude.replace('S', '') if 'W' in longitude: - longdirection = 'W' + longdirection: str = 'W' longitude = longitude.replace('W', '') latitude = float(latitude) longitude = float(longitude) @@ -330,7 +330,7 @@ def get_spoofed_city(city: str, base_dir: str, """Returns the name of the city to use as a GPS spoofing location for image metadata """ - city = '' + city: str = '' city_filename = acct_dir(base_dir, nickname, domain) + '/city.txt' if os.path.isfile(city_filename): try: diff --git a/content.py b/content.py index 53fe46b6b..081d9e6b1 100644 --- a/content.py +++ b/content.py @@ -98,7 +98,7 @@ def remove_html_tag(html_str: str, tag: str) -> str: """ tag_found = True while tag_found: - match_str = ' ' + tag + '="' + match_str: str = ' ' + tag + '="' if match_str not in html_str: tag_found = False break @@ -164,7 +164,7 @@ def html_replace_email_quote(content: str) -> str: return content content_str = content.replace('

    ', '') content_lines = content_str.split('

    ') - new_content = '' + new_content: str = '' for line_str in content_lines: if not line_str: continue @@ -209,7 +209,7 @@ def html_replace_quote_marks(content: str) -> str: if '"' in content: sections = content.split('"') if len(sections) > 1: - new_content = '' + new_content: str = '' open_quote = True markup = False for char in content: @@ -220,16 +220,16 @@ def html_replace_quote_marks(content: str) -> str: markup = False elif char == '"' and not markup: if open_quote: - curr_char = '“' + curr_char: str = '“' else: - curr_char = '”' + curr_char: str = '”' open_quote = not open_quote new_content += curr_char if '"' in new_content: open_quote = True content = new_content - new_content = '' + new_content: str = '' ctr = 0 sections = content.split('"') no_of_sections = len(sections) @@ -512,7 +512,7 @@ def replace_emoji_from_tags(session, base_dir: str, else: # sequence of codes icon_codes = icon_name.split('-') - icon_code_sequence = '' + icon_code_sequence: str = '' for icode in icon_codes: replaced = False try: @@ -520,7 +520,7 @@ def replace_emoji_from_tags(session, base_dir: str, icode, 16)) replaced = True except BaseException: - icon_code_sequence = '' + icon_code_sequence: str = '' if debug: print('EX: ' + 'replace_emoji_from_tags 2 ' + @@ -545,18 +545,18 @@ def replace_emoji_from_tags(session, base_dir: str, content = content.replace(tag_item['name'], icon_code_sequence) - html_class = 'emoji' + html_class: str = 'emoji' if message_type == 'post header': html_class = 'emojiheader' if message_type == 'profile': html_class = 'emojiprofile' if screen_readable: - emoji_tag_name = tag_item['name'].replace(':', '') + emoji_tag_name: str = tag_item['name'].replace(':', '') else: - emoji_tag_name = '' - url_str = get_url_from_post(tag_item['icon']['url']) - tag_url = remove_html(url_str) - emoji_html = "\""" content = content.replace(tag_item['name'], emoji_html) @@ -621,8 +621,8 @@ def _contains_doi_reference(wrd: str, replace_dict: {}) -> bool: return False doi_ref_str = wrd.split(':', 1)[1] - doi_site = 'https://sci-hub.ru' - markup = '
    ' + \ @@ -657,7 +657,7 @@ def _contains_arxiv_reference(wrd: str, replace_dict: {}) -> bool: if not arxiv_day.isdigit(): return False ref_str = arxiv_ref[0] + '.' + arxiv_ref[1] - markup = '' + \ @@ -687,7 +687,7 @@ def remove_link_trackers_from_content(content: str) -> str: return content sections = content.split('?utm_') ctr = 0 - new_content = '' + new_content: str = '' for section_str in sections: if ctr == 0: new_content = section_str @@ -748,7 +748,7 @@ def add_web_links(content: str) -> str: if url.endswith('.') or wrd.endswith(';'): url = url[:-1] url = remove_link_tracking(url) - markup = '' for prefix in prefixes: if url.startswith(prefix): @@ -846,7 +846,7 @@ def replace_remote_hashtags(content: str, ctr += 1 continue if '/' + domain not in link: - new_link = '/users/' + nickname + \ + new_link: str = '/users/' + nickname + \ '?remotetag=' + link.replace('/', '--') replacements[link] = new_link ctr += 1 @@ -1170,7 +1170,7 @@ def remove_long_words(content: str, max_word_length: int, if '/' in word_str: continue if len(word_str[max_word_length:]) < max_word_length: - end_of_line_char = '\n' + end_of_line_char: str = '\n' if '
    ' in original_word_str: end_of_line_char = '' new_word_str = \ @@ -1272,7 +1272,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}: ending = True if ending: - prev_wrd = '' + prev_wrd: str = '' for wrd in words: wrd2 = (prev_wrd + ' ' + wrd).strip() if wrd.endswith(whistle) or wrd2.endswith(whistle): @@ -1294,7 +1294,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}: starting = True if starting: - prev_wrd = '' + prev_wrd: str = '' for wrd in words: wrd2 = (prev_wrd + ' ' + wrd).strip() if wrd.startswith(whistle) or wrd2.startswith(whistle): @@ -1311,7 +1311,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}: if '*' in whistle: whistle_start = whistle.split('*', 1)[0] whistle_end = whistle.split('*', 1)[1] - prev_wrd = '' + prev_wrd: str = '' for wrd in words: wrd2 = (prev_wrd + ' ' + wrd).strip() if ((wrd.startswith(whistle_start) and @@ -1328,7 +1328,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}: prev_wrd = wrd continue - prev_wrd = '' + prev_wrd: str = '' for wrd in words: wrd2 = (prev_wrd + ' ' + wrd).strip() if whistle in (wrd, wrd2): @@ -1392,10 +1392,10 @@ def add_html_tags(base_dir: str, http_prefix: str, '\n': ' --linebreak-- ' } content = replace_strings(content, replacements) - now_playing_str = 'NowPlaying' + now_playing_str: str = 'NowPlaying' if translate.get(now_playing_str): now_playing_str = translate[now_playing_str] - now_playing_lower_str = 'nowplaying' + now_playing_lower_str: str = 'nowplaying' if translate.get(now_playing_lower_str): now_playing_lower_str = translate[now_playing_lower_str] if '#' + now_playing_lower_str in content: @@ -1445,7 +1445,7 @@ def add_html_tags(base_dir: str, http_prefix: str, # extract mentions and tags from words long_words_list: list[str] = [] - prev_word_str = '' + prev_word_str: str = '' auto_tags_list = _load_auto_tags(base_dir, nickname, domain) append_tags = [] for word_str in words: @@ -1459,7 +1459,7 @@ def add_html_tags(base_dir: str, http_prefix: str, if _add_mention(base_dir, word_str, http_prefix, following, petnames, replace_mentions, recipients, hashtags): - prev_word_str = '' + prev_word_str: str = '' continue elif first_char == '#': # remove any endings from the hashtag @@ -1471,7 +1471,7 @@ def add_html_tags(base_dir: str, http_prefix: str, if _add_hash_tags(word_str, http_prefix, original_domain, replace_hashtags, hashtags): - prev_word_str = '' + prev_word_str: str = '' continue elif ':' in word_str: word_str2 = word_str.split(':')[1] @@ -1500,12 +1500,12 @@ def add_html_tags(base_dir: str, http_prefix: str, emoji_dict) else: if _auto_tag(word_str, auto_tags_list, append_tags): - prev_word_str = '' + prev_word_str: str = '' continue if prev_word_str: if _auto_tag(prev_word_str + ' ' + word_str, auto_tags_list, append_tags): - prev_word_str = '' + prev_word_str: str = '' continue prev_word_str = word_str @@ -1641,7 +1641,7 @@ def save_media_in_form_post(media_bytes, debug: bool, return None, None media_location = -1 - search_str = '' + search_str: str = '' filename = None # directly search the binary array for the beginning @@ -1821,7 +1821,7 @@ def extract_text_fields_in_post(post_bytes, boundary: str, debug: bool, if debug: if 'password' not in message_fields: print('DEBUG: POST message_fields: ' + str(message_fields)) - lynx_content_type = 'Content-Type: text/plain; charset=utf-8\r\n' + lynx_content_type: str = 'Content-Type: text/plain; charset=utf-8\r\n' # examine each section of the POST, separated by the boundary for fld in message_fields: if fld == '--': @@ -1856,7 +1856,7 @@ def extract_text_fields_in_post(post_bytes, boundary: str, debug: bool, post_lines = post_value_str.split('\r\n') if debug and 'password' not in post_key: print('post_lines: ' + str(post_lines)) - post_value = '' + post_value: str = '' if len(post_lines) > 2: for line in range(2, len(post_lines)-1): if line > 2: @@ -1873,9 +1873,9 @@ def limit_repeated_words(text: str, max_repeats: int) -> str: """ words = text.replace('\n', ' ').split(' ') repeat_ctr = 0 - repeated_text = '' + repeated_text: str = '' replacements = {} - prev_word = '' + prev_word: str = '' for word in words: if word == prev_word: repeat_ctr += 1 @@ -1975,7 +1975,7 @@ def contains_invalid_local_links(domain_full: str, """Returns true if the given content has invalid links """ for inv_str in INVALID_CONTENT_STRINGS: - match_str = '?' + inv_str + '=' + match_str: str = '?' + inv_str + '=' if match_str not in content: continue # extract the urls and check whether they are for the local domain @@ -2009,10 +2009,10 @@ def bold_reading_string(text: str) -> str: add_paragraph_markup = True paragraphs = text.split('\n') parag_ctr = 0 - new_text = '' + new_text: str = '' for parag in paragraphs: words = parag.split(' ') - new_parag = '' + new_parag: str = '' reading_markup = False for wrd in words: if '<' in wrd: @@ -2025,8 +2025,8 @@ def bold_reading_string(text: str) -> str: '&' not in wrd and '=' not in wrd and \ not wrd.startswith(':'): - prefix = '' - postfix = '' + prefix: str = '' + postfix: str = '' if wrd.startswith('"'): prefix = '"' wrd = wrd[1:] @@ -2109,7 +2109,7 @@ def content_diff(content: str, prev_content: str) -> str: diff = cdiff.compare(text1_sentences, text2_sentences) - diff_text = '' + diff_text: str = '' for line in diff: if line.startswith('- '): if not diff_text: @@ -2146,7 +2146,7 @@ def create_edits_html(edits_json: {}, post_json_object: {}, for modified, _ in edits_json.items(): edit_dates_list.append(modified) edit_dates_list.sort(reverse=True) - edits_str = '' + edits_str: str = '' content = get_content_from_post(post_json_object, system_language, languages_understood, "content") if not content: @@ -2172,7 +2172,7 @@ def create_edits_html(edits_json: {}, post_json_object: {}, datetime_object = \ convert_published_to_local_timezone(datetime_object, timezone) modified_str = datetime_object.strftime("%a %b %d, %H:%M") - diff = '

    ' + modified_str + '

    ' + diff + diff: str = '

    ' + modified_str + '

    ' + diff edits_str += diff content = prev_content if not edits_str: @@ -2189,7 +2189,7 @@ def remove_script(content: str, log_filename: str, separators = [['<', '>'], ['<', '>']] for sep in separators: prefix = sep[0] + 'script' - ending = '/script' + sep[1] + ending: str = '/script' + sep[1] if prefix not in content: continue sections = content.split(prefix) @@ -2208,7 +2208,7 @@ def remove_script(content: str, log_filename: str, if log_filename and actor: # write the detected script to a log file log_str = actor + ' ' + url + ' ' + text + '\n' - write_type = 'a+' + write_type: str = 'a+' if os.path.isfile(log_filename): write_type = 'w+' try: @@ -2321,22 +2321,22 @@ def format_mixed_right_to_left(content: str, # not a RTL language if language_right_to_left(language): return content - result = '' + result: str = '' changed = False paragraphs = content.split('

    ') for text_html in paragraphs: if '

    ' not in text_html: continue - text_html = '

    ' + text_html - text_plain = remove_html(text_html) + text_html: str = '

    ' + text_html + text_plain: str = remove_html(text_html) if is_right_to_left_text(text_plain): text_html = text_html.replace('

    ', '

    ', 1) text_html = text_html.replace('

    ', '

    ', 1) changed = True result += text_html if not changed: - result = '' - prev_distilled = '' + result: str = '' + prev_distilled: str = '' distilled = content while prev_distilled != distilled: prev_distilled = distilled diff --git a/conversation.py b/conversation.py index 893085361..a9d94803c 100644 --- a/conversation.py +++ b/conversation.py @@ -299,7 +299,7 @@ def download_conversation_posts(authorized: bool, session, """ if '://' not in post_id: return [] - profile_str = 'https://www.w3.org/ns/activitystreams' + profile_str: str = 'https://www.w3.org/ns/activitystreams' as_header = { 'Accept': 'application/ld+json; profile="' + profile_str + '"' } @@ -438,7 +438,7 @@ def conversation_tag_to_convthread_id(tag: str) -> str: """ if not isinstance(tag, str): return '' - convthread_id = '' + convthread_id: str = '' for tag_chr in tag: if tag_chr.isdigit(): convthread_id += tag_chr diff --git a/crawlers.py b/crawlers.py index b06a23718..f82285496 100644 --- a/crawlers.py +++ b/crawlers.py @@ -90,7 +90,7 @@ def _save_known_web_bots(base_dir: str, known_bots: []) -> bool: """Saves a list of known web bots """ known_bots_filename = data_dir(base_dir) + '/knownBots.txt' - known_bots_str = '' + known_bots_str: str = '' for crawler in known_bots: known_bots_str += crawler.strip() + '\n' try: diff --git a/cwlists.py b/cwlists.py index d7e3cbbb4..54c24d04a 100644 --- a/cwlists.py +++ b/cwlists.py @@ -115,7 +115,7 @@ def add_cw_from_lists(post_json_object: {}, cw_lists: {}, translate: {}, if 'content' not in post_json_object['object']: if 'contentMap' not in post_json_object['object']: return - cw_text = '' + cw_text: str = '' if post_json_object['object'].get('summary'): cw_text = post_json_object['object']['summary'] diff --git a/daemon.py b/daemon.py index 485bc1da8..aafda02e8 100644 --- a/daemon.py +++ b/daemon.py @@ -108,7 +108,7 @@ from poison import load_2grams class PubServer(BaseHTTPRequestHandler): - protocol_version = 'HTTP/1.1' + protocol_version: str = 'HTTP/1.1' def handle_error(self, request, client_address): """HTTP server error handling @@ -295,11 +295,11 @@ class PubServer(BaseHTTPRequestHandler): class PubServerUnitTest(PubServer): - protocol_version = 'HTTP/1.0' + protocol_version: str = 'HTTP/1.0' class EpicyonServer(ThreadingHTTPServer): - starting_daemon = True + starting_daemon: bool = True hide_announces = {} no_of_books = 0 max_api_blocks = 32000 @@ -315,7 +315,7 @@ class EpicyonServer(ThreadingHTTPServer): block_government = {} block_bluesky = {} block_nostr = {} - followers_synchronization = False + followers_synchronization: bool = False followers_sync_cache = {} buy_sites = None min_images_for_accounts = 0 @@ -323,7 +323,7 @@ class EpicyonServer(ThreadingHTTPServer): css_cache = {} reverse_sequence = None clacks = None - public_replies_unlisted = False + public_replies_unlisted: bool = False dogwhistles = {} preferred_podcast_formats: list[str] = [] bold_reading = {} @@ -331,18 +331,18 @@ class EpicyonServer(ThreadingHTTPServer): hide_recent_posts = {} account_timezone = None post_to_nickname = None - nodeinfo_is_active = False - security_txt_is_active = False - vcard_is_active = False - masto_api_is_active = False + nodeinfo_is_active: bool = False + security_txt_is_active: bool = False + vcard_is_active: bool = False + masto_api_is_active: bool = False map_format = None - dyslexic_font = False - content_license_url = '' - dm_license_url = '' + dyslexic_font: bool = False + content_license_url: str = '' + dm_license_url: str = '' fitness = {} signing_priv_key_pem = None - show_node_info_accounts = False - show_node_info_version = False + show_node_info_accounts: bool = False + show_node_info_version: bool = False text_mode_banner = '' access_keys = {} rss_timeout_sec = 20 @@ -350,68 +350,68 @@ class EpicyonServer(ThreadingHTTPServer): default_reply_interval_hrs = 9999999 recent_dav_etags = {} key_shortcuts = {} - low_bandwidth = False + low_bandwidth: bool = False user_agents_blocked = None crawlers_allowed = None known_bots = None - unit_test = False - allow_local_network_access = False - yt_replace_domain = '' - twitter_replacement_domain = '' + unit_test: bool = False + allow_local_network_access: bool = False + yt_replace_domain: str = '' + twitter_replacement_domain: str = '' newswire = {} max_newswire_posts = 0 - verify_all_signatures = False + verify_all_signatures: bool = False blocklistUpdateCtr = 0 blocklistUpdateInterval = 100 domainBlocklist = None manual_follower_approval = True onion_domain = None i2p_domain = None - media_instance = False - blogs_instance = False + media_instance: bool = False + blogs_instance: bool = False translate = {} - system_language = 'en' + system_language: str = 'en' city = '' voting_time_mins = 30 - positive_voting = False + positive_voting: bool = False newswire_votes_threshold = 1 max_newswire_feed_size_kb = 1 max_newswire_posts_per_source = 1 - show_published_date_only = False + show_published_date_only: bool = False max_mirrored_articles = 0 max_news_posts = 0 maxTags = 32 max_followers = 2000 - show_publish_as_icon = False - full_width_tl_button_header = False - rss_icon_at_top = True - publish_button_at_top = False + show_publish_as_icon: bool = False + full_width_tl_button_header: bool = False + rss_icon_at_top: bool = True + publish_button_at_top: bool = False max_feed_item_size_kb = 100 maxCategoriesFeedItemSizeKb = 1024 dormant_months = 6 max_like_count = 10 followingItemsPerPage = 12 - registration = False - enable_shared_inbox = True + registration: bool = False + enable_shared_inbox: bool = True outboxThread = {} outbox_thread_index = {} new_post_thread = {} project_version = __version__ - secure_mode = True + secure_mode: bool = True max_post_length = 0 maxMediaSize = 0 maxMessageLength = 64000 maxPostsInBox = 32000 maxCacheAgeDays = 30 - domain = '' + domain: str = '' port = 43 - domain_full = '' - http_prefix = 'https' - debug = False + domain_full: str = '' + http_prefix: str = 'https' + debug: bool = False federation_list: list[str] = [] shared_items_federated_domains: list[str] = [] - base_dir = '' - instance_id = '' + base_dir: str = '' + instance_id: str = '' person_cache = {} cached_webfingers = {} favicons_cache = {} @@ -421,22 +421,22 @@ class EpicyonServer(ThreadingHTTPServer): session_i2p = None last_getreq = 0 last_postreq = 0 - getreq_busy = False - postreq_busy = False - received_message = False + getreq_busy: bool = False + postreq_busy: bool = False + received_message: bool = False inbox_queue: list[dict] = [] send_threads = None post_log = [] max_queue_length = 64 - allow_deletion = True + allow_deletion: bool = True last_login_time = 0 last_login_failure = 0 login_failure_count = {} - log_login_failures = True + log_login_failures: bool = True max_replies = 10 tokens = {} tokens_lookup = {} - instance_only_skills_search = True + instance_only_skills_search: bool = True followers_threads = [] blocked_cache = [] blocked_cache_last_updated = 0 @@ -447,9 +447,9 @@ class EpicyonServer(ThreadingHTTPServer): last_known_crawler = 0 lists_enabled = None cw_lists = {} - theme_name = '' - news_instance = False - default_timeline = 'inbox' + theme_name: str = '' + news_instance: bool = False + default_timeline: str = 'inbox' thrFitness = None recent_posts_cache = {} thrCache = None @@ -471,9 +471,9 @@ class EpicyonServer(ThreadingHTTPServer): thrPostSchedule = None thrNewswireDaemon = None thrFederatedSharesDaemon = None - restart_inbox_queue_in_progress = False - restart_inbox_queue = False - signing_priv_key_pem = '' + restart_inbox_queue_in_progress: bool = False + restart_inbox_queue: bool = False + signing_priv_key_pem: str = '' thrCheckActor = {} thrImportFollowing = None thrWatchdog = None @@ -482,8 +482,8 @@ class EpicyonServer(ThreadingHTTPServer): thrFederatedSharesWatchdog = None thrFederatedBlocksDaemon = None qrcode_scale = 6 - instance_description = '' - instance_description_short = 'Epicyon' + instance_description: str = '' + instance_description_short: str = 'Epicyon' robots_txt = None last_llm_time = None mitm_servers = [] @@ -877,7 +877,7 @@ def run_daemon(accounts_data_dir: str, if not content_license_url: content_license_url = 'https://creativecommons.org/licenses/by-nc/4.0' httpd.content_license_url = content_license_url - httpd.dm_license_url = '' + httpd.dm_license_url: str = '' # fitness metrics fitness_filename = data_dir(base_dir) + '/fitness.json' @@ -1013,7 +1013,7 @@ def run_daemon(accounts_data_dir: str, # load translations dictionary httpd.translate = {} - httpd.system_language = 'en' + httpd.system_language: str = 'en' if not unit_test: httpd.translate, httpd.system_language = \ load_translations_from_file(base_dir, language) @@ -1207,7 +1207,7 @@ def run_daemon(accounts_data_dir: str, httpd.cw_lists = load_cw_lists(base_dir, True) # set the avatar for the news account - httpd.theme_name = get_config_param(base_dir, 'theme') + httpd.theme_name: str = get_config_param(base_dir, 'theme') if not httpd.theme_name: httpd.theme_name = 'default' if is_news_theme_name(base_dir, httpd.theme_name): diff --git a/utils.py b/utils.py index 29f28163b..a0f4bde94 100644 --- a/utils.py +++ b/utils.py @@ -356,6 +356,21 @@ def get_language_from_post(post_json_object: {}, system_language: str, return system_language +def get_post_attachments(post_json_object: {}) -> []: + """ Returns the list of attachments for a post + """ + post_obj = post_json_object + if has_object_dict(post_json_object): + post_obj = post_json_object['object'] + if not post_obj.get('attachment'): + return [] + if isinstance(post_obj['attachment'], list): + return post_obj['attachment'] + if isinstance(post_obj['attachment'], dict): + return [post_obj['attachment']] + return [] + + def get_media_descriptions_from_post(post_json_object: {}) -> str: """Returns all attached media descriptions as a single text. This is used for filtering @@ -3754,21 +3769,6 @@ def set_premium_account(base_dir: str, nickname: str, domain: str, return True -def get_post_attachments(post_json_object: {}) -> []: - """ Returns the list of attachments for a post - """ - post_obj = post_json_object - if has_object_dict(post_json_object): - post_obj = post_json_object['object'] - if not post_obj.get('attachment'): - return [] - if isinstance(post_obj['attachment'], list): - return post_obj['attachment'] - if isinstance(post_obj['attachment'], dict): - return [post_obj['attachment']] - return [] - - def string_ends_with(text: str, possible_endings: []) -> bool: """ Does the given text end with at least one of the endings """