Variable types

main
bashrc 2026-04-28 17:07:48 +01:00
parent a9f9982a3d
commit 55b69c0a71
2 changed files with 171 additions and 168 deletions

View File

@ -28,7 +28,7 @@ def load_string(filename: str, exception_text: str) -> str:
"""
try:
with open(filename, 'r', encoding='utf-8') as fp:
text = fp.read()
text: str = fp.read()
return text
except OSError as exc:
if '[ex]' in exception_text:
@ -56,7 +56,7 @@ def load_line(filename: str, exception_text: str) -> str:
"""
try:
with open(filename, 'r', encoding='utf-8') as fp:
text = fp.readline()
text: str = fp.readline()
return text
except OSError as exc:
if '[ex]' in exception_text:
@ -70,7 +70,7 @@ def load_list(filename: str, exception_text: str) -> str:
This is used to replace readlines
"""
lines: list[str] = []
lines_str = load_string(filename, exception_text)
lines_str: str = load_string(filename, exception_text)
if lines_str is None:
return None
if lines_str:

333
utils.py
View File

@ -99,7 +99,7 @@ def get_person_icon(person_json: {}) -> str:
"""
if not person_json.get('icon'):
return ''
person_icon = person_json['icon']
person_icon: list = person_json['icon']
if isinstance(person_icon, list):
# choose the first icon available
person_icon = person_json['icon'][0]
@ -149,7 +149,7 @@ def text_in_file(text: str, filename: str,
if not case_sensitive:
text = text.lower()
content = \
content: str = \
load_string(filename,
'EX: unable to find text in missing file ' + filename)
if content:
@ -193,11 +193,11 @@ def get_actor_languages_list(actor_json: {}) -> []:
if not property_value['type'].endswith('PropertyValue'):
continue
if isinstance(property_value[prop_value_name], list):
lang_list = property_value[prop_value_name]
lang_list: list = property_value[prop_value_name]
lang_list.sort()
return lang_list
if isinstance(property_value[prop_value_name], str):
lang_str = property_value[prop_value_name]
lang_str: str = property_value[prop_value_name]
lang_list_temp: list[str] = []
if ',' in lang_str:
lang_list_temp = lang_str.split(',')
@ -237,7 +237,7 @@ def remove_markup_tag(html: str, tag: str) -> str:
'</' + tag not in html:
return html
section = html.split('<' + tag)
section: list[str] = html.split('<' + tag)
result: str = ''
for text in section:
if not result:
@ -248,8 +248,8 @@ def remove_markup_tag(html: str, tag: str) -> str:
continue
result += text.split('>', 1)[1]
html = result
section = html.split('</' + tag)
html: str = result
section: list[str] = html.split('</' + tag)
result: str = ''
for text in section:
if not result:
@ -266,7 +266,7 @@ def remove_markup_tag(html: str, tag: str) -> str:
def remove_header_tags(html: str) -> str:
"""Removes any header tags from the given html text
"""
header_tags = ('h1', 'h2', 'h3', 'h4', 'h5')
header_tags: list[str] = ('h1', 'h2', 'h3', 'h4', 'h5')
for tag_str in header_tags:
html = remove_markup_tag(html, tag_str)
return html
@ -278,14 +278,14 @@ def get_content_from_post(post_json_object: {}, system_language: str,
"""Returns the content from the post in the given language
including searching for a matching entry within contentMap
"""
this_post_json = post_json_object
this_post_json: dict = post_json_object
if has_object_dict(post_json_object):
# handle quote posts FEP-dd4b, where there is no content within object
if (content_type != 'content' or
('content' in this_post_json['object'] or
'contentMap' in this_post_json['object'])):
this_post_json = post_json_object['object']
map_dict = content_type + 'Map'
map_dict: str = content_type + 'Map'
has_contentmap_dict: bool = False
if this_post_json.get(map_dict):
if isinstance(this_post_json[map_dict], dict):
@ -313,7 +313,7 @@ def get_content_from_post(post_json_object: {}, system_language: str,
for lang in languages_understood:
if not this_post_json[map_dict].get(lang):
continue
map_lang = this_post_json[map_dict][lang]
map_lang: str = this_post_json[map_dict][lang]
if not isinstance(map_lang, str):
continue
content = map_lang
@ -334,18 +334,18 @@ def get_language_from_post(post_json_object: {}, system_language: str,
"""Returns the content language from the post
including searching for a matching entry within contentMap
"""
this_post_json = post_json_object
this_post_json: dict = post_json_object
if has_object_dict(post_json_object):
this_post_json = post_json_object['object']
if not this_post_json.get(content_type):
return system_language
map_dict = content_type + 'Map'
map_dict: str = content_type + 'Map'
if not this_post_json.get(map_dict):
return system_language
if not isinstance(this_post_json[map_dict], dict):
return system_language
if this_post_json[map_dict].get(system_language):
sys_lang = this_post_json[map_dict][system_language]
sys_lang: str = this_post_json[map_dict][system_language]
if isinstance(sys_lang, str):
return system_language
else:
@ -360,7 +360,7 @@ def get_language_from_post(post_json_object: {}, system_language: str,
def get_post_attachments(post_json_object: {}) -> []:
""" Returns the list of attachments for a post
"""
post_obj = post_json_object
post_obj: dict = post_json_object
if has_object_dict(post_json_object):
post_obj = post_json_object['object']
if not post_obj.get('attachment'):
@ -407,14 +407,14 @@ def get_summary_from_post(post_json_object: {}, system_language: str,
"""Returns the summary from the post in the given language
including searching for a matching entry within summaryMap.
"""
summary_str = \
summary_str: str = \
get_content_from_post(post_json_object, system_language,
languages_understood, 'summary')
if not summary_str:
# Also try the "name" field if summary is not available.
# See https://codeberg.org/
# fediverse/fep/src/branch/main/fep/b2b8/fep-b2b8.md
obj = post_json_object
obj: dict = post_json_object
if has_object_dict(post_json_object):
obj = post_json_object['object']
if obj.get('type'):
@ -427,7 +427,7 @@ def get_summary_from_post(post_json_object: {}, system_language: str,
if summary_str:
summary_str = summary_str.strip()
if not _valid_summary(summary_str):
summary_str: str = ''
summary_str = ''
return summary_str
@ -435,7 +435,7 @@ def get_base_content_from_post(post_json_object: {},
system_language: str) -> str:
"""Returns the content from the post in the given language
"""
this_post_json = post_json_object
this_post_json: dict = post_json_object
if has_object_dict(post_json_object):
# handle quote posts FEP-dd4b, where there is no content within object
if 'content' in this_post_json['object'] or \
@ -469,9 +469,9 @@ def set_accounts_data_dir(base_dir: str, accounts_data_path: str) -> None:
accounts_data_path_filename = base_dir + '/data_path.txt'
if os.path.isfile(accounts_data_path_filename):
# read the existing path
path = load_string(accounts_data_path_filename,
'EX: unable to read ' +
accounts_data_path_filename)
path: str = load_string(accounts_data_path_filename,
'EX: unable to read ' +
accounts_data_path_filename)
if path:
if path.strip() == accounts_data_path:
# path is already set, so avoid writing it again
@ -495,11 +495,11 @@ def data_dir(base_dir: str) -> str:
__accounts_data_path__ = base_dir + '/accounts'
# is an alternative path set?
accounts_data_path_filename = base_dir + '/data_path.txt'
accounts_data_path_filename: str = base_dir + '/data_path.txt'
if os.path.isfile(accounts_data_path_filename):
path = load_string(accounts_data_path_filename,
'EX: unable to read ' +
accounts_data_path_filename)
path: str = load_string(accounts_data_path_filename,
'EX: unable to read ' +
accounts_data_path_filename)
if path:
__accounts_data_path__ = path.strip()
print('Accounts data path set to ' + __accounts_data_path__)
@ -522,7 +522,7 @@ def acct_handle_dir(base_dir: str, handle: str) -> str:
def refresh_newswire(base_dir: str) -> None:
"""Causes the newswire to be updates after a change to user accounts
"""
refresh_newswire_filename = data_dir(base_dir) + '/.refresh_newswire'
refresh_newswire_filename: str = data_dir(base_dir) + '/.refresh_newswire'
if os.path.isfile(refresh_newswire_filename):
return
save_flag_file(refresh_newswire_filename,
@ -583,17 +583,17 @@ def has_users_path(path_str: str) -> bool:
if not path_str:
return False
users_list = get_user_paths()
users_list: list[str] = get_user_paths()
for users_str in users_list:
if users_str in path_str:
return True
if '://' in path_str:
domain = path_str.split('://')[1]
domain: str = path_str.split('://')[1]
if '/' in domain:
domain = domain.split('/')[0]
if '://' + domain + '/' not in path_str:
return False
nickname = path_str.split('://' + domain + '/')[1]
nickname: str = path_str.split('://' + domain + '/')[1]
if '/' in nickname or '.' in nickname:
return False
return True
@ -619,14 +619,14 @@ def remove_html(content: str) -> str:
if '<' not in content:
return content
removing: bool = False
replacements = {
replacements: dict = {
'<a href': ' <a href',
'<q>': '"',
'</q>': '"',
'</p>': '\n\n',
'<br>': '\n'
}
content = replace_strings(content, replacements)
content: str = replace_strings(content, replacements)
result: str = ''
for char in content:
if char == '<':
@ -636,10 +636,10 @@ def remove_html(content: str) -> str:
elif not removing:
result += char
plain_text = result.replace(' ', ' ')
plain_text: str = result.replace(' ', ' ')
# insert spaces after full stops
str_len = len(plain_text)
str_len: int = len(plain_text)
result: str = ''
for i in range(str_len):
result += plain_text[i]
@ -668,7 +668,7 @@ def remove_style_within_html(content: str) -> str:
return content
if ' style="' not in content:
return content
sections = content.split(' style="')
sections: list[str] = content.split(' style="')
result: str = ''
ctr: int = 0
for section_text in sections:
@ -686,7 +686,7 @@ def first_paragraph_from_string(content: str) -> str:
"""
if '<p>' not in content or '</p>' not in content:
return remove_html(content)
paragraph = content.split('<p>')[1]
paragraph: str = content.split('<p>')[1]
if '</p>' in paragraph:
paragraph = paragraph.split('</p>')[0]
return remove_html(paragraph)
@ -695,7 +695,7 @@ def first_paragraph_from_string(content: str) -> str:
def get_memorials(base_dir: str) -> str:
"""Returns the nicknames for memorial accounts
"""
memorial_file = data_dir(base_dir) + '/memorial'
memorial_file: str = data_dir(base_dir) + '/memorial'
if not os.path.isfile(memorial_file):
return ''
@ -710,7 +710,7 @@ def set_memorials(base_dir: str, domain: str, memorial_str) -> None:
"""Sets the nicknames for memorial accounts
"""
# check that the accounts exist
memorial_list = memorial_str.split('\n')
memorial_list: list[str] = memorial_str.split('\n')
new_memorial_str: str = ''
for memorial_item in memorial_list:
memorial_nick = memorial_item.strip()
@ -720,7 +720,7 @@ def set_memorials(base_dir: str, domain: str, memorial_str) -> None:
memorial_str = new_memorial_str
# save the accounts
memorial_file = data_dir(base_dir) + '/memorial'
memorial_file: str = data_dir(base_dir) + '/memorial'
save_string(memorial_str, memorial_file,
'EX: unable to write ' + memorial_file)
@ -728,10 +728,10 @@ def set_memorials(base_dir: str, domain: str, memorial_str) -> None:
def _create_config(base_dir: str) -> None:
"""Creates a configuration file
"""
config_filename = base_dir + '/config.json'
config_filename: str = base_dir + '/config.json'
if os.path.isfile(config_filename):
return
config_json = {}
config_json: dict = {}
save_json(config_json, config_filename)
@ -742,13 +742,13 @@ def set_config_param(base_dir: str, variable_name: str,
if not variable_name:
return
_create_config(base_dir)
config_filename = base_dir + '/config.json'
config_json = {}
config_filename: str = base_dir + '/config.json'
config_json: dict = {}
if os.path.isfile(config_filename):
config_json = load_json(config_filename)
if config_json is None:
config_json = {}
variable_name = _convert_to_camel_case(variable_name)
variable_name: str = _convert_to_camel_case(variable_name)
if not variable_name:
return
config_json[variable_name] = variable_value
@ -759,10 +759,10 @@ def get_config_param(base_dir: str, variable_name: str) -> str:
"""Gets a configuration value
"""
_create_config(base_dir)
config_filename = base_dir + '/config.json'
config_json = load_json(config_filename)
config_filename: str = base_dir + '/config.json'
config_json: dict = load_json(config_filename)
if config_json:
variable_name = _convert_to_camel_case(variable_name)
variable_name: str = _convert_to_camel_case(variable_name)
if variable_name in config_json:
return config_json[variable_name]
return None
@ -773,7 +773,7 @@ def get_followers_list(base_dir: str,
follow_file: str = 'following.txt') -> []:
"""Returns a list of followers for the given account
"""
filename = acct_dir(base_dir, nickname, domain) + '/' + follow_file
filename: str = acct_dir(base_dir, nickname, domain) + '/' + follow_file
if not os.path.isfile(filename):
return []
@ -794,9 +794,9 @@ def get_mutuals_of_person(base_dir: str,
"""Returns the mutuals of a person
i.e. accounts which they follow and which also follow back
"""
followers = \
followers: list[str] = \
get_followers_list(base_dir, nickname, domain, 'followers.txt')
following = \
following: list[str] = \
get_followers_list(base_dir, nickname, domain, 'following.txt')
mutuals: list[str] = []
for handle in following:
@ -812,15 +812,15 @@ def get_followers_of_person(base_dir: str,
Used by the shared inbox to know who to send incoming mail to
"""
followers: list[str] = []
domain = remove_domain_port(domain)
handle = nickname + '@' + domain
handle_dir = acct_handle_dir(base_dir, handle)
domain: str = remove_domain_port(domain)
handle: str = nickname + '@' + domain
handle_dir: str = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
return followers
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
for subdir, dirs, _ in os.walk(dir_str):
for account in dirs:
filename = os.path.join(subdir, account) + '/' + follow_file
filename: str = os.path.join(subdir, account) + '/' + follow_file
if account == handle or \
string_starts_with(account, ('inbox@', 'Actor@', 'news@')):
continue
@ -846,29 +846,29 @@ def remove_id_ending(id_str: str) -> str:
"""Removes endings such as /activity and /undo
"""
if id_str.endswith('/activity'):
id_str = id_str[:-len('/activity')]
id_str: str = id_str[:-len('/activity')]
elif id_str.endswith('/undo'):
id_str = id_str[:-len('/undo')]
id_str: str = id_str[:-len('/undo')]
elif id_str.endswith('/event'):
id_str = id_str[:-len('/event')]
id_str: str = id_str[:-len('/event')]
elif id_str.endswith('/replies'):
id_str = id_str[:-len('/replies')]
id_str: str = id_str[:-len('/replies')]
elif id_str.endswith('/delete'):
id_str = id_str[:-len('/delete')]
id_str: str = id_str[:-len('/delete')]
elif id_str.endswith('/update'):
id_str = id_str[:-len('/update')]
id_str: str = id_str[:-len('/update')]
if id_str.endswith('#Create'):
id_str = id_str.split('#Create')[0]
id_str: str = id_str.split('#Create')[0]
elif id_str.endswith('#delete'):
id_str = id_str.split('#delete')[0]
id_str: str = id_str.split('#delete')[0]
elif '#update' in id_str:
id_str = id_str.split('#update')[0]
id_str: str = id_str.split('#update')[0]
elif '#moved' in id_str:
id_str = id_str.split('#moved')[0]
id_str: str = id_str.split('#moved')[0]
elif '#primary' in id_str:
id_str = id_str.split('#primary')[0]
id_str: str = id_str.split('#primary')[0]
elif '#reciprocal' in id_str:
id_str = id_str.split('#reciprocal')[0]
id_str: str = id_str.split('#reciprocal')[0]
return id_str
@ -929,8 +929,9 @@ def load_json(filename: str) -> {}:
filename = filename.replace('/Actor@', '/inbox@')
json_object = None
data = load_string(filename,
'EX: load_json exception ' + str(filename) + ' [ex]')
data: str = load_string(filename,
'EX: load_json exception ' +
str(filename) + ' [ex]')
if data is None:
return json_object
@ -957,8 +958,9 @@ def load_json_onionify(filename: str, domain: str, onion_domain: str,
json_object = None
tries: int = 0
while tries < 5:
data = load_string(filename,
'EX: load_json_onionify exception ' + filename)
data: str = load_string(filename,
'EX: load_json_onionify exception ' +
filename)
if data is None:
if delay_sec > 0:
time.sleep(delay_sec)
@ -981,8 +983,8 @@ def evil_incarnate() -> []:
def evil_nickname(sending_actor_nickname: str) -> bool:
"""sender nicknames which are automatically rejected
"""
evil_nicks = ('hitler', '1488')
nickname_lower = sending_actor_nickname.lower()
evil_nicks: list[str] = ('hitler', '1488')
nickname_lower: str = sending_actor_nickname.lower()
for nick in evil_nicks:
if nick in nickname_lower:
return True
@ -1028,11 +1030,11 @@ def create_person_dir(nickname: str, domain: str, base_dir: str,
dir_name: str) -> str:
"""Create a directory for a person
"""
handle = nickname + '@' + domain
handle_dir = acct_handle_dir(base_dir, handle)
handle: str = nickname + '@' + domain
handle_dir: str = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
os.mkdir(handle_dir)
box_dir = acct_handle_dir(base_dir, handle) + '/' + dir_name
box_dir: str = acct_handle_dir(base_dir, handle) + '/' + dir_name
if not os.path.isdir(box_dir):
os.mkdir(box_dir)
return box_dir
@ -1072,20 +1074,20 @@ def _is_dangerous_string_tag(content: str, allow_local_network_access: bool,
"""Returns true if the given string is dangerous
"""
for separator_style in separators:
start_char = separator_style[0]
end_char = separator_style[1]
start_char: str = separator_style[0]
end_char: str = separator_style[1]
if start_char not in content:
continue
if end_char not in content:
continue
content_sections = content.split(start_char)
content_sections: list[str] = content.split(start_char)
invalid_partials = ()
if not allow_local_network_access:
invalid_partials = get_local_network_addresses()
invalid_partials: list[str] = get_local_network_addresses()
for markup in content_sections:
if end_char not in markup:
continue
markup = markup.split(end_char)[0].strip()
markup: str = markup.split(end_char)[0].strip()
for partial_match in invalid_partials:
if partial_match in markup:
return True
@ -1113,20 +1115,20 @@ def _is_dangerous_string_simple(content: str, allow_local_network_access: bool,
"""Returns true if the given string is dangerous
"""
for separator_style in separators:
start_char = separator_style[0]
end_char = separator_style[1]
start_char: str = separator_style[0]
end_char: str = separator_style[1]
if start_char not in content:
continue
if end_char not in content:
continue
content_sections = content.split(start_char)
content_sections: list[str] = content.split(start_char)
invalid_partials = ()
if not allow_local_network_access:
invalid_partials = get_local_network_addresses()
for markup in content_sections:
if end_char not in markup:
continue
markup = markup.split(end_char)[0].strip()
markup: str = markup.split(end_char)[0].strip()
for partial_match in invalid_partials:
if partial_match in markup:
return True
@ -1139,12 +1141,12 @@ def _is_dangerous_string_simple(content: str, allow_local_network_access: bool,
def html_tag_has_closing(tag_name: str, content: str) -> bool:
"""Does the given tag have opening and closing labels?
"""
content_lower = content.lower()
content_lower: str = content.lower()
if '<' + tag_name not in content_lower:
return True
sections = content_lower.split('<' + tag_name)
sections: list[str] = content_lower.split('<' + tag_name)
ctr: int = 0
end_tag = '</' + tag_name + '>'
end_tag: str = '</' + tag_name + '>'
for section in sections:
if ctr == 0:
ctr += 1
@ -1171,8 +1173,8 @@ def dangerous_markup(content: str, allow_local_network_access: bool,
"""
if '.svg' in content.lower():
return True
separators = [['<', '>'], ['&lt;', '&gt;']]
invalid_strings = [
separators: list[list] = [['<', '>'], ['&lt;', '&gt;']]
invalid_strings: list[str] = [
'ampproject', 'googleapis', '_exec(', ' id=', ' name='
]
if _is_dangerous_string_simple(content, allow_local_network_access,
@ -1181,7 +1183,7 @@ def dangerous_markup(content: str, allow_local_network_access: bool,
for closing_tag in ('code', 'pre'):
if not html_tag_has_closing(closing_tag, content):
return True
invalid_strings = [
invalid_strings: list[str] = [
'script', 'noscript', 'canvas', 'style', 'abbr', 'input',
'frame', 'iframe', 'html', 'body', 'hr', 'allow-popups',
'allow-scripts', 'amp-', '?php', 'pre'
@ -1196,8 +1198,8 @@ def dangerous_markup(content: str, allow_local_network_access: bool,
def dangerous_svg(content: str, allow_local_network_access: bool) -> bool:
"""Returns true if the given svg file content contains dangerous scripts
"""
separators = [['<', '>'], ['&lt;', '&gt;']]
invalid_strings = [
separators: list[list] = [['<', '>'], ['&lt;', '&gt;']]
invalid_strings: list[str] = [
'script'
]
return _is_dangerous_string_tag(content, allow_local_network_access,
@ -1217,7 +1219,7 @@ def _get_statuses_list() -> []:
def contains_statuses(url: str) -> bool:
"""Whether the given url contains /statuses/
"""
statuses_list = _get_statuses_list()
statuses_list: list[str] = _get_statuses_list()
for status_str in statuses_list:
if status_str in url:
return True
@ -1235,15 +1237,15 @@ def get_actor_from_post_id(post_id: str) -> str:
eg. https://somedomain/users/nick/statuses/123 becomes
https://somedomain/users/nick
"""
actor = post_id
statuses_list = _get_statuses_list()
pixelfed_style_statuses = ['/p/']
actor: str = post_id
statuses_list: list[str] = _get_statuses_list()
pixelfed_style_statuses: list[str] = ['/p/']
for status_str in statuses_list:
if status_str not in actor:
continue
if status_str in pixelfed_style_statuses:
# pixelfed style post id
nick = actor.split(status_str)[1]
nick: str = actor.split(status_str)[1]
if '/' in nick:
nick = nick.split('/')[0]
actor = actor.split(status_str)[0] + '/users/' + nick
@ -1265,7 +1267,7 @@ def get_display_name(base_dir: str, actor: str, person_cache: {}) -> str:
if isinstance(person_cache[actor]['actor']['name'], str):
name_found = person_cache[actor]['actor']['name']
# Try to obtain from the cached actors
cached_actor_filename = \
cached_actor_filename: str = \
base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cached_actor_filename):
actor_json = load_json(cached_actor_filename)
@ -1290,7 +1292,7 @@ def get_actor_type(base_dir: str, actor: str, person_cache: {}) -> str:
if isinstance(person_cache[actor]['actor']['type'], str):
return person_cache[actor]['actor']['type']
# Try to obtain from the cached actors
cached_actor_filename = \
cached_actor_filename: str = \
base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cached_actor_filename):
actor_json = load_json(cached_actor_filename)
@ -1305,7 +1307,7 @@ def display_name_is_emoji(display_name: str) -> bool:
"""Returns true if the given display name is an emoji
"""
if ' ' in display_name:
words = display_name.split(' ')
words: list[str] = display_name.split(' ')
for wrd in words:
if not wrd.startswith(':'):
return False
@ -1327,7 +1329,7 @@ def _gender_from_string(translate: {}, text: str) -> str:
gender = None
if not text:
return None
text_orig = text
text_orig: str = text
text = text.lower()
if translate['He/Him'].lower() in text or \
translate['boy'].lower() in text:
@ -1350,7 +1352,7 @@ def get_gender_from_bio(base_dir: str, actor: str, person_cache: {},
"""Tries to ascertain gender from bio description
This is for use by text-to-speech for pitch setting
"""
default_gender = 'They/Them'
default_gender: str = 'They/Them'
actor = get_actor_from_post_id(actor)
if not person_cache.get(actor):
return default_gender
@ -1364,15 +1366,15 @@ def get_gender_from_bio(base_dir: str, actor: str, person_cache: {},
actor_json = person_cache[actor]['actor']
else:
# Try to obtain from the cached actors
cached_actor_filename = \
cached_actor_filename: str = \
base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cached_actor_filename):
actor_json = load_json(cached_actor_filename)
actor_json: dict = load_json(cached_actor_filename)
if not actor_json:
return default_gender
# is gender defined as a profile tag?
if actor_json.get('attachment'):
tags_list = actor_json['attachment']
tags_list: list[dict] = actor_json['attachment']
if isinstance(tags_list, list):
# look for a gender field name
for tag in tags_list:
@ -1436,11 +1438,11 @@ def get_nickname_from_actor(actor: str) -> str:
# handle brid.gy urls
actor = actor.replace('at://did:', 'did:')
users_paths = get_user_paths()
users_paths: list[str] = get_user_paths()
for possible_path in users_paths:
if possible_path not in actor:
continue
nick_str = actor.split(possible_path)[1].replace('@', '')
nick_str: str = actor.split(possible_path)[1].replace('@', '')
if '/' not in nick_str:
return _remove_did_prefix(nick_str)
nick_str = nick_str.split('/')[0]
@ -1448,20 +1450,20 @@ def get_nickname_from_actor(actor: str) -> str:
if '/@/' not in actor:
if '/@' in actor:
# https://domain/@nick
nick_str = actor.split('/@')[1]
nick_str: str = actor.split('/@')[1]
if '/' in nick_str:
nick_str = nick_str.split('/')[0]
return _remove_did_prefix(nick_str)
if '@' in actor:
nick_str = actor.split('@')[0]
nick_str: str = actor.split('@')[0]
return _remove_did_prefix(nick_str)
if '://' in actor:
domain = actor.split('://')[1]
domain: str = actor.split('://')[1]
if '/' in domain:
domain = domain.split('/')[0]
if '://' + domain + '/' not in actor:
return None
nick_str = actor.split('://' + domain + '/')[1]
nick_str: str = actor.split('://' + domain + '/')[1]
if '/' in nick_str or '.' in nick_str:
return None
return _remove_did_prefix(nick_str)
@ -1494,23 +1496,23 @@ def get_domain_from_actor(actor: str) -> (str, int):
if actor.startswith('@'):
actor = actor[1:]
port = None
prefixes = get_protocol_prefixes()
users_paths = get_user_paths()
prefixes: list[str] = get_protocol_prefixes()
users_paths: list[str] = get_user_paths()
for possible_path in users_paths:
if possible_path not in actor:
continue
domain = actor.split(possible_path)[0]
domain: str = actor.split(possible_path)[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
break
if '/@' in actor and '/@/' not in actor:
domain = actor.split('/@')[0]
domain: str = actor.split('/@')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '@' in actor and '/@/' not in actor:
domain = actor.split('@')[1].strip()
domain: str = actor.split('@')[1].strip()
else:
domain = actor
domain: str = actor
for prefix in prefixes:
domain = domain.replace(prefix, '')
if '/' in actor:
@ -1518,10 +1520,10 @@ def get_domain_from_actor(actor: str) -> (str, int):
if '<' in domain:
# handle domain with handle appended
# https://domain<user@domain>
domain = domain.split('<')[0]
domain: str = domain.split('<')[0]
if ':' in domain:
port = get_port_from_domain(domain)
domain = remove_domain_port(domain)
port: int = get_port_from_domain(domain)
domain: str = remove_domain_port(domain)
return domain, port
@ -1530,11 +1532,11 @@ def _set_default_pet_name(base_dir: str, nickname: str, domain: str,
"""Sets a default petname
This helps especially when using onion or i2p address
"""
domain = remove_domain_port(domain)
user_path = acct_dir(base_dir, nickname, domain)
petnames_filename = user_path + '/petnames.txt'
domain: str = remove_domain_port(domain)
user_path: str = acct_dir(base_dir, nickname, domain)
petnames_filename: str = user_path + '/petnames.txt'
petname_lookup_entry = follow_nickname + ' ' + \
petname_lookup_entry: str = follow_nickname + ' ' + \
follow_nickname + '@' + follow_domain + '\n'
if not os.path.isfile(petnames_filename):
# if there is no existing petnames lookup file
@ -1543,9 +1545,10 @@ def _set_default_pet_name(base_dir: str, nickname: str, domain: str,
petnames_filename)
return
petnames_str = load_string(petnames_filename,
'EX: _set_default_pet_name unable to read 1 ' +
petnames_filename)
petnames_str: str = \
load_string(petnames_filename,
'EX: _set_default_pet_name unable to read 1 ' +
petnames_filename)
if petnames_str:
petnames_list = petnames_str.split('\n')
for pet in petnames_list:
@ -1565,8 +1568,8 @@ def follow_person(base_dir: str, nickname: str, domain: str,
follow_file: str) -> bool:
"""Adds a person to the follow list
"""
follow_domain_str_lower1 = follow_domain.lower()
follow_domain_str_lower = remove_eol(follow_domain_str_lower1)
follow_domain_str_lower1: str = follow_domain.lower()
follow_domain_str_lower: str = remove_eol(follow_domain_str_lower1)
if not domain_permitted(follow_domain_str_lower,
federation_list):
if debug:
@ -1577,27 +1580,28 @@ def follow_person(base_dir: str, nickname: str, domain: str,
print('DEBUG: follow of domain ' + follow_domain)
if ':' in domain:
domain_only = remove_domain_port(domain)
handle = nickname + '@' + domain_only
domain_only: str = remove_domain_port(domain)
handle: str = nickname + '@' + domain_only
else:
handle = nickname + '@' + domain
handle: str = nickname + '@' + domain
handle_dir = acct_handle_dir(base_dir, handle)
handle_dir: str = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
print('WARN: account for ' + handle + ' does not exist')
return False
if ':' in follow_domain:
follow_domain_only = remove_domain_port(follow_domain)
handle_to_follow = follow_nickname + '@' + follow_domain_only
follow_domain_only: str = remove_domain_port(follow_domain)
handle_to_follow: str = follow_nickname + '@' + follow_domain_only
else:
handle_to_follow = follow_nickname + '@' + follow_domain
handle_to_follow: str = follow_nickname + '@' + follow_domain
if group_account:
handle_to_follow = '!' + handle_to_follow
# was this person previously unfollowed?
unfollowed_filename = acct_handle_dir(base_dir, handle) + '/unfollowed.txt'
unfollowed_filename: str = \
acct_handle_dir(base_dir, handle) + '/unfollowed.txt'
if os.path.isfile(unfollowed_filename):
if text_in_file(handle_to_follow, unfollowed_filename):
# remove them from the unfollowed file
@ -1614,7 +1618,7 @@ def follow_person(base_dir: str, nickname: str, domain: str,
'EX: follow_person unable to write ' +
unfollowed_filename)
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
if not os.path.isdir(dir_str):
os.mkdir(dir_str)
handle_to_follow = follow_nickname + '@' + follow_domain
@ -1629,7 +1633,7 @@ def follow_person(base_dir: str, nickname: str, domain: str,
# prepend to follow file
try:
with open(filename, 'r+', encoding='utf-8') as fp_foll:
content = fp_foll.read()
content: str = fp_foll.read()
if handle_to_follow + '\n' not in content:
fp_foll.seek(0, 0)
fp_foll.write(handle_to_follow + '\n' + content)
@ -1678,7 +1682,7 @@ def locate_news_votes(base_dir: str, domain: str,
"""Returns the votes filename for a news post
within the news user account
"""
post_url1 = post_url.strip()
post_url1: str = post_url.strip()
post_url = remove_eol(post_url1)
# if this post in the shared inbox?
@ -1689,8 +1693,8 @@ def locate_news_votes(base_dir: str, domain: str,
else:
post_url = post_url + '.json.votes'
account_dir = data_dir(base_dir) + '/news@' + domain + '/'
post_filename = account_dir + 'outbox/' + post_url
account_dir: str = data_dir(base_dir) + '/news@' + domain + '/'
post_filename: str = account_dir + 'outbox/' + post_url
if os.path.isfile(post_filename):
return post_filename
@ -1702,27 +1706,27 @@ def locate_post(base_dir: str, nickname: str, domain: str,
"""Returns the filename for the given status post url
"""
if not replies:
extension = 'json'
extension: str = 'json'
else:
extension = 'replies'
extension: str = 'replies'
# if this post in the shared inbox?
post_url = remove_id_ending(post_url.strip()).replace('/', '#')
post_url: str = remove_id_ending(post_url.strip()).replace('/', '#')
# add the extension
post_url = post_url + '.' + extension
# search boxes
boxes = ('inbox', 'outbox', 'tlblogs')
account_dir = acct_dir(base_dir, nickname, domain) + '/'
boxes: list[str] = ('inbox', 'outbox', 'tlblogs')
account_dir: str = acct_dir(base_dir, nickname, domain) + '/'
for box_name in boxes:
post_filename = account_dir + box_name + '/' + post_url
post_filename: str = account_dir + box_name + '/' + post_url
if os.path.isfile(post_filename):
return post_filename
# check news posts
account_dir = data_dir(base_dir) + '/news' + '@' + domain + '/'
post_filename = account_dir + 'outbox/' + post_url
post_filename: str = account_dir + 'outbox/' + post_url
if os.path.isfile(post_filename):
return post_filename
@ -1741,10 +1745,10 @@ def get_reply_interval_hours(base_dir: str, nickname: str, domain: str,
The reply interval is the number of hours after a post being made
during which replies are allowed
"""
reply_interval_filename = \
reply_interval_filename: str = \
acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours'
if os.path.isfile(reply_interval_filename):
hours_str = \
hours_str: str = \
load_string(reply_interval_filename,
'EX: get_reply_interval_hours unable to read ' +
reply_interval_filename)
@ -1760,9 +1764,9 @@ def set_reply_interval_hours(base_dir: str, nickname: str, domain: str,
The reply interval is the number of hours after a post being made
during which replies are allowed
"""
reply_interval_filename = \
reply_interval_filename: str = \
acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours'
text = str(reply_interval_hours)
text: str = str(reply_interval_hours)
if save_string(text, reply_interval_filename,
'EX: set_reply_interval_hours ' +
'unable to save reply interval ' +
@ -1776,18 +1780,18 @@ def _remove_attachment(base_dir: str, http_prefix: str,
nickname: str, domain: str, post_json: {}) -> None:
"""Removes media files for an attachment
"""
post_attachments = get_post_attachments(post_json)
post_attachments: list[dict] = get_post_attachments(post_json)
if not post_attachments:
return
if not post_attachments[0].get('url'):
return
attachment_url = get_url_from_post(post_attachments[0]['url'])
attachment_url: str = get_url_from_post(post_attachments[0]['url'])
if not attachment_url:
return
attachment_url = remove_html(attachment_url)
# remove the media
media_filename = base_dir + '/' + \
media_filename: str = base_dir + '/' + \
attachment_url.replace(http_prefix + '://' + domain + '/', '')
if os.path.isfile(media_filename):
try:
@ -1797,11 +1801,11 @@ def _remove_attachment(base_dir: str, http_prefix: str,
str(media_filename))
# remove from the log file
account_dir = acct_dir(base_dir, nickname, domain)
account_media_log_filename = account_dir + '/media_log.txt'
account_dir: str = acct_dir(base_dir, nickname, domain)
account_media_log_filename: str = account_dir + '/media_log.txt'
if os.path.isfile(account_media_log_filename):
search_filename = media_filename.replace(base_dir, '')
media_log_text = \
search_filename: str = media_filename.replace(base_dir, '')
media_log_text: str = \
load_string(account_media_log_filename,
'EX: _remove unable to read media log for ' + nickname)
if media_log_text is None:
@ -1821,7 +1825,7 @@ def _remove_attachment(base_dir: str, http_prefix: str,
str(media_filename) + '.vtt')
# remove the etag
etag_filename = media_filename + '.etag'
etag_filename: str = media_filename + '.etag'
if os.path.isfile(etag_filename):
try:
os.remove(etag_filename)
@ -1837,7 +1841,7 @@ def remove_post_from_index(post_url: str, debug: bool,
"""
if not os.path.isfile(index_file):
return
post_id = remove_id_ending(post_url)
post_id: str = remove_id_ending(post_url)
if not text_in_file(post_id, index_file):
return
lines: list[str] = \
@ -1848,8 +1852,7 @@ def remove_post_from_index(post_url: str, debug: bool,
if not lines:
return
try:
with open(index_file, 'w+',
encoding='utf-8') as fp_mod2:
with open(index_file, 'w+', encoding='utf-8') as fp_mod2:
for line in lines:
if line.strip("\n").strip("\r") != post_id:
fp_mod2.write(line)