Variable types

main
bashrc 2026-05-14 20:27:22 +01:00
parent b09eef99a0
commit 5fb7a391ac
9 changed files with 193 additions and 185 deletions

View File

@ -47,13 +47,13 @@ def send_delete_via_server(base_dir: str, session,
print('WARN: No session for send_delete_via_server')
return 6
from_domain_full = get_full_domain(from_domain, from_port)
from_domain_full: str = get_full_domain(from_domain, from_port)
actor = local_actor_url(http_prefix, from_nickname, from_domain_full)
to_url = 'https://www.w3.org/ns/activitystreams#Public'
cc_url = actor + '/followers'
actor: str = local_actor_url(http_prefix, from_nickname, from_domain_full)
to_url: str = 'https://www.w3.org/ns/activitystreams#Public'
cc_url: str = actor + '/followers'
new_delete_json = {
new_delete_json: dict = {
"@context": [
'https://www.w3.org/ns/activitystreams',
'https://w3id.org/security/v1'
@ -65,10 +65,10 @@ def send_delete_via_server(base_dir: str, session,
'type': 'Delete'
}
handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname
handle: str = http_prefix + '://' + from_domain_full + '/@' + from_nickname
# lookup the inbox for the To handle
wf_request = \
wf_request: dict = \
webfinger_handle(session, handle, http_prefix, cached_webfingers,
from_domain, project_version, debug, False,
signing_priv_key_pem, mitm_servers)
@ -81,10 +81,10 @@ def send_delete_via_server(base_dir: str, session,
' did not return a dict. ' + str(wf_request))
return 1
post_to_box = 'outbox'
post_to_box: str = 'outbox'
# get the actor inbox for the To handle
origin_domain = from_domain
origin_domain: str = from_domain
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem, origin_domain,
base_dir, session,
@ -104,14 +104,14 @@ def send_delete_via_server(base_dir: str, session,
print('DEBUG: delete no actor was found for ' + handle)
return 4
auth_header = create_basic_auth_header(from_nickname, password)
auth_header: str = create_basic_auth_header(from_nickname, password)
headers = {
headers: dict = {
'host': from_domain,
'Content-type': 'application/json',
'Authorization': auth_header
}
post_result = \
post_result: str = \
post_json(http_prefix, from_domain_full,
session, new_delete_json, [], inbox_url, headers, 3, True)
if not post_result:
@ -144,15 +144,15 @@ def outbox_delete(base_dir: str, http_prefix: str,
return
if debug:
print('DEBUG: c2s delete request arrived in outbox')
delete_prefix = http_prefix + '://' + domain
actor_url = get_actor_from_post(message_json)
delete_prefix: str = http_prefix + '://' + domain
actor_url: str = get_actor_from_post(message_json)
if (not allow_deletion and
(not message_json['object'].startswith(delete_prefix) or
not actor_url.startswith(delete_prefix))):
if debug:
print('DEBUG: delete not permitted from other instances')
return
message_id = remove_id_ending(message_json['object'])
message_id: str = remove_id_ending(message_json['object'])
if '/statuses/' not in message_id:
if debug:
print('DEBUG: c2s delete object is not a status')
@ -161,21 +161,21 @@ def outbox_delete(base_dir: str, http_prefix: str,
if debug:
print('DEBUG: c2s delete object has no nickname')
return
delete_nickname = get_nickname_from_actor(message_id)
delete_nickname: str = get_nickname_from_actor(message_id)
if delete_nickname != nickname:
if debug:
print("DEBUG: you can't delete a post which " +
"wasn't created by you (nickname does not match)")
return
delete_domain, _ = get_domain_from_actor(message_id)
domain = remove_domain_port(domain)
domain: str = remove_domain_port(domain)
if delete_domain != domain:
if debug:
print("DEBUG: you can't delete a post which " +
"wasn't created by you (domain does not match)")
return
remove_moderation_post_from_index(base_dir, message_id, debug)
post_filename = locate_post(base_dir, delete_nickname, delete_domain,
post_filename: str = locate_post(base_dir, delete_nickname, delete_domain,
message_id)
if not post_filename:
if debug:
@ -191,14 +191,14 @@ def outbox_delete(base_dir: str, http_prefix: str,
def remove_old_hashtags(base_dir: str, max_months: int) -> str:
"""Remove old hashtags
"""
max_months = min(max_months, 11)
max_months: int = min(max_months, 11)
prev_date = date_from_numbers(1970, 1 + max_months, 1, 0, 0)
max_days_since_epoch = (date_utcnow() - prev_date).days
max_days_since_epoch: int = (date_utcnow() - prev_date).days
remove_hashtags: list[str] = []
for _, _, files in os.walk(base_dir + '/tags'):
for fname in files:
tags_filename = os.path.join(base_dir + '/tags', fname)
tags_filename: str = os.path.join(base_dir + '/tags', fname)
if not is_a_file(tags_filename):
continue
# get last modified datetime

View File

@ -31,7 +31,7 @@ def get_donation_url(actor_json: {}) -> str:
return ''
if not isinstance(actor_json['attachment'], list):
return ''
donation_type_list = _get_donation_types()
donation_type_list: list[str] = _get_donation_types()
for property_value in actor_json['attachment']:
if not isinstance(property_value, dict):
print("WARN: actor attachment is not dict: " + str(property_value))
@ -99,8 +99,8 @@ def set_donation_url(actor_json: {}, donate_url: str) -> None:
if not actor_json.get('attachment'):
actor_json['attachment']: list[dict] = []
donation_type = _get_donation_types()
donate_name = None
donation_type: list[str] = _get_donation_types()
donate_name: str = None
for payment_service in donation_type:
if payment_service in donate_url:
donate_name = payment_service
@ -108,7 +108,7 @@ def set_donation_url(actor_json: {}, donate_url: str) -> None:
return
# remove any existing value
property_found = None
property_found: str = None
for property_value in actor_json['attachment']:
if not isinstance(property_value, dict):
print("WARN: actor attachment is not dict: " + str(property_value))
@ -133,7 +133,7 @@ def set_donation_url(actor_json: {}, donate_url: str) -> None:
if not_url:
return
donate_value = \
donate_value: str = \
'<a href="' + donate_url + \
'" rel="me nofollow noopener noreferrer" target="_blank">' + \
donate_url + '</a>'

View File

@ -59,7 +59,7 @@ def set_enigma_pub_key(actor_json: {}, enigma_pub_key: str) -> None:
actor_json['attachment']: list[dict] = []
# remove any existing value
property_found = None
property_found: str = None
for property_value in actor_json['attachment']:
if not isinstance(property_value, dict):
print("WARN: actor attachment is not dict: " + str(property_value))

View File

@ -26,7 +26,8 @@ def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
"""Adds a filter for particular words within the content of a
incoming posts
"""
filters_filename = acct_dir(base_dir, nickname, domain) + '/filters.txt'
filters_filename: str = \
acct_dir(base_dir, nickname, domain) + '/filters.txt'
if is_a_file(filters_filename):
if text_in_file(words, filters_filename):
return False
@ -44,7 +45,7 @@ def add_global_filter(base_dir: str, words: str) -> bool:
return False
if len(words) < 2:
return False
filters_filename = data_dir(base_dir) + '/filters.txt'
filters_filename: str = data_dir(base_dir) + '/filters.txt'
if is_a_file(filters_filename):
if text_in_file(words, filters_filename):
return False
@ -58,12 +59,13 @@ def remove_filter(base_dir: str, nickname: str, domain: str,
words: str) -> bool:
"""Removes a word filter
"""
filters_filename = acct_dir(base_dir, nickname, domain) + '/filters.txt'
filters_filename: str = \
acct_dir(base_dir, nickname, domain) + '/filters.txt'
if not is_a_file(filters_filename):
return False
if not text_in_file(words, filters_filename):
return False
new_filters_filename = filters_filename + '.new'
new_filters_filename: str = filters_filename + '.new'
filters_list: list[str] = \
load_list(filters_filename,
@ -92,12 +94,12 @@ def remove_filter(base_dir: str, nickname: str, domain: str,
def remove_global_filter(base_dir: str, words: str) -> bool:
"""Removes a global word filter
"""
filters_filename = data_dir(base_dir) + '/filters.txt'
filters_filename: str = data_dir(base_dir) + '/filters.txt'
if not is_a_file(filters_filename):
return False
if not text_in_file(words, filters_filename):
return False
new_filters_filename = filters_filename + '.new'
new_filters_filename: str = filters_filename + '.new'
global_list: list[str] = \
load_list(filters_filename,
@ -126,7 +128,7 @@ def remove_global_filter(base_dir: str, words: str) -> bool:
def _is_twitter_post(content: str) -> bool:
"""Returns true if the given post content is a retweet or twitter crosspost
"""
features = (
features: list[str] = (
'/x.com', '/twitter.', '/nitter.',
'@twitter.', '@nitter.', '@x.com',
'>RT <', '_tw<', '_tw@', 'tweet', 'Tweet', '🐦🔗'
@ -157,7 +159,7 @@ def _is_filtered_base(filename: str, content: str,
if not is_a_file(filename):
return False
content = remove_inverted_text(content, system_language)
content: str = remove_inverted_text(content, system_language)
content = remove_square_capitals(content, system_language)
# convert any fancy characters to ordinary ones
@ -168,7 +170,7 @@ def _is_filtered_base(filename: str, content: str,
'EX: _is_filtered_base ' + filename + ' [ex]')
if filtered_list is not None:
for line in filtered_list:
filter_str = remove_eol(line)
filter_str: str = remove_eol(line)
if not filter_str:
continue
if len(filter_str) < 2:
@ -177,7 +179,8 @@ def _is_filtered_base(filename: str, content: str,
if filtered_match(filter_str, content):
return True
else:
filter_words = filter_str.replace('"', '').split('+')
filter_words: list[str] = \
filter_str.replace('"', '').split('+')
for filter_wrd in filter_words:
if not filtered_match(filter_wrd, content):
return False
@ -189,7 +192,7 @@ def is_filtered_globally(base_dir: str, content: str,
system_language: str) -> bool:
"""Is the given content globally filtered?
"""
global_filters_filename = data_dir(base_dir) + '/filters.txt'
global_filters_filename: str = data_dir(base_dir) + '/filters.txt'
if _is_filtered_base(global_filters_filename, content,
system_language):
return True
@ -207,7 +210,7 @@ def is_filtered_bio(base_dir: str,
if not nickname or not domain:
return False
account_filters_filename = \
account_filters_filename: str = \
acct_dir(base_dir, nickname, domain) + '/filters_bio.txt'
return _is_filtered_base(account_filters_filename, bio, system_language)
@ -226,12 +229,13 @@ def is_filtered(base_dir: str, nickname: str, domain: str,
return False
# optionally remove retweets
remove_twitter = acct_dir(base_dir, nickname, domain) + '/.removeTwitter'
remove_twitter: str = \
acct_dir(base_dir, nickname, domain) + '/.removeTwitter'
if is_a_file(remove_twitter):
if _is_twitter_post(content):
return True
account_filters_filename = \
account_filters_filename: str = \
acct_dir(base_dir, nickname, domain) + '/filters.txt'
return _is_filtered_base(account_filters_filename, content,
system_language)
@ -242,11 +246,14 @@ def is_question_filtered(base_dir: str, nickname: str, domain: str,
"""is the given question filtered based on its options?
"""
if question_json.get('oneOf'):
question_options = question_json['oneOf']
question_options: list[str] = question_json['oneOf']
else:
question_options = question_json['object']['oneOf']
question_options: list[str] = question_json['object']['oneOf']
for option in question_options:
if option.get('name'):
if not option.get('name'):
continue
if not isinstance(option['name'], str):
continue
if is_filtered(base_dir, nickname, domain, option['name'],
system_language):
return True

View File

@ -33,7 +33,7 @@ def fitness_performance(start_time, fitness_state: {},
"ctr": int(0)
}
time_diff = float(time.time() - start_time)
time_diff: float = float(time.time() - start_time)
fitness_state['performance'][fitness_id][watch_point]['total'] += time_diff
fitness_state['performance'][fitness_id][watch_point]['ctr'] += 1
@ -44,8 +44,9 @@ def fitness_performance(start_time, fitness_state: {},
2)
if debug:
ctr = fitness_state['performance'][fitness_id][watch_point]['ctr']
total = fitness_state['performance'][fitness_id][watch_point]['total']
ctr: int = fitness_state['performance'][fitness_id][watch_point]['ctr']
total: float = \
fitness_state['performance'][fitness_id][watch_point]['total']
print('FITNESS: performance/' + fitness_id + '/' +
watch_point + '/' + str(total * 1000 / ctr))
@ -73,16 +74,15 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str,
max_entries: int) -> str:
"""Returns the html for a graph of watchpoints
"""
watch_points_list = sorted_watch_points(fitness, fitness_id)
watch_points_list: list[str] = sorted_watch_points(fitness, fitness_id)
css_filename = base_dir + '/epicyon-graph.css'
css_filename: str = base_dir + '/epicyon-graph.css'
if is_a_file(base_dir + '/graph.css'):
css_filename = base_dir + '/graph.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
instance_title: str = get_config_param(base_dir, 'instanceTitle')
preload_images: list[str] = []
html_str = \
html_str: str = \
html_header_with_external_style(css_filename, instance_title, None,
preload_images)
html_str += \
@ -96,7 +96,7 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str,
'</thead><tbody>\n'
# get the maximum time
max_average_time = float(1)
max_average_time: float = float(1)
if watch_points_list:
max_average_time = float(watch_points_list[0].split(' ')[0])
for watch_point in watch_points_list:
@ -106,10 +106,10 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str,
ctr: int = 0
for watch_point in watch_points_list:
name = watch_point.split(' ', 1)[1]
average_time = float(watch_point.split(' ')[0])
height_percent = int(average_time * 100 / max_average_time)
time_ms = int(average_time)
name: str = watch_point.split(' ', 1)[1]
average_time: float = float(watch_point.split(' ')[0])
height_percent: int = int(average_time * 100 / max_average_time)
time_ms: int = int(average_time)
if height_percent == 0:
continue
html_str += \
@ -128,7 +128,7 @@ def html_watch_points_graph(base_dir: str, fitness: {}, fitness_id: str,
def fitness_thread(base_dir: str, fitness: {}) -> None:
"""Thread used to save fitness function scores
"""
fitness_filename = data_dir(base_dir) + '/fitness.json'
fitness_filename: str = data_dir(base_dir) + '/fitness.json'
while True:
# every 10 mins
time.sleep(60 * 10)

View File

@ -38,7 +38,7 @@ def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
"""Is the given account a featured writer, appearing in the features
timeline on news instances?
"""
features_blocked_filename = \
features_blocked_filename: str = \
acct_dir(base_dir, nickname, domain) + '/.nofeatures'
return not is_a_file(features_blocked_filename)
@ -48,23 +48,23 @@ def is_dormant(base_dir: str, nickname: str, domain: str, actor: str,
"""Is the given followed actor dormant, from the standpoint
of the given account
"""
last_seen_filename = acct_dir(base_dir, nickname, domain) + \
last_seen_filename: str = acct_dir(base_dir, nickname, domain) + \
'/lastseen/' + actor.replace('/', '#') + '.txt'
if not is_a_file(last_seen_filename):
return False
days_since_epoch_str = \
days_since_epoch_str: str = \
load_string(last_seen_filename, 'EX: failed to read last seen ' +
last_seen_filename)
if days_since_epoch_str is None:
return False
if days_since_epoch_str:
days_since_epoch = int(days_since_epoch_str)
days_since_epoch: int = int(days_since_epoch_str)
curr_time = date_utcnow()
curr_days_since_epoch = (curr_time - date_epoch()).days
time_diff_months = \
curr_days_since_epoch: int = (curr_time - date_epoch()).days
time_diff_months: int = \
int((curr_days_since_epoch - days_since_epoch) / 30)
if time_diff_months >= dormant_months:
return True
@ -77,7 +77,7 @@ def is_editor(base_dir: str, nickname: str) -> bool:
editors_file = data_dir(base_dir) + '/editors.txt'
if not is_a_file(editors_file):
admin_name = get_config_param(base_dir, 'admin')
admin_name: str = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
@ -90,12 +90,12 @@ def is_editor(base_dir: str, nickname: str) -> bool:
lines = lines.split('\n')
if not lines:
admin_name = get_config_param(base_dir, 'admin')
admin_name: str = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for editor in lines:
editor = editor.strip('\n').strip('\r')
editor: str = editor.strip('\n').strip('\r')
if editor == nickname:
return True
return False
@ -104,10 +104,10 @@ def is_editor(base_dir: str, nickname: str) -> bool:
def is_artist(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an artist
"""
artists_file = data_dir(base_dir) + '/artists.txt'
artists_file: str = data_dir(base_dir) + '/artists.txt'
if not is_a_file(artists_file):
admin_name = get_config_param(base_dir, 'admin')
admin_name: str = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
@ -120,12 +120,12 @@ def is_artist(base_dir: str, nickname: str) -> bool:
lines = lines.split('\n')
if not lines:
admin_name = get_config_param(base_dir, 'admin')
admin_name: str = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for artist in lines:
artist = artist.strip('\n').strip('\r')
artist: str = artist.strip('\n').strip('\r')
if artist == nickname:
return True
return False
@ -151,7 +151,7 @@ def is_system_account(nickname: str) -> bool:
def is_memorial_account(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is a memorial account
"""
memorial_file = data_dir(base_dir) + '/memorial'
memorial_file: str = data_dir(base_dir) + '/memorial'
if not is_a_file(memorial_file):
return False
memorial_list: list[str] = \
@ -167,13 +167,13 @@ def is_memorial_account(base_dir: str, nickname: str) -> bool:
def is_suspended(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is suspended
"""
admin_nickname = get_config_param(base_dir, 'admin')
admin_nickname: str = get_config_param(base_dir, 'admin')
if not admin_nickname:
return False
if nickname == admin_nickname:
return False
suspended_filename = data_dir(base_dir) + '/suspended.txt'
suspended_filename: str = data_dir(base_dir) + '/suspended.txt'
if is_a_file(suspended_filename):
lines: list[str] = \
load_string(suspended_filename,
@ -196,7 +196,7 @@ def is_evil(domain: str) -> bool:
return True
# if a domain contains any of these strings then it is
# declaring itself to be hostile
evil_emporium = (
evil_emporium: list[str] = (
'nazi', 'extremis', 'extreemis', 'gendercritic',
'kiwifarm', 'illegal', 'raplst', 'rapist', 'loli.',
'rapl.st', 'rapi.st', 'antivax', 'plandemic', 'terror'
@ -204,7 +204,7 @@ def is_evil(domain: str) -> bool:
for hostile_str in evil_emporium:
if hostile_str in domain:
return True
evil_domains = evil_incarnate()
evil_domains: list[str] = evil_incarnate()
for concentrated_evil in evil_domains:
if domain.endswith(concentrated_evil):
return True
@ -214,7 +214,7 @@ def is_evil(domain: str) -> bool:
def is_local_network_address(ip_address: str) -> bool:
"""Is the given ip address local?
"""
local_ips = get_local_network_addresses()
local_ips: list[str] = get_local_network_addresses()
for ip_addr in local_ips:
if ip_address.startswith(ip_addr):
return True
@ -249,10 +249,10 @@ def is_public_post_from_url(base_dir: str, nickname: str, domain: str,
post_url: str) -> bool:
"""Returns whether the given url is a public post
"""
post_filename = locate_post(base_dir, nickname, domain, post_url)
post_filename: str = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
return False
post_json_object = load_json(post_filename)
post_json_object: dict = load_json(post_filename)
if not post_json_object:
return False
return is_public_post(post_json_object)
@ -370,10 +370,10 @@ def is_recent_post(post_json_object: {}, max_days: int) -> bool:
if not isinstance(post_json_object['object']['published'], str):
return False
curr_time = date_utcnow()
days_since_epoch = (curr_time - date_epoch()).days
recently = days_since_epoch - max_days
days_since_epoch: int = (curr_time - date_epoch()).days
recently: int = days_since_epoch - max_days
published_date_str = post_json_object['object']['published']
published_date_str: str = post_json_object['object']['published']
if '.' in published_date_str:
published_date_str = published_date_str.split('.')[0] + 'Z'
@ -385,8 +385,7 @@ def is_recent_post(post_json_object: {}, max_days: int) -> bool:
str(published_date_str))
return False
published_days_since_epoch = \
(published_date - date_epoch()).days
published_days_since_epoch: int = (published_date - date_epoch()).days
if published_days_since_epoch < recently:
return False
return True
@ -418,7 +417,7 @@ def is_reply(post_json_object: {}, actor: str) -> bool:
'EncryptedMessage',
'ChatMessage', 'Article'):
return False
reply_id = get_reply_to(post_json_object['object'])
reply_id: str = get_reply_to(post_json_object['object'])
if reply_id:
if isinstance(reply_id, str):
if reply_id.startswith(actor):
@ -492,7 +491,7 @@ def is_group_actor(base_dir: str, actor: str, person_cache: {},
debug: bool = False) -> bool:
"""Is the given actor a group?
"""
person_cache_actor = None
person_cache_actor: str = None
if person_cache:
if person_cache.get(actor):
if person_cache[actor].get('actor'):
@ -508,7 +507,7 @@ def is_group_actor(base_dir: str, actor: str, person_cache: {},
if debug:
print('Actor ' + actor + ' not in cache')
cached_actor_filename = \
cached_actor_filename: str = \
base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if not is_a_file(cached_actor_filename):
if debug:
@ -524,7 +523,7 @@ def is_group_actor(base_dir: str, actor: str, person_cache: {},
def is_group_account(base_dir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account is a group
"""
account_filename = acct_dir(base_dir, nickname, domain) + '.json'
account_filename: str = acct_dir(base_dir, nickname, domain) + '.json'
if not is_a_file(account_filename):
return False
if text_in_file('"type": "Group"', account_filename):
@ -538,7 +537,7 @@ def has_group_type(base_dir: str, actor: str, person_cache: {},
"""
# does the actor path clearly indicate that this is a group?
# eg. https://lemmy/c/groupname
group_paths = get_group_paths()
group_paths: list[str] = get_group_paths()
for grp_path in group_paths:
if grp_path in actor:
if debug:
@ -566,7 +565,7 @@ def is_right_to_left_text(text: str) -> bool:
Arabic \u0627-\u064a
Hebrew/Yiddish \u0590-\u05FF\uFB2A-\uFB4E
"""
unicode_str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \
unicode_str: str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \
'[\u0590-\u05FF\uFB2A-\uFB4E]'
pattern = re.compile(unicode_str)
@ -588,15 +587,15 @@ def is_valid_date(date_str: str) -> bool:
if not section_str.isdigit():
return False
if date_sect_ctr == 0:
date_year = int(section_str)
date_year: int = int(section_str)
if date_year < 1920 or date_year > 3000:
return False
elif date_sect_ctr == 1:
date_month = int(section_str)
date_month: int = int(section_str)
if date_month < 1 or date_month > 12:
return False
elif date_sect_ctr == 2:
date_day = int(section_str)
date_day: int = int(section_str)
if date_day < 1 or date_day > 31:
return False
date_sect_ctr += 1
@ -606,7 +605,7 @@ def is_valid_date(date_str: str) -> bool:
def is_premium_account(base_dir: str, nickname: str, domain: str) -> bool:
""" Is the given account a premium one?
"""
premium_filename = acct_dir(base_dir, nickname, domain) + '/.premium'
premium_filename: str = acct_dir(base_dir, nickname, domain) + '/.premium'
return is_a_file(premium_filename)
@ -626,7 +625,7 @@ def url_permitted(url: str, federation_list: []) -> bool:
def is_corporate(server_name: str) -> bool:
"""Is the given server name a corporate leech?
"""
server_lower = server_name.lower()
server_lower: str = server_name.lower()
if 'google' in server_lower or \
'cloudflare' in server_lower or \
'facebook' in server_lower or \
@ -647,11 +646,11 @@ def can_reply_to(base_dir: str, nickname: str, domain: str,
if '/statuses/' not in post_url:
return True
if not post_json_object:
post_filename = locate_post(base_dir, nickname, domain, post_url)
post_filename: str = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
# the post is not stored locally
return True
post_json_object = load_json(post_filename)
post_json_object: dict = load_json(post_filename)
if not post_json_object:
return False
published = get_published_date(post_json_object)
@ -671,7 +670,7 @@ def can_reply_to(base_dir: str, nickname: str, domain: str,
print('EX: can_reply_to unrecognized current date ' +
str(curr_date_str))
return False
hours_since_publication = \
hours_since_publication: int = \
int((curr_date - pub_date).total_seconds() / 3600)
if hours_since_publication < 0 or \
hours_since_publication >= reply_interval_hours:
@ -701,8 +700,8 @@ def local_only_is_local(message_json: {}, domain_full: str) -> bool:
return False
# check that the sender is local
attrib_field = message_json['object']['attributedTo']
local_actor = get_attributed_to(attrib_field)
attrib_field: str | dict = message_json['object']['attributedTo']
local_actor: str = get_attributed_to(attrib_field)
local_domain, local_port = \
get_domain_from_actor(local_actor)
if local_domain:
@ -719,10 +718,10 @@ def local_only_is_local(message_json: {}, domain_full: str) -> bool:
def is_moderator(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is a moderator
"""
moderators_file = data_dir(base_dir) + '/moderators.txt'
moderators_file: str = data_dir(base_dir) + '/moderators.txt'
if not is_a_file(moderators_file):
admin_name = get_config_param(base_dir, 'admin')
admin_name: str = get_config_param(base_dir, 'admin')
if not admin_name:
return False
if admin_name == nickname:
@ -736,13 +735,13 @@ def is_moderator(base_dir: str, nickname: str) -> bool:
lines = lines.split('\n')
if not lines:
admin_name = get_config_param(base_dir, 'admin')
admin_name: str = get_config_param(base_dir, 'admin')
if not admin_name:
return False
if admin_name == nickname:
return True
for moderator in lines:
moderator = moderator.strip('\n').strip('\r')
moderator: str = moderator.strip('\n').strip('\r')
if moderator == nickname:
return True
return False

View File

@ -61,16 +61,16 @@ def create_initial_last_seen(base_dir: str, http_prefix: str) -> None:
The lastseen files are used to generate the Zzz icons on
follows/following lists on the profile screen.
"""
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
for _, dirs, _ in os.walk(dir_str):
for acct in dirs:
if not is_account_dir(acct):
continue
account_dir = os.path.join(dir_str, acct)
following_filename = account_dir + '/following.txt'
account_dir: str = os.path.join(dir_str, acct)
following_filename: str = account_dir + '/following.txt'
if not is_a_file(following_filename):
continue
last_seen_dir = account_dir + '/lastseen'
last_seen_dir: str = account_dir + '/lastseen'
if not is_a_dir(last_seen_dir):
makedir(last_seen_dir)
following_handles: list[str] = \
@ -84,17 +84,17 @@ def create_initial_last_seen(base_dir: str, http_prefix: str) -> None:
continue
if '@' not in handle:
continue
handle = remove_eol(handle)
nickname = handle.split('@')[0]
domain = handle.split('@')[1]
handle: str = remove_eol(handle)
nickname: str = handle.split('@')[0]
domain: str = handle.split('@')[1]
if nickname.startswith('!'):
nickname = nickname[1:]
actor = local_actor_url(http_prefix, nickname, domain)
last_seen_filename = \
actor: str = local_actor_url(http_prefix, nickname, domain)
last_seen_filename: str = \
last_seen_dir + '/' + actor.replace('/', '#') + '.txt'
if is_a_file(last_seen_filename):
continue
text = str(100)
text: str = str(100)
save_string(text, last_seen_filename,
'EX: create_initial_last_seen 2 ' +
last_seen_filename)
@ -106,8 +106,8 @@ def _pre_approved_follower(base_dir: str,
approve_handle: str) -> bool:
"""Is the given handle an already manually approved follower?
"""
account_dir = acct_dir(base_dir, nickname, domain)
approved_filename = account_dir + '/approved.txt'
account_dir: str = acct_dir(base_dir, nickname, domain)
approved_filename: str = account_dir + '/approved.txt'
if is_a_file(approved_filename):
if text_in_file(approve_handle, approved_filename):
return True
@ -120,21 +120,21 @@ def _remove_from_follow_base(base_dir: str,
debug: bool) -> None:
"""Removes a handle/actor from follow requests or rejects file
"""
accounts_dir = acct_dir(base_dir, nickname, domain)
approve_follows_filename = accounts_dir + '/' + follow_file + '.txt'
accounts_dir: str = acct_dir(base_dir, nickname, domain)
approve_follows_filename: str = accounts_dir + '/' + follow_file + '.txt'
if not is_a_file(approve_follows_filename):
if debug:
print('There is no ' + follow_file +
' to remove ' + nickname + '@' + domain + ' from')
return
accept_deny_actor = None
accept_deny_actor: str = None
if not text_in_file(accept_or_deny_handle, approve_follows_filename):
# is this stored in the file as an actor rather than a handle?
accept_deny_nickname = accept_or_deny_handle.split('@')[0]
accept_deny_domain = accept_or_deny_handle.split('@')[1]
accept_deny_nickname: str = accept_or_deny_handle.split('@')[0]
accept_deny_domain: str = accept_or_deny_handle.split('@')[1]
# for each possible users path construct an actor and
# check if it exists in the file
users_paths = get_user_paths()
users_paths: list[str] = get_user_paths()
actor_found: bool = False
for users_name in users_paths:
accept_deny_actor = \
@ -200,17 +200,17 @@ def is_following_actor(base_dir: str,
The actor can also be a handle: nickname@domain
"""
domain = remove_domain_port(domain)
accounts_dir = acct_dir(base_dir, nickname, domain)
accounts_dir: str = acct_dir(base_dir, nickname, domain)
if not is_a_dir(accounts_dir):
return False
following_file = accounts_dir + '/following.txt'
following_file: str = accounts_dir + '/following.txt'
if not is_a_file(following_file):
return False
if actor.startswith('@'):
actor = actor[1:]
if text_in_file(actor, following_file, False):
return True
following_nickname = get_nickname_from_actor(actor)
following_nickname: str = get_nickname_from_actor(actor)
if not following_nickname:
print('WARN: unable to find nickname in ' + actor)
return False
@ -218,7 +218,7 @@ def is_following_actor(base_dir: str,
if not following_domain:
print('WARN: unable to find domain in ' + actor)
return False
following_handle = \
following_handle: str = \
get_full_domain(following_nickname + '@' + following_domain,
following_port)
if text_in_file(following_handle, following_file, False):
@ -242,7 +242,8 @@ def get_follower_domains(base_dir: str, nickname: str, domain: str) -> []:
"""Returns a list of domains for followers
"""
domain = remove_domain_port(domain)
followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt'
followers_file: str = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not is_a_file(followers_file):
return []
@ -254,7 +255,7 @@ def get_follower_domains(base_dir: str, nickname: str, domain: str) -> []:
domains_list: list[str] = []
for handle in lines:
handle = remove_eol(handle)
handle: str = remove_eol(handle)
follower_domain, _ = get_domain_from_actor(handle)
if not follower_domain:
continue
@ -275,30 +276,31 @@ def is_follower_of_person(base_dir: str, nickname: str, domain: str,
print('No follower_nickname for ' + follower_domain)
return False
domain = remove_domain_port(domain)
followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt'
followers_file: str = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not is_a_file(followers_file):
return False
handle = follower_nickname + '@' + follower_domain
handle: str = follower_nickname + '@' + follower_domain
already_following: bool = False
followers_str = load_string(followers_file,
followers_str: str = load_string(followers_file,
'EX: is_follower_of_person ' +
followers_file)
if followers_str is None:
followers_str: str = ''
followers_str = ''
if handle in followers_str:
already_following = True
else:
paths = get_user_paths()
paths: list[str] = get_user_paths()
for user_path in paths:
url = '://' + follower_domain + user_path + follower_nickname
url: str = '://' + follower_domain + user_path + follower_nickname
if url in followers_str:
already_following = True
break
if not already_following:
url = '://' + follower_domain + '/' + follower_nickname
url: str = '://' + follower_domain + '/' + follower_nickname
if url in followers_str:
already_following = True
@ -312,24 +314,24 @@ def unfollow_account(base_dir: str, nickname: str, domain: str,
"""Removes a person to the follow list
"""
domain = remove_domain_port(domain)
handle = nickname + '@' + domain
handle_to_unfollow = follow_nickname + '@' + follow_domain
handle: str = nickname + '@' + domain
handle_to_unfollow: str = follow_nickname + '@' + follow_domain
if group_account:
handle_to_unfollow = '!' + handle_to_unfollow
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
if not is_a_dir(dir_str):
makedir(dir_str)
handle_dir = acct_handle_dir(base_dir, handle)
handle_dir: str = acct_handle_dir(base_dir, handle)
if not is_a_dir(handle_dir):
makedir(handle_dir)
accounts_dir = acct_dir(base_dir, nickname, domain)
filename = accounts_dir + '/' + follow_file
accounts_dir: str = acct_dir(base_dir, nickname, domain)
filename: str = accounts_dir + '/' + follow_file
if not is_a_file(filename):
if debug:
print('DEBUG: follow file ' + filename + ' was not found')
return False
handle_to_unfollow_lower = handle_to_unfollow.lower()
handle_to_unfollow_lower: str = handle_to_unfollow.lower()
if not text_in_file(handle_to_unfollow_lower, filename, False):
if debug:
print('DEBUG: handle to unfollow ' + handle_to_unfollow +
@ -354,7 +356,7 @@ def unfollow_account(base_dir: str, nickname: str, domain: str,
# write to an unfollowed file so that if a follow accept
# later arrives then it can be ignored
unfollowed_filename = accounts_dir + '/unfollowed.txt'
unfollowed_filename: str = accounts_dir + '/unfollowed.txt'
if is_a_file(unfollowed_filename):
if not text_in_file(handle_to_unfollow_lower,
unfollowed_filename, False):
@ -383,13 +385,13 @@ def clear_follows(base_dir: str, nickname: str, domain: str,
follow_file: str) -> None:
"""Removes all follows
"""
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
if not is_a_dir(dir_str):
makedir(dir_str)
accounts_dir = acct_dir(base_dir, nickname, domain)
accounts_dir: str = acct_dir(base_dir, nickname, domain)
if not is_a_dir(accounts_dir):
makedir(accounts_dir)
filename = accounts_dir + '/' + follow_file
filename: str = accounts_dir + '/' + follow_file
if is_a_file(filename):
erase_file(filename,
'EX: clear_follows unable to delete ' + filename)
@ -409,8 +411,8 @@ def _get_no_of_follows(base_dir: str, nickname: str, domain: str,
# account holders
# if not authenticated:
# return 9999
accounts_dir = acct_dir(base_dir, nickname, domain)
filename = accounts_dir + '/' + follow_file
accounts_dir: str = acct_dir(base_dir, nickname, domain)
filename: str = accounts_dir + '/' + follow_file
if not is_a_file(filename):
return 0
ctr: int = 0
@ -455,10 +457,10 @@ def get_following_feed(base_dir: str, domain: str, port: int, path: str,
if '/' + follow_file not in path:
return None
# handle page numbers
header_only = True
page_number = None
header_only: bool = True
page_number: int = None
if '?page=' in path:
page_number: int = path.split('?page=')[1]
page_number = path.split('?page=')[1]
if len(page_number) > 5:
page_number = "1"
if page_number == 'true' or not authorized:
@ -510,7 +512,7 @@ def get_following_feed(base_dir: str, domain: str, port: int, path: str,
if not page_number:
page_number = 1
next_page_number = int(page_number + 1)
next_page_number: int = int(page_number + 1)
following_of_actor = local_actor_url(http_prefix, nickname, domain)
part_of_str = following_of_actor + '/' + follow_file
collection_id = part_of_str + '?page=' + str(page_number)

View File

@ -21,7 +21,7 @@ def remove_followers_sync(followers_sync_cache: {},
"""Remove an entry within the followers synchronization cache,
so that it will subsequently be regenerated
"""
foll_sync_key = nickname + ':' + follower_domain
foll_sync_key: str = nickname + ':' + follower_domain
if not followers_sync_cache.get(foll_sync_key):
return
del followers_sync_cache[foll_sync_key]
@ -33,7 +33,7 @@ def _get_followers_for_domain(base_dir: str,
"""Returns the followers for a given domain
this is used for followers synchronization
"""
followers_filename = \
followers_filename: str = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not is_a_file(followers_filename):
return []
@ -46,20 +46,20 @@ def _get_followers_for_domain(base_dir: str,
foll_text: str = ''
if search_domain not in foll_text:
return []
lines = foll_text.splitlines()
lines: list[str] = foll_text.splitlines()
result: list[str] = []
for line_str in lines:
if search_domain not in line_str:
continue
if line_str.endswith('@' + search_domain):
nick = line_str.split('@')[0]
paths_list = get_user_paths()
nick: str = line_str.split('@')[0]
paths_list: list[str] = get_user_paths()
found: bool = False
for prefix in ('https', 'http'):
if found:
break
for possible_path in paths_list:
url = prefix + '://' + search_domain + \
url: str = prefix + '://' + search_domain + \
possible_path + nick
filename = base_dir + '/cache/actors/' + \
url.replace('/', '#') + '.json'
@ -70,7 +70,7 @@ def _get_followers_for_domain(base_dir: str,
found = True
break
if not found:
url = prefix + '://' + search_domain + '/' + nick
url: str = prefix + '://' + search_domain + '/' + nick
filename = base_dir + '/cache/actors/' + \
url.replace('/', '#') + '.json'
if is_a_file(filename):
@ -91,13 +91,13 @@ def _get_followers_sync_json(base_dir: str,
See
https://codeberg.org/fediverse/fep/src/branch/main/fep/8fcf/fep-8fcf.md
"""
sync_list = \
sync_list: list[str] = \
_get_followers_for_domain(base_dir,
nickname, domain,
search_domain)
actor = http_prefix + '://' + domain_full + '/users/' + nickname
id_str = actor + '/followers?domain=' + search_domain
sync_json = {
actor: str = http_prefix + '://' + domain_full + '/users/' + nickname
id_str: str = actor + '/followers?domain=' + search_domain
sync_json: dict = {
"@context": [
'https://www.w3.org/ns/activitystreams',
'https://w3id.org/security/v1'
@ -142,18 +142,18 @@ def update_followers_sync_cache(base_dir: str,
"""Updates the followers synchronization cache
https://codeberg.org/fediverse/fep/src/branch/main/fep/8fcf/fep-8fcf.md
"""
foll_sync_key = nickname + ':' + calling_domain
foll_sync_key: str = nickname + ':' + calling_domain
if sync_cache.get(foll_sync_key):
sync_hash = sync_cache[foll_sync_key]['hash']
sync_json = sync_cache[foll_sync_key]['response']
sync_hash: str = sync_cache[foll_sync_key]['hash']
sync_json: dict = sync_cache[foll_sync_key]['response']
else:
sync_json = \
sync_json: dict = \
_get_followers_sync_json(base_dir,
nickname, domain,
http_prefix,
domain_full,
calling_domain)
sync_hash = get_followers_sync_hash(sync_json)
sync_hash: str = get_followers_sync_hash(sync_json)
if sync_hash:
sync_cache[foll_sync_key] = {
"hash": sync_hash,

View File

@ -66,16 +66,16 @@ def receiving_calendar_events(base_dir: str, nickname: str, domain: str,
if following_nickname == nickname and following_domain == domain:
# reminder post
return True
calendar_filename = \
calendar_filename: str = \
_dir_acct(base_dir, nickname, domain) + '/followingCalendar.txt'
handle = following_nickname + '@' + following_domain
handle: str = following_nickname + '@' + following_domain
if not is_a_file(calendar_filename):
following_filename = \
following_filename: str = \
_dir_acct(base_dir, nickname, domain) + '/following.txt'
if not is_a_file(following_filename):
return False
# create a new calendar file from the following file
following_handles = \
following_handles: str = \
load_string(following_filename,
'EX: receiving_calendar_events ' + following_filename)
if following_handles:
@ -92,21 +92,21 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str,
indicating whether to receive calendar events from that account
"""
# check that a following file exists
domain = _port_domain_remove(domain)
following_filename = \
domain: str = _port_domain_remove(domain)
following_filename: str = \
_dir_acct(base_dir, nickname, domain) + '/following.txt'
if not is_a_file(following_filename):
print("WARN: following.txt doesn't exist for " +
nickname + '@' + domain)
return
handle = following_nickname + '@' + following_domain
handle: str = following_nickname + '@' + following_domain
# check that you are following this handle
if not _text_in_file2(handle + '\n', following_filename, False):
print('WARN: ' + handle + ' is not in ' + following_filename)
return
calendar_filename = \
calendar_filename: str = \
_dir_acct(base_dir, nickname, domain) + '/followingCalendar.txt'
# get the contents of the calendar file, which is
@ -118,7 +118,7 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str,
load_string(calendar_filename,
'EX: _receive_calendar_events ' + calendar_filename)
if following_handles is None:
following_handles: str = ''
following_handles = ''
else:
# create a new calendar file from the following file
print('Creating calendar file ' + calendar_filename)
@ -140,8 +140,8 @@ def _receive_calendar_events(base_dir: str, nickname: str, domain: str,
return
# remove from calendar file
new_following_handles: str = ''
following_handles_list = following_handles.split('\n')
handle_lower = handle.lower()
following_handles_list: list[str] = following_handles.split('\n')
handle_lower: str = handle.lower()
for followed in following_handles_list:
if followed.lower() != handle_lower:
new_following_handles += followed + '\n'