From 99efdb79c18bcb684ecdb9f86cbf0ae9716cac52 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Tue, 27 May 2025 13:49:01 +0100 Subject: [PATCH] Move move searchableBy functions --- searchable.py | 189 +++++++++++++++++++++++++++++++++++++++++++++++ utils.py | 185 ---------------------------------------------- webapp_search.py | 2 +- 3 files changed, 190 insertions(+), 186 deletions(-) diff --git a/searchable.py b/searchable.py index 72a4f6e78..cbb68000e 100644 --- a/searchable.py +++ b/searchable.py @@ -15,6 +15,10 @@ from utils import acct_dir from utils import data_dir from utils import text_in_file from utils import is_account_dir +from utils import get_nickname_from_actor +from utils import get_domain_from_actor +from utils import get_full_domain +from utils import get_followers_list def load_searchable_by_default(base_dir: str) -> {}: @@ -58,3 +62,188 @@ def set_searchable_by(base_dir: str, nickname: str, domain: str, fp_search.write(searchable_by) except OSError: print('EX: unable to write searchableByDropdown ' + filename) + + +def _actor_in_searchable_by(searchable_by: str, following_list: []) -> bool: + """Does the given actor within searchable_by exist within the given list? + """ + data_actor = searchable_by.split('/followers')[0] + + if '"' in data_actor: + data_actor = data_actor.split('"')[-1] + + if data_actor not in following_list: + data_nickname = get_nickname_from_actor(data_actor) + data_domain, data_port = get_domain_from_actor(data_actor) + if not data_nickname or not data_domain: + return False + data_domain_full = get_full_domain(data_domain, data_port) + data_handle = data_nickname + '@' + data_domain_full + if data_handle not in following_list: + return False + return True + + +def _search_virtual_box_posts(base_dir: str, nickname: str, domain: str, + search_str: str, max_results: int, + box_name: str) -> []: + """Searches through a virtual box, which is typically an index on the inbox + """ + index_filename = \ + acct_dir(base_dir, nickname, domain) + '/' + box_name + '.index' + if box_name == 'bookmarks': + box_name = 'inbox' + path = acct_dir(base_dir, nickname, domain) + '/' + box_name + if not os.path.isdir(path): + return [] + + search_str = search_str.lower().strip() + + if '+' in search_str: + search_words = search_str.split('+') + for index, _ in enumerate(search_words): + search_words[index] = search_words[index].strip() + print('SEARCH: ' + str(search_words)) + else: + search_words = [search_str] + + res: list[str] = [] + try: + with open(index_filename, 'r', encoding='utf-8') as fp_index: + post_filename = 'start' + while post_filename: + post_filename = fp_index.readline() + if not post_filename: + break + if '.json' not in post_filename: + break + post_filename = path + '/' + post_filename.strip() + if not os.path.isfile(post_filename): + continue + with open(post_filename, 'r', encoding='utf-8') as fp_post: + data = fp_post.read().lower() + + not_found = False + for keyword in search_words: + if keyword not in data: + not_found = True + break + if not_found: + continue + + res.append(post_filename) + if len(res) >= max_results: + return res + except OSError as exc: + print('EX: _search_virtual_box_posts unable to read ' + + index_filename + ' ' + str(exc)) + return res + + +def _get_mutuals_of_person(base_dir: str, + nickname: str, domain: str) -> []: + """Returns the mutuals of a person + i.e. accounts which they follow and which also follow back + """ + followers = \ + get_followers_list(base_dir, nickname, domain, 'followers.txt') + following = \ + get_followers_list(base_dir, nickname, domain, 'following.txt') + mutuals: list[str] = [] + for handle in following: + if handle in followers: + mutuals.append(handle) + return mutuals + + +def search_box_posts(base_dir: str, nickname: str, domain: str, + search_str: str, max_results: int, + box_name: str = 'outbox') -> []: + """Search your posts and return a list of the filenames + containing matching strings + """ + path = acct_dir(base_dir, nickname, domain) + '/' + box_name + # is this a virtual box, such as direct messages? + if not os.path.isdir(path): + if os.path.isfile(path + '.index'): + return _search_virtual_box_posts(base_dir, nickname, domain, + search_str, max_results, box_name) + return [] + search_str = search_str.lower().strip() + + if '+' in search_str: + search_words = search_str.split('+') + for index, _ in enumerate(search_words): + search_words[index] = search_words[index].strip() + print('SEARCH: ' + str(search_words)) + else: + search_words = [search_str] + + following_list: list[str] = [] + mutuals_list: list[str] = [] + check_searchable_by = False + if box_name == 'inbox': + check_searchable_by = True + # https://codeberg.org/fediverse/fep/ + # src/branch/main/fep/268d/fep-268d.md + # create a list containing all of the handles followed + following_list = get_followers_list(base_dir, nickname, domain, + 'following.txt') + # create a list containing all of the mutuals + mutuals_list = _get_mutuals_of_person(base_dir, nickname, domain) + + res: list[str] = [] + for root, _, fnames in os.walk(path): + for fname in fnames: + file_path = os.path.join(root, fname) + try: + with open(file_path, 'r', encoding='utf-8') as fp_post: + data = fp_post.read() + data_lower = data.lower() + + not_found = False + for keyword in search_words: + if keyword not in data_lower: + not_found = True + break + if not_found: + continue + + # if this is not an outbox/bookmarks search then is the + # post marked as being searchable? + # https://codeberg.org/fediverse/fep/ + # src/branch/main/fep/268d/fep-268d.md + if check_searchable_by: + if '"searchableBy":' not in data: + continue + searchable_by = \ + data.split('"searchableBy":')[1].strip() + if searchable_by.startswith('['): + searchable_by = searchable_by.split(']')[0] + if '"' in searchable_by: + searchable_by = searchable_by.split('"')[1] + elif "'" in searchable_by: + searchable_by = searchable_by.split("'")[1] + else: + continue + if '#Public' not in searchable_by: + if '/followers' in searchable_by and \ + following_list: + if not _actor_in_searchable_by(searchable_by, + following_list): + continue + elif '/mutuals' in searchable_by and mutuals_list: + if not _actor_in_searchable_by(searchable_by, + mutuals_list): + continue + else: + continue + + res.append(file_path) + if len(res) >= max_results: + return res + except OSError as exc: + print('EX: search_box_posts unable to read ' + + file_path + ' ' + str(exc)) + break + return res diff --git a/utils.py b/utils.py index bea8bf324..3b7c28656 100644 --- a/utils.py +++ b/utils.py @@ -2889,191 +2889,6 @@ def get_css(base_dir: str, css_filename: str) -> str: return None -def _search_virtual_box_posts(base_dir: str, nickname: str, domain: str, - search_str: str, max_results: int, - box_name: str) -> []: - """Searches through a virtual box, which is typically an index on the inbox - """ - index_filename = \ - acct_dir(base_dir, nickname, domain) + '/' + box_name + '.index' - if box_name == 'bookmarks': - box_name = 'inbox' - path = acct_dir(base_dir, nickname, domain) + '/' + box_name - if not os.path.isdir(path): - return [] - - search_str = search_str.lower().strip() - - if '+' in search_str: - search_words = search_str.split('+') - for index, _ in enumerate(search_words): - search_words[index] = search_words[index].strip() - print('SEARCH: ' + str(search_words)) - else: - search_words = [search_str] - - res: list[str] = [] - try: - with open(index_filename, 'r', encoding='utf-8') as fp_index: - post_filename = 'start' - while post_filename: - post_filename = fp_index.readline() - if not post_filename: - break - if '.json' not in post_filename: - break - post_filename = path + '/' + post_filename.strip() - if not os.path.isfile(post_filename): - continue - with open(post_filename, 'r', encoding='utf-8') as fp_post: - data = fp_post.read().lower() - - not_found = False - for keyword in search_words: - if keyword not in data: - not_found = True - break - if not_found: - continue - - res.append(post_filename) - if len(res) >= max_results: - return res - except OSError as exc: - print('EX: _search_virtual_box_posts unable to read ' + - index_filename + ' ' + str(exc)) - return res - - -def _get_mutuals_of_person(base_dir: str, - nickname: str, domain: str) -> []: - """Returns the mutuals of a person - i.e. accounts which they follow and which also follow back - """ - followers = \ - get_followers_list(base_dir, nickname, domain, 'followers.txt') - following = \ - get_followers_list(base_dir, nickname, domain, 'following.txt') - mutuals: list[str] = [] - for handle in following: - if handle in followers: - mutuals.append(handle) - return mutuals - - -def _actor_in_searchable_by(searchable_by: str, following_list: []) -> bool: - """Does the given actor within searchable_by exist within the given list? - """ - data_actor = searchable_by.split('/followers')[0] - - if '"' in data_actor: - data_actor = data_actor.split('"')[-1] - - if data_actor not in following_list: - data_nickname = get_nickname_from_actor(data_actor) - data_domain, data_port = get_domain_from_actor(data_actor) - if not data_nickname or not data_domain: - return False - data_domain_full = get_full_domain(data_domain, data_port) - data_handle = data_nickname + '@' + data_domain_full - if data_handle not in following_list: - return False - return True - - -def search_box_posts(base_dir: str, nickname: str, domain: str, - search_str: str, max_results: int, - box_name: str = 'outbox') -> []: - """Search your posts and return a list of the filenames - containing matching strings - """ - path = acct_dir(base_dir, nickname, domain) + '/' + box_name - # is this a virtual box, such as direct messages? - if not os.path.isdir(path): - if os.path.isfile(path + '.index'): - return _search_virtual_box_posts(base_dir, nickname, domain, - search_str, max_results, box_name) - return [] - search_str = search_str.lower().strip() - - if '+' in search_str: - search_words = search_str.split('+') - for index, _ in enumerate(search_words): - search_words[index] = search_words[index].strip() - print('SEARCH: ' + str(search_words)) - else: - search_words = [search_str] - - following_list: list[str] = [] - mutuals_list: list[str] = [] - check_searchable_by = False - if box_name == 'inbox': - check_searchable_by = True - # https://codeberg.org/fediverse/fep/ - # src/branch/main/fep/268d/fep-268d.md - # create a list containing all of the handles followed - following_list = get_followers_list(base_dir, nickname, domain, - 'following.txt') - # create a list containing all of the mutuals - mutuals_list = _get_mutuals_of_person(base_dir, nickname, domain) - - res: list[str] = [] - for root, _, fnames in os.walk(path): - for fname in fnames: - file_path = os.path.join(root, fname) - try: - with open(file_path, 'r', encoding='utf-8') as fp_post: - data = fp_post.read() - data_lower = data.lower() - - not_found = False - for keyword in search_words: - if keyword not in data_lower: - not_found = True - break - if not_found: - continue - - # if this is not an outbox/bookmarks search then is the - # post marked as being searchable? - # https://codeberg.org/fediverse/fep/ - # src/branch/main/fep/268d/fep-268d.md - if check_searchable_by: - if '"searchableBy":' not in data: - continue - searchable_by = \ - data.split('"searchableBy":')[1].strip() - if searchable_by.startswith('['): - searchable_by = searchable_by.split(']')[0] - if '"' in searchable_by: - searchable_by = searchable_by.split('"')[1] - elif "'" in searchable_by: - searchable_by = searchable_by.split("'")[1] - else: - continue - if '#Public' not in searchable_by: - if '/followers' in searchable_by and \ - following_list: - if not _actor_in_searchable_by(searchable_by, - following_list): - continue - elif '/mutuals' in searchable_by and mutuals_list: - if not _actor_in_searchable_by(searchable_by, - mutuals_list): - continue - else: - continue - - res.append(file_path) - if len(res) >= max_results: - return res - except OSError as exc: - print('EX: search_box_posts unable to read ' + - file_path + ' ' + str(exc)) - break - return res - - def get_file_case_insensitive(path: str) -> str: """Returns a case specific filename given a case insensitive version of it """ diff --git a/webapp_search.py b/webapp_search.py index 392bf23aa..d422a852f 100644 --- a/webapp_search.py +++ b/webapp_search.py @@ -12,6 +12,7 @@ from shutil import copyfile import urllib.parse from flags import is_editor from flags import is_public_post +from searchable import search_box_posts from utils import get_person_icon from utils import data_dir from utils import get_post_attachments @@ -32,7 +33,6 @@ from utils import load_json from utils import get_nickname_from_actor from utils import locate_post from utils import first_paragraph_from_string -from utils import search_box_posts from utils import get_alt_path from utils import acct_dir from utils import local_actor_url