Separate function for searching fediverse handles

main
Bob Mottram 2024-05-10 13:46:02 +01:00
parent 29ac760381
commit 8ad679e383
1 changed files with 239 additions and 169 deletions

View File

@ -403,6 +403,212 @@ def _receive_search_bookmarks(self, search_str: str,
return False
def _receive_search_handle(self, search_str: str,
calling_domain: str, http_prefix: str,
domain_full: str, onion_domain: str,
i2p_domain: str, users_path: str,
cookie: str, path: str, base_dir: str,
domain: str, proxy_type: str,
person_cache: {}, signing_priv_key_pem: str,
getreq_start_time, debug: bool,
authorized: bool, key_shortcuts: {},
account_timezone: {},
bold_reading_nicknames: {},
recent_posts_cache: {},
max_recent_posts: int,
translate: {}, port: int,
cached_webfingers: {},
project_version: str,
yt_replace_domain: str,
twitter_replacement_domain: str,
show_published_date_only: bool,
default_timeline: str,
peertube_instances: [],
allow_local_network_access: bool,
theme_name: str,
system_language: str,
max_like_count: int,
cw_lists: {},
lists_enabled: {},
dogwhistles: {},
min_images_for_accounts: {},
buy_sites: [],
max_shares_on_profile: int,
no_of_books: int,
auto_cw_cache: {}) -> bool:
"""Receive a search for a fediverse handle or url from the search screen
"""
remote_only = False
if search_str.endswith(';remote'):
search_str = search_str.replace(';remote', '')
remote_only = True
if string_ends_with(search_str, (':', ';', '.')):
actor_str = \
get_instance_url(calling_domain, http_prefix,
domain_full, onion_domain,
i2p_domain) + users_path
redirect_headers(self, actor_str + '/search',
cookie, calling_domain, 303)
self.server.postreq_busy = False
return
# profile search
nickname = get_nickname_from_actor(actor_str)
if not nickname:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
profile_path_str = path.replace('/searchhandle', '')
# are we already following or followed by the searched
# for handle?
search_nickname = get_nickname_from_actor(search_str)
search_domain, search_port = \
get_domain_from_actor(search_str)
search_follower = \
is_follower_of_person(base_dir, nickname, domain,
search_nickname, search_domain)
search_following = \
is_following_actor(base_dir, nickname, domain, search_str)
if not remote_only and (search_follower or search_following):
# get the actor
if not has_users_path(search_str):
if not search_nickname or not search_domain:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
search_domain_full = \
get_full_domain(search_domain, search_port)
actor = \
local_actor_url(http_prefix, search_nickname,
search_domain_full)
else:
actor = search_str
# establish the session
curr_proxy_type = proxy_type
if '.onion/' in actor:
curr_proxy_type = 'tor'
curr_session = self.server.session_onion
elif '.i2p/' in actor:
curr_proxy_type = 'i2p'
curr_session = self.server.session_i2p
curr_session = \
establish_session("handle search", curr_session,
curr_proxy_type, self.server)
if not curr_session:
self.server.postreq_busy = False
return
# get the avatar url for the actor
avatar_url = \
get_avatar_image_url(curr_session,
base_dir, http_prefix,
actor, person_cache,
None, True,
signing_priv_key_pem)
profile_path_str += \
'?options=' + actor + ';1;' + avatar_url
show_person_options(self, calling_domain, profile_path_str,
base_dir, domain, domain_full,
getreq_start_time,
cookie, debug, authorized,
curr_session)
return
else:
if key_shortcuts.get(nickname):
access_keys = key_shortcuts[nickname]
timezone = None
if account_timezone.get(nickname):
timezone = account_timezone.get(nickname)
profile_handle = remove_eol(search_str).strip()
# establish the session
curr_proxy_type = proxy_type
if '.onion/' in profile_handle or \
profile_handle.endswith('.onion'):
curr_proxy_type = 'tor'
curr_session = self.server.session_onion
elif ('.i2p/' in profile_handle or
profile_handle.endswith('.i2p')):
curr_proxy_type = 'i2p'
curr_session = self.server.session_i2p
curr_session = \
establish_session("handle search", curr_session,
curr_proxy_type, self.server)
if not curr_session:
self.server.postreq_busy = False
return
bold_reading = False
if bold_reading_nicknames.get(nickname):
bold_reading = True
profile_str = \
html_profile_after_search(authorized,
recent_posts_cache,
max_recent_posts,
translate,
base_dir,
profile_path_str,
http_prefix,
nickname,
domain,
port,
profile_handle,
curr_session,
cached_webfingers,
person_cache,
debug,
project_version,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
default_timeline,
peertube_instances,
allow_local_network_access,
theme_name,
access_keys,
system_language,
max_like_count,
signing_priv_key_pem,
cw_lists,
lists_enabled,
timezone,
onion_domain,
i2p_domain,
bold_reading,
dogwhistles,
min_images_for_accounts,
buy_sites,
max_shares_on_profile,
no_of_books,
auto_cw_cache)
if profile_str:
msg = profile_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
actor_str = \
get_instance_url(calling_domain,
http_prefix, domain_full,
onion_domain, i2p_domain) + \
users_path
redirect_headers(self, actor_str + '/search',
cookie, calling_domain, 303)
self.server.postreq_busy = False
return
def receive_search_query(self, calling_domain: str, cookie: str,
authorized: bool, path: str,
base_dir: str, http_prefix: str,
@ -638,176 +844,40 @@ def receive_search_query(self, calling_domain: str, cookie: str,
elif ('@' in search_str or
('://' in search_str and
has_users_path(search_str))):
remote_only = False
if search_str.endswith(';remote'):
search_str = search_str.replace(';remote', '')
remote_only = True
if string_ends_with(search_str, (':', ';', '.')):
actor_str = \
get_instance_url(calling_domain, http_prefix,
domain_full, onion_domain,
i2p_domain) + \
users_path
redirect_headers(self, actor_str + '/search',
cookie, calling_domain, 303)
self.server.postreq_busy = False
if _receive_search_handle(self, search_str,
calling_domain, http_prefix,
domain_full, onion_domain,
i2p_domain, users_path,
cookie, path, base_dir,
domain, proxy_type,
person_cache, signing_priv_key_pem,
getreq_start_time, debug,
authorized, key_shortcuts,
account_timezone,
bold_reading_nicknames,
recent_posts_cache,
max_recent_posts,
translate, port,
cached_webfingers,
project_version,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
default_timeline,
peertube_instances,
allow_local_network_access,
theme_name,
system_language,
max_like_count,
cw_lists,
lists_enabled,
dogwhistles,
min_images_for_accounts,
buy_sites,
max_shares_on_profile,
no_of_books,
auto_cw_cache):
return
# profile search
nickname = get_nickname_from_actor(actor_str)
if not nickname:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
profile_path_str = path.replace('/searchhandle', '')
# are we already following or followed by the searched
# for handle?
search_nickname = get_nickname_from_actor(search_str)
search_domain, search_port = \
get_domain_from_actor(search_str)
search_follower = \
is_follower_of_person(base_dir, nickname, domain,
search_nickname, search_domain)
search_following = \
is_following_actor(base_dir, nickname, domain, search_str)
if not remote_only and (search_follower or search_following):
# get the actor
if not has_users_path(search_str):
if not search_nickname or not search_domain:
self.send_response(400)
self.end_headers()
self.server.postreq_busy = False
return
search_domain_full = \
get_full_domain(search_domain, search_port)
actor = \
local_actor_url(http_prefix, search_nickname,
search_domain_full)
else:
actor = search_str
# establish the session
curr_proxy_type = proxy_type
if '.onion/' in actor:
curr_proxy_type = 'tor'
curr_session = self.server.session_onion
elif '.i2p/' in actor:
curr_proxy_type = 'i2p'
curr_session = self.server.session_i2p
curr_session = \
establish_session("handle search", curr_session,
curr_proxy_type, self.server)
if not curr_session:
self.server.postreq_busy = False
return
# get the avatar url for the actor
avatar_url = \
get_avatar_image_url(curr_session,
base_dir, http_prefix,
actor, person_cache,
None, True,
signing_priv_key_pem)
profile_path_str += \
'?options=' + actor + ';1;' + avatar_url
show_person_options(self, calling_domain, profile_path_str,
base_dir, domain, domain_full,
getreq_start_time,
cookie, debug, authorized,
curr_session)
return
else:
if key_shortcuts.get(nickname):
access_keys = key_shortcuts[nickname]
timezone = None
if account_timezone.get(nickname):
timezone = account_timezone.get(nickname)
profile_handle = remove_eol(search_str).strip()
# establish the session
curr_proxy_type = proxy_type
if '.onion/' in profile_handle or \
profile_handle.endswith('.onion'):
curr_proxy_type = 'tor'
curr_session = self.server.session_onion
elif ('.i2p/' in profile_handle or
profile_handle.endswith('.i2p')):
curr_proxy_type = 'i2p'
curr_session = self.server.session_i2p
curr_session = \
establish_session("handle search", curr_session,
curr_proxy_type, self.server)
if not curr_session:
self.server.postreq_busy = False
return
bold_reading = False
if bold_reading_nicknames.get(nickname):
bold_reading = True
profile_str = \
html_profile_after_search(authorized,
recent_posts_cache,
max_recent_posts,
translate,
base_dir,
profile_path_str,
http_prefix,
nickname,
domain,
port,
profile_handle,
curr_session,
cached_webfingers,
person_cache,
debug,
project_version,
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
default_timeline,
peertube_instances,
allow_local_network_access,
theme_name,
access_keys,
system_language,
max_like_count,
signing_priv_key_pem,
cw_lists,
lists_enabled,
timezone,
onion_domain,
i2p_domain,
bold_reading,
dogwhistles,
min_images_for_accounts,
buy_sites,
max_shares_on_profile,
no_of_books,
auto_cw_cache)
if profile_str:
msg = profile_str.encode('utf-8')
msglen = len(msg)
login_headers(self, 'text/html',
msglen, calling_domain)
write2(self, msg)
self.server.postreq_busy = False
return
actor_str = \
get_instance_url(calling_domain,
http_prefix, domain_full,
onion_domain, i2p_domain) + \
users_path
redirect_headers(self, actor_str + '/search',
cookie, calling_domain, 303)
self.server.postreq_busy = False
return
elif (search_str.startswith(':') or
search_str.endswith(' emoji')):
# eg. "cat emoji"