Check that muted is boolean

main
bashrc 2026-05-04 20:46:41 +01:00
parent 0020ae63c9
commit 464746fe9c
1 changed files with 169 additions and 161 deletions

330
posts.py
View File

@ -4750,7 +4750,7 @@ def _create_box_items(base_dir: str,
box_actor: str) -> (int, int):
"""Creates the list of posts within a timeline
"""
index_filename = \
index_filename: str = \
acct_dir(base_dir, timeline_nickname, original_domain) + \
'/' + index_box_name + '.index'
total_posts_count: int = 0
@ -4768,12 +4768,12 @@ def _create_box_items(base_dir: str,
}
first_post_id = replace_strings(first_post_id, replacements)
prev_post_filename = None
prev_post_filename: str = None
try:
with open(index_filename, 'r', encoding='utf-8') as fp_index:
posts_added_to_timeline: int = 0
while posts_added_to_timeline < items_per_page:
post_filename = fp_index.readline()
post_filename: str = fp_index.readline()
if post_filename == prev_post_filename:
break
@ -4811,7 +4811,7 @@ def _create_box_items(base_dir: str,
# filename of the post without any extension or path
# This should also correspond to any index entry in
# the posts cache
post_url = remove_eol(post_filename)
post_url: str = remove_eol(post_filename)
post_url = post_url.replace('.json', '').strip()
# is this a duplicate?
@ -4837,13 +4837,13 @@ def _create_box_items(base_dir: str,
post_url)
# read the post from file
full_post_filename = \
full_post_filename: str = \
locate_post(base_dir, nickname,
original_domain, post_url, False)
if full_post_filename:
# has the post been rejected?
if is_a_file(full_post_filename + '.reject'):
post_url2 = post_url.replace('/', '#') + '.json'
post_url2: str = post_url.replace('/', '#') + '.json'
remove_post_from_index(post_url2, False,
index_filename)
print('REJECT: rejected post in timeline ' +
@ -4929,12 +4929,12 @@ def _create_box_indexed(recent_posts_cache: {},
index_box_name = boxname
timeline_nickname = 'news'
original_domain = domain
domain = get_full_domain(domain, port)
original_domain: str = domain
domain: str = get_full_domain(domain, port)
box_actor = local_actor_url(http_prefix, nickname, domain)
box_actor: str = local_actor_url(http_prefix, nickname, domain)
page_str = '?page=true'
page_str: str = '?page=true'
if page_number:
page_number = max(page_number, 1)
try:
@ -4942,7 +4942,8 @@ def _create_box_indexed(recent_posts_cache: {},
except BaseException:
print('EX: _create_box_indexed ' +
'unable to convert page number to string')
box_url = local_actor_url(http_prefix, nickname, domain) + '/' + boxname
box_url: str = \
local_actor_url(http_prefix, nickname, domain) + '/' + boxname
box_header = {
"@context": [
'https://www.w3.org/ns/activitystreams',
@ -4966,7 +4967,7 @@ def _create_box_indexed(recent_posts_cache: {},
'type': 'OrderedCollectionPage'
}
posts_in_box = []
posts_in_box: list[str] = []
post_urls_in_box: list[str] = []
if not unauthorized_premium:
@ -5127,7 +5128,7 @@ def _expire_conversations_for_person(base_dir: str,
max_age_days: int) -> int:
"""Expires entries within the conversation directory
"""
conv_dir = acct_dir(base_dir, nickname, domain) + '/conversation'
conv_dir: str = acct_dir(base_dir, nickname, domain) + '/conversation'
if not is_a_dir(conv_dir):
print('No conversations for ' + nickname + '@' + domain)
return 0
@ -5139,10 +5140,10 @@ def _expire_conversations_for_person(base_dir: str,
# don't expire muted conversations, so they stay muted
continue
# Time of file creation
full_filename = os.path.join(conv_dir, conv_filename)
full_filename: str = os.path.join(conv_dir, conv_filename)
if not is_a_file(full_filename):
continue
last_modified = file_last_modified(full_filename)
last_modified: str = file_last_modified(full_filename)
# get time difference
if not valid_post_date(last_modified, max_age_days, False):
erase_file(full_filename,
@ -5157,7 +5158,7 @@ def _expire_posts_cache_for_person(base_dir: str,
max_age_days: int) -> int:
"""Expires entries within the posts cache
"""
cache_dir = acct_dir(base_dir, nickname, domain) + '/postcache'
cache_dir: str = acct_dir(base_dir, nickname, domain) + '/postcache'
if not is_a_dir(cache_dir):
print('No cached posts for ' + nickname + '@' + domain)
return 0
@ -5166,10 +5167,10 @@ def _expire_posts_cache_for_person(base_dir: str,
for cache_filename in posts_in_cache:
cache_filename = cache_filename.name
# Time of file creation
full_filename = os.path.join(cache_dir, cache_filename)
full_filename: str = os.path.join(cache_dir, cache_filename)
if not is_a_file(full_filename):
continue
last_modified = file_last_modified(full_filename)
last_modified: str = file_last_modified(full_filename)
# get time difference
if not valid_post_date(last_modified, max_age_days, False):
erase_file(full_filename,
@ -5185,12 +5186,12 @@ def _novel_fields_for_person(nickname: str, domain: str,
"""
if boxname not in ('inbox'):
return
box_dir = create_person_dir(nickname, domain, base_dir, boxname)
box_dir: str = create_person_dir(nickname, domain, base_dir, boxname)
posts_in_box = os.scandir(box_dir)
posts_ctr: int = 0
fields: list[str] = []
expected_fields = (
expected_fields: list[str] = (
'alsoKnownAs',
'attachment',
'capabilities',
@ -5278,7 +5279,7 @@ def _novel_fields_for_person(nickname: str, domain: str,
post_filename = post_filename.name
if not post_filename.endswith('.json'):
continue
full_filename = os.path.join(box_dir, post_filename)
full_filename: str = os.path.join(box_dir, post_filename)
if not is_a_file(full_filename):
continue
post_json_object = load_json(full_filename)
@ -5302,12 +5303,12 @@ def _novel_fields_for_person(nickname: str, domain: str,
def novel_fields(base_dir: str) -> None:
"""Reports any unexpected fields within posts
"""
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
for _, dirs, _ in os.walk(dir_str):
for handle in dirs:
if '@' in handle:
nickname = handle.split('@')[0]
domain = handle.split('@')[1]
nickname: str = handle.split('@')[0]
domain: str = handle.split('@')[1]
_novel_fields_for_person(nickname, domain, base_dir,
'inbox')
break
@ -5330,15 +5331,16 @@ def archive_posts(base_dir: str, http_prefix: str, archive_dir: str,
if not is_a_dir(archive_dir + '/accounts'):
makedir(archive_dir + '/accounts')
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
for _, dirs, _ in os.walk(dir_str):
for handle in dirs:
if '@' in handle:
nickname = handle.split('@')[0]
domain = handle.split('@')[1]
archive_subdir = None
nickname: str = handle.split('@')[0]
domain: str = handle.split('@')[1]
archive_subdir: str = None
if archive_dir:
archive_handle_dir = acct_handle_dir(archive_dir, handle)
archive_handle_dir: str = \
acct_handle_dir(archive_dir, handle)
if not is_a_dir(archive_handle_dir):
makedir(archive_handle_dir)
if not is_a_dir(archive_handle_dir + '/inbox'):
@ -5350,21 +5352,21 @@ def archive_posts(base_dir: str, http_prefix: str, archive_dir: str,
nickname, domain, base_dir,
'inbox', archive_subdir,
recent_posts_cache, max_posts_in_box)
expired_announces = \
expired_announces: int = \
_expire_announce_cache_for_person(base_dir,
nickname, domain,
max_cache_age_days)
print('Expired ' + str(expired_announces) +
' cached announces for ' + nickname + '@' + domain)
expired_conversations = \
expired_conversations: int = \
_expire_conversations_for_person(base_dir,
nickname, domain,
max_cache_age_days)
print('Expired ' + str(expired_conversations) +
' conversations for ' + nickname + '@' + domain)
expired_posts = \
expired_posts: int = \
_expire_posts_cache_for_person(base_dir,
nickname, domain,
max_cache_age_days)
@ -5390,8 +5392,8 @@ def _expire_posts_for_person(http_prefix: str, nickname: str, domain: str,
if max_age_days <= 0:
return expired_post_count
boxname = 'outbox'
box_dir = create_person_dir(nickname, domain, base_dir, boxname)
boxname: str = 'outbox'
box_dir: str = create_person_dir(nickname, domain, base_dir, boxname)
posts_in_box = os.scandir(box_dir)
for post_filename in posts_in_box:
@ -5399,10 +5401,10 @@ def _expire_posts_for_person(http_prefix: str, nickname: str, domain: str,
if not post_filename.endswith('.json'):
continue
# get the post json as text
full_filename = os.path.join(box_dir, post_filename)
full_filename: str = os.path.join(box_dir, post_filename)
if not is_a_file(full_filename):
continue
content = \
content: str = \
load_string(full_filename,
'EX: expire_posts_for_person unable to open content ' +
full_filename)
@ -5410,7 +5412,7 @@ def _expire_posts_for_person(http_prefix: str, nickname: str, domain: str,
content = ''
# Get time of publication
published_str = ''
published_str: str = ''
if '"published":' not in content:
# If a publication date is not available then check the last
# modified date
@ -5457,7 +5459,7 @@ def get_post_expiry_keep_dms(base_dir: str, nickname: str, domain: str) -> int:
"""
keep_dms: bool = True
handle: str = nickname + '@' + domain
expire_dms_filename = \
expire_dms_filename: str = \
acct_handle_dir(base_dir, handle) + '/.expire_posts_dms'
if is_a_file(expire_dms_filename):
keep_dms = False
@ -5468,8 +5470,8 @@ def set_post_expiry_keep_dms(base_dir: str, nickname: str, domain: str,
keep_dms: bool) -> None:
"""Sets whether to keep DMs during post expiry for an account
"""
handle = nickname + '@' + domain
expire_dms_filename = \
handle: str = nickname + '@' + domain
expire_dms_filename: str = \
acct_handle_dir(base_dir, handle) + '/.expire_posts_dms'
if keep_dms:
if is_a_file(expire_dms_filename):
@ -5487,19 +5489,20 @@ def expire_posts(base_dir: str, http_prefix: str,
"""Expires posts for instance accounts
"""
expired_post_count: int = 0
dir_str = data_dir(base_dir)
dir_str: str = data_dir(base_dir)
for _, dirs, _ in os.walk(dir_str):
for handle in dirs:
if '@' not in handle:
continue
nickname = handle.split('@')[0]
domain = handle.split('@')[1]
expire_posts_filename = \
nickname: str = handle.split('@')[0]
domain: str = handle.split('@')[1]
expire_posts_filename: str = \
acct_handle_dir(base_dir, handle) + '/.expire_posts_days'
if not is_a_file(expire_posts_filename):
continue
keep_dms = get_post_expiry_keep_dms(base_dir, nickname, domain)
expire_days_str = \
keep_dms: int = \
get_post_expiry_keep_dms(base_dir, nickname, domain)
expire_days_str: str = \
load_string(expire_posts_filename,
'EX: expire_posts failed to read days file ' +
expire_posts_filename)
@ -5509,7 +5512,7 @@ def expire_posts(base_dir: str, http_prefix: str,
continue
if not expire_days_str.isdigit():
continue
max_age_days = int(expire_days_str)
max_age_days: int = int(expire_days_str)
if max_age_days <= 0:
continue
expired_post_count += \
@ -5525,14 +5528,14 @@ def expire_posts(base_dir: str, http_prefix: str,
def get_post_expiry_days(base_dir: str, nickname: str, domain: str) -> int:
"""Returns the post expiry period for the given account
"""
handle = nickname + '@' + domain
expire_posts_filename = \
handle: str = nickname + '@' + domain
expire_posts_filename: str = \
acct_handle_dir(base_dir, handle) + '/.expire_posts_days'
if not is_a_file(expire_posts_filename):
return 0
days_str = load_string(expire_posts_filename,
'EX: unable to write post expire days ' +
expire_posts_filename)
days_str: str = load_string(expire_posts_filename,
'EX: unable to write post expire days ' +
expire_posts_filename)
if not days_str:
return 0
if not days_str.isdigit():
@ -5544,10 +5547,10 @@ def set_post_expiry_days(base_dir: str, nickname: str, domain: str,
max_age_days: int) -> None:
"""Sets the number of days after which posts from an account will expire
"""
handle = nickname + '@' + domain
expire_posts_filename = \
handle: str = nickname + '@' + domain
expire_posts_filename: str = \
acct_handle_dir(base_dir, handle) + '/.expire_posts_days'
text = str(max_age_days)
text: str = str(max_age_days)
save_string(text, expire_posts_filename,
'EX: unable to write post expire days ' +
expire_posts_filename)
@ -5566,7 +5569,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
if archive_dir:
if not is_a_dir(archive_dir):
makedir(archive_dir)
box_dir = create_person_dir(nickname, domain, base_dir, boxname)
box_dir: str = create_person_dir(nickname, domain, base_dir, boxname)
posts_in_box = os.scandir(box_dir)
no_of_posts: int = 0
for _ in posts_in_box:
@ -5577,13 +5580,13 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
return
# remove entries from the index
handle = nickname + '@' + domain
index_filename = \
handle: str = nickname + '@' + domain
index_filename: str = \
acct_handle_dir(base_dir, handle) + '/' + boxname + '.index'
if is_a_file(index_filename):
index_ctr: int = 0
# get the existing index entries as a string
new_index = ''
new_index: str = ''
index_list: list[str] = \
load_list(index_filename,
'EX: archive_posts_for_person unable to read ' +
@ -5602,9 +5605,9 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
# remove any edits or replies
for ext_name in ('edits', 'replies'):
ext = '.' + ext_name
ext: str = '.' + ext_name
posts_in_box = os.scandir(box_dir)
edits_in_box_dict = {}
edits_in_box_dict: dict = {}
edits_ctr: int = 0
edits_removed_ctr: int = 0
edit_files_ctr: int = 0
@ -5612,8 +5615,8 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
post_filename = post_filename.name
if not post_filename.endswith(ext):
continue
original_post_filename = post_filename.replace(ext, '.json')
full_original_filename = \
original_post_filename: str = post_filename.replace(ext, '.json')
full_original_filename: str = \
os.path.join(box_dir, original_post_filename)
full_filename = os.path.join(box_dir, post_filename)
if not is_a_file(full_original_filename):
@ -5627,13 +5630,13 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
continue
edit_files_ctr += 1
if is_a_file(full_filename):
content = load_string(full_filename,
'EX: unable to open content 2 ' +
full_filename)
content: str = load_string(full_filename,
'EX: unable to open content 2 ' +
full_filename)
if content is None:
content = ''
if '"published":' in content:
published_str = content.split('"published":')[1]
published_str: str = content.split('"published":')[1]
if '"' in published_str:
published_str = published_str.split('"')[1]
# does this look like a date?
@ -5647,7 +5650,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
print(str(edits_removed_ctr) + ' ' + ext_name + ' removed from ' +
boxname + ' for ' + nickname + '@' + domain)
no_of_edits = edits_ctr
no_of_edits: int = edits_ctr
if no_of_edits <= max_posts_in_box:
print('Checked ' + str(no_of_edits) + ' ' + boxname +
' ' + ext_name + ' in ' + str(edit_files_ctr) +
@ -5659,18 +5662,18 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
remove_edits_ctr: int = 0
for published_str, edit_filename in edits_in_box_sorted.items():
file_path = os.path.join(box_dir, edit_filename)
file_path: str = os.path.join(box_dir, edit_filename)
if not is_a_file(file_path):
continue
if archive_dir:
archive_path = os.path.join(archive_dir, edit_filename)
ex_text = \
ex_text: str = \
'EX: archive_posts_for_person unable to archive ' + \
ext_name + ' ' + file_path + ' -> ' + archive_path
if move_file(file_path, archive_path, ex_text):
remove_edits_ctr += 1
else:
ex_text = \
ex_text: str = \
'EX: archive_posts_for_person unable to delete ' + \
ext_name + ' ' + edit_filename
if erase_file(edit_filename, ex_text):
@ -5690,22 +5693,22 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
if not post_filename.endswith('.json'):
continue
# Get the published time
full_filename = os.path.join(box_dir, post_filename)
full_filename: str = os.path.join(box_dir, post_filename)
if is_a_file(full_filename):
content = load_string(full_filename,
'EX: unable to open content 1 ' +
full_filename)
content: str = load_string(full_filename,
'EX: unable to open content 1 ' +
full_filename)
if content is None:
content = ''
if '"published":' in content:
published_str = content.split('"published":')[1]
published_str: str = content.split('"published":')[1]
if '"' in published_str:
published_str = published_str.split('"')[1]
if published_str.endswith('Z'):
posts_in_box_dict[published_str] = post_filename
posts_ctr += 1
no_of_posts = posts_ctr
no_of_posts: int = posts_ctr
if no_of_posts <= max_posts_in_box:
print('Checked ' + str(no_of_posts) + ' ' + boxname +
' posts for ' + nickname + '@' + domain)
@ -5716,34 +5719,35 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
OrderedDict(sorted(posts_in_box_dict.items(), reverse=False))
# directory containing cached html posts
post_cache_dir = box_dir.replace('/' + boxname, '/postcache')
post_cache_dir: str = box_dir.replace('/' + boxname, '/postcache')
remove_ctr: int = 0
for published_str, post_filename in posts_in_box_sorted.items():
file_path = os.path.join(box_dir, post_filename)
file_path: str = os.path.join(box_dir, post_filename)
if not is_a_file(file_path):
continue
if archive_dir:
archive_path = os.path.join(archive_dir, post_filename)
archive_path: str = os.path.join(archive_dir, post_filename)
move_file(file_path, archive_path,
'EX: archive_posts_for_person unable to archive ' +
file_path + ' -> ' + archive_path)
extensions = (
extensions: list[str] = (
'votes', 'arrived', 'muted', 'tts', 'reject', 'mitm',
'edits', 'seen', 'json'
)
for ext in extensions:
ext_path = file_path.replace('.json', '.' + ext)
ext_path: str = file_path.replace('.json', '.' + ext)
if is_a_file(ext_path):
new_ext_path = archive_path.replace('.json', '.' + ext)
new_ext_path: str = \
archive_path.replace('.json', '.' + ext)
move_file(ext_path, new_ext_path,
'EX: unable to archive file ' + ext_path +
' -> ' + new_ext_path)
continue
ext_path = file_path.replace('.json', '.json.' + ext)
if is_a_file(ext_path):
new_ext_path = \
new_ext_path: str = \
archive_path.replace('.json', '.json.' + ext)
move_file(ext_path, new_ext_path,
'EX: unable to archive file ' + ext_path +
@ -5754,7 +5758,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
file_path, False, recent_posts_cache, False)
# remove cached html posts
post_cache_filename = \
post_cache_filename: str = \
os.path.join(post_cache_dir, post_filename)
post_cache_filename = post_cache_filename.replace('.json', '.html')
if is_a_file(post_cache_filename):
@ -5806,10 +5810,10 @@ def get_public_posts_of_person(base_dir: str, nickname: str, domain: str,
if nickname.startswith('!'):
nickname = nickname[1:]
group_account = True
domain_full = get_full_domain(domain, port)
handle = http_prefix + "://" + domain_full + "/@" + nickname
domain_full: str = get_full_domain(domain, port)
handle: str = http_prefix + "://" + domain_full + "/@" + nickname
wf_request = \
wf_request: dict = \
webfinger_handle(session, handle, http_prefix, cached_webfingers,
origin_domain, project_version, debug, group_account,
signing_priv_key_pem, mitm_servers)
@ -5861,12 +5865,12 @@ def get_public_post_domains(session, base_dir: str, nickname: str, domain: str,
session = create_session(proxy_type)
if not session:
return domain_list
person_cache = {}
cached_webfingers = {}
person_cache: dict = {}
cached_webfingers: dict = {}
domain_full = get_full_domain(domain, port)
handle = http_prefix + "://" + domain_full + "/@" + nickname
wf_request = \
domain_full: str = get_full_domain(domain, port)
handle: str = http_prefix + "://" + domain_full + "/@" + nickname
wf_request: dict = \
webfinger_handle(session, handle, http_prefix, cached_webfingers,
domain, project_version, debug, False,
signing_priv_key_pem, mitm_servers)
@ -5886,7 +5890,7 @@ def get_public_post_domains(session, base_dir: str, nickname: str, domain: str,
nickname, domain, 'outbox',
92522, system_language,
mitm_servers)
post_domains = \
post_domains: list[str] = \
get_post_domains(session, person_url, 64, debug,
project_version, http_prefix, domain,
word_frequency, domain_list, system_language,
@ -5904,26 +5908,26 @@ def download_follow_collection(signing_priv_key_pem: str,
"""Returns a list of following/followers for the given actor
by downloading the json for their following/followers collection
"""
prof = 'https://www.w3.org/ns/activitystreams'
prof: str = 'https://www.w3.org/ns/activitystreams'
if '/channel/' not in actor or '/accounts/' not in actor:
accept_str = \
accept_str: str = \
'application/activity+json; ' + \
'profile="' + prof + '"'
session_headers = {
session_headers: dict = {
'Accept': accept_str
}
else:
accept_str = \
accept_str: str = \
'application/ld+json; ' + \
'profile="' + prof + '"'
session_headers = {
session_headers: dict = {
'Accept': accept_str
}
result: list[str] = []
for page_ctr in range(no_of_pages):
url = \
url: str = \
actor + '/' + follow_type + '?page=' + str(page_number + page_ctr)
followers_json = \
followers_json: dict = \
get_json(signing_priv_key_pem, session, url, session_headers, None,
debug, mitm_servers, __version__, http_prefix, None)
if get_json_valid(followers_json):
@ -5955,12 +5959,12 @@ def get_public_post_info(session, base_dir: str, nickname: str, domain: str,
session = create_session(proxy_type)
if not session:
return {}
person_cache = {}
cached_webfingers = {}
person_cache: dict = {}
cached_webfingers: dict = {}
domain_full = get_full_domain(domain, port)
handle = http_prefix + "://" + domain_full + "/@" + nickname
wf_request = \
domain_full: str = get_full_domain(domain, port)
handle: str = http_prefix + "://" + domain_full + "/@" + nickname
wf_request: dict = \
webfinger_handle(session, handle, http_prefix, cached_webfingers,
domain, project_version, debug, False,
signing_priv_key_pem, mitm_servers)
@ -5980,19 +5984,19 @@ def get_public_post_info(session, base_dir: str, nickname: str, domain: str,
nickname, domain, 'outbox',
13863, system_language,
mitm_servers)
max_posts = 64
post_domains = \
max_posts: int = 64
post_domains: list[str] = \
get_post_domains(session, person_url, max_posts, debug,
project_version, http_prefix, domain,
word_frequency, [], system_language,
signing_priv_key_pem, mitm_servers)
post_domains.sort()
domains_info = {}
domains_info: dict = {}
for pdomain in post_domains:
if not domains_info.get(pdomain):
domains_info[pdomain]: list[str] = []
blocked_posts = \
blocked_posts: dict = \
_get_posts_for_blocked_domains(base_dir, session,
person_url, max_posts,
debug,
@ -6017,8 +6021,8 @@ def get_public_post_domains_blocked(session, base_dir: str,
""" Returns a list of domains referenced within public posts which
are globally blocked on this instance
"""
origin_domain = domain
post_domains = \
origin_domain: str = domain
post_domains: list[str] = \
get_public_post_domains(session, base_dir, nickname, domain,
origin_domain,
proxy_type, port, http_prefix,
@ -6028,12 +6032,12 @@ def get_public_post_domains_blocked(session, base_dir: str,
if not post_domains:
return []
blocking_filename = data_dir(base_dir) + '/blocking.txt'
blocking_filename: str = data_dir(base_dir) + '/blocking.txt'
if not is_a_file(blocking_filename):
return []
# read the blocked domains as a single string
blocked_str = \
blocked_str: str = \
load_string(blocking_filename,
'EX: get_public_post_domains_blocked unable to read ' +
blocking_filename)
@ -6060,9 +6064,9 @@ def _get_non_mutuals_of_person(base_dir: str,
"""Returns the followers who are not mutuals of a person
i.e. accounts which follow you but you don't follow them
"""
followers = \
followers: list[str] = \
get_followers_list(base_dir, nickname, domain, 'followers.txt')
following = \
following: list[str] = \
get_followers_list(base_dir, nickname, domain, 'following.txt')
non_mutuals: list[str] = []
for handle in followers:
@ -6082,13 +6086,15 @@ def check_domains(session, base_dir: str,
"""Checks follower accounts for references to globally blocked domains
"""
word_frequency: dict = {}
non_mutuals = _get_non_mutuals_of_person(base_dir, nickname, domain)
non_mutuals: list[str] = \
_get_non_mutuals_of_person(base_dir, nickname, domain)
if not non_mutuals:
print('No non-mutual followers were found')
return
follower_warning_filename = data_dir(base_dir) + '/followerWarnings.txt'
follower_warning_filename: str = \
data_dir(base_dir) + '/followerWarnings.txt'
update_follower_warnings: bool = False
follower_warning_str = ''
follower_warning_str: str = ''
if is_a_file(follower_warning_filename):
follower_warning_str = \
load_string(follower_warning_filename,
@ -6099,12 +6105,12 @@ def check_domains(session, base_dir: str,
if single_check:
# checks a single random non-mutual
index = random.randrange(0, len(non_mutuals))
handle = non_mutuals[index]
index: int = random.randrange(0, len(non_mutuals))
handle: str = non_mutuals[index]
if '@' in handle:
non_mutual_nickname = handle.split('@')[0]
non_mutual_domain = handle.split('@')[1].strip()
blocked_domains = \
non_mutual_nickname: str = handle.split('@')[0]
non_mutual_domain: str = handle.split('@')[1].strip()
blocked_domains: list[str] = \
get_public_post_domains_blocked(session, base_dir,
non_mutual_nickname,
non_mutual_domain,
@ -6125,9 +6131,9 @@ def check_domains(session, base_dir: str,
continue
if handle in follower_warning_str:
continue
non_mutual_nickname = handle.split('@')[0]
non_mutual_domain = handle.split('@')[1].strip()
blocked_domains = \
non_mutual_nickname: str = handle.split('@')[0]
non_mutual_domain: str = handle.split('@')[1].strip()
blocked_domains: list[str] = \
get_public_post_domains_blocked(session, base_dir,
non_mutual_nickname,
non_mutual_domain,
@ -6156,9 +6162,9 @@ def check_domains(session, base_dir: str,
def populate_replies_json(base_dir: str, nickname: str, domain: str,
post_replies_filename: str, authorized: bool,
replies_json: {}) -> None:
pub_str = 'https://www.w3.org/ns/activitystreams#Public'
pub_str: str = 'https://www.w3.org/ns/activitystreams#Public'
# populate the items list with replies
replies_boxes = ('outbox', 'inbox')
replies_boxes: list[str] = ('outbox', 'inbox')
replies_list: list[str] = \
load_list(post_replies_filename,
'EX: populate_replies_json unable to read ' +
@ -6168,8 +6174,8 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str,
reply_found: bool = False
# examine inbox and outbox
for boxname in replies_boxes:
message_id2 = remove_eol(message_id)
search_filename = \
message_id2: str = remove_eol(message_id)
search_filename: str = \
acct_dir(base_dir, nickname, domain) + '/' + \
boxname + '/' + \
message_id2.replace('/', '#') + '.json'
@ -6195,8 +6201,8 @@ def populate_replies_json(base_dir: str, nickname: str, domain: str,
# if not in either inbox or outbox then examine the
# shared inbox
if not reply_found:
message_id2 = remove_eol(message_id)
search_filename = \
message_id2: str = remove_eol(message_id)
search_filename: str = \
data_dir(base_dir) + '/inbox@' + \
domain + '/inbox/' + \
message_id2.replace('/', '#') + '.json'
@ -6264,19 +6270,19 @@ def download_announce(session, base_dir: str, http_prefix: str,
if not isinstance(post_json_object['object'], str):
return None
# ignore self-boosts
actor_url = get_actor_from_post(post_json_object)
actor_url: str = get_actor_from_post(post_json_object)
if actor_url in post_json_object['object']:
return None
# get the announced post
announce_cache_dir = base_dir + '/cache/announce/' + nickname
announce_cache_dir: str = base_dir + '/cache/announce/' + nickname
if not is_a_dir(announce_cache_dir):
makedir(announce_cache_dir)
post_id = None
post_id: str = None
if post_json_object.get('id'):
post_id = remove_id_ending(post_json_object['id'])
announce_filename = \
announce_filename: str = \
announce_cache_dir + '/' + \
post_json_object['object'].replace('/', '#') + '.json'
@ -6291,22 +6297,22 @@ def download_announce(session, base_dir: str, http_prefix: str,
if post_json_object:
return post_json_object
else:
profile_str = 'https://www.w3.org/ns/activitystreams'
accept_str = \
profile_str: str = 'https://www.w3.org/ns/activitystreams'
accept_str: str = \
'application/activity+json; ' + \
'profile="' + profile_str + '"'
as_header = {
as_header: dict = {
'Accept': accept_str
}
if '/channel/' in actor_url or \
'/accounts/' in actor_url:
accept_str = \
accept_str: str = \
'application/ld+json; ' + \
'profile="' + profile_str + '"'
as_header = {
as_header: dict = {
'Accept': accept_str
}
actor_nickname = get_nickname_from_actor(actor_url)
actor_nickname: str = get_nickname_from_actor(actor_url)
if not actor_nickname:
print('WARN: download_announce no actor_nickname')
return None
@ -6328,7 +6334,8 @@ def download_announce(session, base_dir: str, http_prefix: str,
print('Announce download blocked actor: ' +
actor_nickname + '@' + actor_domain)
return None
object_nickname = get_nickname_from_actor(post_json_object['object'])
object_nickname: str = \
get_nickname_from_actor(post_json_object['object'])
object_domain, _ = \
get_domain_from_actor(post_json_object['object'])
if not object_domain:
@ -6355,7 +6362,7 @@ def download_announce(session, base_dir: str, http_prefix: str,
if debug:
print('Downloading Announce content for ' +
post_json_object['object'])
announced_json = \
announced_json: dict = \
get_json(signing_priv_key_pem, session,
post_json_object['object'],
as_header, None, debug, mitm_servers,
@ -6391,11 +6398,11 @@ def download_announce(session, base_dir: str, http_prefix: str,
recent_posts_cache, debug)
return None
announced_actor = announced_id
announced_actor: str = announced_id
if announced_json.get('attributedTo'):
announced_actor = get_attributed_to(announced_json['attributedTo'])
announced_json_str = str(announced_json)
announced_json_str: str = str(announced_json)
if not announced_json.get('type'):
print('WARN: announced post does not have a type ' +
announced_json_str)
@ -6435,7 +6442,7 @@ def download_announce(session, base_dir: str, http_prefix: str,
return None
if announced_json['type'] == 'Video':
converted_json = \
converted_json: dict = \
convert_video_to_note(base_dir, nickname, domain,
system_language,
announced_json, blocked_cache,
@ -6444,7 +6451,7 @@ def download_announce(session, base_dir: str, http_prefix: str,
if converted_json:
announced_json = converted_json
if announced_json['type'] == 'Torrent':
converted_json = \
converted_json: dict = \
convert_torrent_to_note(base_dir, nickname, domain,
system_language,
announced_json, blocked_cache,
@ -6534,7 +6541,7 @@ def download_announce(session, base_dir: str, http_prefix: str,
return None
# Check the content of the announce
convert_post_content_to_html(announced_json)
content_str = announced_json['content']
content_str: str = announced_json['content']
using_content_map: bool = False
if 'contentMap' in announced_json:
if announced_json['contentMap'].get(system_language):
@ -6548,11 +6555,11 @@ def download_announce(session, base_dir: str, http_prefix: str,
recent_posts_cache, debug)
return None
summary_str = \
summary_str: str = \
get_summary_from_post(announced_json, system_language, [])
media_descriptions = \
media_descriptions: str = \
get_media_descriptions_from_post(announced_json)
content_all = content_str
content_all: str = content_str
if summary_str:
content_all = \
summary_str + ' ' + content_str + ' ' + media_descriptions
@ -6680,13 +6687,13 @@ def is_image_media(session, base_dir: str, http_prefix: str,
"""Returns true if the given post has attached image media
"""
if post_json_object['type'] == 'Announce':
blocked_cache = {}
blocked_cache: dict = {}
block_federated: list[str] = []
block_military = {}
block_government = {}
block_bluesky = {}
block_nostr = {}
post_json_announce = \
block_military: dict = {}
block_government: dict = {}
block_bluesky: dict = {}
block_nostr: dict = {}
post_json_announce: dict = \
download_announce(session, base_dir, http_prefix,
nickname, domain, post_json_object,
__version__,
@ -6752,9 +6759,10 @@ def post_is_muted(base_dir: str, nickname: str, domain: str,
post_json_object: {}, message_id: str) -> bool:
""" Returns true if the given post is muted
"""
is_muted = None
is_muted: bool = None
if 'muted' in post_json_object:
is_muted = post_json_object['muted']
if isinstance(post_json_object['muted'], bool):
is_muted = post_json_object['muted']
if is_muted is True or is_muted is False:
return is_muted