Less indentation

main
Bob Mottram 2024-05-10 10:44:26 +01:00
parent 53eeb041fd
commit 803fed70b0
1 changed files with 480 additions and 472 deletions

View File

@ -118,495 +118,503 @@ def receive_search_query(self, calling_domain: str, cookie: str,
default_timeline, cookie, calling_domain, 303) default_timeline, cookie, calling_domain, 303)
self.server.postreq_busy = False self.server.postreq_busy = False
return return
if 'searchtext=' in search_params: if 'searchtext=' not in search_params:
search_str = search_params.split('searchtext=')[1] actor_str = \
if '&' in search_str: get_instance_url(calling_domain, http_prefix, domain_full,
search_str = search_str.split('&')[0] onion_domain, i2p_domain) + users_path
search_str = \ redirect_headers(self, actor_str + '/' + default_timeline,
urllib.parse.unquote_plus(search_str.strip()) cookie, calling_domain, 303)
search_str = search_str.strip() self.server.postreq_busy = False
print('search_str: ' + search_str) return
if search_for_emoji:
search_str = ':' + search_str + ':'
my_posts_endings = ( search_str = search_params.split('searchtext=')[1]
' history', ' in sent', ' in outbox', ' in outgoing', if '&' in search_str:
' in sent items', ' in sent posts', ' in outgoing posts', search_str = search_str.split('&')[0]
' in my history', ' in my outbox', ' in my posts') search_str = \
bookmark_endings = ( urllib.parse.unquote_plus(search_str.strip())
' in my saved items', ' in my saved posts', search_str = search_str.strip()
' in my bookmarks', ' in my saved', ' in my saves', print('search_str: ' + search_str)
' in saved posts', ' in saved items', ' in bookmarks', if search_for_emoji:
' in saved', ' in saves', ' bookmark') search_str = ':' + search_str + ':'
if search_str.startswith('#'): my_posts_endings = (
nickname = get_nickname_from_actor(actor_str) ' history', ' in sent', ' in outbox', ' in outgoing',
if not nickname: ' in sent items', ' in sent posts', ' in outgoing posts',
self.send_response(400) ' in my history', ' in my outbox', ' in my posts')
self.end_headers() bookmark_endings = (
self.server.postreq_busy = False ' in my saved items', ' in my saved posts',
return ' in my bookmarks', ' in my saved', ' in my saves',
' in saved posts', ' in saved items', ' in bookmarks',
' in saved', ' in saves', ' bookmark')
# hashtag search if search_str.startswith('#'):
timezone = None nickname = get_nickname_from_actor(actor_str)
if account_timezone.get(nickname): if not nickname:
timezone = account_timezone.get(nickname) self.send_response(400)
bold_reading = False self.end_headers()
if bold_reading_nicknames.get(nickname): self.server.postreq_busy = False
bold_reading = True return
hashtag_str = \
html_hashtag_search(nickname, domain, port,
recent_posts_cache,
max_recent_posts,
translate,
base_dir,
search_str[1:], 1,
max_posts_in_hashtag_feed,
curr_session,
cached_webfingers,
person_cache,
http_prefix,
project_version,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
allow_local_network_access,
theme_name,
system_language,
max_like_count,
signing_priv_key_pem,
cw_lists,
lists_enabled,
timezone, bold_reading,
dogwhistles,
map_format,
access_keys,
'search',
min_images_for_accounts,
buy_sites,
auto_cw_cache)
if hashtag_str:
msg = hashtag_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
elif (search_str.startswith('*') or
search_str.endswith(' skill')):
possible_endings = (
' skill'
)
for poss_ending in possible_endings:
if search_str.endswith(poss_ending):
search_str = search_str.replace(poss_ending, '')
break
# skill search
search_str = search_str.replace('*', '').strip()
nickname = get_nickname_from_actor(actor_str)
skill_str = \
html_skills_search(actor_str, translate,
base_dir, search_str,
instance_only_skills_search,
64, nickname, domain,
theme_name, access_keys)
if skill_str:
msg = skill_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
elif (search_str.startswith("'") or
string_ends_with(search_str, my_posts_endings)):
# your post history search
possible_endings = (
' in my posts',
' in my history',
' in my outbox',
' in sent posts',
' in outgoing posts',
' in sent items',
' in history',
' in outbox',
' in outgoing',
' in sent',
' history'
)
for poss_ending in possible_endings:
if search_str.endswith(poss_ending):
search_str = search_str.replace(poss_ending, '')
break
nickname = get_nickname_from_actor(actor_str)
if not nickname:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
search_str = search_str.replace("'", '', 1).strip()
timezone = None
if account_timezone.get(nickname):
timezone = account_timezone.get(nickname)
bold_reading = False
if bold_reading_nicknames.get(nickname):
bold_reading = True
history_str = \
html_history_search(translate,
base_dir,
http_prefix,
nickname,
domain,
search_str,
max_posts_in_feed,
page_number,
project_version,
recent_posts_cache,
max_recent_posts,
curr_session,
cached_webfingers,
person_cache,
port,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
allow_local_network_access,
theme_name, 'outbox',
system_language,
max_like_count,
signing_priv_key_pem,
cw_lists,
lists_enabled,
timezone, bold_reading,
dogwhistles,
access_keys,
min_images_for_accounts,
buy_sites,
auto_cw_cache)
if history_str:
msg = history_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
elif (search_str.startswith('-') or
string_ends_with(search_str, bookmark_endings)):
# bookmark search
possible_endings = (
' in my bookmarks'
' in my saved posts'
' in my saved items'
' in my saved'
' in my saves'
' in saved posts'
' in saved items'
' in saved'
' in saves'
' in bookmarks'
' bookmark'
)
for poss_ending in possible_endings:
if search_str.endswith(poss_ending):
search_str = search_str.replace(poss_ending, '')
break
nickname = get_nickname_from_actor(actor_str)
if not nickname:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
search_str = search_str.replace('-', '', 1).strip()
timezone = None
if account_timezone.get(nickname):
timezone = account_timezone.get(nickname)
bold_reading = False
if bold_reading_nicknames.get(nickname):
bold_reading = True
bookmarks_str = \
html_history_search(translate,
base_dir,
http_prefix,
nickname,
domain,
search_str,
max_posts_in_feed,
page_number,
project_version,
recent_posts_cache,
max_recent_posts,
curr_session,
cached_webfingers,
person_cache,
port,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
allow_local_network_access,
theme_name, 'bookmarks',
system_language,
max_like_count,
signing_priv_key_pem,
cw_lists,
lists_enabled,
timezone, bold_reading,
dogwhistles,
access_keys,
min_images_for_accounts,
buy_sites,
auto_cw_cache)
if bookmarks_str:
msg = bookmarks_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
elif ('@' in search_str or
('://' in search_str and
has_users_path(search_str))):
remote_only = False
if search_str.endswith(';remote'):
search_str = search_str.replace(';remote', '')
remote_only = True
if string_ends_with(search_str, (':', ';', '.')):
actor_str = \
get_instance_url(calling_domain, http_prefix,
domain_full, onion_domain,
i2p_domain) + \
users_path
redirect_headers(self, actor_str + '/search',
cookie, calling_domain, 303)
self.server.postreq_busy = False
return
# profile search
nickname = get_nickname_from_actor(actor_str)
if not nickname:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
profile_path_str = path.replace('/searchhandle', '')
# are we already following or followed by the searched # hashtag search
# for handle? timezone = None
search_nickname = get_nickname_from_actor(search_str) if account_timezone.get(nickname):
search_domain, search_port = \ timezone = account_timezone.get(nickname)
get_domain_from_actor(search_str) bold_reading = False
search_follower = \ if bold_reading_nicknames.get(nickname):
is_follower_of_person(base_dir, nickname, domain, bold_reading = True
search_nickname, search_domain) hashtag_str = \
search_following = \ html_hashtag_search(nickname, domain, port,
is_following_actor(base_dir, nickname, domain, search_str) recent_posts_cache,
if not remote_only and (search_follower or search_following): max_recent_posts,
# get the actor translate,
if not has_users_path(search_str): base_dir,
if not search_nickname or not search_domain: search_str[1:], 1,
self.send_response(400) max_posts_in_hashtag_feed,
self.end_headers() curr_session,
self.server.postreq_busy = False cached_webfingers,
return person_cache,
search_domain_full = \ http_prefix,
get_full_domain(search_domain, search_port) project_version,
actor = \ yt_replace_domain,
local_actor_url(http_prefix, search_nickname, twitter_replacement_domain,
search_domain_full) show_published_date_only,
else: peertube_instances,
actor = search_str allow_local_network_access,
theme_name,
# establish the session system_language,
curr_proxy_type = proxy_type max_like_count,
if '.onion/' in actor: signing_priv_key_pem,
curr_proxy_type = 'tor' cw_lists,
curr_session = self.server.session_onion lists_enabled,
elif '.i2p/' in actor: timezone, bold_reading,
curr_proxy_type = 'i2p' dogwhistles,
curr_session = self.server.session_i2p map_format,
access_keys,
curr_session = \ 'search',
establish_session("handle search", curr_session, min_images_for_accounts,
curr_proxy_type, self.server) buy_sites,
if not curr_session: auto_cw_cache)
self.server.postreq_busy = False if hashtag_str:
return msg = hashtag_str.encode('utf-8')
msglen = len(msg)
# get the avatar url for the actor login_headers(self, 'text/html',
avatar_url = \ msglen, calling_domain)
get_avatar_image_url(curr_session, write2(self, msg)
base_dir, http_prefix, self.server.postreq_busy = False
actor, person_cache, return
None, True, elif (search_str.startswith('*') or
signing_priv_key_pem) search_str.endswith(' skill')):
profile_path_str += \ possible_endings = (
'?options=' + actor + ';1;' + avatar_url ' skill'
)
show_person_options(self, calling_domain, profile_path_str, for poss_ending in possible_endings:
base_dir, domain, domain_full, if search_str.endswith(poss_ending):
getreq_start_time, search_str = search_str.replace(poss_ending, '')
cookie, debug, authorized, break
curr_session) # skill search
return search_str = search_str.replace('*', '').strip()
else: nickname = get_nickname_from_actor(actor_str)
if key_shortcuts.get(nickname): skill_str = \
access_keys = key_shortcuts[nickname] html_skills_search(actor_str, translate,
base_dir, search_str,
timezone = None instance_only_skills_search,
if account_timezone.get(nickname): 64, nickname, domain,
timezone = account_timezone.get(nickname) theme_name, access_keys)
if skill_str:
profile_handle = remove_eol(search_str).strip() msg = skill_str.encode('utf-8')
msglen = len(msg)
# establish the session login_headers(self, 'text/html',
curr_proxy_type = proxy_type msglen, calling_domain)
if '.onion/' in profile_handle or \ write2(self, msg)
profile_handle.endswith('.onion'): self.server.postreq_busy = False
curr_proxy_type = 'tor' return
curr_session = self.server.session_onion elif (search_str.startswith("'") or
elif ('.i2p/' in profile_handle or string_ends_with(search_str, my_posts_endings)):
profile_handle.endswith('.i2p')): # your post history search
curr_proxy_type = 'i2p' possible_endings = (
curr_session = self.server.session_i2p ' in my posts',
' in my history',
curr_session = \ ' in my outbox',
establish_session("handle search", curr_session, ' in sent posts',
curr_proxy_type, self.server) ' in outgoing posts',
if not curr_session: ' in sent items',
self.server.postreq_busy = False ' in history',
return ' in outbox',
' in outgoing',
bold_reading = False ' in sent',
if bold_reading_nicknames.get(nickname): ' history'
bold_reading = True )
for poss_ending in possible_endings:
profile_str = \ if search_str.endswith(poss_ending):
html_profile_after_search(authorized, search_str = search_str.replace(poss_ending, '')
recent_posts_cache, break
max_recent_posts, nickname = get_nickname_from_actor(actor_str)
translate, if not nickname:
base_dir, self.send_response(400)
profile_path_str, self.end_headers()
http_prefix, self.server.postreq_busy = False
nickname, return
domain, search_str = search_str.replace("'", '', 1).strip()
port, timezone = None
profile_handle, if account_timezone.get(nickname):
curr_session, timezone = account_timezone.get(nickname)
cached_webfingers, bold_reading = False
person_cache, if bold_reading_nicknames.get(nickname):
debug, bold_reading = True
project_version, history_str = \
yt_replace_domain, html_history_search(translate,
twitter_replacement_domain, base_dir,
show_published_date_only, http_prefix,
default_timeline, nickname,
peertube_instances, domain,
allow_local_network_access, search_str,
theme_name, max_posts_in_feed,
access_keys, page_number,
system_language, project_version,
max_like_count, recent_posts_cache,
signing_priv_key_pem, max_recent_posts,
cw_lists, curr_session,
lists_enabled, cached_webfingers,
timezone, person_cache,
onion_domain, port,
i2p_domain, yt_replace_domain,
bold_reading, twitter_replacement_domain,
dogwhistles, show_published_date_only,
min_images_for_accounts, peertube_instances,
buy_sites, allow_local_network_access,
max_shares_on_profile, theme_name, 'outbox',
no_of_books, system_language,
auto_cw_cache) max_like_count,
if profile_str: signing_priv_key_pem,
msg = profile_str.encode('utf-8') cw_lists,
msglen = len(msg) lists_enabled,
login_headers(self, 'text/html', timezone, bold_reading,
msglen, calling_domain) dogwhistles,
write2(self, msg) access_keys,
self.server.postreq_busy = False min_images_for_accounts,
return buy_sites,
auto_cw_cache)
if history_str:
msg = history_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
elif (search_str.startswith('-') or
string_ends_with(search_str, bookmark_endings)):
# bookmark search
possible_endings = (
' in my bookmarks'
' in my saved posts'
' in my saved items'
' in my saved'
' in my saves'
' in saved posts'
' in saved items'
' in saved'
' in saves'
' in bookmarks'
' bookmark'
)
for poss_ending in possible_endings:
if search_str.endswith(poss_ending):
search_str = search_str.replace(poss_ending, '')
break
nickname = get_nickname_from_actor(actor_str)
if not nickname:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
search_str = search_str.replace('-', '', 1).strip()
timezone = None
if account_timezone.get(nickname):
timezone = account_timezone.get(nickname)
bold_reading = False
if bold_reading_nicknames.get(nickname):
bold_reading = True
bookmarks_str = \
html_history_search(translate,
base_dir,
http_prefix,
nickname,
domain,
search_str,
max_posts_in_feed,
page_number,
project_version,
recent_posts_cache,
max_recent_posts,
curr_session,
cached_webfingers,
person_cache,
port,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
allow_local_network_access,
theme_name, 'bookmarks',
system_language,
max_like_count,
signing_priv_key_pem,
cw_lists,
lists_enabled,
timezone, bold_reading,
dogwhistles,
access_keys,
min_images_for_accounts,
buy_sites,
auto_cw_cache)
if bookmarks_str:
msg = bookmarks_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
elif ('@' in search_str or
('://' in search_str and
has_users_path(search_str))):
remote_only = False
if search_str.endswith(';remote'):
search_str = search_str.replace(';remote', '')
remote_only = True
if string_ends_with(search_str, (':', ';', '.')):
actor_str = \ actor_str = \
get_instance_url(calling_domain, get_instance_url(calling_domain, http_prefix,
http_prefix, domain_full, domain_full, onion_domain,
onion_domain, i2p_domain) + \ i2p_domain) + \
users_path users_path
redirect_headers(self, actor_str + '/search', redirect_headers(self, actor_str + '/search',
cookie, calling_domain, 303) cookie, calling_domain, 303)
self.server.postreq_busy = False self.server.postreq_busy = False
return return
elif (search_str.startswith(':') or # profile search
search_str.endswith(' emoji')): nickname = get_nickname_from_actor(actor_str)
# eg. "cat emoji" if not nickname:
if search_str.endswith(' emoji'): self.send_response(400)
search_str = \ self.end_headers()
search_str.replace(' emoji', '') self.server.postreq_busy = False
# emoji search return
nickname = get_nickname_from_actor(actor_str) profile_path_str = path.replace('/searchhandle', '')
emoji_str = \
html_search_emoji(translate, # are we already following or followed by the searched
base_dir, search_str, # for handle?
nickname, domain, search_nickname = get_nickname_from_actor(search_str)
theme_name, access_keys) search_domain, search_port = \
if emoji_str: get_domain_from_actor(search_str)
msg = emoji_str.encode('utf-8') search_follower = \
msglen = len(msg) is_follower_of_person(base_dir, nickname, domain,
login_headers(self, 'text/html', msglen, calling_domain) search_nickname, search_domain)
write2(self, msg) search_following = \
self.server.postreq_busy = False is_following_actor(base_dir, nickname, domain, search_str)
return if not remote_only and (search_follower or search_following):
elif search_str.startswith('.'): # get the actor
# wanted items search if not has_users_path(search_str):
nickname = get_nickname_from_actor(actor_str) if not search_nickname or not search_domain:
wanted_items_str = \ self.send_response(400)
html_search_shared_items(translate, self.end_headers()
base_dir, self.server.postreq_busy = False
search_str[1:], page_number, return
max_posts_in_feed, search_domain_full = \
http_prefix, get_full_domain(search_domain, search_port)
domain_full, actor = \
actor_str, calling_domain, local_actor_url(http_prefix, search_nickname,
shared_items_federated_domains, search_domain_full)
'wanted', nickname, domain, else:
theme_name, actor = search_str
access_keys)
if wanted_items_str: # establish the session
msg = wanted_items_str.encode('utf-8') curr_proxy_type = proxy_type
msglen = len(msg) if '.onion/' in actor:
login_headers(self, 'text/html', msglen, calling_domain) curr_proxy_type = 'tor'
write2(self, msg) curr_session = self.server.session_onion
elif '.i2p/' in actor:
curr_proxy_type = 'i2p'
curr_session = self.server.session_i2p
curr_session = \
establish_session("handle search", curr_session,
curr_proxy_type, self.server)
if not curr_session:
self.server.postreq_busy = False self.server.postreq_busy = False
return return
# get the avatar url for the actor
avatar_url = \
get_avatar_image_url(curr_session,
base_dir, http_prefix,
actor, person_cache,
None, True,
signing_priv_key_pem)
profile_path_str += \
'?options=' + actor + ';1;' + avatar_url
show_person_options(self, calling_domain, profile_path_str,
base_dir, domain, domain_full,
getreq_start_time,
cookie, debug, authorized,
curr_session)
return
else: else:
# shared items search if key_shortcuts.get(nickname):
nickname = get_nickname_from_actor(actor_str) access_keys = key_shortcuts[nickname]
shared_items_str = \
html_search_shared_items(translate, base_dir, timezone = None
search_str, page_number, if account_timezone.get(nickname):
max_posts_in_feed, timezone = account_timezone.get(nickname)
http_prefix, domain_full,
actor_str, calling_domain, profile_handle = remove_eol(search_str).strip()
shared_items_federated_domains,
'shares', nickname, domain, # establish the session
theme_name, access_keys) curr_proxy_type = proxy_type
if shared_items_str: if '.onion/' in profile_handle or \
msg = shared_items_str.encode('utf-8') profile_handle.endswith('.onion'):
msglen = len(msg) curr_proxy_type = 'tor'
login_headers(self, 'text/html', curr_session = self.server.session_onion
msglen, calling_domain) elif ('.i2p/' in profile_handle or
write2(self, msg) profile_handle.endswith('.i2p')):
curr_proxy_type = 'i2p'
curr_session = self.server.session_i2p
curr_session = \
establish_session("handle search", curr_session,
curr_proxy_type, self.server)
if not curr_session:
self.server.postreq_busy = False self.server.postreq_busy = False
return return
bold_reading = False
if bold_reading_nicknames.get(nickname):
bold_reading = True
profile_str = \
html_profile_after_search(authorized,
recent_posts_cache,
max_recent_posts,
translate,
base_dir,
profile_path_str,
http_prefix,
nickname,
domain,
port,
profile_handle,
curr_session,
cached_webfingers,
person_cache,
debug,
project_version,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
default_timeline,
peertube_instances,
allow_local_network_access,
theme_name,
access_keys,
system_language,
max_like_count,
signing_priv_key_pem,
cw_lists,
lists_enabled,
timezone,
onion_domain,
i2p_domain,
bold_reading,
dogwhistles,
min_images_for_accounts,
buy_sites,
max_shares_on_profile,
no_of_books,
auto_cw_cache)
if profile_str:
msg = profile_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
actor_str = \
get_instance_url(calling_domain,
http_prefix, domain_full,
onion_domain, i2p_domain) + \
users_path
redirect_headers(self, actor_str + '/search',
cookie, calling_domain, 303)
self.server.postreq_busy = False
return
elif (search_str.startswith(':') or
search_str.endswith(' emoji')):
# eg. "cat emoji"
if search_str.endswith(' emoji'):
search_str = \
search_str.replace(' emoji', '')
# emoji search
nickname = get_nickname_from_actor(actor_str)
emoji_str = \
html_search_emoji(translate,
base_dir, search_str,
nickname, domain,
theme_name, access_keys)
if emoji_str:
msg = emoji_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html', msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
elif search_str.startswith('.'):
# wanted items search
nickname = get_nickname_from_actor(actor_str)
wanted_items_str = \
html_search_shared_items(translate,
base_dir,
search_str[1:], page_number,
max_posts_in_feed,
http_prefix,
domain_full,
actor_str, calling_domain,
shared_items_federated_domains,
'wanted', nickname, domain,
theme_name,
access_keys)
if wanted_items_str:
msg = wanted_items_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html', msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
else:
# shared items search
nickname = get_nickname_from_actor(actor_str)
shared_items_str = \
html_search_shared_items(translate, base_dir,
search_str, page_number,
max_posts_in_feed,
http_prefix, domain_full,
actor_str, calling_domain,
shared_items_federated_domains,
'shares', nickname, domain,
theme_name, access_keys)
if shared_items_str:
msg = shared_items_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
actor_str = \ actor_str = \
get_instance_url(calling_domain, http_prefix, get_instance_url(calling_domain, http_prefix,
domain_full, onion_domain, i2p_domain) + \ domain_full, onion_domain, i2p_domain) + users_path
users_path
redirect_headers(self, actor_str + '/' + default_timeline, redirect_headers(self, actor_str + '/' + default_timeline,
cookie, calling_domain, 303) cookie, calling_domain, 303)
self.server.postreq_busy = False self.server.postreq_busy = False