Add list types

merge-requests/30/head
Bob Mottram 2024-12-23 17:45:20 +00:00
parent 4eaa0f56ee
commit a859ff14ac
54 changed files with 280 additions and 280 deletions

View File

@ -1138,9 +1138,9 @@ def run_daemon(accounts_data_dir: str,
httpd.getreq_busy = False
httpd.postreq_busy = False
httpd.received_message = False
httpd.inbox_queue = []
httpd.inbox_queue: list[dict] = []
httpd.send_threads = send_threads
httpd.post_log = []
httpd.post_log: list[str] = []
httpd.max_queue_length = 64
httpd.allow_deletion = allow_deletion
httpd.last_login_time = 0
@ -1157,7 +1157,7 @@ def run_daemon(accounts_data_dir: str,
# create a cache of blocked domains in memory.
# This limits the amount of slow disk reads which need to be done
httpd.blocked_cache = []
httpd.blocked_cache: list[str] = []
httpd.blocked_cache_last_updated = 0
httpd.blocked_cache_update_secs = 120
httpd.blocked_cache_last_updated = \
@ -1307,7 +1307,7 @@ def run_daemon(accounts_data_dir: str,
si_federation_tokens)
# load peertube instances from file into a list
httpd.peertube_instances = []
httpd.peertube_instances: list[str] = []
load_peertube_instances(base_dir, httpd.peertube_instances)
create_initial_last_seen(base_dir, http_prefix)

View File

@ -901,7 +901,7 @@ def daemon_http_get(self) -> None:
print('Authorization token refused for ' +
'offers collection federation')
# show offers collection for federation
offers_json = []
offers_json: list[dict] = []
if has_accept(self, calling_domain) and \
offers_collection_authorized:
if self.server.debug:
@ -1102,7 +1102,7 @@ def daemon_http_get(self) -> None:
print('Authorization token refused for ' +
'wanted collection federation')
# show wanted collection for federation
wanted_json = []
wanted_json: list[dict] = []
if has_accept(self, calling_domain) and \
wanted_collection_authorized:
if self.server.debug:
@ -4114,7 +4114,7 @@ def daemon_http_get(self) -> None:
# reply from the web interface icon
in_reply_to_url = None
reply_to_list = []
reply_to_list: list[str] = []
reply_page_number = 1
reply_category = ''
share_description = None
@ -6380,7 +6380,7 @@ def _show_known_crawlers(self, calling_domain: str, path: str,
return False
if not is_moderator(base_dir, nickname):
return False
crawlers_list = []
crawlers_list: list[str] = []
curr_time = int(time.time())
recent_crawlers = 60 * 60 * 24 * 30
for ua_str, item in known_crawlers.items():

View File

@ -139,7 +139,7 @@ def bookmark_button(self, calling_domain: str, path: str,
return
bookmark_actor = \
local_actor_url(http_prefix, self.post_to_nickname, domain_full)
cc_list = []
cc_list: list[str] = []
bookmark_post(recent_posts_cache,
base_dir, federation_list,
self.post_to_nickname, domain, port,
@ -348,7 +348,7 @@ def bookmark_button_undo(self, calling_domain: str, path: str,
return
undo_actor = \
local_actor_url(http_prefix, self.post_to_nickname, domain_full)
cc_list = []
cc_list: list[str] = []
undo_bookmark_post(recent_posts_cache,
base_dir, federation_list,
self.post_to_nickname,

View File

@ -103,7 +103,7 @@ def show_media_timeline(self, authorized: bool,
0, positive_voting,
voting_time_mins)
if not inbox_media_feed:
inbox_media_feed = []
inbox_media_feed: list[dict] = []
if request_http(self.headers, debug):
nickname = path.replace('/users/', '')
nickname = nickname.replace('/tlmedia', '')
@ -319,7 +319,7 @@ def show_blogs_timeline(self, authorized: bool,
0, positive_voting,
voting_time_mins)
if not inbox_blogs_feed:
inbox_blogs_feed = []
inbox_blogs_feed: list[dict] = []
if request_http(self.headers, debug):
nickname = path.replace('/users/', '')
nickname = nickname.replace('/tlblogs', '')
@ -531,7 +531,7 @@ def show_news_timeline(self, authorized: bool,
positive_voting,
voting_time_mins)
if not inbox_news_feed:
inbox_news_feed = []
inbox_news_feed: list[dict] = []
if request_http(self.headers, debug):
nickname = path.replace('/users/', '')
nickname = nickname.replace('/tlnews', '')
@ -745,7 +745,7 @@ def show_features_timeline(self, authorized: bool,
positive_voting,
voting_time_mins)
if not inbox_features_feed:
inbox_features_feed = []
inbox_features_feed: list[dict] = []
if request_http(self.headers, debug):
nickname = path.replace('/users/', '')
nickname = nickname.replace('/tlfeatures', '')
@ -2069,7 +2069,7 @@ def show_replies(self, authorized: bool,
0, positive_voting,
voting_time_mins)
if not inbox_replies_feed:
inbox_replies_feed = []
inbox_replies_feed: list[dict] = []
if request_http(self.headers, debug):
nickname = path.replace('/users/', '')
nickname = nickname.replace('/tlreplies', '')

View File

@ -298,7 +298,7 @@ def citations_update(self, calling_domain: str, cookie: str,
fields = \
extract_text_fields_in_post(post_bytes, boundary, debug, None)
print('citationstest: ' + str(fields))
citations = []
citations: list[str] = []
for ctr in range(0, 128):
field_name = 'newswire' + str(ctr)
if not fields.get(field_name):

View File

@ -290,7 +290,7 @@ def _profile_post_peertube_instances(base_dir: str, fields: {}, self,
def _profile_post_block_federated(base_dir: str, fields: {}, self) -> None:
""" HTTP POST save blocking API endpoints
"""
block_ep_new = []
block_ep_new: list[str] = []
if fields.get('blockFederated'):
block_federated_str = fields['blockFederated']
block_ep_new = block_federated_str.split('\n')
@ -370,7 +370,7 @@ def _profile_post_buy_domains(base_dir: str, fields: {}, self) -> None:
def _profile_post_crawlers_allowed(base_dir: str, fields: {}, self) -> None:
""" HTTP POST save allowed web crawlers
"""
crawlers_allowed = []
crawlers_allowed: list[str] = []
if fields.get('crawlersAllowedStr'):
crawlers_allowed_str = fields['crawlersAllowedStr']
crawlers_allowed_list = crawlers_allowed_str.split('\n')
@ -392,7 +392,7 @@ def _profile_post_crawlers_allowed(base_dir: str, fields: {}, self) -> None:
def _profile_post_blocked_user_agents(base_dir: str, fields: {}, self) -> None:
""" HTTP POST save blocked user agents
"""
user_agents_blocked = []
user_agents_blocked: list[str] = []
if fields.get('userAgentsBlockedStr'):
user_agents_blocked_str = fields['userAgentsBlockedStr']
user_agents_blocked_list = user_agents_blocked_str.split('\n')
@ -560,7 +560,7 @@ def _profile_post_auto_cw(base_dir: str, nickname: str, domain: str,
print('EX: _profile_edit ' +
'unable to delete ' +
auto_cw_filename)
self.server.auto_cw_cache[nickname] = []
self.server.auto_cw_cache[nickname]: list[str] = []
def _profile_post_autogenerated_tags(base_dir: str,
@ -1578,7 +1578,7 @@ def _profile_post_bio(actor_json: {}, fields: {},
""" HTTP POST change user bio
"""
featured_tags = get_featured_hashtags(actor_json) + ' '
actor_json['tag'] = []
actor_json['tag']: list[dict] = []
if fields.get('bio'):
if fields['bio'] != actor_json['summary']:
bio_str = remove_html(fields['bio'])
@ -1613,7 +1613,7 @@ def _profile_post_alsoknownas(actor_json: {}, fields: {},
actor_changed: bool) -> bool:
""" HTTP POST Other accounts (alsoKnownAs)
"""
also_known_as = []
also_known_as: list[str] = []
if actor_json.get('alsoKnownAs'):
also_known_as = actor_json['alsoKnownAs']
if fields.get('alsoKnownAs'):
@ -1632,7 +1632,7 @@ def _profile_post_alsoknownas(actor_json: {}, fields: {},
fields['alsoKnownAs'] = \
fields['alsoKnownAs'].replace(';', ',')
new_also_known_as = fields['alsoKnownAs'].split(',')
also_known_as = []
also_known_as: list[str] = []
for alt_actor in new_also_known_as:
alt_actor = alt_actor.strip()
if resembles_url(alt_actor):

View File

@ -399,9 +399,9 @@ def _receive_new_post_process_editblog(self, fields: {},
post_json_object['object']['summary'] = \
fields['subject']
# format message
tags = []
tags: list[dict] = []
hashtags_dict = {}
mentioned_recipients = []
mentioned_recipients: list[str] = []
fields['message'] = \
add_html_tags(base_dir, http_prefix,
nickname, domain,
@ -410,7 +410,7 @@ def _receive_new_post_process_editblog(self, fields: {},
hashtags_dict,
translate, True)
# replace emoji with unicode
tags = []
tags: list[dict] = []
for _, tag in hashtags_dict.items():
tags.append(tag)
# get list of tags
@ -1222,7 +1222,7 @@ def _receive_new_post_process_newquestion(self, fields: {},
return NEW_POST_FAILED
if not fields.get('message'):
return NEW_POST_FAILED
q_options = []
q_options: list[str] = []
for question_ctr in range(8):
if fields.get('questionOption' + str(question_ctr)):
q_options.append(fields['questionOption' +
@ -1830,7 +1830,7 @@ def _receive_new_post_process(self, post_type: str, path: str, headers: {},
self.server.default_post_language[nickname] = \
fields['languagesDropdown']
if 'searchableByDropdown' not in fields:
fields['searchableByDropdown'] = []
fields['searchableByDropdown']: list[str] = []
if not citations_button_press:
# Store a file which contains the time in seconds

View File

@ -192,7 +192,7 @@ def remove_old_hashtags(base_dir: str, max_months: int) -> str:
max_months = min(max_months, 11)
prev_date = date_from_numbers(1970, 1 + max_months, 1, 0, 0)
max_days_since_epoch = (date_utcnow() - prev_date).days
remove_hashtags = []
remove_hashtags: list[str] = []
for _, _, files in os.walk(base_dir + '/tags'):
for fname in files:

View File

@ -578,7 +578,7 @@ def _desktop_reply_to_post(session, post_id: str,
video_transcript = None
auto_cw_cache = {}
# TODO searchable status
searchable_by = []
searchable_by: list[str] = []
_say_command(say_str, say_str, screenreader, system_language, espeak)
if send_post_via_server(signing_priv_key_pem, __version__,
base_dir, session, nickname, password,
@ -665,7 +665,7 @@ def _desktop_new_post(session,
video_transcript = None
auto_cw_cache = {}
# TODO searchable status
searchable_by = []
searchable_by: list[str] = []
_say_command(say_str, say_str, screenreader, system_language, espeak)
if send_post_via_server(signing_priv_key_pem, __version__,
base_dir, session, nickname, password,
@ -850,7 +850,7 @@ def _read_local_box_post(session, nickname: str, domain: str,
yt_replace_domain = None
twitter_replacement_domain = None
show_vote_posts = False
languages_understood = []
languages_understood: list[str] = []
person_url = local_actor_url(http_prefix, nickname, domain_full)
actor_json = \
get_person_from_cache(base_dir, person_url, person_cache)
@ -1494,7 +1494,7 @@ def _desktop_new_dm_base(session, to_handle: str,
say_str = 'Sending'
auto_cw_cache = {}
# TODO searchable status
searchable_by = []
searchable_by: list[str] = []
_say_command(say_str, say_str, screenreader, system_language, espeak)
if send_post_via_server(signing_priv_key_pem, __version__,
base_dir, session, nickname, password,
@ -1599,9 +1599,9 @@ def run_desktop_client(base_dir: str, proxy_type: str, http_prefix: str,
media_creator = ''
blocked_cache = {}
block_federated = []
languages_understood = []
mitm_servers = []
block_federated: list[str] = []
languages_understood: list[str] = []
mitm_servers: list[str] = []
indent = ' '
if show_new_posts:

View File

@ -49,7 +49,7 @@ def set_discord(actor_json: {}, discord: str) -> None:
"""Sets discord for the given actor
"""
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -83,7 +83,7 @@ def set_donation_url(actor_json: {}, donate_url: str) -> None:
not_url = True
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
donation_type = _get_donation_types()
donate_name = None

View File

@ -49,7 +49,7 @@ def set_enigma_pub_key(actor_json: {}, enigma_pub_key: str) -> None:
remove_key = True
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -996,7 +996,7 @@ def _command_options() -> None:
if not argb.language:
argb.language = 'en'
signing_priv_key_pem = get_instance_actor_key(base_dir, origin_domain)
mitm_servers = []
mitm_servers: list[str] = []
get_public_posts_of_person(base_dir, nickname, domain, False, True,
proxy_type, argb.port, http_prefix, debug,
__version__, argb.language,
@ -1075,7 +1075,7 @@ def _command_options() -> None:
elif argb.gnunet:
proxy_type = 'gnunet'
word_frequency = {}
domain_list = []
domain_list: list[str] = []
if not argb.language:
argb.language = 'en'
signing_priv_key_pem = None
@ -1086,7 +1086,7 @@ def _command_options() -> None:
if argb.secure_mode:
signing_priv_key_pem = \
get_instance_actor_key(base_dir, origin_domain)
mitm_servers = []
mitm_servers: list[str] = []
domain_list = \
get_public_post_domains(None,
base_dir, nickname, domain,
@ -1135,13 +1135,13 @@ def _command_options() -> None:
elif argb.gnunet:
proxy_type = 'gnunet'
word_frequency = {}
domain_list = []
domain_list: list[str] = []
if not argb.language:
argb.language = 'en'
signing_priv_key_pem = None
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
domain_list = \
get_public_post_domains_blocked(None,
base_dir, nickname, domain,
@ -1192,7 +1192,7 @@ def _command_options() -> None:
signing_priv_key_pem = None
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
check_domains(None,
base_dir, nickname, domain,
proxy_type, argb.port,
@ -1221,7 +1221,7 @@ def _command_options() -> None:
signing_priv_key_pem = None
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
dot_graph = instances_graph(base_dir, argb.socnet,
proxy_type, argb.port,
http_prefix, debug,
@ -1280,7 +1280,7 @@ def _command_options() -> None:
if not argb.language:
argb.language = 'en'
signing_priv_key_pem = get_instance_actor_key(base_dir, origin_domain)
mitm_servers = []
mitm_servers: list[str] = []
get_public_posts_of_person(base_dir, nickname, domain, False, False,
proxy_type, argb.port, http_prefix, debug,
__version__, argb.language,
@ -1311,7 +1311,7 @@ def _command_options() -> None:
print('Obtained instance actor signing key')
else:
print('Did not obtain instance actor key for ' + domain)
mitm_servers = []
mitm_servers: list[str] = []
test_json = get_json(signing_priv_key_pem, session, argb.json,
as_header, None, debug, mitm_servers,
__version__, http_prefix, domain)
@ -1345,7 +1345,7 @@ def _command_options() -> None:
if not nickname:
print('Please specify a nickname with the --nickname option')
sys.exit()
mitm_servers = []
mitm_servers: list[str] = []
conv_json = download_conversation_posts(True, session, http_prefix,
base_dir, nickname, domain,
post_id, argb.debug,
@ -1373,7 +1373,7 @@ def _command_options() -> None:
print('Obtained instance actor signing key')
else:
print('Did not obtain instance actor key for ' + domain)
mitm_servers = []
mitm_servers: list[str] = []
test_ssml = download_ssml(signing_priv_key_pem, session, argb.ssml,
as_header, None, debug, __version__,
http_prefix, domain, mitm_servers)
@ -1428,7 +1428,7 @@ def _command_options() -> None:
print('Obtained instance actor signing key')
else:
print('Did not obtain instance actor key for ' + domain)
mitm_servers = []
mitm_servers: list[str] = []
test_html = download_html(signing_priv_key_pem, session, argb.htmlpost,
as_header, None, debug, __version__,
http_prefix, domain, mitm_servers)
@ -1458,7 +1458,7 @@ def _command_options() -> None:
'--domain option')
sys.exit()
session = create_session(None)
mitm_servers = []
mitm_servers: list[str] = []
verified = \
verify_html(session, argb.verifyurl, debug, __version__,
http_prefix, argb.nickname, domain,
@ -1541,7 +1541,7 @@ def _command_options() -> None:
get_config_param(base_dir, 'preferredPodcastFormats')
if podcast_formats_str:
podcast_formats = podcast_formats_str.split(',')
preferred_podcast_formats = []
preferred_podcast_formats: list[str] = []
for pod_format in podcast_formats:
pod_format = pod_format.lower().strip()
if '/' not in pod_format:
@ -1669,7 +1669,7 @@ def _command_options() -> None:
if argb.nickname:
nickname = argb.nickname
federation_list = []
federation_list: list[str] = []
if argb.federation_list:
if len(argb.federation_list) == 1:
if not (argb.federation_list[0].lower() == 'any' or
@ -1726,9 +1726,9 @@ def _command_options() -> None:
if i2p_domain:
session_i2p = create_session('i2p')
followers_sync_cache = {}
sites_unavailable = []
sites_unavailable: list[str] = []
system_language = argb.language
mitm_servers = []
mitm_servers: list[str] = []
manual_approve_follow_request(session, session_onion, session_i2p,
onion_domain, i2p_domain,
base_dir, http_prefix,
@ -1775,8 +1775,8 @@ def _command_options() -> None:
if i2p_domain:
session_i2p = create_session('i2p')
followers_sync_cache = {}
sites_unavailable = []
mitm_servers = []
sites_unavailable: list[str] = []
mitm_servers: list[str] = []
system_language = argb.language
manual_deny_follow_request2(session, session_onion, session_i2p,
onion_domain, i2p_domain,
@ -1901,8 +1901,8 @@ def _command_options() -> None:
video_transcript = None
auto_cw_cache = {}
# TODO searchable status
searchable_by = []
mitm_servers = []
searchable_by: list[str] = []
mitm_servers: list[str] = []
print('Sending post to ' + argb.sendto)
send_post_via_server(signing_priv_key_pem, __version__,
@ -2010,7 +2010,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending announce/repeat of ' + argb.announce)
mitm_servers = []
mitm_servers: list[str] = []
send_announce_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2058,7 +2058,7 @@ def _command_options() -> None:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
session = create_session(proxy_type)
mitm_servers = []
mitm_servers: list[str] = []
box_json = c2s_box_json(session, argb.nickname, argb.password,
domain, port, http_prefix,
argb.box, argb.pageNumber,
@ -2123,7 +2123,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending shared item: ' + argb.itemName)
mitm_servers = []
mitm_servers: list[str] = []
send_share_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2170,7 +2170,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending undo of shared item: ' + argb.undoItemName)
mitm_servers = []
mitm_servers: list[str] = []
send_undo_share_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2237,7 +2237,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending wanted item: ' + argb.wantedItemName)
mitm_servers = []
mitm_servers: list[str] = []
send_wanted_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2284,7 +2284,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending undo of wanted item: ' + argb.undoWantedItemName)
mitm_servers = []
mitm_servers: list[str] = []
send_undo_wanted_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2322,7 +2322,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending like of ' + argb.like)
mitm_servers = []
mitm_servers: list[str] = []
send_like_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2365,7 +2365,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending emoji reaction ' + argb.emoji + ' to ' + argb.react)
mitm_servers = []
mitm_servers: list[str] = []
send_reaction_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2402,7 +2402,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending undo like of ' + argb.undolike)
mitm_servers = []
mitm_servers: list[str] = []
send_undo_like_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2447,7 +2447,7 @@ def _command_options() -> None:
print('Sending undo emoji reaction ' +
argb.emoji + ' to ' + argb.react)
mitm_servers = []
mitm_servers: list[str] = []
send_undo_reaction_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2485,7 +2485,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending bookmark of ' + argb.bookmark)
mitm_servers = []
mitm_servers: list[str] = []
send_bookmark_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2523,7 +2523,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending undo bookmark of ' + argb.unbookmark)
mitm_servers = []
mitm_servers: list[str] = []
send_undo_bookmark_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2560,7 +2560,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending delete request of ' + argb.delete)
mitm_servers = []
mitm_servers: list[str] = []
send_delete_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2611,7 +2611,7 @@ def _command_options() -> None:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
system_language = argb.language
mitm_servers = []
mitm_servers: list[str] = []
send_follow_request_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2665,7 +2665,7 @@ def _command_options() -> None:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
system_language = argb.language
mitm_servers = []
mitm_servers: list[str] = []
send_unfollow_request_via_server(base_dir, session,
argb.nickname, argb.password,
domain, port,
@ -2705,7 +2705,7 @@ def _command_options() -> None:
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
following_json = \
get_following_via_server(session,
argb.nickname, argb.password,
@ -2741,7 +2741,7 @@ def _command_options() -> None:
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
blocked_json = \
get_blocks_via_server(session,
argb.nickname, argb.password,
@ -2777,7 +2777,7 @@ def _command_options() -> None:
signing_priv_key_pem = None
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
followers_json = \
get_followers_via_server(session,
argb.nickname, argb.password,
@ -2813,7 +2813,7 @@ def _command_options() -> None:
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
follow_requests_json = \
get_follow_requests_via_server(session,
argb.nickname, argb.password,
@ -2879,8 +2879,8 @@ def _command_options() -> None:
signing_priv_key_pem = None
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
block_federated = []
mitm_servers = []
block_federated: list[str] = []
mitm_servers: list[str] = []
ctr = migrate_accounts(base_dir, session,
http_prefix, cached_webfingers,
True, signing_priv_key_pem,
@ -2904,7 +2904,7 @@ def _command_options() -> None:
print('Did not obtain instance actor key for ' + domain)
if argb.actor.startswith('@'):
argb.actor = argb.actor[1:]
mitm_servers = []
mitm_servers: list[str] = []
get_actor_json(domain, argb.actor, argb.http, argb.gnunet,
argb.ipfs, argb.ipns,
debug, False, signing_priv_key_pem, None,
@ -3011,7 +3011,7 @@ def _command_options() -> None:
signing_priv_key_pem = None
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
wf_request = webfinger_handle(session, handle,
http_prefix, cached_webfingers,
host_domain, __version__, debug, False,
@ -3071,7 +3071,7 @@ def _command_options() -> None:
signing_priv_key_pem = None
if argb.secure_mode:
signing_priv_key_pem = get_instance_actor_key(base_dir, domain)
mitm_servers = []
mitm_servers: list[str] = []
followers_list = \
download_follow_collection(signing_priv_key_pem,
'followers', session,
@ -3366,7 +3366,7 @@ def _command_options() -> None:
print('Sending ' + argb.skill + ' skill level ' +
str(argb.skillLevelPercent) + ' for ' + nickname)
mitm_servers = []
mitm_servers: list[str] = []
send_skill_via_server(base_dir, session,
nickname, argb.password,
domain, port,
@ -3405,7 +3405,7 @@ def _command_options() -> None:
print('Sending availability status of ' + nickname +
' as ' + argb.availability)
mitm_servers = []
mitm_servers: list[str] = []
send_availability_via_server(base_dir, session,
nickname, argb.password,
domain, port, http_prefix,
@ -3474,7 +3474,7 @@ def _command_options() -> None:
print('Federating shared items with: ' +
argb.shared_items_federated_domains)
shared_items_federated_domains = []
shared_items_federated_domains: list[str] = []
if argb.shared_items_federated_domains:
fed_domains_str = argb.shared_items_federated_domains
set_config_param(base_dir, 'sharedItemsFederatedDomains',
@ -3523,7 +3523,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending block of ' + argb.block)
mitm_servers = []
mitm_servers: list[str] = []
send_block_via_server(base_dir, session, nickname, argb.password,
domain, port,
http_prefix, argb.block,
@ -3559,7 +3559,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending mute of ' + argb.mute)
mitm_servers = []
mitm_servers: list[str] = []
send_mute_via_server(base_dir, session, nickname, argb.password,
domain, port,
http_prefix, argb.mute,
@ -3595,7 +3595,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending undo mute of ' + argb.unmute)
mitm_servers = []
mitm_servers: list[str] = []
send_undo_mute_via_server(base_dir, session, nickname, argb.password,
domain, port,
http_prefix, argb.unmute,
@ -3643,7 +3643,7 @@ def _command_options() -> None:
system_language = argb.language
print('Sending undo block of ' + argb.unblock)
mitm_servers = []
mitm_servers: list[str] = []
send_undo_block_via_server(base_dir, session, nickname, argb.password,
domain, port,
http_prefix, argb.unblock,
@ -3713,7 +3713,7 @@ def _command_options() -> None:
set_role(base_dir, nickname, domain, 'admin')
set_availability(base_dir, nickname, domain, 'busy')
block_federated = []
block_federated: list[str] = []
add_share(base_dir,
http_prefix, nickname, domain, port,
"spanner",
@ -3765,7 +3765,7 @@ def _command_options() -> None:
chat_url = ''
auto_cw_cache = {}
test_video_transcript = ''
searchable_by = []
searchable_by: list[str] = []
curr_session = None
create_public_post(base_dir, nickname, domain, port, http_prefix,
@ -4113,7 +4113,7 @@ def _command_options() -> None:
if low_bandwidth is not None:
argb.low_bandwidth = bool(low_bandwidth)
user_agents_blocked = []
user_agents_blocked: list[str] = []
if argb.userAgentBlocks:
user_agents_blocked_str = argb.userAgentBlocks
set_config_param(base_dir, 'userAgentsBlocked',
@ -4126,7 +4126,7 @@ def _command_options() -> None:
for user_agents_blocked_str2 in agent_blocks_list:
user_agents_blocked.append(user_agents_blocked_str2.strip())
crawlers_allowed = []
crawlers_allowed: list[str] = []
if argb.crawlersAllowed:
crawlers_allowed_str = argb.crawlersAllowed
set_config_param(base_dir, 'crawlersAllowed', crawlers_allowed_str)

View File

@ -58,7 +58,7 @@ def sorted_watch_points(fitness: {}, fitness_id: str) -> []:
return []
if not fitness['performance'].get(fitness_id):
return []
result = []
result: list[str] = []
for watch_point, item in fitness['performance'][fitness_id].items():
if not item.get('total'):
continue
@ -81,7 +81,7 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str,
instance_title = \
get_config_param(base_dir, 'instanceTitle')
preload_images = []
preload_images: list[str] = []
html_str = \
html_header_with_external_style(css_filename, instance_title, None,
preload_images)

View File

@ -82,7 +82,7 @@ def is_editor(base_dir: str, nickname: str) -> bool:
return True
return False
lines = []
lines: list[str] = []
try:
with open(editors_file, 'r', encoding='utf-8') as fp_editors:
lines = fp_editors.readlines()
@ -113,7 +113,7 @@ def is_artist(base_dir: str, nickname: str) -> bool:
return True
return False
lines = []
lines: list[str] = []
try:
with open(artists_file, 'r', encoding='utf-8') as fp_artists:
lines = fp_artists.readlines()
@ -155,7 +155,7 @@ def is_memorial_account(base_dir: str, nickname: str) -> bool:
memorial_file = data_dir(base_dir) + '/memorial'
if not os.path.isfile(memorial_file):
return False
memorial_list = []
memorial_list: list[str] = []
try:
with open(memorial_file, 'r', encoding='utf-8') as fp_memorial:
memorial_list = fp_memorial.read().split('\n')
@ -177,7 +177,7 @@ def is_suspended(base_dir: str, nickname: str) -> bool:
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
lines = []
lines: list[str] = []
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_susp:
lines = fp_susp.readlines()

View File

@ -61,7 +61,7 @@ def create_initial_last_seen(base_dir: str, http_prefix: str) -> None:
last_seen_dir = account_dir + '/lastseen'
if not os.path.isdir(last_seen_dir):
os.mkdir(last_seen_dir)
following_handles = []
following_handles: list[str] = []
try:
with open(following_filename, 'r',
encoding='utf-8') as fp_foll:
@ -232,14 +232,14 @@ def get_follower_domains(base_dir: str, nickname: str, domain: str) -> []:
if not os.path.isfile(followers_file):
return []
lines = []
lines: list[str] = []
try:
with open(followers_file, 'r', encoding='utf-8') as fp_foll:
lines = fp_foll.readlines()
except OSError:
print('EX: get_follower_domains ' + followers_file)
domains_list = []
domains_list: list[str] = []
for handle in lines:
handle = remove_eol(handle)
follower_domain, _ = get_domain_from_actor(handle)
@ -323,7 +323,7 @@ def unfollow_account(base_dir: str, nickname: str, domain: str,
print('DEBUG: handle to unfollow ' + handle_to_unfollow +
' is not in ' + filename)
return False
lines = []
lines: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_unfoll:
lines = fp_unfoll.readlines()
@ -413,7 +413,7 @@ def _get_no_of_follows(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(filename):
return 0
ctr = 0
lines = []
lines: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_foll:
lines = fp_foll.readlines()
@ -538,7 +538,7 @@ def get_following_feed(base_dir: str, domain: str, port: int, path: str,
curr_page = 1
page_ctr = 0
total_ctr = 0
lines = []
lines: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_foll:
lines = fp_foll.readlines()
@ -626,7 +626,7 @@ def no_of_follow_requests(base_dir: str,
if not os.path.isfile(approve_follows_filename):
return 0
ctr = 0
lines = []
lines: list[str] = []
try:
with open(approve_follows_filename, 'r',
encoding='utf-8') as fp_approve:

View File

@ -36,7 +36,7 @@ def _get_followers_for_domain(base_dir: str,
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not os.path.isfile(followers_filename):
return []
lines = []
lines: list[str] = []
foll_text = ''
try:
with open(followers_filename, 'r', encoding='utf-8') as fp_foll:
@ -47,7 +47,7 @@ def _get_followers_for_domain(base_dir: str,
if search_domain not in foll_text:
return []
lines = foll_text.splitlines()
result = []
result: list[str] = []
for line_str in lines:
if search_domain not in line_str:
continue

View File

@ -269,7 +269,7 @@ def _sort_todays_events(post_events_list: []) -> []:
break
# sort the dict
new_post_events_list = []
new_post_events_list: list[list] = []
sorted_events_dict = dict(sorted(post_events_dict.items()))
for _, post_event in sorted_events_dict.items():
new_post_events_list.append(post_event)
@ -304,7 +304,7 @@ def get_todays_events(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(calendar_filename):
return events
calendar_post_ids = []
calendar_post_ids: list[str] = []
recreate_events_file = False
try:
with open(calendar_filename, 'r', encoding='utf-8') as fp_events:
@ -338,7 +338,7 @@ def get_todays_events(base_dir: str, nickname: str, domain: str,
public_event = is_public_post(post_json_object)
post_event = []
post_event: list[dict] = []
day_of_month = None
for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
@ -382,7 +382,7 @@ def get_todays_events(base_dir: str, nickname: str, domain: str,
continue
calendar_post_ids.append(post_id)
if not events.get(day_of_month):
events[day_of_month] = []
events[day_of_month]: list[dict] = []
events[day_of_month].append(post_event)
events[day_of_month] = \
_sort_todays_events(events[day_of_month])
@ -686,7 +686,7 @@ def get_this_weeks_events(base_dir: str, nickname: str, domain: str) -> {}:
if not os.path.isfile(calendar_filename):
return events
calendar_post_ids = []
calendar_post_ids: list[str] = []
recreate_events_file = False
try:
with open(calendar_filename, 'r', encoding='utf-8') as fp_events:
@ -702,7 +702,7 @@ def get_this_weeks_events(base_dir: str, nickname: str, domain: str) -> {}:
if not _is_happening_post(post_json_object):
continue
post_event = []
post_event: list[dict] = []
week_day_index = None
for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
@ -728,7 +728,7 @@ def get_this_weeks_events(base_dir: str, nickname: str, domain: str) -> {}:
continue
calendar_post_ids.append(post_id)
if not events.get(week_day_index):
events[week_day_index] = []
events[week_day_index]: list[dict] = []
events[week_day_index].append(post_event)
except OSError:
print('EX: get_this_weeks_events failed to read ' + calendar_filename)
@ -763,7 +763,7 @@ def get_calendar_events(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(calendar_filename):
return events
calendar_post_ids = []
calendar_post_ids: list[str] = []
recreate_events_file = False
try:
with open(calendar_filename, 'r', encoding='utf-8') as fp_events:
@ -790,7 +790,7 @@ def get_calendar_events(base_dir: str, nickname: str, domain: str,
if not _event_text_match(content, text_match):
continue
post_event = []
post_event: list[dict] = []
day_of_month = None
for tag in post_json_object['object']['tag']:
if not _is_happening_event(tag):
@ -829,7 +829,7 @@ def get_calendar_events(base_dir: str, nickname: str, domain: str,
continue
calendar_post_ids.append(post_id)
if not events.get(day_of_month):
events[day_of_month] = []
events[day_of_month]: list[dict] = []
events[day_of_month].append(post_event)
except OSError:
print('EX: get_calendar_events failed to read ' + calendar_filename)
@ -1122,13 +1122,13 @@ def dav_put_response(base_dir: str, nickname: str, domain: str,
stored_count = 0
reading_event = False
lines_list = xml_str.split('\n')
event_list = []
event_list: list[dict] = []
for line in lines_list:
line = line.strip()
if not reading_event:
if line == 'BEGIN:VEVENT':
reading_event = True
event_list = []
event_list: list[dict] = []
else:
if line == 'END:VEVENT':
if event_list:

View File

@ -389,7 +389,7 @@ def verify_post_headers(http_prefix: str,
# Unpack the signed headers and set values based on current headers and
# body (if a digest was included)
signed_header_list = []
signed_header_list: list[str] = []
algorithm = 'rsa-sha256'
digest_algorithm = 'rsa-sha256'
for signed_header in signature_dict[request_target_key].split(field_sep2):

View File

@ -191,7 +191,7 @@ def run_import_following(base_dir: str, httpd):
time.sleep(20)
# get a list of accounts on the instance, in random sequence
accounts_list = []
accounts_list: list[str] = []
for _, dirs, _ in os.walk(dir_str):
for account in dirs:
if '@' not in account:

View File

@ -1112,7 +1112,7 @@ def _send_to_group_members(server, session, session_onion, session_i2p,
print(handle + ' sending to group members')
shared_item_federation_tokens = {}
shared_items_federated_domains = []
shared_items_federated_domains: list[str] = []
shared_items_federated_domains_str = \
get_config_param(base_dir, 'shared_items_federated_domains')
if shared_items_federated_domains_str:
@ -3228,7 +3228,7 @@ def run_inbox_queue(server,
heart_beat_ctr = 0
queue_restore_ctr = 0
curr_mitm_servers = []
curr_mitm_servers: list[str] = []
# time when the last DM bounce message was sent
# This is in a list so that it can be changed by reference
@ -3775,7 +3775,7 @@ def run_inbox_queue(server,
for handle, _ in recipients_dict.items():
destination = \
queue_json['destination'].replace(inbox_handle, handle)
languages_understood = []
languages_understood: list[str] = []
mitm = False
if queue_json.get('mitm'):
mitm = True

View File

@ -2054,7 +2054,7 @@ def receive_question_vote(server, base_dir: str, nickname: str, domain: str,
# if the votes on a question have changed then
# send out an update
question_json['type'] = 'Update'
shared_items_federated_domains = []
shared_items_federated_domains: list[str] = []
shared_item_federation_tokens = {}
send_to_followers_thread(server, session, session_onion, session_i2p,
base_dir, nickname, domain,

View File

@ -182,7 +182,7 @@ def libretranslate_languages(url: str, api_key: str) -> []:
if not isinstance(result, list):
return []
lang_list = []
lang_list: list[str] = []
for lang in result:
if not isinstance(lang, dict):
continue

View File

@ -47,7 +47,7 @@ def no_of_likes(post_json_object: {}) -> int:
if not isinstance(obj['likes'], dict):
return 0
if not obj['likes'].get('items'):
obj['likes']['items'] = []
obj['likes']['items']: list[dict] = []
obj['likes']['totalItems'] = 0
return len(obj['likes']['items'])
@ -500,7 +500,7 @@ def update_likes_collection(recent_posts_cache: {},
obj['likes'] = likes_json
else:
if not obj['likes'].get('items'):
obj['likes']['items'] = []
obj['likes']['items']: list[dict] = []
for like_item in obj['likes']['items']:
if like_item.get('actor'):
if like_item['actor'] == actor:

View File

@ -577,7 +577,7 @@ def get_map_links_from_post_content(content: str, session) -> []:
"""
osm_domain = 'openstreetmap.org'
sections = content.split('://')
map_links = []
map_links: list[str] = []
ctr = 0
for link_str in sections:
if ctr == 0:
@ -647,7 +647,7 @@ def add_tag_map_links(tag_maps_dir: str, tag_name: str,
post_url = post_url.replace('#', '/')
# read the existing map links
existing_map_links = []
existing_map_links: list[str] = []
if os.path.isfile(tag_map_filename):
try:
with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag:
@ -739,7 +739,7 @@ def _hashtag_map_to_format(base_dir: str, tag_name: str,
map_str += '<Document>\n'
if os.path.isfile(tag_map_filename):
map_links = []
map_links: list[str] = []
try:
with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag:
map_links = fp_tag.read().split('\n')

View File

@ -14,7 +14,7 @@ def _markdown_get_sections(markdown: str) -> []:
if '<code>' not in markdown:
return [markdown]
lines = markdown.split('\n')
sections = []
sections: list[str] = []
section_text = ''
section_active = False
ctr = 0

View File

@ -42,7 +42,7 @@ def _meta_data_instance_v1(show_accounts: bool,
print('WARN: json load exception _meta_data_instance_v1')
return {}
rules_list = []
rules_list: list[str] = []
rules_filename = data_dir(base_dir) + '/tos.md'
if os.path.isfile(rules_filename):
try:
@ -235,7 +235,7 @@ def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str,
no_of_statuses = 0
no_of_followers = 0
no_of_following = 0
fields = []
fields: list[dict] = []
published = None
if show_accounts and not broch_mode:
no_of_followers = lines_in_file(account_dir + '/followers.txt')
@ -361,27 +361,27 @@ def masto_api_v1_response(path: str, calling_domain: str,
}
send_json_str = 'masto API streaming response'
if path.endswith('/followers'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API followers sent for ' + nickname + \
calling_info
elif path.endswith('/following'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API following sent for ' + nickname + \
calling_info
elif path.endswith('/statuses'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API statuses sent for ' + nickname + \
calling_info
elif path.endswith('/search'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API search sent ' + original_path + \
calling_info
elif path.endswith('/relationships'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API relationships sent ' + original_path + \
calling_info
@ -398,29 +398,29 @@ def masto_api_v1_response(path: str, calling_domain: str,
# federation problems, so avoid implementing that
if path.startswith('/api/v1/blocks'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API instance blocks sent ' + path + calling_info
elif path.startswith('/api/v1/favorites'):
send_json = []
send_json: list[dict] = []
send_json_str = 'masto API favorites sent ' + path + calling_info
elif path.startswith('/api/v1/follow_requests'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API follow requests sent ' + path + calling_info
elif path.startswith('/api/v1/mutes'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API mutes sent ' + path + calling_info
elif path.startswith('/api/v1/notifications'):
send_json = []
send_json: list[dict] = []
send_json_str = \
'masto API notifications sent ' + path + calling_info
elif path.startswith('/api/v1/reports'):
send_json = []
send_json: list[dict] = []
send_json_str = 'masto API reports sent ' + path + calling_info
elif path.startswith('/api/v1/statuses'):
send_json = []
send_json: list[dict] = []
send_json_str = 'masto API statuses sent ' + path + calling_info
elif path.startswith('/api/v1/timelines'):
send_json = {
@ -476,6 +476,6 @@ def masto_api_v1_response(path: str, calling_domain: str,
send_json = ['mastodon.social', domain_full]
send_json_str = 'masto API peers metadata sent ' + ua_str
elif path.startswith('/api/v1/instance/activity'):
send_json = []
send_json: list[dict] = []
send_json_str = 'masto API activity metadata sent ' + ua_str
return send_json, send_json_str

View File

@ -49,10 +49,10 @@ def _meta_data_instance_v2(show_accounts: bool,
print('WARN: json load exception _meta_data_instance_v1')
return {}
rules_list = []
rules_list: list[str] = []
rules_filename = data_dir(base_dir) + '/tos.md'
if os.path.isfile(rules_filename):
rules_lines = []
rules_lines: list[str] = []
try:
with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rules_lines = fp_rules.readlines()
@ -126,7 +126,7 @@ def _meta_data_instance_v2(show_accounts: bool,
published_filename)
# get all supported mime types
supported_mime_types = []
supported_mime_types: list[str] = []
image_ext = get_image_extensions()
for ext in image_ext:
mime_str = get_image_mime_type('x.' + ext)
@ -139,7 +139,7 @@ def _meta_data_instance_v2(show_accounts: bool,
for ext in audio_ext:
supported_mime_types.append('audio/' + ext)
fields = []
fields: list[dict] = []
# get account fields from attachments
if admin_actor.get('attachment'):
if isinstance(admin_actor['attachment'], list):

View File

@ -71,7 +71,7 @@ def set_matrix_address(actor_json: {}, matrix_address: str) -> None:
"""Sets an matrix address for the given actor
"""
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -109,7 +109,7 @@ def metadata_custom_emoji(base_dir: str,
Endpoint /api/v1/custom_emojis
See https://docs.joinmastodon.org/methods/instance/custom_emojis
"""
result = []
result: list[dict] = []
emojis_url = http_prefix + '://' + domain_full + '/emoji'
for _, _, files in os.walk(base_dir + '/emoji'):
for fname in files:

View File

@ -104,7 +104,7 @@ def _update_moved_handle(base_dir: str, nickname: str, domain: str,
ipns = False
if http_prefix == 'ipns':
ipns = True
mitm_servers = []
mitm_servers: list[str] = []
person_json = \
get_actor_json(domain, person_url, http_prefix, gnunet, ipfs, ipns,
debug, False,
@ -142,7 +142,7 @@ def _update_moved_handle(base_dir: str, nickname: str, domain: str,
following_filename = \
acct_dir(base_dir, nickname, domain) + '/following.txt'
if os.path.isfile(following_filename):
following_handles = []
following_handles: list[str] = []
try:
with open(following_filename, 'r', encoding='utf-8') as fp_foll1:
following_handles = fp_foll1.readlines()
@ -195,7 +195,7 @@ def _update_moved_handle(base_dir: str, nickname: str, domain: str,
followers_filename = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if os.path.isfile(followers_filename):
follower_handles = []
follower_handles: list[str] = []
try:
with open(followers_filename, 'r', encoding='utf-8') as fp_foll3:
follower_handles = fp_foll3.readlines()

View File

@ -57,7 +57,7 @@ def set_music_site_url(actor_json: {}, music_site_url: str) -> None:
return
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -394,7 +394,7 @@ def _newswire_hashtag_processing(base_dir: str, post_json_object: {},
rules_filename = data_dir(base_dir) + '/hashtagrules.txt'
if not os.path.isfile(rules_filename):
return True
rules = []
rules: list[str] = []
try:
with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rules = fp_rules.readlines()
@ -424,7 +424,7 @@ def _newswire_hashtag_processing(base_dir: str, post_json_object: {},
continue
conditions_str = rule_str.split('if ', 1)[1]
conditions_str = conditions_str.split(' then ')[0]
tags_in_conditions = []
tags_in_conditions: list[str] = []
tree = hashtag_rule_tree(operators, conditions_str,
tags_in_conditions, moderated)
if not hashtag_rule_resolve(tree, hashtags, moderated, content, url):
@ -471,7 +471,7 @@ def _create_news_mirror(base_dir: str, domain: str,
if not os.path.isfile(mirror_index_filename):
# no index for mirrors found
return True
removals = []
removals: list[str] = []
try:
with open(mirror_index_filename, 'r',
encoding='utf-8') as fp_index:
@ -726,7 +726,7 @@ def _convert_rss_to_activitypub(base_dir: str, http_prefix: str,
if save_post:
# ensure that all hashtags are stored in the json
# and appended to the content
blog['object']['tag'] = []
blog['object']['tag']: list[dict] = []
for tag_name in hashtags:
ht_id = tag_name.replace('#', '')
hashtag_url = \

View File

@ -115,7 +115,7 @@ def get_newswire_tags(text: str, max_tags: int) -> []:
if text_simplified.endswith('.'):
text_simplified = text_simplified[:len(text_simplified)-1]
words = text_simplified.split(' ')
tags = []
tags: list[str] = []
for wrd in words:
if not wrd.startswith('#'):
continue
@ -232,7 +232,7 @@ def _add_newswire_dict_entry(base_dir: str,
title = limit_word_lengths(title, 13)
if tags is None:
tags = []
tags: list[str] = []
# extract hashtags from the text of the feed post
post_tags = get_newswire_tags(all_text, max_tags)
@ -442,7 +442,7 @@ def _get_podcast_categories(xml_item: str, xml_str: str) -> str:
""" get podcast categories if they exist. These can be turned into hashtags
See https://podcast-standard.org/itunes_tags
"""
podcast_categories = []
podcast_categories: list[str] = []
# convert keywords to hashtags
if '<itunes:keywords' in xml_item:
@ -876,13 +876,13 @@ def _xml2str_to_dict(base_dir: str, domain: str, xml_str: str,
if not _valid_feed_date(pub_date_str):
continue
post_filename = ''
votes_status = []
votes_status: list[str] = []
podcast_properties = \
xml_podcast_to_dict(base_dir, rss_item, xml_str)
if podcast_properties:
podcast_properties['linkMimeType'] = link_mime_type
fediverse_handle = ''
extra_links = []
extra_links: list[str] = []
_add_newswire_dict_entry(base_dir,
result, pub_date_str,
title, link,
@ -995,13 +995,13 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
if not _valid_feed_date(pub_date_str):
continue
post_filename = ''
votes_status = []
votes_status: list[str] = []
podcast_properties = \
xml_podcast_to_dict(base_dir, rss_item, xml_str)
if podcast_properties:
podcast_properties['linkMimeType'] = link_mime_type
fediverse_handle = ''
extra_links = []
extra_links: list[str] = []
_add_newswire_dict_entry(base_dir,
result, pub_date_str,
title, link,
@ -1099,7 +1099,7 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
fediverse_handle = actor_uri
# are there any extra links?
extra_links = []
extra_links: list[str] = []
if '<activity:object>' in atom_item and \
'</activity:object>' in atom_item:
obj_str = atom_item.split('<activity:object>')[1]
@ -1153,7 +1153,7 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
if not _valid_feed_date(pub_date_str):
continue
post_filename = ''
votes_status = []
votes_status: list[str] = []
podcast_properties = \
xml_podcast_to_dict(base_dir, atom_item, xml_str)
if podcast_properties:
@ -1275,9 +1275,9 @@ def _json_feed_v1to_dict(base_dir: str, xml_str: str,
if not _valid_feed_date(pub_date_str):
continue
post_filename = ''
votes_status = []
votes_status: list[str] = []
fediverse_handle = ''
extra_links = []
extra_links: list[str] = []
_add_newswire_dict_entry(base_dir,
result, pub_date_str,
title, link,
@ -1379,13 +1379,13 @@ def _atom_feed_yt_to_dict(base_dir: str, xml_str: str,
if not _valid_feed_date(pub_date_str):
continue
post_filename = ''
votes_status = []
votes_status: list[str] = []
podcast_properties = \
xml_podcast_to_dict(base_dir, atom_item, xml_str)
if podcast_properties:
podcast_properties['linkMimeType'] = 'video/youtube'
fediverse_handle = ''
extra_links = []
extra_links: list[str] = []
_add_newswire_dict_entry(base_dir,
result, pub_date_str,
title, link,
@ -1600,7 +1600,7 @@ def _get_hashtags_from_post(post_json_object: {}) -> []:
return []
if not isinstance(post_json_object['object']['tag'], list):
return []
tags = []
tags: list[str] = []
for tgname in post_json_object['object']['tag']:
if not isinstance(tgname, dict):
continue
@ -1674,7 +1674,7 @@ def _add_account_blogs_to_newswire(base_dir: str, nickname: str, domain: str,
published = post_json_object['object']['published']
published = published.replace('T', ' ')
published = published.replace('Z', '+00:00')
votes = []
votes: list[str] = []
if os.path.isfile(full_post_filename + '.votes'):
votes = load_json(full_post_filename + '.votes')
content = \
@ -1689,7 +1689,7 @@ def _add_account_blogs_to_newswire(base_dir: str, nickname: str, domain: str,
url_str = get_url_from_post(url2)
url3 = remove_html(url_str)
fediverse_handle = ''
extra_links = []
extra_links: list[str] = []
_add_newswire_dict_entry(base_dir,
newswire, published,
summary, url3,
@ -1780,7 +1780,7 @@ def get_dict_from_newswire(session, base_dir: str, domain: str,
max_posts_per_source = 5
# add rss feeds
rss_feed = []
rss_feed: list[str] = []
try:
with open(subscriptions_filename, 'r', encoding='utf-8') as fp_sub:
rss_feed = fp_sub.readlines()
@ -1835,7 +1835,7 @@ def get_dict_from_newswire(session, base_dir: str, domain: str,
no_of_posts = len(sorted_result.items())
if no_of_posts > max_newswire_posts:
ctr = 0
removals = []
removals: list[str] = []
for date_str, item in sorted_result.items():
ctr += 1
if ctr > max_newswire_posts:

View File

@ -606,7 +606,7 @@ def post_message_to_outbox(session, translate: {},
acct_dir(base_dir, post_to_nickname, domain) + '/.noVotes'
if os.path.isfile(show_vote_file):
show_vote_posts = False
languages_understood = []
languages_understood: list[str] = []
if is_image_media(session, base_dir, http_prefix,
post_to_nickname, domain,
message_json,

View File

@ -63,7 +63,7 @@ def set_peertube(actor_json: {}, peertube: str) -> None:
"""Sets peertube for the given actor
"""
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -220,7 +220,7 @@ def get_actor_update_json(actor_json: {}) -> {}:
if actor_json.get('memorial'):
memorial = True
indexable = account_is_indexable(actor_json)
searchable_by = []
searchable_by: list[str] = []
if actor_json.get('searchableBy'):
if isinstance(actor_json['searchableBy'], list):
searchable_by = actor_json['searchableBy']
@ -901,7 +901,7 @@ def person_upgrade_actor(base_dir: str, person_json: {},
update_actor = True
if 'searchableBy' not in person_json:
person_json['searchableBy'] = []
person_json['searchableBy']: list[str] = []
update_actor = True
# add a speaker endpoint
@ -1058,7 +1058,7 @@ def add_alternate_domains(actor_json: {}, domain: str,
if not nickname:
return
if 'alsoKnownAs' not in actor_json:
actor_json['alsoKnownAs'] = []
actor_json['alsoKnownAs']: list[str] = []
if onion_domain:
onion_actor = 'http://' + onion_domain + '/users/' + nickname
if onion_actor not in actor_json['alsoKnownAs']:
@ -1274,7 +1274,7 @@ def reenable_account(base_dir: str, nickname: str) -> None:
"""
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
lines = []
lines: list[str] = []
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_sus:
lines = fp_sus.readlines()
@ -1367,7 +1367,7 @@ def can_remove_post(base_dir: str,
# is the post by a moderator?
moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file):
lines = []
lines: list[str] = []
try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
@ -1403,7 +1403,7 @@ def _remove_tags_for_nickname(base_dir: str, nickname: str,
continue
if not text_in_file(match_str, tag_filename):
continue
lines = []
lines: list[str] = []
try:
with open(tag_filename, 'r', encoding='utf-8') as fp_tag:
lines = fp_tag.readlines()
@ -1434,7 +1434,7 @@ def remove_account(base_dir: str, nickname: str,
# Don't remove moderators
moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file):
lines = []
lines: list[str] = []
try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
@ -2258,8 +2258,8 @@ def set_featured_hashtags(actor_json: {}, hashtags: str,
if separator_str in hashtags:
break
tag_list = hashtags.split(separator_str)
result = []
tags_used = []
result: list[str] = []
tags_used: list[str] = []
actor_id = actor_json['id']
actor_domain = actor_id.split('://')[1]
if '/' in actor_domain:

6
pgp.py
View File

@ -155,7 +155,7 @@ def set_email_address(actor_json: {}, email_address: str) -> None:
not_email_address = True
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None
@ -220,7 +220,7 @@ def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None:
remove_key = True
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None
@ -283,7 +283,7 @@ def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None:
remove_fingerprint = True
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -86,7 +86,7 @@ def set_pixelfed(actor_json: {}, pixelfed: str) -> None:
"""Sets pixelfed for the given actor
"""
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -1978,7 +1978,7 @@ def load_dictionary(base_dir: str) -> []:
if not os.path.isfile(filename):
return []
words = []
words: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_dict:
words = fp_dict.read().split('\n')
@ -1997,7 +1997,7 @@ def load_2grams(base_dir: str) -> {}:
return {}
twograms = {}
lines = []
lines: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_dict:
lines = fp_dict.read().split('\n')

View File

@ -177,7 +177,7 @@ def is_moderator(base_dir: str, nickname: str) -> bool:
return True
return False
lines = []
lines: list[str] = []
try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
@ -630,7 +630,7 @@ def _get_posts(session, outbox_url: str, max_posts: int,
if raw:
if debug:
print('Returning the raw feed')
result = []
result: list[dict] = []
i = 0
user_feed = parse_user_feed(signing_priv_key_pem,
session, outbox_url, as_header,
@ -688,11 +688,11 @@ def _get_posts(session, outbox_url: str, max_posts: int,
continue
content = content.replace('&apos;', "'")
mentions = []
mentions: list[str] = []
emoji = {}
summary = ''
in_reply_to = ''
attachment = []
attachment: list[list] = []
sensitive = False
if isinstance(this_item, dict):
if this_item.get('tag'):
@ -1241,7 +1241,7 @@ def _attach_buy_link(post_json_object: {},
if translate.get(buy_str):
buy_str = translate[buy_str]
if 'attachment' not in post_json_object:
post_json_object['attachment'] = []
post_json_object['attachment']: list[dict] = []
post_json_object['attachment'].append({
"type": "Link",
"name": buy_str,
@ -1262,7 +1262,7 @@ def _attach_chat_link(post_json_object: {},
if ' ' in chat_url or '<' in chat_url:
return
if 'attachment' not in post_json_object:
post_json_object['attachment'] = []
post_json_object['attachment']: list[dict] = []
post_json_object['attachment'].append({
"type": "Link",
"name": "Chat",
@ -1622,7 +1622,7 @@ def _consolidate_actors_list(actors_list: []) -> None:
""" consolidate duplicated actors
https://domain/@nick gets merged with https://domain/users/nick
"""
possible_duplicate_actors = []
possible_duplicate_actors: list[str] = []
for cc_actor in actors_list:
if '/@' in cc_actor:
if '/@/' not in cc_actor:
@ -1631,7 +1631,7 @@ def _consolidate_actors_list(actors_list: []) -> None:
if not possible_duplicate_actors:
return
u_paths = get_user_paths()
remove_actors = []
remove_actors: list[str] = []
for cc_actor in possible_duplicate_actors:
for usr_path in u_paths:
if '/@/' not in cc_actor:
@ -1796,7 +1796,7 @@ def _create_post_base(base_dir: str,
if not is_blocked_hashtag(base_dir, audio_value):
content += ' #' + audio_value
tags = []
tags: list[dict] = []
hashtags_dict = {}
domain = get_full_domain(domain, port)
@ -1810,7 +1810,7 @@ def _create_post_base(base_dir: str,
hashtags_dict, translate, True)
# replace emoji with unicode
tags = []
tags: list[dict] = []
for tag_name, tag in hashtags_dict.items():
tags.append(tag)
@ -1838,8 +1838,8 @@ def _create_post_base(base_dir: str,
summary = remove_invalid_chars(valid_content_warning(subject))
sensitive = True
to_recipients = []
to_cc = []
to_recipients: list[str] = []
to_cc: list[str] = []
if to_url:
if not isinstance(to_url, str):
print('ERROR: to_url is not a string')
@ -1888,7 +1888,7 @@ def _create_post_base(base_dir: str,
# make sure that CC doesn't also contain a To address
# eg. To: [ "https://mydomain/users/foo/followers" ]
# CC: [ "X", "Y", "https://mydomain/users/foo", "Z" ]
remove_from_cc = []
remove_from_cc: list[str] = []
for cc_recipient in to_cc:
for send_to_actor in to_recipients:
if cc_recipient in send_to_actor and \
@ -1912,7 +1912,7 @@ def _create_post_base(base_dir: str,
post_object_type = 'Article'
# convert the searchable_by state into a url
searchable_by_list = []
searchable_by_list: list[str] = []
if searchable_by == 'public':
searchable_by_list = ["https://www.w3.org/ns/activitystreams#Public"]
elif searchable_by == 'yourself':
@ -2002,7 +2002,7 @@ def outbox_message_create_wrap(http_prefix: str,
new_post_id = \
local_actor_url(http_prefix, nickname, domain) + \
'/statuses/' + status_number
cc_list = []
cc_list: list[str] = []
if message_json.get('cc'):
cc_list = message_json['cc']
new_post = {
@ -2036,8 +2036,8 @@ def _post_is_addressed_to_followers(nickname: str, domain: str, port: int,
if not post_json_object.get('object'):
return False
to_list = []
cc_list = []
to_list: list[str] = []
cc_list: list[str] = []
if post_json_object['type'] != 'Update' and \
has_object_dict(post_json_object):
if post_json_object['object'].get('to'):
@ -2155,7 +2155,7 @@ def json_pin_post(base_dir: str, http_prefix: str,
domain_full, system_language)
items_list = []
if pinned_post_json:
items_list = [pinned_post_json]
items_list: list[dict] = [pinned_post_json]
actor = local_actor_url(http_prefix, nickname, domain_full)
post_context = get_individual_post_context()
@ -2182,7 +2182,7 @@ def regenerate_index_for_box(base_dir: str,
if os.path.isfile(box_index_filename):
return
index_lines = []
index_lines: list[str] = []
for _, _, files in os.walk(box_dir):
for fname in files:
if ':##' not in fname:
@ -2343,7 +2343,7 @@ def _append_citations_to_blog_post(base_dir: str,
if not os.path.isfile(citations_filename):
return
citations_separator = '#####'
citations = []
citations: list[str] = []
try:
with open(citations_filename, 'r', encoding='utf-8') as fp_cit:
citations = fp_cit.readlines()
@ -2480,7 +2480,7 @@ def create_question_post(base_dir: str,
anonymous_participation_enabled = event_status = ticket_url = None
conversation_id = None
convthread_id = None
searchable_by = []
searchable_by: list[str] = []
message_json = \
_create_post_base(base_dir, nickname, domain, port,
'https://www.w3.org/ns/activitystreams#Public',
@ -2505,7 +2505,7 @@ def create_question_post(base_dir: str,
chat_url, auto_cw_cache, searchable_by,
session)
message_json['object']['type'] = 'Question'
message_json['object']['oneOf'] = []
message_json['object']['oneOf']: list[dict] = []
message_json['object']['votersCount'] = 0
curr_time = date_utcnow()
days_since_epoch = \
@ -2555,7 +2555,7 @@ def create_unlisted_post(base_dir: str,
anonymous_participation_enabled = None
event_status = None
ticket_url = None
searchable_by = []
searchable_by: list[str] = []
return _create_post_base(base_dir, nickname, domain, port,
local_actor + '/followers',
'https://www.w3.org/ns/activitystreams#Public',
@ -2644,7 +2644,7 @@ def get_mentioned_people(base_dir: str, http_prefix: str,
"""
if '@' not in content:
return None
mentions = []
mentions: list[str] = []
words = content.split(' ')
for wrd in words:
if not wrd.startswith('@'):
@ -2715,7 +2715,7 @@ def create_direct_message_post(base_dir: str,
anonymous_participation_enabled = None
event_status = None
ticket_url = None
searchable_by = []
searchable_by: list[str] = []
message_json = \
_create_post_base(base_dir, nickname, domain, port,
post_to, post_cc,
@ -2742,8 +2742,8 @@ def create_direct_message_post(base_dir: str,
if not isinstance(message_json['to'], list):
message_json['to'] = [message_json['to']]
message_json['object']['to'] = message_json['to']
message_json['cc'] = []
message_json['object']['cc'] = []
message_json['cc']: list[str] = []
message_json['object']['cc']: list[str] = []
if dm_is_chat:
message_json['object']['type'] = 'ChatMessage'
if schedule_post:
@ -2778,7 +2778,7 @@ def create_report_post(base_dir: str,
subject = report_title + ': ' + subject
# create the list of moderators from the moderators file
moderators_list = []
moderators_list: list[str] = []
moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file):
try:
@ -2850,7 +2850,7 @@ def create_report_post(base_dir: str,
ticket_url = None
conversation_id = None
convthread_id = None
searchable_by = []
searchable_by: list[str] = []
for to_url in post_to:
# who is this report going to?
to_nickname = to_url.split('/users/')[1]
@ -3906,7 +3906,7 @@ def _send_to_named_addresses(server, session, session_onion, session_i2p,
return
recipients_object = post_json_object
recipients = []
recipients: list[str] = []
recipient_type = ('to', 'cc')
for rtype in recipient_type:
if not recipients_object.get(rtype):
@ -4112,7 +4112,7 @@ def _has_shared_inbox(session, http_prefix: str, domain: str,
"""Returns true if the given domain has a shared inbox
This tries the new and the old way of webfingering the shared inbox
"""
try_handles = []
try_handles: list[str] = []
if ':' not in domain:
try_handles.append(domain + '@' + domain)
try_handles.append('inbox@' + domain)
@ -4189,7 +4189,7 @@ def send_to_followers(server, session, session_onion, session_i2p,
sending_ctr = 0
# randomize the order of sending to instances
randomized_instances = []
randomized_instances: list[str] = []
for follower_domain, follower_handles in grouped.items():
randomized_instances.append([follower_domain, follower_handles])
random.shuffle(randomized_instances)
@ -4551,7 +4551,7 @@ def create_moderation(base_dir: str, nickname: str, domain: str, port: int,
if is_moderator(base_dir, nickname):
moderation_index_file = data_dir(base_dir) + '/moderation.txt'
if os.path.isfile(moderation_index_file):
lines = []
lines: list[str] = []
try:
with open(moderation_index_file, 'r',
encoding='utf-8') as fp_index:
@ -4563,7 +4563,7 @@ def create_moderation(base_dir: str, nickname: str, domain: str, port: int,
if header_only:
return box_header
page_lines = []
page_lines: list[str] = []
if len(lines) > 0:
end_line_number = \
len(lines) - 1 - int(items_per_page * page_number)
@ -4608,7 +4608,7 @@ def is_image_media(session, base_dir: str, http_prefix: str,
"""
if post_json_object['type'] == 'Announce':
blocked_cache = {}
block_federated = []
block_federated: list[str] = []
post_json_announce = \
download_announce(session, base_dir, http_prefix,
nickname, domain, post_json_object,
@ -5034,7 +5034,7 @@ def _create_box_indexed(recent_posts_cache: {},
}
posts_in_box = []
post_urls_in_box = []
post_urls_in_box: list[str] = []
if not unauthorized_premium:
total_posts_count, posts_added_to_timeline = \
@ -5262,7 +5262,7 @@ def _novel_fields_for_person(nickname: str, domain: str,
posts_in_box = os.scandir(box_dir)
posts_ctr = 0
fields = []
fields: list[str] = []
expected_fields = (
'alsoKnownAs',
'attachment',
@ -5767,7 +5767,7 @@ def get_public_posts_of_person(base_dir: str, nickname: str, domain: str,
return
person_cache = {}
cached_webfingers = {}
federation_list = []
federation_list: list[str] = []
group_account = False
if nickname.startswith('!'):
nickname = nickname[1:]
@ -5885,7 +5885,7 @@ def download_follow_collection(signing_priv_key_pem: str,
session_headers = {
'Accept': accept_str
}
result = []
result: list[str] = []
for page_ctr in range(no_of_pages):
url = \
actor + '/' + follow_type + '?page=' + str(page_number + page_ctr)
@ -5956,7 +5956,7 @@ def get_public_post_info(session, base_dir: str, nickname: str, domain: str,
domains_info = {}
for pdomain in post_domains:
if not domains_info.get(pdomain):
domains_info[pdomain] = []
domains_info[pdomain]: list[str] = []
blocked_posts = \
_get_posts_for_blocked_domains(base_dir, session,
@ -6007,7 +6007,7 @@ def get_public_post_domains_blocked(session, base_dir: str,
print('EX: get_public_post_domains_blocked unable to read ' +
blocking_filename)
blocked_domains = []
blocked_domains: list[str] = []
for domain_name in post_domains:
if '@' not in domain_name:
continue
@ -6031,7 +6031,7 @@ def _get_non_mutuals_of_person(base_dir: str,
get_followers_list(base_dir, nickname, domain, 'followers.txt')
following = \
get_followers_list(base_dir, nickname, domain, 'following.txt')
non_mutuals = []
non_mutuals: list[str] = []
for handle in followers:
if handle not in following:
non_mutuals.append(handle)
@ -7465,7 +7465,7 @@ def valid_post_content(base_dir: str, nickname: str, domain: str,
# check number of tags
if message_json['object'].get('tag'):
if not isinstance(message_json['object']['tag'], list):
message_json['object']['tag'] = []
message_json['object']['tag']: list[dict] = []
else:
if len(message_json['object']['tag']) > int(max_mentions * 2):
if message_json['object'].get('id'):

View File

@ -49,7 +49,7 @@ def set_pronouns(actor_json: {}, pronouns: str) -> None:
"""Sets pronouns for the given actor
"""
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -145,7 +145,7 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
print('EX: unable to append to voters file ' + voters_filename)
else:
# change an entry in the voters file
lines = []
lines: list[str] = []
try:
with open(voters_filename, 'r',
encoding='utf-8') as fp_voters:
@ -154,7 +154,7 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
print('EX: question_update_votes unable to read ' +
voters_filename)
newlines = []
newlines: list[str] = []
save_voters_file = False
for vote_line in lines:
if vote_line.startswith(actor_url +
@ -185,7 +185,7 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
if not possible_answer.get('name'):
continue
total_items = 0
lines = []
lines: list[str] = []
try:
with open(voters_filename, 'r', encoding='utf-8') as fp_voters:
lines = fp_voters.readlines()

View File

@ -490,7 +490,7 @@ def _update_common_reactions(base_dir: str, emoji_content: str) -> None:
print('EX: unable to load common reactions file' +
common_reactions_filename)
if common_reactions:
new_common_reactions = []
new_common_reactions: list[str] = []
reaction_found = False
for line in common_reactions:
if ' ' + emoji_content in line:
@ -581,7 +581,7 @@ def update_reaction_collection(recent_posts_cache: {},
obj['reactions'] = reactions_json
else:
if not obj['reactions'].get('items'):
obj['reactions']['items'] = []
obj['reactions']['items']: list[dict] = []
# upper limit for the number of reactions on a post
if len(obj['reactions']['items']) >= MAX_ACTOR_REACTIONS_PER_POST:
return
@ -623,7 +623,7 @@ def html_emoji_reactions(post_json_object: {}, interactive: bool,
if not post_json_object['object']['reactions'].get('items'):
return ''
reactions = {}
reacted_to_by_this_actor = []
reacted_to_by_this_actor: list[str] = []
for item in post_json_object['object']['reactions']['items']:
emoji_content = item['content']
emoji_actor = item['actor']

View File

@ -425,7 +425,7 @@ def _deduplicate_recent_books_list(base_dir: str,
return
# load recent books as a list
recent_lines = []
recent_lines: list[str] = []
try:
with open(recent_books_filename, 'r',
encoding='utf-8') as fp_recent:
@ -435,7 +435,7 @@ def _deduplicate_recent_books_list(base_dir: str,
recent_books_filename + ' ' + str(ex))
# deduplicate the list
new_recent_lines = []
new_recent_lines: list[str] = []
for line in recent_lines:
if line not in new_recent_lines:
new_recent_lines.append(line)
@ -525,7 +525,7 @@ def store_book_events(base_dir: str,
# update the cache for this reader
books_cache['readers'][actor] = reader_books_json
if 'reader_list' not in books_cache:
books_cache['reader_list'] = []
books_cache['reader_list']: list[str] = []
if actor in books_cache['reader_list']:
books_cache['reader_list'].remove(actor)
books_cache['reader_list'].append(actor)

View File

@ -236,7 +236,7 @@ def update_moved_actors(base_dir: str, debug: bool) -> None:
print('No cached actors found')
# get the handles to be checked for movedTo attribute
handles_to_check = []
handles_to_check: list[str] = []
dir_str = data_dir(base_dir)
for _, dirs, _ in os.walk(dir_str):
for account in dirs:
@ -332,7 +332,7 @@ def _get_inactive_accounts(base_dir: str, nickname: str, domain: str,
followers_filename)
followers_list = followers_str.split('\n')
result = []
result: list[str] = []
users_list = get_user_paths()
for handle in followers_list:
if handle in result:

View File

@ -54,7 +54,7 @@ def _add_role(base_dir: str, nickname: str, domain: str,
if os.path.isfile(role_file):
# is this nickname already in the file?
lines = []
lines: list[str] = []
try:
with open(role_file, 'r', encoding='utf-8') as fp_role:
lines = fp_role.readlines()
@ -179,7 +179,7 @@ def actor_roles_from_list(actor_json: {}, roles_list: []) -> None:
"""Sets roles from a list
"""
# clear Roles from the occupation list
empty_roles_list = []
empty_roles_list: list[dict] = []
for occupation_item in actor_json['hasOccupation']:
if not isinstance(occupation_item, dict):
continue
@ -202,7 +202,7 @@ def get_actor_roles_list(actor_json: {}) -> []:
return []
if not isinstance(actor_json['hasOccupation'], list):
return []
roles_list = []
roles_list: list[str] = []
for occupation_item in actor_json['hasOccupation']:
if not isinstance(occupation_item, dict):
continue

View File

@ -41,7 +41,7 @@ def _update_post_schedule(base_dir: str, handle: str, httpd,
days_since_epoch = (curr_time - date_epoch()).days
schedule_dir = acct_handle_dir(base_dir, handle) + '/scheduled/'
index_lines = []
index_lines: list[str] = []
delete_schedule_post = False
nickname = handle.split('@')[0]
shared_items_federated_domains = httpd.shared_items_federated_domains

View File

@ -487,7 +487,7 @@ def _expire_shares_for_account(base_dir: str, nickname: str, domain: str,
if not shares_json:
return 0
curr_time = int(time.time())
delete_item_id = []
delete_item_id: list[str] = []
for item_id, item in shares_json.items():
if curr_time > item['expire']:
delete_item_id.append(item_id)
@ -1611,7 +1611,7 @@ def merge_shared_item_tokens(base_dir: str, domain_full: str,
"""When the shared item federation domains list has changed, update
the tokens dict accordingly
"""
removals = []
removals: list[str] = []
changed = False
for token_domain_full, _ in tokens_json.items():
if domain_full:
@ -1905,7 +1905,7 @@ def run_federated_shares_daemon(base_dir: str, httpd, http_prefix: str,
min_days = 7
max_days = 14
_generate_next_shares_token_update(base_dir, min_days, max_days)
sites_unavailable = []
sites_unavailable: list[str] = []
while True:
shared_items_federated_domains_str = \
get_config_param(base_dir, 'sharedItemsFederatedDomains')
@ -1919,7 +1919,7 @@ def run_federated_shares_daemon(base_dir: str, httpd, http_prefix: str,
min_days, max_days, httpd)
# get a list of the domains within the shared items federation
shared_items_federated_domains = []
shared_items_federated_domains: list[str] = []
fed_domains_list = \
shared_items_federated_domains_str.split(',')
for shared_fed_domain in fed_domains_list:
@ -2327,7 +2327,7 @@ def actor_attached_shares(actor_json: {}) -> []:
if not isinstance(actor_json['attachment'], list):
return []
attached_shares = []
attached_shares: list[str] = []
for attach_item in actor_json['attachment']:
if _is_valueflows_attachment(attach_item):
attached_shares.append(attach_item['href'])
@ -2369,11 +2369,11 @@ def add_shares_to_actor(base_dir: str,
https://codeberg.org/fediverse/fep/src/branch/main/fep/0837/fep-0837.md
"""
if 'attachment' not in actor_json:
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
changed = False
# remove any existing ValueFlows items from attachment list
new_attachment = []
new_attachment: list[dict] = []
for attach_item in actor_json['attachment']:
is_proposal = False
if _is_valueflows_attachment(attach_item):

View File

@ -174,7 +174,7 @@ def load_unavailable_sites(base_dir: str) -> []:
"""load a list of unavailable sites
"""
unavailable_sites_filename = data_dir(base_dir) + '/unavailable_sites.txt'
sites_unavailable = []
sites_unavailable: list[str] = []
try:
with open(unavailable_sites_filename, 'r',
encoding='utf-8') as fp_sites:

View File

@ -29,7 +29,7 @@ def set_skills_from_dict(actor_json: {}, skills_dict: {}) -> []:
"""Converts a dict containing skills to a list
Returns the string version of the dictionary
"""
skills_list = []
skills_list: list[str] = []
for name, value in skills_dict.items():
skills_list.append(name + ':' + str(value))
set_occupation_skills_list(actor_json, skills_list)

View File

@ -150,7 +150,7 @@ def _speaker_pronounce(base_dir: str, say_text: str, translate: {}) -> str:
")": ","
}
if os.path.isfile(pronounce_filename):
pronounce_list = []
pronounce_list: list[str] = []
try:
with open(pronounce_filename, 'r', encoding='utf-8') as fp_pro:
pronounce_list = fp_pro.readlines()
@ -419,7 +419,7 @@ def speakable_text(http_prefix: str,
if ' <3' in content:
content = content.replace(' <3', ' ' + translate['heart'])
content = remove_html(html_replace_quote_marks(content))
detected_links = []
detected_links: list[str] = []
content = speaker_replace_links(http_prefix,
nickname, domain, domain_full,
content, translate, detected_links)
@ -451,7 +451,7 @@ def _post_to_speaker_json(base_dir: str, http_prefix: str,
return {}
if not isinstance(post_json_object['object']['content'], str):
return {}
detected_links = []
detected_links: list[str] = []
content = urllib.parse.unquote_plus(post_json_object['object']['content'])
content = html.unescape(content)
content = content.replace('<p>', '').replace('</p>', ' ')
@ -532,11 +532,11 @@ def _post_to_speaker_json(base_dir: str, http_prefix: str,
post_id = remove_id_ending(post_json_object['object']['id'])
follow_requests_exist = False
follow_requests_list = []
follow_requests_list: list[str] = []
accounts_dir = acct_dir(base_dir, nickname, domain_full)
approve_follows_filename = accounts_dir + '/followrequests.txt'
if os.path.isfile(approve_follows_filename):
follows = []
follows: list[str] = []
try:
with open(approve_follows_filename, 'r',
encoding='utf-8') as fp_foll:

2
ssb.py
View File

@ -71,7 +71,7 @@ def set_ssb_address(actor_json: {}, ssb_address: str) -> None:
not_ssb_address = True
if not actor_json.get('attachment'):
actor_json['attachment'] = []
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None

View File

@ -753,7 +753,7 @@ def create_server_alice(path: str, domain: str, port: int,
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
shared_items_federated_domains = []
shared_items_federated_domains: list[str] = []
system_language = 'en'
languages_understood = [system_language]
nickname = 'alice'
@ -808,7 +808,7 @@ def create_server_alice(path: str, domain: str, port: int,
chat_url = ''
auto_cw_cache = {}
test_video_transcript = ''
searchable_by = []
searchable_by: list[str] = []
session = None
create_public_post(path, nickname, domain, port, http_prefix,
"No wise fish would go anywhere without a porpoise",
@ -886,13 +886,13 @@ def create_server_alice(path: str, domain: str, port: int,
show_node_info_version = True
city = 'London, England'
log_login_failures = False
user_agents_blocked = []
user_agents_blocked: list[str] = []
max_like_count = 10
default_reply_interval_hrs = 9999999999
lists_enabled = ''
content_license_url = 'https://creativecommons.org/licenses/by-nc/4.0'
dyslexic_font = False
crawlers_allowed = []
crawlers_allowed: list[str] = []
check_actor_timeout = 2
preferred_podcast_formats = None
clacks = None
@ -950,7 +950,7 @@ def create_server_bob(path: str, domain: str, port: int,
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
shared_items_federated_domains = []
shared_items_federated_domains: list[str] = []
system_language = 'en'
languages_understood = [system_language]
nickname = 'bob'
@ -1005,7 +1005,7 @@ def create_server_bob(path: str, domain: str, port: int,
chat_url = ''
auto_cw_cache = {}
test_video_transcript = ''
searchable_by = []
searchable_by: list[str] = []
session = None
create_public_post(path, nickname, domain, port, http_prefix,
"It's your life, live it your way.",
@ -1083,7 +1083,7 @@ def create_server_bob(path: str, domain: str, port: int,
show_node_info_version = True
city = 'London, England'
log_login_failures = False
user_agents_blocked = []
user_agents_blocked: list[str] = []
max_like_count = 10
default_reply_interval_hrs = 9999999999
lists_enabled = ''