diff --git a/delete.py b/delete.py index fc3e6cc6a..1a769e0bc 100644 --- a/delete.py +++ b/delete.py @@ -47,13 +47,13 @@ def send_delete_via_server(base_dir: str, session, print('WARN: No session for send_delete_via_server') return 6 - from_domain_full = get_full_domain(from_domain, from_port) + from_domain_full: str = get_full_domain(from_domain, from_port) - actor = local_actor_url(http_prefix, from_nickname, from_domain_full) - to_url = 'https://www.w3.org/ns/activitystreams#Public' - cc_url = actor + '/followers' + actor: str = local_actor_url(http_prefix, from_nickname, from_domain_full) + to_url: str = 'https://www.w3.org/ns/activitystreams#Public' + cc_url: str = actor + '/followers' - new_delete_json = { + new_delete_json: dict = { "@context": [ 'https://www.w3.org/ns/activitystreams', 'https://w3id.org/security/v1' @@ -65,10 +65,10 @@ def send_delete_via_server(base_dir: str, session, 'type': 'Delete' } - handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname + handle: str = http_prefix + '://' + from_domain_full + '/@' + from_nickname # lookup the inbox for the To handle - wf_request = \ + wf_request: dict = \ webfinger_handle(session, handle, http_prefix, cached_webfingers, from_domain, project_version, debug, False, signing_priv_key_pem, mitm_servers) @@ -81,10 +81,10 @@ def send_delete_via_server(base_dir: str, session, ' did not return a dict. ' + str(wf_request)) return 1 - post_to_box = 'outbox' + post_to_box: str = 'outbox' # get the actor inbox for the To handle - origin_domain = from_domain + origin_domain: str = from_domain (inbox_url, _, _, from_person_id, _, _, _, _) = get_person_box(signing_priv_key_pem, origin_domain, base_dir, session, @@ -104,14 +104,14 @@ def send_delete_via_server(base_dir: str, session, print('DEBUG: delete no actor was found for ' + handle) return 4 - auth_header = create_basic_auth_header(from_nickname, password) + auth_header: str = create_basic_auth_header(from_nickname, password) - headers = { + headers: dict = { 'host': from_domain, 'Content-type': 'application/json', 'Authorization': auth_header } - post_result = \ + post_result: str = \ post_json(http_prefix, from_domain_full, session, new_delete_json, [], inbox_url, headers, 3, True) if not post_result: @@ -144,15 +144,15 @@ def outbox_delete(base_dir: str, http_prefix: str, return if debug: print('DEBUG: c2s delete request arrived in outbox') - delete_prefix = http_prefix + '://' + domain - actor_url = get_actor_from_post(message_json) + delete_prefix: str = http_prefix + '://' + domain + actor_url: str = get_actor_from_post(message_json) if (not allow_deletion and (not message_json['object'].startswith(delete_prefix) or not actor_url.startswith(delete_prefix))): if debug: print('DEBUG: delete not permitted from other instances') return - message_id = remove_id_ending(message_json['object']) + message_id: str = remove_id_ending(message_json['object']) if '/statuses/' not in message_id: if debug: print('DEBUG: c2s delete object is not a status') @@ -161,22 +161,22 @@ def outbox_delete(base_dir: str, http_prefix: str, if debug: print('DEBUG: c2s delete object has no nickname') return - delete_nickname = get_nickname_from_actor(message_id) + delete_nickname: str = get_nickname_from_actor(message_id) if delete_nickname != nickname: if debug: print("DEBUG: you can't delete a post which " + "wasn't created by you (nickname does not match)") return delete_domain, _ = get_domain_from_actor(message_id) - domain = remove_domain_port(domain) + domain: str = remove_domain_port(domain) if delete_domain != domain: if debug: print("DEBUG: you can't delete a post which " + "wasn't created by you (domain does not match)") return remove_moderation_post_from_index(base_dir, message_id, debug) - post_filename = locate_post(base_dir, delete_nickname, delete_domain, - message_id) + post_filename: str = locate_post(base_dir, delete_nickname, delete_domain, + message_id) if not post_filename: if debug: print('DEBUG: c2s delete post not found in inbox or outbox') @@ -191,14 +191,14 @@ def outbox_delete(base_dir: str, http_prefix: str, def remove_old_hashtags(base_dir: str, max_months: int) -> str: """Remove old hashtags """ - max_months = min(max_months, 11) + max_months: int = min(max_months, 11) prev_date = date_from_numbers(1970, 1 + max_months, 1, 0, 0) - max_days_since_epoch = (date_utcnow() - prev_date).days + max_days_since_epoch: int = (date_utcnow() - prev_date).days remove_hashtags: list[str] = [] for _, _, files in os.walk(base_dir + '/tags'): for fname in files: - tags_filename = os.path.join(base_dir + '/tags', fname) + tags_filename: str = os.path.join(base_dir + '/tags', fname) if not is_a_file(tags_filename): continue # get last modified datetime diff --git a/donate.py b/donate.py index f46df5707..53ee2176b 100644 --- a/donate.py +++ b/donate.py @@ -31,7 +31,7 @@ def get_donation_url(actor_json: {}) -> str: return '' if not isinstance(actor_json['attachment'], list): return '' - donation_type_list = _get_donation_types() + donation_type_list: list[str] = _get_donation_types() for property_value in actor_json['attachment']: if not isinstance(property_value, dict): print("WARN: actor attachment is not dict: " + str(property_value)) @@ -99,8 +99,8 @@ def set_donation_url(actor_json: {}, donate_url: str) -> None: if not actor_json.get('attachment'): actor_json['attachment']: list[dict] = [] - donation_type = _get_donation_types() - donate_name = None + donation_type: list[str] = _get_donation_types() + donate_name: str = None for payment_service in donation_type: if payment_service in donate_url: donate_name = payment_service @@ -108,7 +108,7 @@ def set_donation_url(actor_json: {}, donate_url: str) -> None: return # remove any existing value - property_found = None + property_found: str = None for property_value in actor_json['attachment']: if not isinstance(property_value, dict): print("WARN: actor attachment is not dict: " + str(property_value)) @@ -133,7 +133,7 @@ def set_donation_url(actor_json: {}, donate_url: str) -> None: if not_url: return - donate_value = \ + donate_value: str = \ '' + \ donate_url + '' diff --git a/enigma.py b/enigma.py index 0c13dfde9..4bfac7709 100644 --- a/enigma.py +++ b/enigma.py @@ -59,7 +59,7 @@ def set_enigma_pub_key(actor_json: {}, enigma_pub_key: str) -> None: actor_json['attachment']: list[dict] = [] # remove any existing value - property_found = None + property_found: str = None for property_value in actor_json['attachment']: if not isinstance(property_value, dict): print("WARN: actor attachment is not dict: " + str(property_value)) diff --git a/filters.py b/filters.py index 073a926ba..769f6d2f9 100644 --- a/filters.py +++ b/filters.py @@ -26,7 +26,8 @@ def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool: """Adds a filter for particular words within the content of a incoming posts """ - filters_filename = acct_dir(base_dir, nickname, domain) + '/filters.txt' + filters_filename: str = \ + acct_dir(base_dir, nickname, domain) + '/filters.txt' if is_a_file(filters_filename): if text_in_file(words, filters_filename): return False @@ -44,7 +45,7 @@ def add_global_filter(base_dir: str, words: str) -> bool: return False if len(words) < 2: return False - filters_filename = data_dir(base_dir) + '/filters.txt' + filters_filename: str = data_dir(base_dir) + '/filters.txt' if is_a_file(filters_filename): if text_in_file(words, filters_filename): return False @@ -58,12 +59,13 @@ def remove_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool: """Removes a word filter """ - filters_filename = acct_dir(base_dir, nickname, domain) + '/filters.txt' + filters_filename: str = \ + acct_dir(base_dir, nickname, domain) + '/filters.txt' if not is_a_file(filters_filename): return False if not text_in_file(words, filters_filename): return False - new_filters_filename = filters_filename + '.new' + new_filters_filename: str = filters_filename + '.new' filters_list: list[str] = \ load_list(filters_filename, @@ -92,12 +94,12 @@ def remove_filter(base_dir: str, nickname: str, domain: str, def remove_global_filter(base_dir: str, words: str) -> bool: """Removes a global word filter """ - filters_filename = data_dir(base_dir) + '/filters.txt' + filters_filename: str = data_dir(base_dir) + '/filters.txt' if not is_a_file(filters_filename): return False if not text_in_file(words, filters_filename): return False - new_filters_filename = filters_filename + '.new' + new_filters_filename: str = filters_filename + '.new' global_list: list[str] = \ load_list(filters_filename, @@ -126,7 +128,7 @@ def remove_global_filter(base_dir: str, words: str) -> bool: def _is_twitter_post(content: str) -> bool: """Returns true if the given post content is a retweet or twitter crosspost """ - features = ( + features: list[str] = ( '/x.com', '/twitter.', '/nitter.', '@twitter.', '@nitter.', '@x.com', '>RT <', '_tw<', '_tw@', 'tweet', 'Tweet', '🐦🔗' @@ -157,7 +159,7 @@ def _is_filtered_base(filename: str, content: str, if not is_a_file(filename): return False - content = remove_inverted_text(content, system_language) + content: str = remove_inverted_text(content, system_language) content = remove_square_capitals(content, system_language) # convert any fancy characters to ordinary ones @@ -168,7 +170,7 @@ def _is_filtered_base(filename: str, content: str, 'EX: _is_filtered_base ' + filename + ' [ex]') if filtered_list is not None: for line in filtered_list: - filter_str = remove_eol(line) + filter_str: str = remove_eol(line) if not filter_str: continue if len(filter_str) < 2: @@ -177,7 +179,8 @@ def _is_filtered_base(filename: str, content: str, if filtered_match(filter_str, content): return True else: - filter_words = filter_str.replace('"', '').split('+') + filter_words: list[str] = \ + filter_str.replace('"', '').split('+') for filter_wrd in filter_words: if not filtered_match(filter_wrd, content): return False @@ -189,7 +192,7 @@ def is_filtered_globally(base_dir: str, content: str, system_language: str) -> bool: """Is the given content globally filtered? """ - global_filters_filename = data_dir(base_dir) + '/filters.txt' + global_filters_filename: str = data_dir(base_dir) + '/filters.txt' if _is_filtered_base(global_filters_filename, content, system_language): return True @@ -207,7 +210,7 @@ def is_filtered_bio(base_dir: str, if not nickname or not domain: return False - account_filters_filename = \ + account_filters_filename: str = \ acct_dir(base_dir, nickname, domain) + '/filters_bio.txt' return _is_filtered_base(account_filters_filename, bio, system_language) @@ -226,12 +229,13 @@ def is_filtered(base_dir: str, nickname: str, domain: str, return False # optionally remove retweets - remove_twitter = acct_dir(base_dir, nickname, domain) + '/.removeTwitter' + remove_twitter: str = \ + acct_dir(base_dir, nickname, domain) + '/.removeTwitter' if is_a_file(remove_twitter): if _is_twitter_post(content): return True - account_filters_filename = \ + account_filters_filename: str = \ acct_dir(base_dir, nickname, domain) + '/filters.txt' return _is_filtered_base(account_filters_filename, content, system_language) @@ -242,12 +246,15 @@ def is_question_filtered(base_dir: str, nickname: str, domain: str, """is the given question filtered based on its options? """ if question_json.get('oneOf'): - question_options = question_json['oneOf'] + question_options: list[str] = question_json['oneOf'] else: - question_options = question_json['object']['oneOf'] + question_options: list[str] = question_json['object']['oneOf'] for option in question_options: - if option.get('name'): - if is_filtered(base_dir, nickname, domain, option['name'], - system_language): - return True + if not option.get('name'): + continue + if not isinstance(option['name'], str): + continue + if is_filtered(base_dir, nickname, domain, option['name'], + system_language): + return True return False diff --git a/fitnessFunctions.py b/fitnessFunctions.py index b740833e3..43356ba45 100644 --- a/fitnessFunctions.py +++ b/fitnessFunctions.py @@ -33,7 +33,7 @@ def fitness_performance(start_time, fitness_state: {}, "ctr": int(0) } - time_diff = float(time.time() - start_time) + time_diff: float = float(time.time() - start_time) fitness_state['performance'][fitness_id][watch_point]['total'] += time_diff fitness_state['performance'][fitness_id][watch_point]['ctr'] += 1 @@ -44,8 +44,9 @@ def fitness_performance(start_time, fitness_state: {}, 2) if debug: - ctr = fitness_state['performance'][fitness_id][watch_point]['ctr'] - total = fitness_state['performance'][fitness_id][watch_point]['total'] + ctr: int = fitness_state['performance'][fitness_id][watch_point]['ctr'] + total: float = \ + fitness_state['performance'][fitness_id][watch_point]['total'] print('FITNESS: performance/' + fitness_id + '/' + watch_point + '/' + str(total * 1000 / ctr)) @@ -73,16 +74,15 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str, max_entries: int) -> str: """Returns the html for a graph of watchpoints """ - watch_points_list = sorted_watch_points(fitness, fitness_id) + watch_points_list: list[str] = sorted_watch_points(fitness, fitness_id) - css_filename = base_dir + '/epicyon-graph.css' + css_filename: str = base_dir + '/epicyon-graph.css' if is_a_file(base_dir + '/graph.css'): css_filename = base_dir + '/graph.css' - instance_title = \ - get_config_param(base_dir, 'instanceTitle') + instance_title: str = get_config_param(base_dir, 'instanceTitle') preload_images: list[str] = [] - html_str = \ + html_str: str = \ html_header_with_external_style(css_filename, instance_title, None, preload_images) html_str += \ @@ -96,7 +96,7 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str, '\n' # get the maximum time - max_average_time = float(1) + max_average_time: float = float(1) if watch_points_list: max_average_time = float(watch_points_list[0].split(' ')[0]) for watch_point in watch_points_list: @@ -106,10 +106,10 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str, ctr: int = 0 for watch_point in watch_points_list: - name = watch_point.split(' ', 1)[1] - average_time = float(watch_point.split(' ')[0]) - height_percent = int(average_time * 100 / max_average_time) - time_ms = int(average_time) + name: str = watch_point.split(' ', 1)[1] + average_time: float = float(watch_point.split(' ')[0]) + height_percent: int = int(average_time * 100 / max_average_time) + time_ms: int = int(average_time) if height_percent == 0: continue html_str += \ @@ -128,7 +128,7 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str, def fitness_thread(base_dir: str, fitness: {}) -> None: """Thread used to save fitness function scores """ - fitness_filename = data_dir(base_dir) + '/fitness.json' + fitness_filename: str = data_dir(base_dir) + '/fitness.json' while True: # every 10 mins time.sleep(60 * 10) diff --git a/flags.py b/flags.py index ecab78421..676759965 100644 --- a/flags.py +++ b/flags.py @@ -38,7 +38,7 @@ def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool: """Is the given account a featured writer, appearing in the features timeline on news instances? """ - features_blocked_filename = \ + features_blocked_filename: str = \ acct_dir(base_dir, nickname, domain) + '/.nofeatures' return not is_a_file(features_blocked_filename) @@ -48,23 +48,23 @@ def is_dormant(base_dir: str, nickname: str, domain: str, actor: str, """Is the given followed actor dormant, from the standpoint of the given account """ - last_seen_filename = acct_dir(base_dir, nickname, domain) + \ + last_seen_filename: str = acct_dir(base_dir, nickname, domain) + \ '/lastseen/' + actor.replace('/', '#') + '.txt' if not is_a_file(last_seen_filename): return False - days_since_epoch_str = \ + days_since_epoch_str: str = \ load_string(last_seen_filename, 'EX: failed to read last seen ' + last_seen_filename) if days_since_epoch_str is None: return False if days_since_epoch_str: - days_since_epoch = int(days_since_epoch_str) + days_since_epoch: int = int(days_since_epoch_str) curr_time = date_utcnow() - curr_days_since_epoch = (curr_time - date_epoch()).days - time_diff_months = \ + curr_days_since_epoch: int = (curr_time - date_epoch()).days + time_diff_months: int = \ int((curr_days_since_epoch - days_since_epoch) / 30) if time_diff_months >= dormant_months: return True @@ -77,7 +77,7 @@ def is_editor(base_dir: str, nickname: str) -> bool: editors_file = data_dir(base_dir) + '/editors.txt' if not is_a_file(editors_file): - admin_name = get_config_param(base_dir, 'admin') + admin_name: str = get_config_param(base_dir, 'admin') if admin_name: if admin_name == nickname: return True @@ -90,12 +90,12 @@ def is_editor(base_dir: str, nickname: str) -> bool: lines = lines.split('\n') if not lines: - admin_name = get_config_param(base_dir, 'admin') + admin_name: str = get_config_param(base_dir, 'admin') if admin_name: if admin_name == nickname: return True for editor in lines: - editor = editor.strip('\n').strip('\r') + editor: str = editor.strip('\n').strip('\r') if editor == nickname: return True return False @@ -104,10 +104,10 @@ def is_editor(base_dir: str, nickname: str) -> bool: def is_artist(base_dir: str, nickname: str) -> bool: """Returns true if the given nickname is an artist """ - artists_file = data_dir(base_dir) + '/artists.txt' + artists_file: str = data_dir(base_dir) + '/artists.txt' if not is_a_file(artists_file): - admin_name = get_config_param(base_dir, 'admin') + admin_name: str = get_config_param(base_dir, 'admin') if admin_name: if admin_name == nickname: return True @@ -120,12 +120,12 @@ def is_artist(base_dir: str, nickname: str) -> bool: lines = lines.split('\n') if not lines: - admin_name = get_config_param(base_dir, 'admin') + admin_name: str = get_config_param(base_dir, 'admin') if admin_name: if admin_name == nickname: return True for artist in lines: - artist = artist.strip('\n').strip('\r') + artist: str = artist.strip('\n').strip('\r') if artist == nickname: return True return False @@ -151,7 +151,7 @@ def is_system_account(nickname: str) -> bool: def is_memorial_account(base_dir: str, nickname: str) -> bool: """Returns true if the given nickname is a memorial account """ - memorial_file = data_dir(base_dir) + '/memorial' + memorial_file: str = data_dir(base_dir) + '/memorial' if not is_a_file(memorial_file): return False memorial_list: list[str] = \ @@ -167,13 +167,13 @@ def is_memorial_account(base_dir: str, nickname: str) -> bool: def is_suspended(base_dir: str, nickname: str) -> bool: """Returns true if the given nickname is suspended """ - admin_nickname = get_config_param(base_dir, 'admin') + admin_nickname: str = get_config_param(base_dir, 'admin') if not admin_nickname: return False if nickname == admin_nickname: return False - suspended_filename = data_dir(base_dir) + '/suspended.txt' + suspended_filename: str = data_dir(base_dir) + '/suspended.txt' if is_a_file(suspended_filename): lines: list[str] = \ load_string(suspended_filename, @@ -196,7 +196,7 @@ def is_evil(domain: str) -> bool: return True # if a domain contains any of these strings then it is # declaring itself to be hostile - evil_emporium = ( + evil_emporium: list[str] = ( 'nazi', 'extremis', 'extreemis', 'gendercritic', 'kiwifarm', 'illegal', 'raplst', 'rapist', 'loli.', 'rapl.st', 'rapi.st', 'antivax', 'plandemic', 'terror' @@ -204,7 +204,7 @@ def is_evil(domain: str) -> bool: for hostile_str in evil_emporium: if hostile_str in domain: return True - evil_domains = evil_incarnate() + evil_domains: list[str] = evil_incarnate() for concentrated_evil in evil_domains: if domain.endswith(concentrated_evil): return True @@ -214,7 +214,7 @@ def is_evil(domain: str) -> bool: def is_local_network_address(ip_address: str) -> bool: """Is the given ip address local? """ - local_ips = get_local_network_addresses() + local_ips: list[str] = get_local_network_addresses() for ip_addr in local_ips: if ip_address.startswith(ip_addr): return True @@ -249,10 +249,10 @@ def is_public_post_from_url(base_dir: str, nickname: str, domain: str, post_url: str) -> bool: """Returns whether the given url is a public post """ - post_filename = locate_post(base_dir, nickname, domain, post_url) + post_filename: str = locate_post(base_dir, nickname, domain, post_url) if not post_filename: return False - post_json_object = load_json(post_filename) + post_json_object: dict = load_json(post_filename) if not post_json_object: return False return is_public_post(post_json_object) @@ -370,10 +370,10 @@ def is_recent_post(post_json_object: {}, max_days: int) -> bool: if not isinstance(post_json_object['object']['published'], str): return False curr_time = date_utcnow() - days_since_epoch = (curr_time - date_epoch()).days - recently = days_since_epoch - max_days + days_since_epoch: int = (curr_time - date_epoch()).days + recently: int = days_since_epoch - max_days - published_date_str = post_json_object['object']['published'] + published_date_str: str = post_json_object['object']['published'] if '.' in published_date_str: published_date_str = published_date_str.split('.')[0] + 'Z' @@ -385,8 +385,7 @@ def is_recent_post(post_json_object: {}, max_days: int) -> bool: str(published_date_str)) return False - published_days_since_epoch = \ - (published_date - date_epoch()).days + published_days_since_epoch: int = (published_date - date_epoch()).days if published_days_since_epoch < recently: return False return True @@ -418,7 +417,7 @@ def is_reply(post_json_object: {}, actor: str) -> bool: 'EncryptedMessage', 'ChatMessage', 'Article'): return False - reply_id = get_reply_to(post_json_object['object']) + reply_id: str = get_reply_to(post_json_object['object']) if reply_id: if isinstance(reply_id, str): if reply_id.startswith(actor): @@ -492,7 +491,7 @@ def is_group_actor(base_dir: str, actor: str, person_cache: {}, debug: bool = False) -> bool: """Is the given actor a group? """ - person_cache_actor = None + person_cache_actor: str = None if person_cache: if person_cache.get(actor): if person_cache[actor].get('actor'): @@ -508,7 +507,7 @@ def is_group_actor(base_dir: str, actor: str, person_cache: {}, if debug: print('Actor ' + actor + ' not in cache') - cached_actor_filename = \ + cached_actor_filename: str = \ base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json' if not is_a_file(cached_actor_filename): if debug: @@ -524,7 +523,7 @@ def is_group_actor(base_dir: str, actor: str, person_cache: {}, def is_group_account(base_dir: str, nickname: str, domain: str) -> bool: """Returns true if the given account is a group """ - account_filename = acct_dir(base_dir, nickname, domain) + '.json' + account_filename: str = acct_dir(base_dir, nickname, domain) + '.json' if not is_a_file(account_filename): return False if text_in_file('"type": "Group"', account_filename): @@ -538,7 +537,7 @@ def has_group_type(base_dir: str, actor: str, person_cache: {}, """ # does the actor path clearly indicate that this is a group? # eg. https://lemmy/c/groupname - group_paths = get_group_paths() + group_paths: list[str] = get_group_paths() for grp_path in group_paths: if grp_path in actor: if debug: @@ -566,7 +565,7 @@ def is_right_to_left_text(text: str) -> bool: Arabic \u0627-\u064a Hebrew/Yiddish \u0590-\u05FF\uFB2A-\uFB4E """ - unicode_str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \ + unicode_str: str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \ '[\u0590-\u05FF\uFB2A-\uFB4E]' pattern = re.compile(unicode_str) @@ -588,15 +587,15 @@ def is_valid_date(date_str: str) -> bool: if not section_str.isdigit(): return False if date_sect_ctr == 0: - date_year = int(section_str) + date_year: int = int(section_str) if date_year < 1920 or date_year > 3000: return False elif date_sect_ctr == 1: - date_month = int(section_str) + date_month: int = int(section_str) if date_month < 1 or date_month > 12: return False elif date_sect_ctr == 2: - date_day = int(section_str) + date_day: int = int(section_str) if date_day < 1 or date_day > 31: return False date_sect_ctr += 1 @@ -606,7 +605,7 @@ def is_valid_date(date_str: str) -> bool: def is_premium_account(base_dir: str, nickname: str, domain: str) -> bool: """ Is the given account a premium one? """ - premium_filename = acct_dir(base_dir, nickname, domain) + '/.premium' + premium_filename: str = acct_dir(base_dir, nickname, domain) + '/.premium' return is_a_file(premium_filename) @@ -626,7 +625,7 @@ def url_permitted(url: str, federation_list: []) -> bool: def is_corporate(server_name: str) -> bool: """Is the given server name a corporate leech? """ - server_lower = server_name.lower() + server_lower: str = server_name.lower() if 'google' in server_lower or \ 'cloudflare' in server_lower or \ 'facebook' in server_lower or \ @@ -647,11 +646,11 @@ def can_reply_to(base_dir: str, nickname: str, domain: str, if '/statuses/' not in post_url: return True if not post_json_object: - post_filename = locate_post(base_dir, nickname, domain, post_url) + post_filename: str = locate_post(base_dir, nickname, domain, post_url) if not post_filename: # the post is not stored locally return True - post_json_object = load_json(post_filename) + post_json_object: dict = load_json(post_filename) if not post_json_object: return False published = get_published_date(post_json_object) @@ -671,7 +670,7 @@ def can_reply_to(base_dir: str, nickname: str, domain: str, print('EX: can_reply_to unrecognized current date ' + str(curr_date_str)) return False - hours_since_publication = \ + hours_since_publication: int = \ int((curr_date - pub_date).total_seconds() / 3600) if hours_since_publication < 0 or \ hours_since_publication >= reply_interval_hours: @@ -701,8 +700,8 @@ def local_only_is_local(message_json: {}, domain_full: str) -> bool: return False # check that the sender is local - attrib_field = message_json['object']['attributedTo'] - local_actor = get_attributed_to(attrib_field) + attrib_field: str | dict = message_json['object']['attributedTo'] + local_actor: str = get_attributed_to(attrib_field) local_domain, local_port = \ get_domain_from_actor(local_actor) if local_domain: @@ -719,10 +718,10 @@ def local_only_is_local(message_json: {}, domain_full: str) -> bool: def is_moderator(base_dir: str, nickname: str) -> bool: """Returns true if the given nickname is a moderator """ - moderators_file = data_dir(base_dir) + '/moderators.txt' + moderators_file: str = data_dir(base_dir) + '/moderators.txt' if not is_a_file(moderators_file): - admin_name = get_config_param(base_dir, 'admin') + admin_name: str = get_config_param(base_dir, 'admin') if not admin_name: return False if admin_name == nickname: @@ -736,13 +735,13 @@ def is_moderator(base_dir: str, nickname: str) -> bool: lines = lines.split('\n') if not lines: - admin_name = get_config_param(base_dir, 'admin') + admin_name: str = get_config_param(base_dir, 'admin') if not admin_name: return False if admin_name == nickname: return True for moderator in lines: - moderator = moderator.strip('\n').strip('\r') + moderator: str = moderator.strip('\n').strip('\r') if moderator == nickname: return True return False diff --git a/follow.py b/follow.py index edf934a86..00a157779 100644 --- a/follow.py +++ b/follow.py @@ -61,16 +61,16 @@ def create_initial_last_seen(base_dir: str, http_prefix: str) -> None: The lastseen files are used to generate the Zzz icons on follows/following lists on the profile screen. """ - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: if not is_account_dir(acct): continue - account_dir = os.path.join(dir_str, acct) - following_filename = account_dir + '/following.txt' + account_dir: str = os.path.join(dir_str, acct) + following_filename: str = account_dir + '/following.txt' if not is_a_file(following_filename): continue - last_seen_dir = account_dir + '/lastseen' + last_seen_dir: str = account_dir + '/lastseen' if not is_a_dir(last_seen_dir): makedir(last_seen_dir) following_handles: list[str] = \ @@ -84,17 +84,17 @@ def create_initial_last_seen(base_dir: str, http_prefix: str) -> None: continue if '@' not in handle: continue - handle = remove_eol(handle) - nickname = handle.split('@')[0] - domain = handle.split('@')[1] + handle: str = remove_eol(handle) + nickname: str = handle.split('@')[0] + domain: str = handle.split('@')[1] if nickname.startswith('!'): nickname = nickname[1:] - actor = local_actor_url(http_prefix, nickname, domain) - last_seen_filename = \ + actor: str = local_actor_url(http_prefix, nickname, domain) + last_seen_filename: str = \ last_seen_dir + '/' + actor.replace('/', '#') + '.txt' if is_a_file(last_seen_filename): continue - text = str(100) + text: str = str(100) save_string(text, last_seen_filename, 'EX: create_initial_last_seen 2 ' + last_seen_filename) @@ -106,8 +106,8 @@ def _pre_approved_follower(base_dir: str, approve_handle: str) -> bool: """Is the given handle an already manually approved follower? """ - account_dir = acct_dir(base_dir, nickname, domain) - approved_filename = account_dir + '/approved.txt' + account_dir: str = acct_dir(base_dir, nickname, domain) + approved_filename: str = account_dir + '/approved.txt' if is_a_file(approved_filename): if text_in_file(approve_handle, approved_filename): return True @@ -120,21 +120,21 @@ def _remove_from_follow_base(base_dir: str, debug: bool) -> None: """Removes a handle/actor from follow requests or rejects file """ - accounts_dir = acct_dir(base_dir, nickname, domain) - approve_follows_filename = accounts_dir + '/' + follow_file + '.txt' + accounts_dir: str = acct_dir(base_dir, nickname, domain) + approve_follows_filename: str = accounts_dir + '/' + follow_file + '.txt' if not is_a_file(approve_follows_filename): if debug: print('There is no ' + follow_file + ' to remove ' + nickname + '@' + domain + ' from') return - accept_deny_actor = None + accept_deny_actor: str = None if not text_in_file(accept_or_deny_handle, approve_follows_filename): # is this stored in the file as an actor rather than a handle? - accept_deny_nickname = accept_or_deny_handle.split('@')[0] - accept_deny_domain = accept_or_deny_handle.split('@')[1] + accept_deny_nickname: str = accept_or_deny_handle.split('@')[0] + accept_deny_domain: str = accept_or_deny_handle.split('@')[1] # for each possible users path construct an actor and # check if it exists in the file - users_paths = get_user_paths() + users_paths: list[str] = get_user_paths() actor_found: bool = False for users_name in users_paths: accept_deny_actor = \ @@ -200,17 +200,17 @@ def is_following_actor(base_dir: str, The actor can also be a handle: nickname@domain """ domain = remove_domain_port(domain) - accounts_dir = acct_dir(base_dir, nickname, domain) + accounts_dir: str = acct_dir(base_dir, nickname, domain) if not is_a_dir(accounts_dir): return False - following_file = accounts_dir + '/following.txt' + following_file: str = accounts_dir + '/following.txt' if not is_a_file(following_file): return False if actor.startswith('@'): actor = actor[1:] if text_in_file(actor, following_file, False): return True - following_nickname = get_nickname_from_actor(actor) + following_nickname: str = get_nickname_from_actor(actor) if not following_nickname: print('WARN: unable to find nickname in ' + actor) return False @@ -218,7 +218,7 @@ def is_following_actor(base_dir: str, if not following_domain: print('WARN: unable to find domain in ' + actor) return False - following_handle = \ + following_handle: str = \ get_full_domain(following_nickname + '@' + following_domain, following_port) if text_in_file(following_handle, following_file, False): @@ -242,7 +242,8 @@ def get_follower_domains(base_dir: str, nickname: str, domain: str) -> []: """Returns a list of domains for followers """ domain = remove_domain_port(domain) - followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt' + followers_file: str = \ + acct_dir(base_dir, nickname, domain) + '/followers.txt' if not is_a_file(followers_file): return [] @@ -254,7 +255,7 @@ def get_follower_domains(base_dir: str, nickname: str, domain: str) -> []: domains_list: list[str] = [] for handle in lines: - handle = remove_eol(handle) + handle: str = remove_eol(handle) follower_domain, _ = get_domain_from_actor(handle) if not follower_domain: continue @@ -275,30 +276,31 @@ def is_follower_of_person(base_dir: str, nickname: str, domain: str, print('No follower_nickname for ' + follower_domain) return False domain = remove_domain_port(domain) - followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt' + followers_file: str = \ + acct_dir(base_dir, nickname, domain) + '/followers.txt' if not is_a_file(followers_file): return False - handle = follower_nickname + '@' + follower_domain + handle: str = follower_nickname + '@' + follower_domain already_following: bool = False - followers_str = load_string(followers_file, - 'EX: is_follower_of_person ' + - followers_file) + followers_str: str = load_string(followers_file, + 'EX: is_follower_of_person ' + + followers_file) if followers_str is None: - followers_str: str = '' + followers_str = '' if handle in followers_str: already_following = True else: - paths = get_user_paths() + paths: list[str] = get_user_paths() for user_path in paths: - url = '://' + follower_domain + user_path + follower_nickname + url: str = '://' + follower_domain + user_path + follower_nickname if url in followers_str: already_following = True break if not already_following: - url = '://' + follower_domain + '/' + follower_nickname + url: str = '://' + follower_domain + '/' + follower_nickname if url in followers_str: already_following = True @@ -312,24 +314,24 @@ def unfollow_account(base_dir: str, nickname: str, domain: str, """Removes a person to the follow list """ domain = remove_domain_port(domain) - handle = nickname + '@' + domain - handle_to_unfollow = follow_nickname + '@' + follow_domain + handle: str = nickname + '@' + domain + handle_to_unfollow: str = follow_nickname + '@' + follow_domain if group_account: handle_to_unfollow = '!' + handle_to_unfollow - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) if not is_a_dir(dir_str): makedir(dir_str) - handle_dir = acct_handle_dir(base_dir, handle) + handle_dir: str = acct_handle_dir(base_dir, handle) if not is_a_dir(handle_dir): makedir(handle_dir) - accounts_dir = acct_dir(base_dir, nickname, domain) - filename = accounts_dir + '/' + follow_file + accounts_dir: str = acct_dir(base_dir, nickname, domain) + filename: str = accounts_dir + '/' + follow_file if not is_a_file(filename): if debug: print('DEBUG: follow file ' + filename + ' was not found') return False - handle_to_unfollow_lower = handle_to_unfollow.lower() + handle_to_unfollow_lower: str = handle_to_unfollow.lower() if not text_in_file(handle_to_unfollow_lower, filename, False): if debug: print('DEBUG: handle to unfollow ' + handle_to_unfollow + @@ -354,7 +356,7 @@ def unfollow_account(base_dir: str, nickname: str, domain: str, # write to an unfollowed file so that if a follow accept # later arrives then it can be ignored - unfollowed_filename = accounts_dir + '/unfollowed.txt' + unfollowed_filename: str = accounts_dir + '/unfollowed.txt' if is_a_file(unfollowed_filename): if not text_in_file(handle_to_unfollow_lower, unfollowed_filename, False): @@ -383,13 +385,13 @@ def clear_follows(base_dir: str, nickname: str, domain: str, follow_file: str) -> None: """Removes all follows """ - dir_str = data_dir(base_dir) + dir_str: str = data_dir(base_dir) if not is_a_dir(dir_str): makedir(dir_str) - accounts_dir = acct_dir(base_dir, nickname, domain) + accounts_dir: str = acct_dir(base_dir, nickname, domain) if not is_a_dir(accounts_dir): makedir(accounts_dir) - filename = accounts_dir + '/' + follow_file + filename: str = accounts_dir + '/' + follow_file if is_a_file(filename): erase_file(filename, 'EX: clear_follows unable to delete ' + filename) @@ -409,8 +411,8 @@ def _get_no_of_follows(base_dir: str, nickname: str, domain: str, # account holders # if not authenticated: # return 9999 - accounts_dir = acct_dir(base_dir, nickname, domain) - filename = accounts_dir + '/' + follow_file + accounts_dir: str = acct_dir(base_dir, nickname, domain) + filename: str = accounts_dir + '/' + follow_file if not is_a_file(filename): return 0 ctr: int = 0 @@ -455,10 +457,10 @@ def get_following_feed(base_dir: str, domain: str, port: int, path: str, if '/' + follow_file not in path: return None # handle page numbers - header_only = True - page_number = None + header_only: bool = True + page_number: int = None if '?page=' in path: - page_number: int = path.split('?page=')[1] + page_number = path.split('?page=')[1] if len(page_number) > 5: page_number = "1" if page_number == 'true' or not authorized: @@ -510,7 +512,7 @@ def get_following_feed(base_dir: str, domain: str, port: int, path: str, if not page_number: page_number = 1 - next_page_number = int(page_number + 1) + next_page_number: int = int(page_number + 1) following_of_actor = local_actor_url(http_prefix, nickname, domain) part_of_str = following_of_actor + '/' + follow_file collection_id = part_of_str + '?page=' + str(page_number) diff --git a/followerSync.py b/followerSync.py index 0c02a30a4..7df0e9d9b 100644 --- a/followerSync.py +++ b/followerSync.py @@ -21,7 +21,7 @@ def remove_followers_sync(followers_sync_cache: {}, """Remove an entry within the followers synchronization cache, so that it will subsequently be regenerated """ - foll_sync_key = nickname + ':' + follower_domain + foll_sync_key: str = nickname + ':' + follower_domain if not followers_sync_cache.get(foll_sync_key): return del followers_sync_cache[foll_sync_key] @@ -33,7 +33,7 @@ def _get_followers_for_domain(base_dir: str, """Returns the followers for a given domain this is used for followers synchronization """ - followers_filename = \ + followers_filename: str = \ acct_dir(base_dir, nickname, domain) + '/followers.txt' if not is_a_file(followers_filename): return [] @@ -46,20 +46,20 @@ def _get_followers_for_domain(base_dir: str, foll_text: str = '' if search_domain not in foll_text: return [] - lines = foll_text.splitlines() + lines: list[str] = foll_text.splitlines() result: list[str] = [] for line_str in lines: if search_domain not in line_str: continue if line_str.endswith('@' + search_domain): - nick = line_str.split('@')[0] - paths_list = get_user_paths() + nick: str = line_str.split('@')[0] + paths_list: list[str] = get_user_paths() found: bool = False for prefix in ('https', 'http'): if found: break for possible_path in paths_list: - url = prefix + '://' + search_domain + \ + url: str = prefix + '://' + search_domain + \ possible_path + nick filename = base_dir + '/cache/actors/' + \ url.replace('/', '#') + '.json' @@ -70,7 +70,7 @@ def _get_followers_for_domain(base_dir: str, found = True break if not found: - url = prefix + '://' + search_domain + '/' + nick + url: str = prefix + '://' + search_domain + '/' + nick filename = base_dir + '/cache/actors/' + \ url.replace('/', '#') + '.json' if is_a_file(filename): @@ -91,13 +91,13 @@ def _get_followers_sync_json(base_dir: str, See https://codeberg.org/fediverse/fep/src/branch/main/fep/8fcf/fep-8fcf.md """ - sync_list = \ + sync_list: list[str] = \ _get_followers_for_domain(base_dir, nickname, domain, search_domain) - actor = http_prefix + '://' + domain_full + '/users/' + nickname - id_str = actor + '/followers?domain=' + search_domain - sync_json = { + actor: str = http_prefix + '://' + domain_full + '/users/' + nickname + id_str: str = actor + '/followers?domain=' + search_domain + sync_json: dict = { "@context": [ 'https://www.w3.org/ns/activitystreams', 'https://w3id.org/security/v1' @@ -142,18 +142,18 @@ def update_followers_sync_cache(base_dir: str, """Updates the followers synchronization cache https://codeberg.org/fediverse/fep/src/branch/main/fep/8fcf/fep-8fcf.md """ - foll_sync_key = nickname + ':' + calling_domain + foll_sync_key: str = nickname + ':' + calling_domain if sync_cache.get(foll_sync_key): - sync_hash = sync_cache[foll_sync_key]['hash'] - sync_json = sync_cache[foll_sync_key]['response'] + sync_hash: str = sync_cache[foll_sync_key]['hash'] + sync_json: dict = sync_cache[foll_sync_key]['response'] else: - sync_json = \ + sync_json: dict = \ _get_followers_sync_json(base_dir, nickname, domain, http_prefix, domain_full, calling_domain) - sync_hash = get_followers_sync_hash(sync_json) + sync_hash: str = get_followers_sync_hash(sync_json) if sync_hash: sync_cache[foll_sync_key] = { "hash": sync_hash, diff --git a/followingCalendar.py b/followingCalendar.py index e8919f027..9023c432d 100644 --- a/followingCalendar.py +++ b/followingCalendar.py @@ -66,16 +66,16 @@ def receiving_calendar_events(base_dir: str, nickname: str, domain: str, if following_nickname == nickname and following_domain == domain: # reminder post return True - calendar_filename = \ + calendar_filename: str = \ _dir_acct(base_dir, nickname, domain) + '/followingCalendar.txt' - handle = following_nickname + '@' + following_domain + handle: str = following_nickname + '@' + following_domain if not is_a_file(calendar_filename): - following_filename = \ + following_filename: str = \ _dir_acct(base_dir, nickname, domain) + '/following.txt' if not is_a_file(following_filename): return False # create a new calendar file from the following file - following_handles = \ + following_handles: str = \ load_string(following_filename, 'EX: receiving_calendar_events ' + following_filename) if following_handles: @@ -92,21 +92,21 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str, indicating whether to receive calendar events from that account """ # check that a following file exists - domain = _port_domain_remove(domain) - following_filename = \ + domain: str = _port_domain_remove(domain) + following_filename: str = \ _dir_acct(base_dir, nickname, domain) + '/following.txt' if not is_a_file(following_filename): print("WARN: following.txt doesn't exist for " + nickname + '@' + domain) return - handle = following_nickname + '@' + following_domain + handle: str = following_nickname + '@' + following_domain # check that you are following this handle if not _text_in_file2(handle + '\n', following_filename, False): print('WARN: ' + handle + ' is not in ' + following_filename) return - calendar_filename = \ + calendar_filename: str = \ _dir_acct(base_dir, nickname, domain) + '/followingCalendar.txt' # get the contents of the calendar file, which is @@ -118,7 +118,7 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str, load_string(calendar_filename, 'EX: _receive_calendar_events ' + calendar_filename) if following_handles is None: - following_handles: str = '' + following_handles = '' else: # create a new calendar file from the following file print('Creating calendar file ' + calendar_filename) @@ -140,8 +140,8 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str, return # remove from calendar file new_following_handles: str = '' - following_handles_list = following_handles.split('\n') - handle_lower = handle.lower() + following_handles_list: list[str] = following_handles.split('\n') + handle_lower: str = handle.lower() for followed in following_handles_list: if followed.lower() != handle_lower: new_following_handles += followed + '\n'