Function position

main
Bob Mottram 2025-09-21 12:38:54 +01:00
parent 338a3587f0
commit b98910cd0f
16 changed files with 223 additions and 224 deletions

View File

@ -217,7 +217,7 @@ def _reject_quote_request(message_json: {}, domain_full: str,
not curr_domain.endswith('.onion') and \ not curr_domain.endswith('.onion') and \
domain_to_follow.endswith('.onion'): domain_to_follow.endswith('.onion'):
curr_session = session_onion curr_session = session_onion
curr_http_prefix = 'http' curr_http_prefix: str = 'http'
curr_domain = onion_domain curr_domain = onion_domain
curr_port = 80 curr_port = 80
port = 80 port = 80
@ -228,7 +228,7 @@ def _reject_quote_request(message_json: {}, domain_full: str,
not curr_domain.endswith('.i2p') and not curr_domain.endswith('.i2p') and
domain_to_follow.endswith('.i2p')): domain_to_follow.endswith('.i2p')):
curr_session = session_i2p curr_session = session_i2p
curr_http_prefix = 'http' curr_http_prefix: str = 'http'
curr_domain = i2p_domain curr_domain = i2p_domain
curr_port = 80 curr_port = 80
port = 80 port = 80
@ -403,7 +403,7 @@ def receive_accept_reject(base_dir: str, domain: str, message_json: {},
nickname = get_nickname_from_actor(actor_url) nickname = get_nickname_from_actor(actor_url)
if not nickname: if not nickname:
# single user instance # single user instance
nickname = 'dev' nickname: str = 'dev'
if debug: if debug:
print('DEBUG: ' + message_json['type'] + print('DEBUG: ' + message_json['type'] +
' does not contain a nickname. ' + ' does not contain a nickname. ' +

View File

@ -264,7 +264,7 @@ def announce_public(session, base_dir: str, federation_list: [],
""" """
from_domain = get_full_domain(domain, port) from_domain = get_full_domain(domain, port)
to_url = 'https://www.w3.org/ns/activitystreams#Public' to_url: str = 'https://www.w3.org/ns/activitystreams#Public'
cc_url = local_actor_url(http_prefix, nickname, from_domain) + '/followers' cc_url = local_actor_url(http_prefix, nickname, from_domain) + '/followers'
return create_announce(session, base_dir, federation_list, return create_announce(session, base_dir, federation_list,
nickname, domain, port, nickname, domain, port,
@ -296,7 +296,7 @@ def send_announce_via_server(base_dir: str, session,
from_domain_full = get_full_domain(from_domain, from_port) from_domain_full = get_full_domain(from_domain, from_port)
to_url = 'https://www.w3.org/ns/activitystreams#Public' to_url: str = 'https://www.w3.org/ns/activitystreams#Public'
actor_str = local_actor_url(http_prefix, from_nickname, from_domain_full) actor_str = local_actor_url(http_prefix, from_nickname, from_domain_full)
cc_url = actor_str + '/followers' cc_url = actor_str + '/followers'
@ -333,7 +333,7 @@ def send_announce_via_server(base_dir: str, session,
' did not return a dict. ' + str(wf_request)) ' did not return a dict. ' + str(wf_request))
return 1 return 1
post_to_box = 'outbox' post_to_box: str = 'outbox'
# get the actor inbox for the To handle # get the actor inbox for the To handle
origin_domain = from_domain origin_domain = from_domain
@ -422,7 +422,7 @@ def send_undo_announce_via_server(base_dir: str, session,
' did not return a dict. ' + str(wf_request)) ' did not return a dict. ' + str(wf_request))
return 1 return 1
post_to_box = 'outbox' post_to_box: str = 'outbox'
# get the actor inbox for the To handle # get the actor inbox for the To handle
origin_domain = domain origin_domain = domain

View File

@ -257,7 +257,7 @@ def authorize(base_dir: str, path: str, auth_header: str, debug: bool) -> bool:
def create_password(length: int): def create_password(length: int):
valid_chars = 'abcdefghijklmnopqrstuvwxyz' + \ valid_chars: str = 'abcdefghijklmnopqrstuvwxyz' + \
'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
return ''.join((secrets.choice(valid_chars) for i in range(length))) return ''.join((secrets.choice(valid_chars) for i in range(length)))
@ -293,10 +293,10 @@ def record_login_failure(base_dir: str, ip_address: str,
if not log_to_file: if not log_to_file:
return return
failure_log = data_dir(base_dir) + '/loginfailures.log' failure_log: str = data_dir(base_dir) + '/loginfailures.log'
write_type = 'a+' write_type: str = 'a+'
if not os.path.isfile(failure_log): if not os.path.isfile(failure_log):
write_type = 'w+' write_type: str = 'w+'
curr_time = date_utcnow() curr_time = date_utcnow()
curr_time_str = curr_time.strftime("%Y-%m-%d %H:%M:%SZ") curr_time_str = curr_time.strftime("%Y-%m-%d %H:%M:%SZ")
try: try:

View File

@ -127,7 +127,7 @@ def send_availability_via_server(base_dir: str, session,
' did not return a dict. ' + str(wf_request)) ' did not return a dict. ' + str(wf_request))
return 1 return 1
post_to_box = 'outbox' post_to_box: str = 'outbox'
# get the actor inbox for the To handle # get the actor inbox for the To handle
origin_domain = domain origin_domain = domain

View File

@ -60,7 +60,7 @@ def get_global_block_reason(search_text: str,
if not text_in_file(search_text, blocking_reasons_filename): if not text_in_file(search_text, blocking_reasons_filename):
return '' return ''
reasons_str = '' reasons_str: str = ''
try: try:
with open(blocking_reasons_filename, 'r', with open(blocking_reasons_filename, 'r',
encoding='utf-8') as fp_reas: encoding='utf-8') as fp_reas:
@ -93,8 +93,8 @@ def get_account_blocks(base_dir: str,
if not os.path.isfile(blocking_filename): if not os.path.isfile(blocking_filename):
return '' return ''
blocked_accounts_textarea = '' blocked_accounts_textarea: str = ''
blocking_file_text = '' blocking_file_text: str = ''
try: try:
with open(blocking_filename, 'r', encoding='utf-8') as fp_block: with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
blocking_file_text = fp_block.read() blocking_file_text = fp_block.read()
@ -155,7 +155,7 @@ def blocked_timeline_json(actor: str, page_number: int, items_per_page: int,
if index >= len(blocked_list): if index >= len(blocked_list):
break break
block_handle = blocked_list[index] block_handle = blocked_list[index]
block_reason = '' block_reason: str = ''
if ' - ' in block_handle: if ' - ' in block_handle:
block_reason = block_handle.split(' - ')[1] block_reason = block_handle.split(' - ')[1]
block_handle = block_handle.split(' - ')[0] block_handle = block_handle.split(' - ')[0]
@ -187,8 +187,8 @@ def add_account_blocks(base_dir: str,
if blocked_accounts_textarea is None: if blocked_accounts_textarea is None:
return False return False
blocklist = blocked_accounts_textarea.split('\n') blocklist = blocked_accounts_textarea.split('\n')
blocking_file_text = '' blocking_file_text: str = ''
blocking_reasons_file_text = '' blocking_reasons_file_text: str = ''
for line in blocklist: for line in blocklist:
line = line.strip() line = line.strip()
reason = None reason = None
@ -271,7 +271,7 @@ def _add_global_block_reason(base_dir: str,
print('EX: unable to add blocking reason ' + print('EX: unable to add blocking reason ' +
block_id) block_id)
else: else:
reasons_str = '' reasons_str: str = ''
try: try:
with open(blocking_reasons_filename, 'r', with open(blocking_reasons_filename, 'r',
encoding='utf-8') as fp_reas: encoding='utf-8') as fp_reas:
@ -279,7 +279,7 @@ def _add_global_block_reason(base_dir: str,
except OSError: except OSError:
print('EX: unable to read blocking reasons') print('EX: unable to read blocking reasons')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
if not line.startswith(block_id + ' '): if not line.startswith(block_id + ' '):
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
@ -375,7 +375,7 @@ def _add_block_reason(base_dir: str,
print('EX: unable to add blocking reason 2 ' + print('EX: unable to add blocking reason 2 ' +
block_id) block_id)
else: else:
reasons_str = '' reasons_str: str = ''
try: try:
with open(blocking_reasons_filename, 'r', with open(blocking_reasons_filename, 'r',
encoding='utf-8') as fp_reas: encoding='utf-8') as fp_reas:
@ -383,7 +383,7 @@ def _add_block_reason(base_dir: str,
except OSError: except OSError:
print('EX: unable to read blocking reasons 2') print('EX: unable to read blocking reasons 2')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
if not line.startswith(block_id + ' '): if not line.startswith(block_id + ' '):
new_reasons_str += line + '\n' new_reasons_str += line + '\n'
@ -428,7 +428,7 @@ def add_block(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/following.txt' acct_dir(base_dir, nickname, domain) + '/following.txt'
if os.path.isfile(following_filename): if os.path.isfile(following_filename):
if text_in_file(block_handle + '\n', following_filename): if text_in_file(block_handle + '\n', following_filename):
following_str = '' following_str: str = ''
try: try:
with open(following_filename, 'r', with open(following_filename, 'r',
encoding='utf-8') as fp_foll: encoding='utf-8') as fp_foll:
@ -453,7 +453,7 @@ def add_block(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/followers.txt' acct_dir(base_dir, nickname, domain) + '/followers.txt'
if os.path.isfile(followers_filename): if os.path.isfile(followers_filename):
if text_in_file(block_handle + '\n', followers_filename): if text_in_file(block_handle + '\n', followers_filename):
followers_str = '' followers_str: str = ''
try: try:
with open(followers_filename, 'r', with open(followers_filename, 'r',
encoding='utf-8') as fp_foll: encoding='utf-8') as fp_foll:
@ -504,7 +504,7 @@ def _remove_global_block_reason(base_dir: str,
if not text_in_file(unblock_id + ' ', unblocking_filename): if not text_in_file(unblock_id + ' ', unblocking_filename):
return False return False
reasons_str = '' reasons_str: str = ''
try: try:
with open(unblocking_filename, 'r', with open(unblocking_filename, 'r',
encoding='utf-8') as fp_reas: encoding='utf-8') as fp_reas:
@ -512,7 +512,7 @@ def _remove_global_block_reason(base_dir: str,
except OSError: except OSError:
print('EX: unable to read blocking reasons 3') print('EX: unable to read blocking reasons 3')
reasons_lines = reasons_str.split('\n') reasons_lines = reasons_str.split('\n')
new_reasons_str = '' new_reasons_str: str = ''
for line in reasons_lines: for line in reasons_lines:
if line.startswith(unblock_id + ' '): if line.startswith(unblock_id + ' '):
continue continue
@ -640,7 +640,7 @@ def is_blocked_hashtag(base_dir: str, hashtag: str) -> bool:
if os.path.isfile(global_blocking_filename): if os.path.isfile(global_blocking_filename):
hashtag = hashtag.strip('\n').strip('\r') hashtag = hashtag.strip('\n').strip('\r')
if not hashtag.startswith('#'): if not hashtag.startswith('#'):
hashtag = '#' + hashtag hashtag: str = '#' + hashtag
if text_in_file(hashtag + '\n', global_blocking_filename): if text_in_file(hashtag + '\n', global_blocking_filename):
return True return True
return False return False
@ -650,7 +650,7 @@ def get_domain_blocklist(base_dir: str) -> str:
"""Returns all globally blocked domains as a string """Returns all globally blocked domains as a string
This can be used for fast matching to mitigate flooding This can be used for fast matching to mitigate flooding
""" """
blocked_str = '' blocked_str: str = ''
evil_domains = evil_incarnate() evil_domains = evil_incarnate()
for evil in evil_domains: for evil in evil_domains:
@ -748,7 +748,7 @@ def is_blocked_domain(base_dir: str, domain: str,
else: else:
blocked_cache_list = blocked_cache blocked_cache_list = blocked_cache
search_str = '*@' + domain search_str: str = '*@' + domain
for curr_block_list in (blocked_cache_list, block_federated_list): for curr_block_list in (blocked_cache_list, block_federated_list):
for blocked_str in curr_block_list: for blocked_str in curr_block_list:
blocked_str = blocked_str.strip() blocked_str = blocked_str.strip()
@ -1011,7 +1011,7 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str,
handle = following_nickname + '@' + following_domain handle = following_nickname + '@' + following_domain
if text_in_file(handle + '\n', blocking_filename, False): if text_in_file(handle + '\n', blocking_filename, False):
file_text = '' file_text: str = ''
try: try:
with open(blocking_filename, 'r', with open(blocking_filename, 'r',
encoding='utf-8') as fp_noannounce: encoding='utf-8') as fp_noannounce:
@ -1020,7 +1020,7 @@ def allowed_announce_add(base_dir: str, nickname: str, domain: str,
print('EX: unable to read noannounce add: ' + print('EX: unable to read noannounce add: ' +
blocking_filename + ' ' + handle) blocking_filename + ' ' + handle)
new_file_text = '' new_file_text: str = ''
file_text_list = file_text.split('\n') file_text_list = file_text.split('\n')
handle_lower = handle.lower() handle_lower = handle.lower()
for allowed in file_text_list: for allowed in file_text_list:
@ -1058,7 +1058,7 @@ def allowed_announce_remove(base_dir: str, nickname: str, domain: str,
blocking_filename + ' ' + handle) blocking_filename + ' ' + handle)
return return
file_text = '' file_text: str = ''
if not text_in_file(handle + '\n', blocking_filename, False): if not text_in_file(handle + '\n', blocking_filename, False):
try: try:
with open(blocking_filename, 'r', with open(blocking_filename, 'r',
@ -1091,7 +1091,7 @@ def blocked_quote_toots_add(base_dir: str, nickname: str, domain: str,
handle = following_nickname + '@' + following_domain handle = following_nickname + '@' + following_domain
if not text_in_file(handle + '\n', blocking_filename, False): if not text_in_file(handle + '\n', blocking_filename, False):
file_text = '' file_text: str = ''
try: try:
with open(blocking_filename, 'r', with open(blocking_filename, 'r',
encoding='utf-8') as fp_quotes: encoding='utf-8') as fp_quotes:
@ -1123,7 +1123,7 @@ def blocked_quote_toots_remove(base_dir: str, nickname: str, domain: str,
if not os.path.isfile(blocking_filename): if not os.path.isfile(blocking_filename):
return return
file_text = '' file_text: str = ''
if text_in_file(handle + '\n', blocking_filename, False): if text_in_file(handle + '\n', blocking_filename, False):
try: try:
with open(blocking_filename, 'r', with open(blocking_filename, 'r',
@ -1825,7 +1825,7 @@ def import_blocking_file(base_dir: str, nickname: str, domain: str,
# already blocked # already blocked
continue continue
append_blocks.append(blocked_domain_name) append_blocks.append(blocked_domain_name)
blocked_comment = '' blocked_comment: str = ''
if '"' in line_str: if '"' in line_str:
quote_section = line_str.split('"') quote_section = line_str.split('"')
if len(quote_section) > 1: if len(quote_section) > 1:
@ -1905,7 +1905,7 @@ def export_blocking_file(base_dir: str, nickname: str, domain: str) -> str:
blocked_domain = blocked_domain.strip() blocked_domain = blocked_domain.strip()
if blocked_domain.startswith('#'): if blocked_domain.startswith('#'):
continue continue
reason_str = '' reason_str: str = ''
for reason_line in blocking_reasons: for reason_line in blocking_reasons:
if reason_line.startswith(blocked_domain + ' '): if reason_line.startswith(blocked_domain + ' '):
reason_str = reason_line.split(' ', 1)[1] reason_str = reason_line.split(' ', 1)[1]
@ -2042,7 +2042,7 @@ def load_blocked_nostr(base_dir: str) -> {}:
def save_blocked_military(base_dir: str, block_military: {}) -> None: def save_blocked_military(base_dir: str, block_military: {}) -> None:
"""Saves a list of nicknames for accounts which block military instances """Saves a list of nicknames for accounts which block military instances
""" """
nicknames_str = '' nicknames_str: str = ''
for nickname, _ in block_military.items(): for nickname, _ in block_military.items():
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
@ -2058,7 +2058,7 @@ def save_blocked_military(base_dir: str, block_military: {}) -> None:
def save_blocked_government(base_dir: str, block_government: {}) -> None: def save_blocked_government(base_dir: str, block_government: {}) -> None:
"""Saves a list of nicknames for accounts which block government instances """Saves a list of nicknames for accounts which block government instances
""" """
nicknames_str = '' nicknames_str: str = ''
for nickname, _ in block_government.items(): for nickname, _ in block_government.items():
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
@ -2074,7 +2074,7 @@ def save_blocked_government(base_dir: str, block_government: {}) -> None:
def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None: def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None:
"""Saves a list of nicknames for accounts which block bluesky bridges """Saves a list of nicknames for accounts which block bluesky bridges
""" """
nicknames_str = '' nicknames_str: str = ''
for nickname, _ in block_bluesky.items(): for nickname, _ in block_bluesky.items():
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
@ -2090,7 +2090,7 @@ def save_blocked_bluesky(base_dir: str, block_bluesky: {}) -> None:
def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None: def save_blocked_nostr(base_dir: str, block_nostr: {}) -> None:
"""Saves a list of nicknames for accounts which block nostr bridges """Saves a list of nicknames for accounts which block nostr bridges
""" """
nicknames_str = '' nicknames_str: str = ''
for nickname, _ in block_nostr.items(): for nickname, _ in block_nostr.items():
nicknames_str += nickname + '\n' nicknames_str += nickname + '\n'
@ -2247,7 +2247,7 @@ def _update_federated_blocks(session, base_dir: str,
print('DEBUG: federated blocklist endpoints: ' + print('DEBUG: federated blocklist endpoints: ' +
str(block_federated_endpoints)) str(block_federated_endpoints))
new_block_api_str = '' new_block_api_str: str = ''
for endpoint in block_federated_endpoints: for endpoint in block_federated_endpoints:
if not endpoint: if not endpoint:
continue continue
@ -2328,7 +2328,7 @@ def save_block_federated_endpoints(base_dir: str,
block_api_endpoints_filename = \ block_api_endpoints_filename = \
data_dir(base_dir) + '/block_api_endpoints.txt' data_dir(base_dir) + '/block_api_endpoints.txt'
result: list[str] = [] result: list[str] = []
block_federated_endpoints_str = '' block_federated_endpoints_str: str = ''
for endpoint in block_federated_endpoints: for endpoint in block_federated_endpoints:
if not endpoint: if not endpoint:
continue continue

73
blog.py
View File

@ -162,7 +162,7 @@ def _get_blog_replies(base_dir: str, http_prefix: str, translate: {},
print('EX: unable to read blog 4 ' + post_filename) print('EX: unable to read blog 4 ' + post_filename)
if lines: if lines:
replies_str = '' replies_str: str = ''
for reply_post_id in lines: for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id) reply_post_id = remove_eol(reply_post_id)
replacements = { replacements = {
@ -187,7 +187,7 @@ def _get_blog_replies(base_dir: str, http_prefix: str, translate: {},
replies_str += rply replies_str += rply
# indicate the reply indentation level # indicate the reply indentation level
indent_str = '>' indent_str: str = '>'
indent_level = 0 indent_level = 0
while indent_level < depth: while indent_level < depth:
indent_str += ' >' indent_str += ' >'
@ -210,13 +210,13 @@ def _html_blog_post_content(debug: bool, session, authorized: bool,
"""Returns the content for a single blog post """Returns the content for a single blog post
""" """
linked_author = False linked_author = False
actor = '' actor: str = ''
blog_str = '' blog_str: str = ''
message_link = '' message_link: str = ''
if post_json_object['object'].get('id'): if post_json_object['object'].get('id'):
message_link = \ message_link = \
post_json_object['object']['id'].replace('/statuses/', '/') post_json_object['object']['id'].replace('/statuses/', '/')
title_str = '' title_str: str = ''
article_added = False article_added = False
if post_json_object['object'].get('summary'): if post_json_object['object'].get('summary'):
title_str = post_json_object['object']['summary'] title_str = post_json_object['object']['summary']
@ -263,14 +263,14 @@ def _html_blog_post_content(debug: bool, session, authorized: bool,
blog_str += ' ' + handle blog_str += ' ' + handle
blog_str += '</h3>\n' blog_str += '</h3>\n'
avatar_link = '' avatar_link: str = ''
reply_str = '' reply_str: str = ''
announce_str = '' announce_str: str = ''
like_str = '' like_str: str = ''
bookmark_str = '' bookmark_str: str = ''
delete_str = '' delete_str: str = ''
mute_str = '' mute_str: str = ''
is_muted = False is_muted: bool = False
person_url = local_actor_url(http_prefix, nickname, domain_full) person_url = local_actor_url(http_prefix, nickname, domain_full)
actor_json = \ actor_json = \
@ -308,7 +308,7 @@ def _html_blog_post_content(debug: bool, session, authorized: bool,
else: else:
blog_str += '<br><article>' + content_str + '</article>\n' blog_str += '<br><article>' + content_str + '</article>\n'
citations_str = '' citations_str: str = ''
if post_json_object['object'].get('tag'): if post_json_object['object'].get('tag'):
for tag_json in post_json_object['object']['tag']: for tag_json in post_json_object['object']['tag']:
if not isinstance(tag_json, dict): if not isinstance(tag_json, dict):
@ -328,7 +328,7 @@ def _html_blog_post_content(debug: bool, session, authorized: bool,
'<li><a href="' + citation_url + '">' + \ '<li><a href="' + citation_url + '">' + \
'<cite>' + citation_name + '</cite></a></li>\n' '<cite>' + citation_name + '</cite></a></li>\n'
if citations_str: if citations_str:
citations_str = '<p><b>' + translate['Citations'] + \ citations_str: str = '<p><b>' + translate['Citations'] + \
':</b></p>' + \ ':</b></p>' + \
'<u>\n' + citations_str + '</u>\n' '<u>\n' + citations_str + '</u>\n'
@ -346,7 +346,7 @@ def _html_blog_post_content(debug: bool, session, authorized: bool,
# separator between blogs should be centered # separator between blogs should be centered
if '<center>' not in blog_separator: if '<center>' not in blog_separator:
blog_separator = '<center>' + blog_separator + '</center>' blog_separator: str = '<center>' + blog_separator + '</center>'
if replies == 0: if replies == 0:
blog_str += blog_separator + '\n' blog_str += blog_separator + '\n'
@ -379,8 +379,8 @@ def _html_blog_post_rss2(domain: str, post_json_object: {},
system_language: str) -> str: system_language: str) -> str:
"""Returns the RSS version 2 feed for a single blog post """Returns the RSS version 2 feed for a single blog post
""" """
rss_str = '' rss_str: str = ''
message_link = '' message_link: str = ''
if post_json_object['object'].get('id'): if post_json_object['object'].get('id'):
message_link = \ message_link = \
post_json_object['object']['id'].replace('/statuses/', '/') post_json_object['object']['id'].replace('/statuses/', '/')
@ -398,7 +398,7 @@ def _html_blog_post_rss2(domain: str, post_json_object: {},
system_language) system_language)
description = first_paragraph_from_string(content) description = first_paragraph_from_string(content)
description = escape_text(description) description = escape_text(description)
rss_str = ' <item>' rss_str: str = ' <item>'
rss_str += ' <title>' + title_str + '</title>' rss_str += ' <title>' + title_str + '</title>'
rss_str += ' <link>' + message_link + '</link>' rss_str += ' <link>' + message_link + '</link>'
rss_str += \ rss_str += \
@ -413,8 +413,8 @@ def _html_blog_post_rss3(domain: str, post_json_object: {},
system_language: str) -> str: system_language: str) -> str:
"""Returns the RSS version 3 feed for a single blog post """Returns the RSS version 3 feed for a single blog post
""" """
rss_str = '' rss_str: str = ''
message_link = '' message_link: str = ''
if post_json_object['object'].get('id'): if post_json_object['object'].get('id'):
message_link = \ message_link = \
post_json_object['object']['id'].replace('/statuses/', '/') post_json_object['object']['id'].replace('/statuses/', '/')
@ -431,7 +431,7 @@ def _html_blog_post_rss3(domain: str, post_json_object: {},
get_base_content_from_post(post_json_object, get_base_content_from_post(post_json_object,
system_language) system_language)
description = first_paragraph_from_string(content) description = first_paragraph_from_string(content)
rss_str = 'title: ' + title_str + '\n' rss_str: str = 'title: ' + title_str + '\n'
rss_str += 'link: ' + message_link + '\n' rss_str += 'link: ' + message_link + '\n'
rss_str += 'description: ' + description + '\n' rss_str += 'description: ' + description + '\n'
rss_str += 'created: ' + rss_date_str + '\n\n' rss_str += 'created: ' + rss_date_str + '\n\n'
@ -479,7 +479,7 @@ def html_blog_post(session, authorized: bool,
debug: bool, content_license_url: str) -> str: debug: bool, content_license_url: str) -> str:
"""Returns a html blog post """Returns a html blog post
""" """
blog_str = '' blog_str: str = ''
css_filename = base_dir + '/epicyon-blog.css' css_filename = base_dir + '/epicyon-blog.css'
if os.path.isfile(base_dir + '/blog.css'): if os.path.isfile(base_dir + '/blog.css'):
@ -491,7 +491,7 @@ def html_blog_post(session, authorized: bool,
if post_json_object['object'].get('updated'): if post_json_object['object'].get('updated'):
modified = post_json_object['object']['updated'] modified = post_json_object['object']['updated']
title = post_json_object['object']['summary'] title = post_json_object['object']['summary']
url = '' url: str = ''
if post_json_object['object'].get('url'): if post_json_object['object'].get('url'):
url_str = get_url_from_post(post_json_object['object']['url']) url_str = get_url_from_post(post_json_object['object']['url'])
url = remove_html(url_str) url = remove_html(url_str)
@ -545,7 +545,7 @@ def html_blog_page(authorized: bool, session,
if ' ' in nickname or '@' in nickname or \ if ' ' in nickname or '@' in nickname or \
'\n' in nickname or '\r' in nickname: '\n' in nickname or '\r' in nickname:
return None return None
blog_str = '' blog_str: str = ''
css_filename = base_dir + '/epicyon-profile.css' css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'): if os.path.isfile(base_dir + '/epicyon.css'):
@ -574,7 +574,7 @@ def html_blog_page(authorized: bool, session,
# show previous and next buttons # show previous and next buttons
if page_number is not None: if page_number is not None:
navigate_str = '<p>' navigate_str: str = '<p>'
if page_number > 1: if page_number > 1:
# show previous button # show previous button
navigate_str += '<a href="' + http_prefix + '://' + \ navigate_str += '<a href="' + http_prefix + '://' + \
@ -640,7 +640,7 @@ def html_blog_page_rss2(base_dir: str, http_prefix: str, translate: {},
domain_full = get_full_domain(domain, port) domain_full = get_full_domain(domain, port)
blog_rss2 = '' blog_rss2: str = ''
if include_header: if include_header:
blog_rss2 = rss2header(http_prefix, nickname, domain_full, blog_rss2 = rss2header(http_prefix, nickname, domain_full,
'Blog', translate) 'Blog', translate)
@ -685,7 +685,7 @@ def html_blog_page_rss3(base_dir: str, http_prefix: str,
'\n' in nickname or '\r' in nickname: '\n' in nickname or '\r' in nickname:
return None return None
blog_rss3 = '' blog_rss3: str = ''
blogs_index = acct_dir(base_dir, nickname, domain) + '/tlblogs.index' blogs_index = acct_dir(base_dir, nickname, domain) + '/tlblogs.index'
if not os.path.isfile(blogs_index): if not os.path.isfile(blogs_index):
@ -751,7 +751,7 @@ def html_blog_view(authorized: bool,
person_cache: {}, debug: bool) -> str: person_cache: {}, debug: bool) -> str:
"""Show the blog main page """Show the blog main page
""" """
blog_str = '' blog_str: str = ''
css_filename = base_dir + '/epicyon-profile.css' css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'): if os.path.isfile(base_dir + '/epicyon.css'):
@ -816,7 +816,7 @@ def html_edit_blog(media_instance: bool, translate: {},
try: try:
with open(dir_str + '/newpost.txt', 'r', with open(dir_str + '/newpost.txt', 'r',
encoding='utf-8') as fp_blog: encoding='utf-8') as fp_blog:
edit_blog_text = '<p>' + fp_blog.read() + '</p>' edit_blog_text: str = '<p>' + fp_blog.read() + '</p>'
except OSError: except OSError:
print('EX: html_edit_blog unable to read ' + print('EX: html_edit_blog unable to read ' +
dir_str + '/newpost.txt') dir_str + '/newpost.txt')
@ -829,7 +829,7 @@ def html_edit_blog(media_instance: bool, translate: {},
path = path.split('?')[0] path = path.split('?')[0]
path_base = path path_base = path
edit_blog_image_section = ' <div class="container">' edit_blog_image_section: str = ' <div class="container">'
edit_blog_image_section += ' <label class="labels">' + \ edit_blog_image_section += ' <label class="labels">' + \
translate['Image description'] + '</label>' translate['Image description'] + '</label>'
edit_blog_image_section += \ edit_blog_image_section += \
@ -843,11 +843,10 @@ def html_edit_blog(media_instance: bool, translate: {},
placeholder_message = translate['Write something'] + '...' placeholder_message = translate['Write something'] + '...'
endpoint = 'editblogpost' endpoint = 'editblogpost'
placeholder_subject = translate['Title'] placeholder_subject = translate['Title']
scope_icon = 'scope_blog.png' scope_icon: str = 'scope_blog.png'
scope_description = translate['Blog'] scope_description: str = translate['Blog']
date_and_location = '' date_and_location: str = '<div class="container">'
date_and_location = '<div class="container">'
date_and_location += \ date_and_location += \
'<p><input type="checkbox" class="profilecheckbox" ' + \ '<p><input type="checkbox" class="profilecheckbox" ' + \
@ -917,7 +916,7 @@ def html_edit_blog(media_instance: bool, translate: {},
edit_blog_form += edit_blog_image_section edit_blog_form += edit_blog_image_section
edit_blog_form += \ edit_blog_form += \
' <label class="labels">' + placeholder_subject + '</label><br>' ' <label class="labels">' + placeholder_subject + '</label><br>'
title_str = '' title_str: str = ''
if post_json_object['object'].get('summary'): if post_json_object['object'].get('summary'):
title_str = post_json_object['object']['summary'] title_str = post_json_object['object']['summary']
edit_blog_form += \ edit_blog_form += \

View File

@ -78,7 +78,7 @@ def undo_bookmarks_collection_entry(recent_posts_cache: {},
bookmark_index = remove_eol(bookmark_index) bookmark_index = remove_eol(bookmark_index)
if not text_in_file(bookmark_index, bookmarks_index_filename): if not text_in_file(bookmark_index, bookmarks_index_filename):
return return
index_str = '' index_str: str = ''
try: try:
with open(bookmarks_index_filename, 'r', with open(bookmarks_index_filename, 'r',
encoding='utf-8') as fp_index: encoding='utf-8') as fp_index:
@ -199,7 +199,7 @@ def update_bookmarks_collection(recent_posts_cache: {},
str(post_json_object)) str(post_json_object))
return return
bookmarks_ending = '/bookmarks' bookmarks_ending: str = '/bookmarks'
if not object_url.endswith(bookmarks_ending): if not object_url.endswith(bookmarks_ending):
collection_id = object_url + bookmarks_ending collection_id = object_url + bookmarks_ending
else: else:
@ -445,7 +445,7 @@ def send_bookmark_via_server(base_dir: str, session,
' did not return a dict. ' + str(wf_request)) ' did not return a dict. ' + str(wf_request))
return 1 return 1
post_to_box = 'outbox' post_to_box: str = 'outbox'
# get the actor inbox for the To handle # get the actor inbox for the To handle
origin_domain = domain origin_domain = domain
@ -542,7 +542,7 @@ def send_undo_bookmark_via_server(base_dir: str, session,
' did not return a dict. ' + str(wf_request)) ' did not return a dict. ' + str(wf_request))
return 1 return 1
post_to_box = 'outbox' post_to_box: str = 'outbox'
# get the actor inbox for the To handle # get the actor inbox for the To handle
origin_domain = domain origin_domain = domain

View File

@ -243,7 +243,7 @@ def get_person_pub_key(base_dir: str, session, person_url: str,
elif i2p_domain: elif i2p_domain:
if '.i2p/' in person_url: if '.i2p/' in person_url:
person_domain = i2p_domain person_domain = i2p_domain
profile_str = 'https://www.w3.org/ns/activitystreams' profile_str: str = 'https://www.w3.org/ns/activitystreams'
accept_str = \ accept_str = \
'application/activity+json; profile="' + profile_str + '"' 'application/activity+json; profile="' + profile_str + '"'
as_header = { as_header = {
@ -288,7 +288,7 @@ def cache_svg_images(session, base_dir: str, http_prefix: str,
return False return False
cached = False cached = False
post_id = remove_id_ending(obj['id']).replace('/', '--') post_id = remove_id_ending(obj['id']).replace('/', '--')
actor = 'unknown' actor: str = 'unknown'
if post_attachments and obj.get('attributedTo'): if post_attachments and obj.get('attributedTo'):
actor = get_attributed_to(obj['attributedTo']) actor = get_attributed_to(obj['attributedTo'])
log_filename = data_dir(base_dir) + '/svg_scripts_log.txt' log_filename = data_dir(base_dir) + '/svg_scripts_log.txt'
@ -417,7 +417,7 @@ def clear_from_post_caches(base_dir: str, recent_posts_cache: {},
"""Clears cached html for the given post, so that edits """Clears cached html for the given post, so that edits
to news will appear to news will appear
""" """
filename = '/postcache/' + post_id + '.html' filename: str = '/postcache/' + post_id + '.html'
dir_str = data_dir(base_dir) dir_str = data_dir(base_dir)
for _, dirs, _ in os.walk(dir_str): for _, dirs, _ in os.walk(dir_str):
for acct in dirs: for acct in dirs:

View File

@ -48,7 +48,7 @@ def get_hashtag_category(base_dir: str, hashtag: str) -> str:
def load_city_hashtags(base_dir: str, translate: {}) -> None: def load_city_hashtags(base_dir: str, translate: {}) -> None:
"""create hashtag categories for cities """create hashtag categories for cities
""" """
category_str = 'places' category_str: str = 'places'
if translate.get(category_str): if translate.get(category_str):
category_str = translate[category_str] category_str = translate[category_str]
@ -91,7 +91,7 @@ def load_city_hashtags(base_dir: str, translate: {}) -> None:
city_filename) city_filename)
if '-' in hashtag: if '-' in hashtag:
section = hashtag.split('-') section = hashtag.split('-')
new_hashtag = '' new_hashtag: str = ''
for text in section: for text in section:
new_hashtag += text.lower().title() new_hashtag += text.lower().title()
hashtag2 = new_hashtag hashtag2 = new_hashtag
@ -107,7 +107,7 @@ def load_city_hashtags(base_dir: str, translate: {}) -> None:
city_filename) city_filename)
if ' ' in hashtag: if ' ' in hashtag:
section = hashtag.split(' ') section = hashtag.split(' ')
new_hashtag = '' new_hashtag: str = ''
for text in section: for text in section:
new_hashtag += text.lower().title() new_hashtag += text.lower().title()
hashtag2 = new_hashtag hashtag2 = new_hashtag
@ -207,7 +207,7 @@ def update_hashtag_categories(base_dir: str) -> None:
category_list.append(category_str) category_list.append(category_str)
category_list.sort() category_list.sort()
category_list_str = '' category_list_str: str = ''
for category_str in category_list: for category_str in category_list:
category_list_str += category_str + '\n' category_list_str += category_str + '\n'
@ -291,7 +291,7 @@ def guess_hashtag_category(tag_name: str, hashtag_categories: {},
if len(tag_name) < min_tag_length: if len(tag_name) < min_tag_length:
return '' return ''
category_matched = '' category_matched: str = ''
tag_matched_len = 0 tag_matched_len = 0
finished = False finished = False

14
city.py
View File

@ -214,8 +214,8 @@ def spoof_geolocation(base_dir: str,
variance_at_location = 0.0004 variance_at_location = 0.0004
default_latitude = 51.8744 default_latitude = 51.8744
default_longitude = 0.368333 default_longitude = 0.368333
default_latdirection = 'N' default_latdirection: str = 'N'
default_longdirection = 'W' default_longdirection: str = 'W'
if cities_list: if cities_list:
cities = cities_list cities = cities_list
@ -257,13 +257,13 @@ def spoof_geolocation(base_dir: str,
area_km2 = 0 area_km2 = 0
if len(city_fields) > 3: if len(city_fields) > 3:
area_km2 = int(city_fields[3]) area_km2 = int(city_fields[3])
latdirection = 'N' latdirection: str = 'N'
longdirection = 'E' longdirection: str = 'E'
if 'S' in latitude: if 'S' in latitude:
latdirection = 'S' latdirection: str = 'S'
latitude = latitude.replace('S', '') latitude = latitude.replace('S', '')
if 'W' in longitude: if 'W' in longitude:
longdirection = 'W' longdirection: str = 'W'
longitude = longitude.replace('W', '') longitude = longitude.replace('W', '')
latitude = float(latitude) latitude = float(latitude)
longitude = float(longitude) longitude = float(longitude)
@ -330,7 +330,7 @@ def get_spoofed_city(city: str, base_dir: str,
"""Returns the name of the city to use as a GPS spoofing location for """Returns the name of the city to use as a GPS spoofing location for
image metadata image metadata
""" """
city = '' city: str = ''
city_filename = acct_dir(base_dir, nickname, domain) + '/city.txt' city_filename = acct_dir(base_dir, nickname, domain) + '/city.txt'
if os.path.isfile(city_filename): if os.path.isfile(city_filename):
try: try:

View File

@ -98,7 +98,7 @@ def remove_html_tag(html_str: str, tag: str) -> str:
""" """
tag_found = True tag_found = True
while tag_found: while tag_found:
match_str = ' ' + tag + '="' match_str: str = ' ' + tag + '="'
if match_str not in html_str: if match_str not in html_str:
tag_found = False tag_found = False
break break
@ -164,7 +164,7 @@ def html_replace_email_quote(content: str) -> str:
return content return content
content_str = content.replace('<p>', '') content_str = content.replace('<p>', '')
content_lines = content_str.split('</p>') content_lines = content_str.split('</p>')
new_content = '' new_content: str = ''
for line_str in content_lines: for line_str in content_lines:
if not line_str: if not line_str:
continue continue
@ -209,7 +209,7 @@ def html_replace_quote_marks(content: str) -> str:
if '"' in content: if '"' in content:
sections = content.split('"') sections = content.split('"')
if len(sections) > 1: if len(sections) > 1:
new_content = '' new_content: str = ''
open_quote = True open_quote = True
markup = False markup = False
for char in content: for char in content:
@ -220,16 +220,16 @@ def html_replace_quote_marks(content: str) -> str:
markup = False markup = False
elif char == '"' and not markup: elif char == '"' and not markup:
if open_quote: if open_quote:
curr_char = '' curr_char: str = ''
else: else:
curr_char = '' curr_char: str = ''
open_quote = not open_quote open_quote = not open_quote
new_content += curr_char new_content += curr_char
if '&quot;' in new_content: if '&quot;' in new_content:
open_quote = True open_quote = True
content = new_content content = new_content
new_content = '' new_content: str = ''
ctr = 0 ctr = 0
sections = content.split('&quot;') sections = content.split('&quot;')
no_of_sections = len(sections) no_of_sections = len(sections)
@ -512,7 +512,7 @@ def replace_emoji_from_tags(session, base_dir: str,
else: else:
# sequence of codes # sequence of codes
icon_codes = icon_name.split('-') icon_codes = icon_name.split('-')
icon_code_sequence = '' icon_code_sequence: str = ''
for icode in icon_codes: for icode in icon_codes:
replaced = False replaced = False
try: try:
@ -520,7 +520,7 @@ def replace_emoji_from_tags(session, base_dir: str,
icode, 16)) icode, 16))
replaced = True replaced = True
except BaseException: except BaseException:
icon_code_sequence = '' icon_code_sequence: str = ''
if debug: if debug:
print('EX: ' + print('EX: ' +
'replace_emoji_from_tags 2 ' + 'replace_emoji_from_tags 2 ' +
@ -545,18 +545,18 @@ def replace_emoji_from_tags(session, base_dir: str,
content = content.replace(tag_item['name'], content = content.replace(tag_item['name'],
icon_code_sequence) icon_code_sequence)
html_class = 'emoji' html_class: str = 'emoji'
if message_type == 'post header': if message_type == 'post header':
html_class = 'emojiheader' html_class = 'emojiheader'
if message_type == 'profile': if message_type == 'profile':
html_class = 'emojiprofile' html_class = 'emojiprofile'
if screen_readable: if screen_readable:
emoji_tag_name = tag_item['name'].replace(':', '') emoji_tag_name: str = tag_item['name'].replace(':', '')
else: else:
emoji_tag_name = '' emoji_tag_name: str = ''
url_str = get_url_from_post(tag_item['icon']['url']) url_str: str = get_url_from_post(tag_item['icon']['url'])
tag_url = remove_html(url_str) tag_url: str = remove_html(url_str)
emoji_html = "<img src=\"" + tag_url + "\" alt=\"" + \ emoji_html: str = "<img src=\"" + tag_url + "\" alt=\"" + \
emoji_tag_name + \ emoji_tag_name + \
"\" align=\"middle\" class=\"" + html_class + "\"/>" "\" align=\"middle\" class=\"" + html_class + "\"/>"
content = content.replace(tag_item['name'], emoji_html) content = content.replace(tag_item['name'], emoji_html)
@ -621,8 +621,8 @@ def _contains_doi_reference(wrd: str, replace_dict: {}) -> bool:
return False return False
doi_ref_str = wrd.split(':', 1)[1] doi_ref_str = wrd.split(':', 1)[1]
doi_site = 'https://sci-hub.ru' doi_site: str = 'https://sci-hub.ru'
markup = '<a href="' + doi_site + '/' + \ markup: str = '<a href="' + doi_site + '/' + \
doi_ref_str + '" tabindex="10" ' + \ doi_ref_str + '" tabindex="10" ' + \
'rel="nofollow noopener noreferrer" ' + \ 'rel="nofollow noopener noreferrer" ' + \
'target="_blank">' + \ 'target="_blank">' + \
@ -657,7 +657,7 @@ def _contains_arxiv_reference(wrd: str, replace_dict: {}) -> bool:
if not arxiv_day.isdigit(): if not arxiv_day.isdigit():
return False return False
ref_str = arxiv_ref[0] + '.' + arxiv_ref[1] ref_str = arxiv_ref[0] + '.' + arxiv_ref[1]
markup = '<a href="https://arxiv.org/abs/' + \ markup: str = '<a href="https://arxiv.org/abs/' + \
ref_str + '" tabindex="10" ' + \ ref_str + '" tabindex="10" ' + \
'rel="nofollow noopener noreferrer" ' + \ 'rel="nofollow noopener noreferrer" ' + \
'target="_blank">' + \ 'target="_blank">' + \
@ -687,7 +687,7 @@ def remove_link_trackers_from_content(content: str) -> str:
return content return content
sections = content.split('?utm_') sections = content.split('?utm_')
ctr = 0 ctr = 0
new_content = '' new_content: str = ''
for section_str in sections: for section_str in sections:
if ctr == 0: if ctr == 0:
new_content = section_str new_content = section_str
@ -748,7 +748,7 @@ def add_web_links(content: str) -> str:
if url.endswith('.') or wrd.endswith(';'): if url.endswith('.') or wrd.endswith(';'):
url = url[:-1] url = url[:-1]
url = remove_link_tracking(url) url = remove_link_tracking(url)
markup = '<a href="' + url + '" tabindex="10" ' + \ markup: str = '<a href="' + url + '" tabindex="10" ' + \
'rel="nofollow noopener noreferrer" target="_blank">' 'rel="nofollow noopener noreferrer" target="_blank">'
for prefix in prefixes: for prefix in prefixes:
if url.startswith(prefix): if url.startswith(prefix):
@ -846,7 +846,7 @@ def replace_remote_hashtags(content: str,
ctr += 1 ctr += 1
continue continue
if '/' + domain not in link: if '/' + domain not in link:
new_link = '/users/' + nickname + \ new_link: str = '/users/' + nickname + \
'?remotetag=' + link.replace('/', '--') '?remotetag=' + link.replace('/', '--')
replacements[link] = new_link replacements[link] = new_link
ctr += 1 ctr += 1
@ -1170,7 +1170,7 @@ def remove_long_words(content: str, max_word_length: int,
if '/' in word_str: if '/' in word_str:
continue continue
if len(word_str[max_word_length:]) < max_word_length: if len(word_str[max_word_length:]) < max_word_length:
end_of_line_char = '\n' end_of_line_char: str = '\n'
if '<br>' in original_word_str: if '<br>' in original_word_str:
end_of_line_char = '' end_of_line_char = ''
new_word_str = \ new_word_str = \
@ -1272,7 +1272,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}:
ending = True ending = True
if ending: if ending:
prev_wrd = '' prev_wrd: str = ''
for wrd in words: for wrd in words:
wrd2 = (prev_wrd + ' ' + wrd).strip() wrd2 = (prev_wrd + ' ' + wrd).strip()
if wrd.endswith(whistle) or wrd2.endswith(whistle): if wrd.endswith(whistle) or wrd2.endswith(whistle):
@ -1294,7 +1294,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}:
starting = True starting = True
if starting: if starting:
prev_wrd = '' prev_wrd: str = ''
for wrd in words: for wrd in words:
wrd2 = (prev_wrd + ' ' + wrd).strip() wrd2 = (prev_wrd + ' ' + wrd).strip()
if wrd.startswith(whistle) or wrd2.startswith(whistle): if wrd.startswith(whistle) or wrd2.startswith(whistle):
@ -1311,7 +1311,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}:
if '*' in whistle: if '*' in whistle:
whistle_start = whistle.split('*', 1)[0] whistle_start = whistle.split('*', 1)[0]
whistle_end = whistle.split('*', 1)[1] whistle_end = whistle.split('*', 1)[1]
prev_wrd = '' prev_wrd: str = ''
for wrd in words: for wrd in words:
wrd2 = (prev_wrd + ' ' + wrd).strip() wrd2 = (prev_wrd + ' ' + wrd).strip()
if ((wrd.startswith(whistle_start) and if ((wrd.startswith(whistle_start) and
@ -1328,7 +1328,7 @@ def detect_dogwhistles(content: str, dogwhistles: {}) -> {}:
prev_wrd = wrd prev_wrd = wrd
continue continue
prev_wrd = '' prev_wrd: str = ''
for wrd in words: for wrd in words:
wrd2 = (prev_wrd + ' ' + wrd).strip() wrd2 = (prev_wrd + ' ' + wrd).strip()
if whistle in (wrd, wrd2): if whistle in (wrd, wrd2):
@ -1392,10 +1392,10 @@ def add_html_tags(base_dir: str, http_prefix: str,
'\n': ' --linebreak-- ' '\n': ' --linebreak-- '
} }
content = replace_strings(content, replacements) content = replace_strings(content, replacements)
now_playing_str = 'NowPlaying' now_playing_str: str = 'NowPlaying'
if translate.get(now_playing_str): if translate.get(now_playing_str):
now_playing_str = translate[now_playing_str] now_playing_str = translate[now_playing_str]
now_playing_lower_str = 'nowplaying' now_playing_lower_str: str = 'nowplaying'
if translate.get(now_playing_lower_str): if translate.get(now_playing_lower_str):
now_playing_lower_str = translate[now_playing_lower_str] now_playing_lower_str = translate[now_playing_lower_str]
if '#' + now_playing_lower_str in content: if '#' + now_playing_lower_str in content:
@ -1445,7 +1445,7 @@ def add_html_tags(base_dir: str, http_prefix: str,
# extract mentions and tags from words # extract mentions and tags from words
long_words_list: list[str] = [] long_words_list: list[str] = []
prev_word_str = '' prev_word_str: str = ''
auto_tags_list = _load_auto_tags(base_dir, nickname, domain) auto_tags_list = _load_auto_tags(base_dir, nickname, domain)
append_tags = [] append_tags = []
for word_str in words: for word_str in words:
@ -1459,7 +1459,7 @@ def add_html_tags(base_dir: str, http_prefix: str,
if _add_mention(base_dir, word_str, http_prefix, following, if _add_mention(base_dir, word_str, http_prefix, following,
petnames, replace_mentions, recipients, petnames, replace_mentions, recipients,
hashtags): hashtags):
prev_word_str = '' prev_word_str: str = ''
continue continue
elif first_char == '#': elif first_char == '#':
# remove any endings from the hashtag # remove any endings from the hashtag
@ -1471,7 +1471,7 @@ def add_html_tags(base_dir: str, http_prefix: str,
if _add_hash_tags(word_str, http_prefix, original_domain, if _add_hash_tags(word_str, http_prefix, original_domain,
replace_hashtags, hashtags): replace_hashtags, hashtags):
prev_word_str = '' prev_word_str: str = ''
continue continue
elif ':' in word_str: elif ':' in word_str:
word_str2 = word_str.split(':')[1] word_str2 = word_str.split(':')[1]
@ -1500,12 +1500,12 @@ def add_html_tags(base_dir: str, http_prefix: str,
emoji_dict) emoji_dict)
else: else:
if _auto_tag(word_str, auto_tags_list, append_tags): if _auto_tag(word_str, auto_tags_list, append_tags):
prev_word_str = '' prev_word_str: str = ''
continue continue
if prev_word_str: if prev_word_str:
if _auto_tag(prev_word_str + ' ' + word_str, if _auto_tag(prev_word_str + ' ' + word_str,
auto_tags_list, append_tags): auto_tags_list, append_tags):
prev_word_str = '' prev_word_str: str = ''
continue continue
prev_word_str = word_str prev_word_str = word_str
@ -1641,7 +1641,7 @@ def save_media_in_form_post(media_bytes, debug: bool,
return None, None return None, None
media_location = -1 media_location = -1
search_str = '' search_str: str = ''
filename = None filename = None
# directly search the binary array for the beginning # directly search the binary array for the beginning
@ -1821,7 +1821,7 @@ def extract_text_fields_in_post(post_bytes, boundary: str, debug: bool,
if debug: if debug:
if 'password' not in message_fields: if 'password' not in message_fields:
print('DEBUG: POST message_fields: ' + str(message_fields)) print('DEBUG: POST message_fields: ' + str(message_fields))
lynx_content_type = 'Content-Type: text/plain; charset=utf-8\r\n' lynx_content_type: str = 'Content-Type: text/plain; charset=utf-8\r\n'
# examine each section of the POST, separated by the boundary # examine each section of the POST, separated by the boundary
for fld in message_fields: for fld in message_fields:
if fld == '--': if fld == '--':
@ -1856,7 +1856,7 @@ def extract_text_fields_in_post(post_bytes, boundary: str, debug: bool,
post_lines = post_value_str.split('\r\n') post_lines = post_value_str.split('\r\n')
if debug and 'password' not in post_key: if debug and 'password' not in post_key:
print('post_lines: ' + str(post_lines)) print('post_lines: ' + str(post_lines))
post_value = '' post_value: str = ''
if len(post_lines) > 2: if len(post_lines) > 2:
for line in range(2, len(post_lines)-1): for line in range(2, len(post_lines)-1):
if line > 2: if line > 2:
@ -1873,9 +1873,9 @@ def limit_repeated_words(text: str, max_repeats: int) -> str:
""" """
words = text.replace('\n', ' ').split(' ') words = text.replace('\n', ' ').split(' ')
repeat_ctr = 0 repeat_ctr = 0
repeated_text = '' repeated_text: str = ''
replacements = {} replacements = {}
prev_word = '' prev_word: str = ''
for word in words: for word in words:
if word == prev_word: if word == prev_word:
repeat_ctr += 1 repeat_ctr += 1
@ -1975,7 +1975,7 @@ def contains_invalid_local_links(domain_full: str,
"""Returns true if the given content has invalid links """Returns true if the given content has invalid links
""" """
for inv_str in INVALID_CONTENT_STRINGS: for inv_str in INVALID_CONTENT_STRINGS:
match_str = '?' + inv_str + '=' match_str: str = '?' + inv_str + '='
if match_str not in content: if match_str not in content:
continue continue
# extract the urls and check whether they are for the local domain # extract the urls and check whether they are for the local domain
@ -2009,10 +2009,10 @@ def bold_reading_string(text: str) -> str:
add_paragraph_markup = True add_paragraph_markup = True
paragraphs = text.split('\n') paragraphs = text.split('\n')
parag_ctr = 0 parag_ctr = 0
new_text = '' new_text: str = ''
for parag in paragraphs: for parag in paragraphs:
words = parag.split(' ') words = parag.split(' ')
new_parag = '' new_parag: str = ''
reading_markup = False reading_markup = False
for wrd in words: for wrd in words:
if '<' in wrd: if '<' in wrd:
@ -2025,8 +2025,8 @@ def bold_reading_string(text: str) -> str:
'&' not in wrd and '=' not in wrd and \ '&' not in wrd and '=' not in wrd and \
not wrd.startswith(':'): not wrd.startswith(':'):
prefix = '' prefix: str = ''
postfix = '' postfix: str = ''
if wrd.startswith('"'): if wrd.startswith('"'):
prefix = '"' prefix = '"'
wrd = wrd[1:] wrd = wrd[1:]
@ -2109,7 +2109,7 @@ def content_diff(content: str, prev_content: str) -> str:
diff = cdiff.compare(text1_sentences, text2_sentences) diff = cdiff.compare(text1_sentences, text2_sentences)
diff_text = '' diff_text: str = ''
for line in diff: for line in diff:
if line.startswith('- '): if line.startswith('- '):
if not diff_text: if not diff_text:
@ -2146,7 +2146,7 @@ def create_edits_html(edits_json: {}, post_json_object: {},
for modified, _ in edits_json.items(): for modified, _ in edits_json.items():
edit_dates_list.append(modified) edit_dates_list.append(modified)
edit_dates_list.sort(reverse=True) edit_dates_list.sort(reverse=True)
edits_str = '' edits_str: str = ''
content = get_content_from_post(post_json_object, system_language, content = get_content_from_post(post_json_object, system_language,
languages_understood, "content") languages_understood, "content")
if not content: if not content:
@ -2172,7 +2172,7 @@ def create_edits_html(edits_json: {}, post_json_object: {},
datetime_object = \ datetime_object = \
convert_published_to_local_timezone(datetime_object, timezone) convert_published_to_local_timezone(datetime_object, timezone)
modified_str = datetime_object.strftime("%a %b %d, %H:%M") modified_str = datetime_object.strftime("%a %b %d, %H:%M")
diff = '<p><b>' + modified_str + '</b></p>' + diff diff: str = '<p><b>' + modified_str + '</b></p>' + diff
edits_str += diff edits_str += diff
content = prev_content content = prev_content
if not edits_str: if not edits_str:
@ -2189,7 +2189,7 @@ def remove_script(content: str, log_filename: str,
separators = [['<', '>'], ['&lt;', '&gt;']] separators = [['<', '>'], ['&lt;', '&gt;']]
for sep in separators: for sep in separators:
prefix = sep[0] + 'script' prefix = sep[0] + 'script'
ending = '/script' + sep[1] ending: str = '/script' + sep[1]
if prefix not in content: if prefix not in content:
continue continue
sections = content.split(prefix) sections = content.split(prefix)
@ -2208,7 +2208,7 @@ def remove_script(content: str, log_filename: str,
if log_filename and actor: if log_filename and actor:
# write the detected script to a log file # write the detected script to a log file
log_str = actor + ' ' + url + ' ' + text + '\n' log_str = actor + ' ' + url + ' ' + text + '\n'
write_type = 'a+' write_type: str = 'a+'
if os.path.isfile(log_filename): if os.path.isfile(log_filename):
write_type = 'w+' write_type = 'w+'
try: try:
@ -2321,22 +2321,22 @@ def format_mixed_right_to_left(content: str,
# not a RTL language # not a RTL language
if language_right_to_left(language): if language_right_to_left(language):
return content return content
result = '' result: str = ''
changed = False changed = False
paragraphs = content.split('<p>') paragraphs = content.split('<p>')
for text_html in paragraphs: for text_html in paragraphs:
if '</p>' not in text_html: if '</p>' not in text_html:
continue continue
text_html = '<p>' + text_html text_html: str = '<p>' + text_html
text_plain = remove_html(text_html) text_plain: str = remove_html(text_html)
if is_right_to_left_text(text_plain): if is_right_to_left_text(text_plain):
text_html = text_html.replace('<p>', '<p><div dir="rtl">', 1) text_html = text_html.replace('<p>', '<p><div dir="rtl">', 1)
text_html = text_html.replace('</p>', '</div></p>', 1) text_html = text_html.replace('</p>', '</div></p>', 1)
changed = True changed = True
result += text_html result += text_html
if not changed: if not changed:
result = '' result: str = ''
prev_distilled = '' prev_distilled: str = ''
distilled = content distilled = content
while prev_distilled != distilled: while prev_distilled != distilled:
prev_distilled = distilled prev_distilled = distilled

View File

@ -299,7 +299,7 @@ def download_conversation_posts(authorized: bool, session,
""" """
if '://' not in post_id: if '://' not in post_id:
return [] return []
profile_str = 'https://www.w3.org/ns/activitystreams' profile_str: str = 'https://www.w3.org/ns/activitystreams'
as_header = { as_header = {
'Accept': 'application/ld+json; profile="' + profile_str + '"' 'Accept': 'application/ld+json; profile="' + profile_str + '"'
} }
@ -438,7 +438,7 @@ def conversation_tag_to_convthread_id(tag: str) -> str:
""" """
if not isinstance(tag, str): if not isinstance(tag, str):
return '' return ''
convthread_id = '' convthread_id: str = ''
for tag_chr in tag: for tag_chr in tag:
if tag_chr.isdigit(): if tag_chr.isdigit():
convthread_id += tag_chr convthread_id += tag_chr

View File

@ -90,7 +90,7 @@ def _save_known_web_bots(base_dir: str, known_bots: []) -> bool:
"""Saves a list of known web bots """Saves a list of known web bots
""" """
known_bots_filename = data_dir(base_dir) + '/knownBots.txt' known_bots_filename = data_dir(base_dir) + '/knownBots.txt'
known_bots_str = '' known_bots_str: str = ''
for crawler in known_bots: for crawler in known_bots:
known_bots_str += crawler.strip() + '\n' known_bots_str += crawler.strip() + '\n'
try: try:

View File

@ -115,7 +115,7 @@ def add_cw_from_lists(post_json_object: {}, cw_lists: {}, translate: {},
if 'content' not in post_json_object['object']: if 'content' not in post_json_object['object']:
if 'contentMap' not in post_json_object['object']: if 'contentMap' not in post_json_object['object']:
return return
cw_text = '' cw_text: str = ''
if post_json_object['object'].get('summary'): if post_json_object['object'].get('summary'):
cw_text = post_json_object['object']['summary'] cw_text = post_json_object['object']['summary']

110
daemon.py
View File

@ -108,7 +108,7 @@ from poison import load_2grams
class PubServer(BaseHTTPRequestHandler): class PubServer(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1' protocol_version: str = 'HTTP/1.1'
def handle_error(self, request, client_address): def handle_error(self, request, client_address):
"""HTTP server error handling """HTTP server error handling
@ -295,11 +295,11 @@ class PubServer(BaseHTTPRequestHandler):
class PubServerUnitTest(PubServer): class PubServerUnitTest(PubServer):
protocol_version = 'HTTP/1.0' protocol_version: str = 'HTTP/1.0'
class EpicyonServer(ThreadingHTTPServer): class EpicyonServer(ThreadingHTTPServer):
starting_daemon = True starting_daemon: bool = True
hide_announces = {} hide_announces = {}
no_of_books = 0 no_of_books = 0
max_api_blocks = 32000 max_api_blocks = 32000
@ -315,7 +315,7 @@ class EpicyonServer(ThreadingHTTPServer):
block_government = {} block_government = {}
block_bluesky = {} block_bluesky = {}
block_nostr = {} block_nostr = {}
followers_synchronization = False followers_synchronization: bool = False
followers_sync_cache = {} followers_sync_cache = {}
buy_sites = None buy_sites = None
min_images_for_accounts = 0 min_images_for_accounts = 0
@ -323,7 +323,7 @@ class EpicyonServer(ThreadingHTTPServer):
css_cache = {} css_cache = {}
reverse_sequence = None reverse_sequence = None
clacks = None clacks = None
public_replies_unlisted = False public_replies_unlisted: bool = False
dogwhistles = {} dogwhistles = {}
preferred_podcast_formats: list[str] = [] preferred_podcast_formats: list[str] = []
bold_reading = {} bold_reading = {}
@ -331,18 +331,18 @@ class EpicyonServer(ThreadingHTTPServer):
hide_recent_posts = {} hide_recent_posts = {}
account_timezone = None account_timezone = None
post_to_nickname = None post_to_nickname = None
nodeinfo_is_active = False nodeinfo_is_active: bool = False
security_txt_is_active = False security_txt_is_active: bool = False
vcard_is_active = False vcard_is_active: bool = False
masto_api_is_active = False masto_api_is_active: bool = False
map_format = None map_format = None
dyslexic_font = False dyslexic_font: bool = False
content_license_url = '' content_license_url: str = ''
dm_license_url = '' dm_license_url: str = ''
fitness = {} fitness = {}
signing_priv_key_pem = None signing_priv_key_pem = None
show_node_info_accounts = False show_node_info_accounts: bool = False
show_node_info_version = False show_node_info_version: bool = False
text_mode_banner = '' text_mode_banner = ''
access_keys = {} access_keys = {}
rss_timeout_sec = 20 rss_timeout_sec = 20
@ -350,68 +350,68 @@ class EpicyonServer(ThreadingHTTPServer):
default_reply_interval_hrs = 9999999 default_reply_interval_hrs = 9999999
recent_dav_etags = {} recent_dav_etags = {}
key_shortcuts = {} key_shortcuts = {}
low_bandwidth = False low_bandwidth: bool = False
user_agents_blocked = None user_agents_blocked = None
crawlers_allowed = None crawlers_allowed = None
known_bots = None known_bots = None
unit_test = False unit_test: bool = False
allow_local_network_access = False allow_local_network_access: bool = False
yt_replace_domain = '' yt_replace_domain: str = ''
twitter_replacement_domain = '' twitter_replacement_domain: str = ''
newswire = {} newswire = {}
max_newswire_posts = 0 max_newswire_posts = 0
verify_all_signatures = False verify_all_signatures: bool = False
blocklistUpdateCtr = 0 blocklistUpdateCtr = 0
blocklistUpdateInterval = 100 blocklistUpdateInterval = 100
domainBlocklist = None domainBlocklist = None
manual_follower_approval = True manual_follower_approval = True
onion_domain = None onion_domain = None
i2p_domain = None i2p_domain = None
media_instance = False media_instance: bool = False
blogs_instance = False blogs_instance: bool = False
translate = {} translate = {}
system_language = 'en' system_language: str = 'en'
city = '' city = ''
voting_time_mins = 30 voting_time_mins = 30
positive_voting = False positive_voting: bool = False
newswire_votes_threshold = 1 newswire_votes_threshold = 1
max_newswire_feed_size_kb = 1 max_newswire_feed_size_kb = 1
max_newswire_posts_per_source = 1 max_newswire_posts_per_source = 1
show_published_date_only = False show_published_date_only: bool = False
max_mirrored_articles = 0 max_mirrored_articles = 0
max_news_posts = 0 max_news_posts = 0
maxTags = 32 maxTags = 32
max_followers = 2000 max_followers = 2000
show_publish_as_icon = False show_publish_as_icon: bool = False
full_width_tl_button_header = False full_width_tl_button_header: bool = False
rss_icon_at_top = True rss_icon_at_top: bool = True
publish_button_at_top = False publish_button_at_top: bool = False
max_feed_item_size_kb = 100 max_feed_item_size_kb = 100
maxCategoriesFeedItemSizeKb = 1024 maxCategoriesFeedItemSizeKb = 1024
dormant_months = 6 dormant_months = 6
max_like_count = 10 max_like_count = 10
followingItemsPerPage = 12 followingItemsPerPage = 12
registration = False registration: bool = False
enable_shared_inbox = True enable_shared_inbox: bool = True
outboxThread = {} outboxThread = {}
outbox_thread_index = {} outbox_thread_index = {}
new_post_thread = {} new_post_thread = {}
project_version = __version__ project_version = __version__
secure_mode = True secure_mode: bool = True
max_post_length = 0 max_post_length = 0
maxMediaSize = 0 maxMediaSize = 0
maxMessageLength = 64000 maxMessageLength = 64000
maxPostsInBox = 32000 maxPostsInBox = 32000
maxCacheAgeDays = 30 maxCacheAgeDays = 30
domain = '' domain: str = ''
port = 43 port = 43
domain_full = '' domain_full: str = ''
http_prefix = 'https' http_prefix: str = 'https'
debug = False debug: bool = False
federation_list: list[str] = [] federation_list: list[str] = []
shared_items_federated_domains: list[str] = [] shared_items_federated_domains: list[str] = []
base_dir = '' base_dir: str = ''
instance_id = '' instance_id: str = ''
person_cache = {} person_cache = {}
cached_webfingers = {} cached_webfingers = {}
favicons_cache = {} favicons_cache = {}
@ -421,22 +421,22 @@ class EpicyonServer(ThreadingHTTPServer):
session_i2p = None session_i2p = None
last_getreq = 0 last_getreq = 0
last_postreq = 0 last_postreq = 0
getreq_busy = False getreq_busy: bool = False
postreq_busy = False postreq_busy: bool = False
received_message = False received_message: bool = False
inbox_queue: list[dict] = [] inbox_queue: list[dict] = []
send_threads = None send_threads = None
post_log = [] post_log = []
max_queue_length = 64 max_queue_length = 64
allow_deletion = True allow_deletion: bool = True
last_login_time = 0 last_login_time = 0
last_login_failure = 0 last_login_failure = 0
login_failure_count = {} login_failure_count = {}
log_login_failures = True log_login_failures: bool = True
max_replies = 10 max_replies = 10
tokens = {} tokens = {}
tokens_lookup = {} tokens_lookup = {}
instance_only_skills_search = True instance_only_skills_search: bool = True
followers_threads = [] followers_threads = []
blocked_cache = [] blocked_cache = []
blocked_cache_last_updated = 0 blocked_cache_last_updated = 0
@ -447,9 +447,9 @@ class EpicyonServer(ThreadingHTTPServer):
last_known_crawler = 0 last_known_crawler = 0
lists_enabled = None lists_enabled = None
cw_lists = {} cw_lists = {}
theme_name = '' theme_name: str = ''
news_instance = False news_instance: bool = False
default_timeline = 'inbox' default_timeline: str = 'inbox'
thrFitness = None thrFitness = None
recent_posts_cache = {} recent_posts_cache = {}
thrCache = None thrCache = None
@ -471,9 +471,9 @@ class EpicyonServer(ThreadingHTTPServer):
thrPostSchedule = None thrPostSchedule = None
thrNewswireDaemon = None thrNewswireDaemon = None
thrFederatedSharesDaemon = None thrFederatedSharesDaemon = None
restart_inbox_queue_in_progress = False restart_inbox_queue_in_progress: bool = False
restart_inbox_queue = False restart_inbox_queue: bool = False
signing_priv_key_pem = '' signing_priv_key_pem: str = ''
thrCheckActor = {} thrCheckActor = {}
thrImportFollowing = None thrImportFollowing = None
thrWatchdog = None thrWatchdog = None
@ -482,8 +482,8 @@ class EpicyonServer(ThreadingHTTPServer):
thrFederatedSharesWatchdog = None thrFederatedSharesWatchdog = None
thrFederatedBlocksDaemon = None thrFederatedBlocksDaemon = None
qrcode_scale = 6 qrcode_scale = 6
instance_description = '' instance_description: str = ''
instance_description_short = 'Epicyon' instance_description_short: str = 'Epicyon'
robots_txt = None robots_txt = None
last_llm_time = None last_llm_time = None
mitm_servers = [] mitm_servers = []
@ -877,7 +877,7 @@ def run_daemon(accounts_data_dir: str,
if not content_license_url: if not content_license_url:
content_license_url = 'https://creativecommons.org/licenses/by-nc/4.0' content_license_url = 'https://creativecommons.org/licenses/by-nc/4.0'
httpd.content_license_url = content_license_url httpd.content_license_url = content_license_url
httpd.dm_license_url = '' httpd.dm_license_url: str = ''
# fitness metrics # fitness metrics
fitness_filename = data_dir(base_dir) + '/fitness.json' fitness_filename = data_dir(base_dir) + '/fitness.json'
@ -1013,7 +1013,7 @@ def run_daemon(accounts_data_dir: str,
# load translations dictionary # load translations dictionary
httpd.translate = {} httpd.translate = {}
httpd.system_language = 'en' httpd.system_language: str = 'en'
if not unit_test: if not unit_test:
httpd.translate, httpd.system_language = \ httpd.translate, httpd.system_language = \
load_translations_from_file(base_dir, language) load_translations_from_file(base_dir, language)
@ -1207,7 +1207,7 @@ def run_daemon(accounts_data_dir: str,
httpd.cw_lists = load_cw_lists(base_dir, True) httpd.cw_lists = load_cw_lists(base_dir, True)
# set the avatar for the news account # set the avatar for the news account
httpd.theme_name = get_config_param(base_dir, 'theme') httpd.theme_name: str = get_config_param(base_dir, 'theme')
if not httpd.theme_name: if not httpd.theme_name:
httpd.theme_name = 'default' httpd.theme_name = 'default'
if is_news_theme_name(base_dir, httpd.theme_name): if is_news_theme_name(base_dir, httpd.theme_name):

View File

@ -356,6 +356,21 @@ def get_language_from_post(post_json_object: {}, system_language: str,
return system_language return system_language
def get_post_attachments(post_json_object: {}) -> []:
""" Returns the list of attachments for a post
"""
post_obj = post_json_object
if has_object_dict(post_json_object):
post_obj = post_json_object['object']
if not post_obj.get('attachment'):
return []
if isinstance(post_obj['attachment'], list):
return post_obj['attachment']
if isinstance(post_obj['attachment'], dict):
return [post_obj['attachment']]
return []
def get_media_descriptions_from_post(post_json_object: {}) -> str: def get_media_descriptions_from_post(post_json_object: {}) -> str:
"""Returns all attached media descriptions as a single text. """Returns all attached media descriptions as a single text.
This is used for filtering This is used for filtering
@ -3754,21 +3769,6 @@ def set_premium_account(base_dir: str, nickname: str, domain: str,
return True return True
def get_post_attachments(post_json_object: {}) -> []:
""" Returns the list of attachments for a post
"""
post_obj = post_json_object
if has_object_dict(post_json_object):
post_obj = post_json_object['object']
if not post_obj.get('attachment'):
return []
if isinstance(post_obj['attachment'], list):
return post_obj['attachment']
if isinstance(post_obj['attachment'], dict):
return [post_obj['attachment']]
return []
def string_ends_with(text: str, possible_endings: []) -> bool: def string_ends_with(text: str, possible_endings: []) -> bool:
""" Does the given text end with at least one of the endings """ Does the given text end with at least one of the endings
""" """