Replace readlines with functions

main
bashrc 2026-04-26 17:49:49 +01:00
parent 3beb8c97f8
commit 4a1ee28e8d
23 changed files with 392 additions and 488 deletions

View File

@ -54,6 +54,7 @@ from conversation import unmute_conversation
from auth import create_basic_auth_header
from session import get_json
from data import load_string
from data import load_list
from data import save_string
from data import append_string
@ -605,19 +606,15 @@ def update_blocked_cache(base_dir: str,
global_blocking_filename = data_dir(base_dir) + '/blocking.txt'
if not os.path.isfile(global_blocking_filename):
return blocked_cache_last_updated
try:
with open(global_blocking_filename, 'r',
encoding='utf-8') as fp_blocked:
blocked_lines = fp_blocked.readlines()
# remove newlines
for index, _ in enumerate(blocked_lines):
blocked_lines[index] = remove_eol(blocked_lines[index])
# update the cache
blocked_cache.clear()
blocked_cache += evil_incarnate() + blocked_lines
except OSError as ex:
print('EX: update_blocked_cache unable to read ' +
global_blocking_filename + ' ' + str(ex))
blocked_lines = load_list(global_blocking_filename,
'EX: update_blocked_cache unable to read ' +
global_blocking_filename + ' [ex]')
# remove newlines
for index, _ in enumerate(blocked_lines):
blocked_lines[index] = remove_eol(blocked_lines[index])
# update the cache
blocked_cache.clear()
blocked_cache += evil_incarnate() + blocked_lines
return curr_time
@ -1599,20 +1596,17 @@ def set_broch_mode(base_dir: str, domain_full: str, enabled: bool) -> None:
following_filename = account_dir + '/' + follow_file_type
if not os.path.isfile(following_filename):
continue
try:
with open(following_filename, 'r',
encoding='utf-8') as fp_foll:
follow_list = fp_foll.readlines()
for handle in follow_list:
if '@' not in handle:
continue
handle = remove_eol(handle)
handle_domain = handle.split('@')[1]
if handle_domain not in allowed_domains:
allowed_domains.append(handle_domain)
except OSError as ex:
print('EX: set_broch_mode failed to read ' +
following_filename + ' ' + str(ex))
follow_list = \
load_list(following_filename,
'EX: set_broch_mode failed to read ' +
following_filename + ' [ex]')
for handle in follow_list:
if '@' not in handle:
continue
handle = remove_eol(handle)
handle_domain = handle.split('@')[1]
if handle_domain not in allowed_domains:
allowed_domains.append(handle_domain)
break
# write the allow file

19
blog.py
View File

@ -48,6 +48,7 @@ from newswire import rss2footer
from cache import get_person_from_cache
from flags import is_image_file
from data import load_string
from data import load_list
def _no_of_blog_replies(base_dir: str, http_prefix: str, translate: {},
@ -82,12 +83,9 @@ def _no_of_blog_replies(base_dir: str, http_prefix: str, translate: {},
removals: list[str] = []
replies = 0
lines: list[str] = []
try:
with open(post_filename, 'r', encoding='utf-8') as fp_post:
lines = fp_post.readlines()
except OSError:
print('EX: failed to read blog ' + post_filename)
lines: list[str] = \
load_list(post_filename,
'EX: failed to read blog ' + post_filename)
for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id)
@ -156,12 +154,9 @@ def _get_blog_replies(base_dir: str, http_prefix: str, translate: {},
return blog_text + '\n'
return ''
lines: list[str] = []
try:
with open(post_filename, 'r', encoding='utf-8') as fp_post:
lines = fp_post.readlines()
except OSError:
print('EX: unable to read blog 4 ' + post_filename)
lines: list[str] = \
load_list(post_filename,
'EX: unable to read blog 4 ' + post_filename)
if lines:
replies_str: str = ''

View File

@ -21,6 +21,7 @@ from utils import lines_in_file
from utils import data_dir
from utils import account_is_indexable
from utils import is_yggdrasil_address
from data import load_list
def _meta_data_instance_v1(show_accounts: bool,
@ -46,24 +47,21 @@ def _meta_data_instance_v1(show_accounts: bool,
rules_list: list[str] = []
rules_filename = data_dir(base_dir) + '/tos.md'
if os.path.isfile(rules_filename):
try:
with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rules_lines = fp_rules.readlines()
rule_ctr = 1
for line in rules_lines:
line = line.strip()
if not line:
continue
if line.startswith('#'):
continue
rules_list.append({
'id': str(rule_ctr),
'text': line
})
rule_ctr += 1
except OSError:
print('EX: _meta_data_instance_v1 unable to read ' +
rules_filename)
rules_lines = load_list(rules_filename,
'EX: _meta_data_instance_v1 unable to read ' +
rules_filename)
rule_ctr = 1
for line in rules_lines:
line = line.strip()
if not line:
continue
if line.startswith('#'):
continue
rules_list.append({
'id': str(rule_ctr),
'text': line
})
rule_ctr += 1
is_bot = False
is_group = False

View File

@ -23,6 +23,7 @@ from formats import get_image_mime_type
from formats import get_image_extensions
from formats import get_audio_extensions
from formats import get_video_extensions
from data import load_list
def _get_masto_api_v2id_from_nickname(nickname: str) -> int:
@ -53,12 +54,9 @@ def _meta_data_instance_v2(show_accounts: bool,
rules_list: list[str] = []
rules_filename = data_dir(base_dir) + '/tos.md'
if os.path.isfile(rules_filename):
rules_lines: list[str] = []
try:
with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rules_lines = fp_rules.readlines()
except OSError:
print('EX: _meta_data_instance_v2 unable to read rules')
rules_lines: list[str] = \
load_list(rules_filename,
'EX: _meta_data_instance_v2 unable to read rules')
rule_ctr = 1
for line in rules_lines:
line = line.strip()

View File

@ -20,6 +20,7 @@ from blocking import is_blocked
from posts import get_user_url
from follow import unfollow_account
from person import get_actor_json
from data import load_list
def _move_following_handles_for_account(base_dir: str,
@ -38,20 +39,18 @@ def _move_following_handles_for_account(base_dir: str,
acct_dir(base_dir, nickname, domain) + '/following.txt'
if not os.path.isfile(following_filename):
return ctr
try:
with open(following_filename, 'r', encoding='utf-8') as fp_foll:
following_handles = fp_foll.readlines()
for follow_handle in following_handles:
follow_handle = follow_handle.strip("\n").strip("\r")
ctr += \
_update_moved_handle(base_dir, nickname, domain,
follow_handle, session,
http_prefix, cached_webfingers,
debug, signing_priv_key_pem,
block_federated, mitm_servers)
except OSError:
print('EX: _move_following_handles_for_account unable to read ' +
following_filename)
following_handles = \
load_list(following_filename,
'EX: _move_following_handles_for_account unable to read ' +
following_filename)
for follow_handle in following_handles:
follow_handle = follow_handle.strip("\n").strip("\r")
ctr += \
_update_moved_handle(base_dir, nickname, domain,
follow_handle, session,
http_prefix, cached_webfingers,
debug, signing_priv_key_pem,
block_federated, mitm_servers)
return ctr
@ -152,13 +151,10 @@ def _update_moved_handle(base_dir: str, nickname: str, domain: str,
following_filename = \
acct_dir(base_dir, nickname, domain) + '/following.txt'
if os.path.isfile(following_filename):
following_handles: list[str] = []
try:
with open(following_filename, 'r', encoding='utf-8') as fp_foll1:
following_handles = fp_foll1.readlines()
except OSError:
print('EX: _update_moved_handle unable to read ' +
following_filename)
following_handles: list[str] = \
load_list(following_filename,
'EX: _update_moved_handle unable to read ' +
following_filename)
moved_to_handle = moved_to_nickname + '@' + moved_to_domain_full
handle_lower = handle.lower()
@ -205,13 +201,10 @@ def _update_moved_handle(base_dir: str, nickname: str, domain: str,
followers_filename = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if os.path.isfile(followers_filename):
follower_handles: list[str] = []
try:
with open(followers_filename, 'r', encoding='utf-8') as fp_foll3:
follower_handles = fp_foll3.readlines()
except OSError:
print('EX: _update_moved_handle unable to read ' +
followers_filename)
follower_handles: list[str] = \
load_list(followers_filename,
'EX: _update_moved_handle unable to read ' +
followers_filename)
handle_lower = handle.lower()

View File

@ -42,6 +42,7 @@ from threads import begin_thread
from threads import thread_with_trace
from webapp_hashtagswarm import store_hash_tags
from cache import clear_from_post_caches
from data import load_list
def _update_feeds_outbox_index(base_dir: str, domain: str,
@ -396,13 +397,10 @@ def _newswire_hashtag_processing(base_dir: str, post_json_object: {},
rules_filename = data_dir(base_dir) + '/hashtagrules.txt'
if not os.path.isfile(rules_filename):
return True
rules: list[str] = []
try:
with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rules = fp_rules.readlines()
except OSError:
print('EX: _newswire_hashtag_processing unable to read ' +
rules_filename)
rules: list[str] = \
load_list(rules_filename,
'EX: _newswire_hashtag_processing unable to read ' +
rules_filename)
domain_full = get_full_domain(domain, port)

View File

@ -54,6 +54,7 @@ from blocking import is_blocked_hashtag
from filters import is_filtered
from session import download_image_any_mime_type
from content import remove_script
from data import load_list
def _remove_cdata(text: str) -> str:
@ -1826,13 +1827,10 @@ def get_dict_from_newswire(session, base_dir: str, domain: str,
max_posts_per_source = 5
# add rss feeds
rss_feed: list[str] = []
try:
with open(subscriptions_filename, 'r', encoding='utf-8') as fp_sub:
rss_feed = fp_sub.readlines()
except OSError:
print('EX: get_dict_from_newswire unable to read ' +
subscriptions_filename)
rss_feed: list[str] = \
load_list(subscriptions_filename,
'EX: get_dict_from_newswire unable to read ' +
subscriptions_filename)
result = {}
for url in rss_feed:
url = url.strip()

View File

@ -91,6 +91,7 @@ from cache import store_person_in_cache
from cache import remove_person_from_cache
from filters import is_filtered_bio
from follow import is_following_actor
from data import load_list
def generate_rsa_key() -> (str, str):
@ -1332,12 +1333,10 @@ def reenable_account(base_dir: str, nickname: str, domain: str) -> None:
"""
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
lines: list[str] = []
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_sus:
lines = fp_sus.readlines()
except OSError:
print('EX: reenable_account unable to read ' + suspended_filename)
lines: list[str] = \
load_list(suspended_filename,
'EX: reenable_account unable to read ' +
suspended_filename)
try:
with open(suspended_filename, 'w+', encoding='utf-8') as fp_sus:
for suspended in lines:
@ -1388,11 +1387,10 @@ def suspend_account(base_dir: str, nickname: str, domain: str) -> None:
# Don't suspend moderators
moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file):
try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: suspend_account unable too read ' + moderators_file)
lines: list[str] = \
load_list(moderators_file,
'EX: suspend_account unable too read ' +
moderators_file)
for moderator in lines:
if moderator.strip('\n').strip('\r') == nickname:
return
@ -1413,11 +1411,10 @@ def suspend_account(base_dir: str, nickname: str, domain: str) -> None:
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_sus:
lines = fp_sus.readlines()
except OSError:
print('EX: suspend_account unable to read 2 ' + suspended_filename)
lines: list[str] = \
load_list(suspended_filename,
'EX: suspend_account unable to read 2 ' +
suspended_filename)
for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname:
return
@ -1454,12 +1451,10 @@ def can_remove_post(base_dir: str,
# is the post by a moderator?
moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file):
lines: list[str] = []
try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: can_remove_post unable to read ' + moderators_file)
lines: list[str] = \
load_list(moderators_file,
'EX: can_remove_post unable to read ' +
moderators_file)
for moderator in lines:
if domain_full + '/users/' + \
moderator.strip('\n') + '/' in post_id:
@ -1490,13 +1485,10 @@ def _remove_tags_for_nickname(base_dir: str, nickname: str,
continue
if not text_in_file(match_str, tag_filename):
continue
lines: list[str] = []
try:
with open(tag_filename, 'r', encoding='utf-8') as fp_tag:
lines = fp_tag.readlines()
except OSError:
print('EX: _remove_tags_for_nickname unable to read ' +
tag_filename)
lines: list[str] = \
load_list(tag_filename,
'EX: _remove_tags_for_nickname unable to read ' +
tag_filename)
try:
with open(tag_filename, 'w+', encoding='utf-8') as fp_tag:
for tagline in lines:
@ -1546,12 +1538,9 @@ def remove_account(base_dir: str, nickname: str,
# Don't remove moderators
moderators_file = data_dir(base_dir) + '/moderators.txt'
if os.path.isfile(moderators_file):
lines: list[str] = []
try:
with open(moderators_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: remove_account unable to read ' + moderators_file)
lines: list[str] = \
load_list(moderators_file,
'EX: remove_account unable to read ' + moderators_file)
for moderator in lines:
if moderator.strip('\n') == nickname:
return False

View File

@ -144,6 +144,7 @@ from pyjsonld import JsonLdError
from conversation import conversation_tag_to_convthread_id
from conversation import post_id_to_convthread_id
from quote import quote_toots_allowed
from data import load_list
def convert_post_content_to_html(message_json: {}) -> None:
@ -2443,13 +2444,10 @@ def _append_citations_to_blog_post(base_dir: str,
if not os.path.isfile(citations_filename):
return
citations_separator = '#####'
citations: list[str] = []
try:
with open(citations_filename, 'r', encoding='utf-8') as fp_cit:
citations = fp_cit.readlines()
except OSError:
print('EX: _append_citations_to_blog_post unable to read ' +
citations_filename)
citations: list[str] = \
load_list(citations_filename,
'EX: _append_citations_to_blog_post unable to read ' +
citations_filename)
for line in citations:
if citations_separator not in line:
continue
@ -4565,14 +4563,10 @@ def create_moderation(base_dir: str, nickname: str, domain: str, port: int,
if is_moderator(base_dir, nickname):
moderation_index_file = data_dir(base_dir) + '/moderation.txt'
if os.path.isfile(moderation_index_file):
lines: list[str] = []
try:
with open(moderation_index_file, 'r',
encoding='utf-8') as fp_index:
lines = fp_index.readlines()
except OSError:
print('EX: create_moderation unable to read ' +
moderation_index_file)
lines: list[str] = \
load_list(moderation_index_file,
'EX: create_moderation unable to read ' +
moderation_index_file)
box_header['totalItems'] = len(lines)
if header_only:
return box_header

View File

@ -16,6 +16,7 @@ from utils import text_in_file
from utils import dangerous_markup
from utils import get_reply_to
from utils import get_actor_from_post
from data import load_list
def is_vote(base_dir: str, nickname: str, domain: str,
@ -145,14 +146,10 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
print('EX: unable to append to voters file ' + voters_filename)
else:
# change an entry in the voters file
lines: list[str] = []
try:
with open(voters_filename, 'r',
encoding='utf-8') as fp_voters:
lines = fp_voters.readlines()
except OSError:
print('EX: question_update_votes unable to read ' +
voters_filename)
lines: list[str] = \
load_list(voters_filename,
'EX: question_update_votes unable to read ' +
voters_filename)
newlines: list[str] = []
save_voters_file = False
@ -185,13 +182,10 @@ def question_update_votes(base_dir: str, nickname: str, domain: str,
if not possible_answer.get('name'):
continue
total_items = 0
lines: list[str] = []
try:
with open(voters_filename, 'r', encoding='utf-8') as fp_voters:
lines = fp_voters.readlines()
except OSError:
print('EX: question_update_votes unable to read ' +
voters_filename)
lines: list[str] = \
load_list(voters_filename,
'EX: question_update_votes unable to read ' +
voters_filename)
for vote_line in lines:
if vote_line.endswith(voters_file_separator +
possible_answer['name'] + '\n'):

View File

@ -38,6 +38,7 @@ from session import post_json
from webfinger import webfinger_handle
from auth import create_basic_auth_header
from posts import get_person_box
from data import load_list
# the maximum number of reactions from individual actors which can be
# added to a post. Hence an adversary can't bombard you with sockpuppet
@ -484,13 +485,10 @@ def _update_common_reactions(base_dir: str, emoji_content: str) -> None:
common_reactions_filename = data_dir(base_dir) + '/common_reactions.txt'
common_reactions = None
if os.path.isfile(common_reactions_filename):
try:
with open(common_reactions_filename, 'r',
encoding='utf-8') as fp_react:
common_reactions = fp_react.readlines()
except OSError:
print('EX: unable to load common reactions file' +
common_reactions_filename)
common_reactions: list[str] = \
load_list(common_reactions_filename,
'EX: unable to load common reactions file' +
common_reactions_filename)
if common_reactions:
new_common_reactions: list[str] = []
reaction_found = False

View File

@ -16,6 +16,7 @@ from utils import acct_dir
from utils import text_in_file
from utils import get_config_param
from status import get_status_number
from data import load_list
def _clear_role_status(base_dir: str, role: str) -> None:
@ -54,12 +55,9 @@ def _add_role(base_dir: str, nickname: str, domain: str,
if os.path.isfile(role_file):
# is this nickname already in the file?
lines: list[str] = []
try:
with open(role_file, 'r', encoding='utf-8') as fp_role:
lines = fp_role.readlines()
except OSError:
print('EX: _add_role, failed to read roles file ' + role_file)
lines: list[str] = \
load_list(role_file,
'EX: _add_role, failed to read roles file ' + role_file)
for role_nickname in lines:
role_nickname = role_nickname.strip('\n').strip('\r')
@ -97,11 +95,9 @@ def _remove_role(base_dir: str, nickname: str, role_filename: str) -> None:
if not os.path.isfile(role_file):
return
try:
with open(role_file, 'r', encoding='utf-8') as fp_role:
lines = fp_role.readlines()
except OSError:
print('EX: _remove_role, failed to read roles file ' + role_file)
lines: list[str] = \
load_list(role_file,
'EX: _remove_role, failed to read roles file ' + role_file)
try:
with open(role_file, 'w+', encoding='utf-8') as fp_role:
@ -283,12 +279,9 @@ def is_devops(base_dir: str, nickname: str) -> bool:
return True
return False
lines: list[str] = []
try:
with open(devops_file, 'r', encoding='utf-8') as fp_mod:
lines = fp_mod.readlines()
except OSError:
print('EX: is_devops unable to read ' + devops_file)
lines: list[str] = \
load_list(devops_file,
'EX: is_devops unable to read ' + devops_file)
if not lines:
# if there is nothing in the file
admin_name = get_config_param(base_dir, 'admin')

View File

@ -32,6 +32,7 @@ from utils import local_actor_url
from utils import get_actor_from_post
from content import html_replace_quote_marks
from content import html_replace_inline_quotes
from data import load_list
SPEAKER_REMOVE_CHARS = ('.\n', '. ', ',', ';', '?', '!')
@ -151,13 +152,10 @@ def _speaker_pronounce(base_dir: str, say_text: str, translate: {}) -> str:
")": ","
}
if os.path.isfile(pronounce_filename):
pronounce_list: list[str] = []
try:
with open(pronounce_filename, 'r', encoding='utf-8') as fp_pro:
pronounce_list = fp_pro.readlines()
except OSError:
print('EX: _speaker_pronounce unable to read ' +
pronounce_filename)
pronounce_list: list[str] = \
load_list(pronounce_filename,
'EX: _speaker_pronounce unable to read ' +
pronounce_filename)
if pronounce_list:
for conversion in pronounce_list:
separator = None
@ -542,14 +540,10 @@ def _post_to_speaker_json(base_dir: str, http_prefix: str,
accounts_dir = acct_dir(base_dir, nickname, domain_full)
approve_follows_filename = accounts_dir + '/followrequests.txt'
if os.path.isfile(approve_follows_filename):
follows: list[str] = []
try:
with open(approve_follows_filename, 'r',
encoding='utf-8') as fp_foll:
follows = fp_foll.readlines()
except OSError:
print('EX: _post_to_speaker_json unable to read ' +
approve_follows_filename)
follows: list[str] = \
load_list(approve_follows_filename,
'EX: _post_to_speaker_json unable to read ' +
approve_follows_filename)
if follows:
follow_requests_exist = True
for i, _ in enumerate(follows):

294
tests.py
View File

@ -244,6 +244,8 @@ from blocking import is_blocked_domain
from filters import filtered_match
from gemini import blog_to_gemini
from blog import html_blog_post_gemini_links
from data import load_list
from data import load_string
TEST_SERVER_GROUP_RUNNING = False
@ -2123,9 +2125,11 @@ def test_shared_items_federation(base_dir: str) -> None:
assert valid_inbox(bob_dir, 'bob', bob_domain)
assert valid_inbox_filenames(bob_dir, 'bob', bob_domain,
alice_domain, alice_port)
assert text_in_file('alice@' + alice_domain,
bob_dir_str + '/bob@' +
bob_domain + '/followers.txt')
filename = bob_dir_str + '/bob@' + bob_domain + '/followers.txt'
if not text_in_file('alice@' + alice_domain, filename):
text = load_string(filename, '')
print('alice@' + alice_domain + ' not in\n' + str(text))
assert text_in_file('alice@' + alice_domain, filename)
assert text_in_file('bob@' + bob_domain,
alice_dir_str + '/alice@' +
alice_domain + '/following.txt')
@ -5762,9 +5766,8 @@ def _test_thread_functions():
modules[mod_name]['source'] = source_str
if 'thread_with_trace(' in source_str:
threads_called_in_modules.append(mod_name)
with open(source_file, 'r', encoding='utf-8') as fp_src:
lines = fp_src.readlines()
modules[mod_name]['lines'] = lines
lines: list[str] = load_list(source_file, '')
modules[mod_name]['lines'] = lines
for mod_name in threads_called_in_modules:
thread_sections = \
@ -5919,152 +5922,149 @@ def _test_functions():
'functions': []
}
# load the module source
source_str = ''
with open(source_file, 'r', encoding='utf-8') as fp_src:
source_str = fp_src.read()
modules[mod_name]['source'] = source_str
source_str: str = load_string(source_file, '')
modules[mod_name]['source'] = source_str
# go through the source line by line
with open(source_file, 'r', encoding='utf-8') as fp_src:
lines = fp_src.readlines()
modules[mod_name]['lines'] = lines
line_count = 0
prev_line = 'start'
method_name = ''
method_args: list[str] = []
module_line = 0
curr_return_types = ''
is_comment = False
for line in lines:
if '"""' in line:
is_comment = not is_comment
module_line += 1
# what group is this module in?
if '__module_group__' in line:
if '=' in line:
group_name = line.split('=')[1].strip()
group_name = group_name.replace('"', '')
group_name = group_name.replace("'", '')
modules[mod_name]['group'] = group_name
if not mod_groups.get(group_name):
mod_groups[group_name] = [mod_name]
else:
if mod_name not in mod_groups[group_name]:
mod_groups[group_name].append(mod_name)
# reading function lines
if not line.strip().startswith('def '):
if 'self.server.' in line:
assert _check_self_variables(mod_name,
method_name,
method_args, line,
module_line)
if line_count > 0:
line_count += 1
# add LOC count for this function
if len(prev_line.strip()) == 0 and \
len(line.strip()) == 0 and \
line_count > 2:
line_count -= 2
if line_count > 80:
loc_str = str(line_count) + ';' + method_name
if line_count < 1000:
loc_str = '0' + loc_str
if line_count < 100:
loc_str = '0' + loc_str
if line_count < 10:
loc_str = '0' + loc_str
if loc_str not in method_loc:
method_loc.append(loc_str)
line_count = 0
lines: list[str] = load_list(source_file, '')
modules[mod_name]['lines'] = lines
line_count = 0
prev_line = 'start'
method_name = ''
method_args: list[str] = []
module_line = 0
curr_return_types = ''
is_comment = False
for line in lines:
if '"""' in line:
is_comment = not is_comment
module_line += 1
# what group is this module in?
if '__module_group__' in line:
if '=' in line:
group_name = line.split('=')[1].strip()
group_name = group_name.replace('"', '')
group_name = group_name.replace("'", '')
modules[mod_name]['group'] = group_name
if not mod_groups.get(group_name):
mod_groups[group_name] = [mod_name]
else:
if mod_name not in mod_groups[group_name]:
mod_groups[group_name].append(mod_name)
# reading function lines
if not line.strip().startswith('def '):
if 'self.server.' in line:
assert _check_self_variables(mod_name,
method_name,
method_args, line,
module_line)
if line_count > 0:
line_count += 1
# add LOC count for this function
if len(prev_line.strip()) == 0 and \
len(line.strip()) == 0 and \
line_count > 2:
line_count -= 2
if line_count > 80:
loc_str = str(line_count) + ';' + method_name
if line_count < 1000:
loc_str = '0' + loc_str
if line_count < 100:
loc_str = '0' + loc_str
if line_count < 10:
loc_str = '0' + loc_str
if loc_str not in method_loc:
method_loc.append(loc_str)
line_count = 0
is_return_statement = False
if ' return' in line:
before_return = line.split(' return')[0].strip()
if not before_return:
is_return_statement = True
is_return_statement = False
if ' return' in line:
before_return = line.split(' return')[0].strip()
if not before_return:
is_return_statement = True
if curr_return_types and is_return_statement and \
not is_comment and '#' not in line and \
'"""' not in line:
# check return statements are of the expected type
if line.endswith(' return\n'):
if curr_return_types != 'None':
print(method_name + ' in module ' +
mod_name + ' has unexpected return')
print('Expected: return ' +
str(curr_return_types))
print('Actual: ' + line.strip())
assert False
elif (' return' in line and
not line.endswith(',\n') and
not line.endswith('\\\n') and
',' in curr_return_types):
# check the number of return values
ret_types = line.split(' return', 1)[1]
no_of_args1 = \
len(curr_return_types.split(','))
no_of_args2 = \
len(ret_types.split(','))
if no_of_args1 != no_of_args2:
print(method_name + ' in module ' +
mod_name +
' has unexpected ' +
'number of arguments')
print('Expected: return ' +
str(curr_return_types))
print('Actual: ' + line.strip())
assert False
if curr_return_types and is_return_statement and \
not is_comment and '#' not in line and \
'"""' not in line:
# check return statements are of the expected type
if line.endswith(' return\n'):
if curr_return_types != 'None':
print(method_name + ' in module ' +
mod_name + ' has unexpected return')
print('Expected: return ' +
str(curr_return_types))
print('Actual: ' + line.strip())
assert False
elif (' return' in line and
not line.endswith(',\n') and
not line.endswith('\\\n') and
',' in curr_return_types):
# check the number of return values
ret_types = line.split(' return', 1)[1]
no_of_args1 = \
len(curr_return_types.split(','))
no_of_args2 = \
len(ret_types.split(','))
if no_of_args1 != no_of_args2:
print(method_name + ' in module ' +
mod_name +
' has unexpected ' +
'number of arguments')
print('Expected: return ' +
str(curr_return_types))
print('Actual: ' + line.strip())
assert False
prev_line = line
continue
# reading function def
prev_line = line
line_count = 1
method_name = line.split('def ', 1)[1].split('(')[0]
# get list of arguments with spaces removed
method_args = \
source_str.split('def ' + method_name + '(')[1]
return_types = method_args.split(')', 1)[1]
if ':' in return_types:
return_types = return_types.split(':')[0]
if '->' in return_types:
return_types = return_types.split('->')[1].strip()
if return_types.startswith('(') and \
not return_types.endswith(')'):
return_types += ')'
else:
return_types: list[str] = []
curr_return_types = return_types
method_args = method_args.split(')', 1)[0]
method_args = method_args.replace(' ', '').split(',')
if function.get(mod_name):
function[mod_name].append(method_name)
else:
function[mod_name] = [method_name]
if method_name not in modules[mod_name]['functions']:
modules[mod_name]['functions'].append(method_name)
if not _check_method_args(mod_name, method_name,
method_args):
assert False
# create an entry for this function
function_properties[method_name] = {
"args": method_args,
"module": mod_name,
"calledInModule": [],
"returns": return_types
}
# LOC count for the last function
if line_count > 2:
line_count -= 2
if line_count > 80:
loc_str = str(line_count) + ';' + method_name
if line_count < 1000:
loc_str = '0' + loc_str
if line_count < 100:
loc_str = '0' + loc_str
if line_count < 10:
loc_str = '0' + loc_str
if loc_str not in method_loc:
method_loc.append(loc_str)
continue
# reading function def
prev_line = line
line_count = 1
method_name = line.split('def ', 1)[1].split('(')[0]
# get list of arguments with spaces removed
method_args = \
source_str.split('def ' + method_name + '(')[1]
return_types = method_args.split(')', 1)[1]
if ':' in return_types:
return_types = return_types.split(':')[0]
if '->' in return_types:
return_types = return_types.split('->')[1].strip()
if return_types.startswith('(') and \
not return_types.endswith(')'):
return_types += ')'
else:
return_types: list[str] = []
curr_return_types = return_types
method_args = method_args.split(')', 1)[0]
method_args = method_args.replace(' ', '').split(',')
if function.get(mod_name):
function[mod_name].append(method_name)
else:
function[mod_name] = [method_name]
if method_name not in modules[mod_name]['functions']:
modules[mod_name]['functions'].append(method_name)
if not _check_method_args(mod_name, method_name,
method_args):
assert False
# create an entry for this function
function_properties[method_name] = {
"args": method_args,
"module": mod_name,
"calledInModule": [],
"returns": return_types
}
# LOC count for the last function
if line_count > 2:
line_count -= 2
if line_count > 80:
loc_str = str(line_count) + ';' + method_name
if line_count < 1000:
loc_str = '0' + loc_str
if line_count < 100:
loc_str = '0' + loc_str
if line_count < 10:
loc_str = '0' + loc_str
if loc_str not in method_loc:
method_loc.append(loc_str)
break
print('LOC counts:')

View File

@ -22,6 +22,7 @@ from cryptography.hazmat.primitives import hashes
from followingCalendar import add_person_to_calendar
from unicodetext import standardize_text
from formats import get_image_extensions
from data import load_list
VALID_HASHTAG_CHARS = \
set('_0123456789' +
@ -797,12 +798,9 @@ def get_followers_list(base_dir: str,
if not os.path.isfile(filename):
return []
lines: list[str] = []
try:
with open(filename, 'r', encoding='utf-8') as fp_foll:
lines = fp_foll.readlines()
except OSError:
print('EX: get_followers_list unable to read ' + filename)
lines: list[str] = \
load_list(filename,
'EX: get_followers_list unable to read ' + filename)
if lines:
for i, _ in enumerate(lines):
@ -1640,16 +1638,13 @@ def follow_person(base_dir: str, nickname: str, domain: str,
if text_in_file(handle_to_follow, unfollowed_filename):
# remove them from the unfollowed file
new_lines = ''
try:
with open(unfollowed_filename, 'r',
encoding='utf-8') as fp_unfoll:
lines = fp_unfoll.readlines()
for line in lines:
if handle_to_follow not in line:
new_lines += line
except OSError:
print('EX: follow_person unable to read ' +
unfollowed_filename)
lines: list[str] = \
load_list(unfollowed_filename,
'EX: follow_person unable to read ' +
unfollowed_filename)
for line in lines:
if handle_to_follow not in line:
new_lines += line
try:
with open(unfollowed_filename, 'w+',
encoding='utf-8') as fp_unfoll:
@ -1897,13 +1892,10 @@ def remove_post_from_index(post_url: str, debug: bool,
post_id = remove_id_ending(post_url)
if not text_in_file(post_id, index_file):
return
lines: list[str] = []
try:
with open(index_file, 'r', encoding='utf-8') as fp_mod1:
lines = fp_mod1.readlines()
except OSError as exc:
print('EX: remove_post_from_index unable to read ' +
index_file + ' ' + str(exc))
lines: list[str] = \
load_list(index_file,
'EX: remove_post_from_index unable to read ' +
index_file + ' [ex]')
if not lines:
return
@ -2072,13 +2064,10 @@ def _remove_post_id_from_tag_index(tag_index_filename: str,
post_id: str) -> None:
"""Remove post_id from the tag index file
"""
lines = None
try:
with open(tag_index_filename, 'r', encoding='utf-8') as fp_index:
lines = fp_index.readlines()
except OSError:
print('EX: _remove_post_id_from_tag_index unable to read ' +
tag_index_filename)
lines: list[str] = \
load_list(tag_index_filename,
'EX: _remove_post_id_from_tag_index unable to read ' +
tag_index_filename)
if not lines:
return
newlines = ''

View File

@ -25,6 +25,7 @@ from webapp_utils import html_footer
from webapp_utils import get_banner_file
from webapp_utils import edit_text_field
from shares import share_category_icon
from data import load_list
def _links_exist(base_dir: str) -> bool:
@ -222,14 +223,12 @@ def get_left_column_content(base_dir: str, nickname: str, domain_full: str,
links_filename = data_dir(base_dir) + '/links.txt'
links_file_contains_entries = False
links_list = None
links_list: list[str] = None
if os.path.isfile(links_filename):
try:
with open(links_filename, 'r', encoding='utf-8') as fp_links:
links_list = fp_links.readlines()
except OSError:
print('EX: get_left_column_content unable to read ' +
links_filename)
links_list = \
load_list(links_filename,
'EX: get_left_column_content unable to read ' +
links_filename)
if not front_page:
# show a number of shares

View File

@ -35,6 +35,7 @@ from webapp_utils import html_post_separator
from webapp_utils import header_buttons_front_screen
from webapp_utils import edit_text_field
from textmode import text_mode_browser
from data import load_list
def _votes_indicator(total_votes: int, positive_voting: bool) -> str:
@ -383,13 +384,10 @@ def html_citations(base_dir: str, nickname: str, domain: str,
citations_selected: list[str] = []
if os.path.isfile(citations_filename):
citations_separator = '#####'
citations: list[str] = []
try:
with open(citations_filename, 'r', encoding='utf-8') as fp_cit:
citations = fp_cit.readlines()
except OSError as exc:
print('EX: html_citations unable to read ' +
citations_filename + ' ' + str(exc))
citations: list[str] = \
load_list(citations_filename,
'EX: html_citations unable to read ' +
citations_filename + ' [ex]')
for line in citations:
if citations_separator not in line:
continue

View File

@ -53,6 +53,7 @@ from maps import get_location_from_post
from cache import get_person_from_cache
from person import get_person_notes
from textmode import text_mode_browser
from data import load_list
def _html_new_post_drop_down(scope_icon: str, scope_description: str,
@ -937,14 +938,10 @@ def html_new_post(edit_post_params: {},
translate['Citations'] + ':</label></p>\n'
citations_str += ' <ul>\n'
citations_separator = '#####'
citations: list[str] = []
try:
with open(citations_filename, 'r',
encoding='utf-8') as fp_cit:
citations = fp_cit.readlines()
except OSError as exc:
print('EX: html_new_post unable to read ' +
citations_filename + ' ' + str(exc))
citations: list[str] = \
load_list(citations_filename,
'EX: html_new_post unable to read ' +
citations_filename + ' [ex]')
for line in citations:
if citations_separator not in line:
continue

View File

@ -35,6 +35,7 @@ from blocking import get_global_block_reason
from blocking import is_blocked_domain
from blocking import is_blocked
from session import create_session
from data import load_list
def html_moderation(default_timeline: str,
@ -473,42 +474,40 @@ def html_moderation_info(translate: {}, base_dir: str,
blocking_reasons_exist = False
if os.path.isfile(blocking_reasons_filename):
blocking_reasons_exist = True
try:
with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
blocked_lines = fp_block.readlines()
blocked_str = ''
if blocked_lines:
blocked_lines.sort()
for line in blocked_lines:
if not line:
continue
line = remove_eol(line).strip()
if blocking_reasons_exist:
blocking_reasons_file = blocking_reasons_filename
reason = \
get_global_block_reason(line,
blocking_reasons_file)
if reason:
blocked_str += \
line + ' - ' + reason + '\n'
continue
blocked_str += line + '\n'
info_form += '<div class="container">\n'
info_form += \
' <br><b>' + \
translate['Blocked accounts and hashtags'] + '</b>'
info_form += \
' <br>' + \
translate[msg_str1]
info_form += \
' <textarea id="message" ' + \
'name="blocked" style="height:2000px" ' + \
'spellcheck="false">' + blocked_str + '</textarea>\n'
info_form += '</div>\n'
info_shown = True
except OSError as exc:
print('EX: html_moderation_info unable to read 2 ' +
blocking_filename + ' ' + str(exc))
blocked_lines: list[str] = \
load_list(blocking_filename,
'EX: html_moderation_info unable to read 2 ' +
blocking_filename + ' [ex]')
blocked_str = ''
if blocked_lines:
blocked_lines.sort()
for line in blocked_lines:
if not line:
continue
line = remove_eol(line).strip()
if blocking_reasons_exist:
blocking_reasons_file = blocking_reasons_filename
reason = \
get_global_block_reason(line,
blocking_reasons_file)
if reason:
blocked_str += \
line + ' - ' + reason + '\n'
continue
blocked_str += line + '\n'
info_form += '<div class="container">\n'
info_form += \
' <br><b>' + \
translate['Blocked accounts and hashtags'] + '</b>'
info_form += \
' <br>' + \
translate[msg_str1]
info_form += \
' <textarea id="message" ' + \
'name="blocked" style="height:2000px" ' + \
'spellcheck="false">' + blocked_str + '</textarea>\n'
info_form += '</div>\n'
info_shown = True
filters_filename = dir_str + '/filters.txt'
if os.path.isfile(filters_filename):

View File

@ -145,6 +145,7 @@ from session import get_json_valid
from session import get_json
from blog import html_blog_post_markdown
from blog import html_blog_post_gemini_links
from data import load_list
# maximum length for display name within html posts
MAX_DISPLAY_NAME_LENGTH = 42
@ -2111,14 +2112,10 @@ def _substitute_onion_domains(base_dir: str, content: str) -> str:
onion_domains_filename = data_dir(base_dir) + '/onion_domains.txt'
if os.path.isfile(onion_domains_filename):
onion_domains_list: list[str] = []
try:
with open(onion_domains_filename, 'r',
encoding='utf-8') as fp_onions:
onion_domains_list = fp_onions.readlines()
except OSError:
print('EX: unable to load onion domains file ' +
onion_domains_filename)
onion_domains_list: list[str] = \
load_list(onion_domains_filename,
'EX: unable to load onion domains file ' +
onion_domains_filename)
if onion_domains_list:
onion_domains = {}
separators = (' ', ',', '->')

View File

@ -136,6 +136,7 @@ from shares import actor_attached_shares_as_html
from git import get_repo_url
from reading import html_profile_book_list
from availability import get_availability
from data import load_list
THEME_FORMATS = '.zip, .gz'
BLOCKFILE_FORMATS = '.csv'
@ -2762,14 +2763,11 @@ def _html_edit_profile_filtering(base_dir: str, nickname: str, domain: str,
locations_filename = base_dir + '/custom_locations.txt'
if not os.path.isfile(locations_filename):
locations_filename = base_dir + '/locations.txt'
cities: list[str] = []
try:
with open(locations_filename, 'r', encoding='utf-8') as fp_loc:
cities = fp_loc.readlines()
cities.sort()
except OSError:
print('EX: _html_edit_profile_filtering unable to read ' +
locations_filename)
cities: list[str] = \
load_list(locations_filename,
'EX: _html_edit_profile_filtering unable to read ' +
locations_filename)
cities.sort()
edit_profile_form += ' <select id="cityDropdown" ' + \
'name="cityDropdown" class="theme">\n'
city = city.lower()

View File

@ -57,6 +57,7 @@ from webapp_hashtagswarm import html_hash_tag_swarm
from maps import html_hashtag_maps
from session import get_json_valid
from session import get_json
from data import load_list
def html_search_emoji(translate: {}, base_dir: str, search_str: str,
@ -928,12 +929,10 @@ def html_hashtag_search(nickname: str, domain: str, port: int,
nickname = None
# read the index
lines: list[str] = []
try:
with open(hashtag_index_file, 'r', encoding='utf-8') as fp_hash:
lines = fp_hash.readlines()
except OSError:
print('EX: html_hashtag_search unable to read ' + hashtag_index_file)
lines: list[str] = \
load_list(hashtag_index_file,
'EX: html_hashtag_search unable to read ' +
hashtag_index_file)
# read the css
css_filename = base_dir + '/epicyon-profile.css'
@ -1383,12 +1382,10 @@ def hashtag_search_rss(nickname: str, domain: str, port: int,
nickname = None
# read the index
lines: list[str] = []
try:
with open(hashtag_index_file, 'r', encoding='utf-8') as fp_hash:
lines = fp_hash.readlines()
except OSError:
print('EX: hashtag_search_rss unable to read ' + hashtag_index_file)
lines: list[str] = \
load_list(hashtag_index_file,
'EX: hashtag_search_rss unable to read ' +
hashtag_index_file)
if not lines:
return None
@ -1396,8 +1393,8 @@ def hashtag_search_rss(nickname: str, domain: str, port: int,
max_feed_length = 10
hashtag_feed = rss2tag_header(hashtag, http_prefix, domain_full)
for index, _ in enumerate(lines):
post_id = lines[index].strip('\n').strip('\r')
for index, item in enumerate(lines):
post_id = item.strip('\n').strip('\r')
if ' ' not in post_id:
nickname = get_nickname_from_actor(post_id)
if not nickname:
@ -1497,13 +1494,10 @@ def hashtag_search_json(nickname: str, domain: str, port: int,
nickname = None
# read the index
lines: list[str] = []
try:
with open(hashtag_index_file, 'r', encoding='utf-8') as fp_hash:
lines = fp_hash.readlines()
except OSError:
print('EX: hashtag_search_json unable to read ' +
hashtag_index_file)
lines: list[str] = \
load_list(hashtag_index_file,
'EX: hashtag_search_json unable to read ' +
hashtag_index_file)
if not lines:
return None

View File

@ -59,6 +59,7 @@ from blocking import is_blocked
from blocking import allowed_announce
from shares import vf_proposal_from_share
from webapp_pwa import get_pwa_theme_colors
from data import load_list
def minimizing_attached_images(base_dir: str, nickname: str, domain: str,
@ -2215,13 +2216,9 @@ def html_common_emoji(base_dir: str, no_of_emoji: int) -> str:
common_emoji_filename = data_dir(base_dir) + '/common_emoji.txt'
if not os.path.isfile(common_emoji_filename):
return ''
common_emoji = None
try:
with open(common_emoji_filename, 'r', encoding='utf-8') as fp_emoji:
common_emoji = fp_emoji.readlines()
except OSError:
print('EX: html_common_emoji unable to load file')
return ''
common_emoji: list[str] = \
load_list(common_emoji_filename,
'EX: html_common_emoji unable to load file')
if not common_emoji:
return ''
line_ctr = 0