__filename__ = "webapp_search.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.3.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Web Interface"
import os
from shutil import copyfile
import urllib.parse
from datetime import datetime
from utils import get_base_content_from_post
from utils import is_account_dir
from utils import get_config_param
from utils import get_full_domain
from utils import is_editor
from utils import load_json
from utils import get_nickname_from_actor
from utils import locate_post
from utils import is_public_post
from utils import first_paragraph_from_string
from utils import search_box_posts
from utils import get_alt_path
from utils import acct_dir
from utils import local_actor_url
from skills import no_of_actor_skills
from skills import get_skills_from_list
from categories import get_hashtag_category
from feeds import rss2tag_header
from feeds import rss2tag_footer
from webapp_utils import set_custom_background
from webapp_utils import html_keyboard_navigation
from webapp_utils import html_header_with_external_style
from webapp_utils import html_footer
from webapp_utils import get_search_banner_file
from webapp_utils import html_post_separator
from webapp_utils import html_search_result_share
from webapp_post import individual_post_as_html
from webapp_hashtagswarm import html_hash_tag_swarm
def html_search_emoji(css_cache: {}, translate: {},
base_dir: str, http_prefix: str,
search_str: str) -> str:
"""Search results for emoji
"""
# emoji.json is generated so that it can be customized and the changes
# will be retained even if default_emoji.json is subsequently updated
if not os.path.isfile(base_dir + '/emoji/emoji.json'):
copyfile(base_dir + '/emoji/default_emoji.json',
base_dir + '/emoji/emoji.json')
search_str = search_str.lower().replace(':', '').strip('\n').strip('\r')
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
emoji_lookup_filename = base_dir + '/emoji/emoji.json'
custom_emoji_lookup_filename = base_dir + '/emojicustom/emoji.json'
# create header
instance_title = \
get_config_param(base_dir, 'instanceTitle')
emoji_form = \
html_header_with_external_style(css_filename, instance_title, None)
emoji_form += '
' + \
translate['Emoji Search'] + \
'
'
# does the lookup file exist?
if not os.path.isfile(emoji_lookup_filename):
emoji_form += '
' + \
translate['No results'] + '
'
emoji_form += html_footer()
return emoji_form
emoji_json = load_json(emoji_lookup_filename)
if emoji_json:
if os.path.isfile(custom_emoji_lookup_filename):
custom_emoji_json = load_json(custom_emoji_lookup_filename)
if custom_emoji_json:
emoji_json = dict(emoji_json, **custom_emoji_json)
results = {}
for emoji_name, filename in emoji_json.items():
if search_str in emoji_name:
results[emoji_name] = filename + '.png'
for emoji_name, filename in emoji_json.items():
if emoji_name in search_str:
results[emoji_name] = filename + '.png'
if not results:
emoji_form += '
' + \
translate['No results'] + '
'
heading_shown = False
emoji_form += '
'
msg_str1 = translate['Copy the text then paste it into your post']
msg_str2 = ':'
emoji_form += '
'
emoji_form += html_footer()
return emoji_form
def _match_shared_item(search_str_lower_list: [],
shared_item: {}) -> bool:
"""Returns true if the shared item matches search criteria
"""
for search_substr in search_str_lower_list:
search_substr = search_substr.strip()
if shared_item.get('location'):
if search_substr in shared_item['location'].lower():
return True
if search_substr in shared_item['summary'].lower():
return True
if search_substr in shared_item['displayName'].lower():
return True
if search_substr in shared_item['category'].lower():
return True
return False
def _html_search_result_share_page(actor: str, domain_full: str,
calling_domain: str, page_number: int,
search_str_lower: str, translate: {},
previous: bool) -> str:
"""Returns the html for the previous button on shared items search results
"""
post_actor = get_alt_path(actor, domain_full, calling_domain)
# previous page link, needs to be a POST
if previous:
page_number -= 1
title_str = translate['Page up']
image_url = 'pageup.png'
else:
page_number += 1
title_str = translate['Page down']
image_url = 'pagedown.png'
shared_items_form = \
'\n'
return shared_items_form
def _html_shares_result(base_dir: str, shares_json: {}, page_number: int,
results_per_page: int,
search_str_lower_list: [], curr_page: int, ctr: int,
calling_domain: str, http_prefix: str,
domain_full: str, contact_nickname: str, actor: str,
results_exist: bool, search_str_lower: str,
translate: {},
shares_file_type: str) -> (bool, int, int, str):
"""Result for shared items search
"""
shared_items_form = ''
if curr_page > page_number:
return results_exist, curr_page, ctr, shared_items_form
for name, shared_item in shares_json.items():
if _match_shared_item(search_str_lower_list, shared_item):
if curr_page == page_number:
# show individual search result
shared_items_form += \
html_search_result_share(base_dir, shared_item, translate,
http_prefix, domain_full,
contact_nickname,
name, actor, shares_file_type,
shared_item['category'])
if not results_exist and curr_page > 1:
# show the previous page button
shared_items_form += \
_html_search_result_share_page(actor, domain_full,
calling_domain,
page_number,
search_str_lower,
translate, True)
results_exist = True
ctr += 1
if ctr >= results_per_page:
curr_page += 1
if curr_page > page_number:
# show the next page button
shared_items_form += \
_html_search_result_share_page(actor, domain_full,
calling_domain,
page_number,
search_str_lower,
translate, False)
return results_exist, curr_page, ctr, shared_items_form
ctr = 0
return results_exist, curr_page, ctr, shared_items_form
def html_search_shared_items(css_cache: {}, translate: {},
base_dir: str, search_str: str,
page_number: int,
results_per_page: int,
http_prefix: str,
domain_full: str, actor: str,
calling_domain: str,
shared_items_federated_domains: [],
shares_file_type: str) -> str:
"""Search results for shared items
"""
curr_page = 1
ctr = 0
shared_items_form = ''
search_str_lower = urllib.parse.unquote(search_str)
search_str_lower = search_str_lower.lower().strip('\n').strip('\r')
search_str_lower_list = search_str_lower.split('+')
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
shared_items_form = \
html_header_with_external_style(css_filename, instance_title, None)
if shares_file_type == 'shares':
title_str = translate['Shared Items Search']
else:
title_str = translate['Wanted Items Search']
shared_items_form += \
'
'
results_exist = False
for _, dirs, files in os.walk(base_dir + '/accounts'):
for handle in dirs:
if not is_account_dir(handle):
continue
contact_nickname = handle.split('@')[0]
shares_filename = base_dir + '/accounts/' + handle + \
'/' + shares_file_type + '.json'
if not os.path.isfile(shares_filename):
continue
shares_json = load_json(shares_filename)
if not shares_json:
continue
(results_exist, curr_page, ctr,
result_str) = _html_shares_result(base_dir, shares_json,
page_number,
results_per_page,
search_str_lower_list,
curr_page, ctr,
calling_domain, http_prefix,
domain_full,
contact_nickname,
actor, results_exist,
search_str_lower, translate,
shares_file_type)
shared_items_form += result_str
if curr_page > page_number:
break
break
# search federated shared items
if shares_file_type == 'shares':
catalogs_dir = base_dir + '/cache/catalogs'
else:
catalogs_dir = base_dir + '/cache/wantedItems'
if curr_page <= page_number and os.path.isdir(catalogs_dir):
for _, dirs, files in os.walk(catalogs_dir):
for fname in files:
if '#' in fname:
continue
if not fname.endswith('.' + shares_file_type + '.json'):
continue
federated_domain = fname.split('.')[0]
if federated_domain not in shared_items_federated_domains:
continue
shares_filename = catalogs_dir + '/' + fname
shares_json = load_json(shares_filename)
if not shares_json:
continue
(results_exist, curr_page, ctr,
result_str) = _html_shares_result(base_dir, shares_json,
page_number,
results_per_page,
search_str_lower_list,
curr_page, ctr,
calling_domain, http_prefix,
domain_full,
contact_nickname,
actor, results_exist,
search_str_lower, translate,
shares_file_type)
shared_items_form += result_str
if curr_page > page_number:
break
break
if not results_exist:
shared_items_form += \
'
' + translate['No results'] + '
\n'
shared_items_form += html_footer()
return shared_items_form
def _html_common_emoji(base_dir: str, no_of_emoji: int) -> str:
"""Shows common emoji
"""
common_emoji_filename = base_dir + '/accounts/common_emoji.txt'
if not os.path.isfile(common_emoji_filename):
return ''
common_emoji = None
try:
with open(common_emoji_filename, 'r') as fp_emoji:
common_emoji = fp_emoji.readlines()
except OSError:
print('EX: html_common_emoji unable to load file')
return ''
if not common_emoji:
return ''
line_ctr = 0
ctr = 0
html_str = ''
while ctr < no_of_emoji and line_ctr < len(common_emoji):
emoji_name = common_emoji[ctr].split(' ')[1].replace('\n', '')
emoji_filename = base_dir + '/emoji/' + emoji_name + '.png'
if os.path.isfile(emoji_filename):
# NOTE: deliberately no alt text, so that without graphics only
# the emoji name shows
html_str += \
'\n'
ctr += 1
line_ctr += 1
if html_str:
html_str = \
'
' + html_str + '
\n'
return html_str
def html_search_emoji_text_entry(css_cache: {}, translate: {},
base_dir: str, path: str) -> str:
"""Search for an emoji by name
"""
# emoji.json is generated so that it can be customized and the changes
# will be retained even if default_emoji.json is subsequently updated
if not os.path.isfile(base_dir + '/emoji/emoji.json'):
copyfile(base_dir + '/emoji/default_emoji.json',
base_dir + '/emoji/emoji.json')
actor = path.replace('/search', '')
set_custom_background(base_dir, 'search-background', 'follow-background')
css_filename = base_dir + '/epicyon-follow.css'
if os.path.isfile(base_dir + '/follow.css'):
css_filename = base_dir + '/follow.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
emoji_str = \
html_header_with_external_style(css_filename, instance_title, None)
emoji_str += '
\n'
emoji_str += '
\n'
emoji_str += '
\n'
emoji_str += \
'
' + \
translate['Enter an emoji name to search for'] + '
\n'
emoji_str += ' \n'
emoji_str += '
\n'
emoji_str += '
\n'
emoji_str += '
\n'
emoji_str += _html_common_emoji(base_dir, 8)
emoji_str += html_footer()
return emoji_str
def html_search(css_cache: {}, translate: {},
base_dir: str, path: str, domain: str,
default_timeline: str, theme: str,
text_mode_banner: str, access_keys: {}) -> str:
"""Search called from the timeline icon
"""
actor = path.replace('/search', '')
search_nickname = get_nickname_from_actor(actor)
if not search_nickname:
return ''
set_custom_background(base_dir, 'search-background', 'follow-background')
css_filename = base_dir + '/epicyon-search.css'
if os.path.isfile(base_dir + '/search.css'):
css_filename = base_dir + '/search.css'
instance_title = get_config_param(base_dir, 'instanceTitle')
follow_str = \
html_header_with_external_style(css_filename, instance_title, None)
# show a banner above the search box
search_banner_file, search_banner_filename = \
get_search_banner_file(base_dir, search_nickname, domain, theme)
text_mode_banner_str = html_keyboard_navigation(text_mode_banner, {}, {})
if text_mode_banner_str is None:
text_mode_banner_str = ''
if os.path.isfile(search_banner_filename):
timeline_key = access_keys['menuTimeline']
users_path = '/users/' + search_nickname
follow_str += \
'\n' + text_mode_banner_str + \
'\n'
follow_str += '\n' + \
'\n'
# show the search box
follow_str += '
\n'
follow_str += '
\n'
follow_str += '
\n'
follow_str += \
'
' + translate['Search screen text'] + '
\n'
follow_str += ' \n'
cached_hashtag_swarm_filename = \
acct_dir(base_dir, search_nickname, domain) + '/.hashtagSwarm'
swarm_str = ''
if os.path.isfile(cached_hashtag_swarm_filename):
try:
with open(cached_hashtag_swarm_filename, 'r') as fp_swarm:
swarm_str = fp_swarm.read()
except OSError:
print('EX: html_search unable to read cached hashtag swarm ' +
cached_hashtag_swarm_filename)
if not swarm_str:
swarm_str = html_hash_tag_swarm(base_dir, actor, translate)
if swarm_str:
try:
with open(cached_hashtag_swarm_filename, 'w+') as fp_hash:
fp_hash.write(swarm_str)
except OSError:
print('EX: html_search unable to save cached hashtag swarm ' +
cached_hashtag_swarm_filename)
follow_str += '
' + swarm_str + '
\n'
follow_str += '
\n'
follow_str += '
\n'
follow_str += '
\n'
follow_str += html_footer()
return follow_str
def html_skills_search(actor: str,
css_cache: {}, translate: {}, base_dir: str,
http_prefix: str,
skillsearch: str, instance_only: bool,
posts_per_page: int) -> str:
"""Show a page containing search results for a skill
"""
if skillsearch.startswith('*'):
skillsearch = skillsearch[1:].strip()
skillsearch = skillsearch.lower().strip('\n').strip('\r')
results = []
# search instance accounts
for subdir, _, files in os.walk(base_dir + '/accounts/'):
for fname in files:
if not fname.endswith('.json'):
continue
if not is_account_dir(fname):
continue
actor_filename = os.path.join(subdir, fname)
actor_json = load_json(actor_filename)
if actor_json:
if actor_json.get('id') and \
no_of_actor_skills(actor_json) > 0 and \
actor_json.get('name') and \
actor_json.get('icon'):
actor = actor_json['id']
actor_skills_list = actor_json['hasOccupation']['skills']
skills = get_skills_from_list(actor_skills_list)
for skill_name, skill_level in skills.items():
skill_name = skill_name.lower()
if not (skill_name in skillsearch or
skillsearch in skill_name):
continue
skill_level_str = str(skill_level)
if skill_level < 100:
skill_level_str = '0' + skill_level_str
if skill_level < 10:
skill_level_str = '0' + skill_level_str
index_str = \
skill_level_str + ';' + actor + ';' + \
actor_json['name'] + \
';' + actor_json['icon']['url']
if index_str not in results:
results.append(index_str)
break
if not instance_only:
# search actor cache
for subdir, _, files in os.walk(base_dir + '/cache/actors/'):
for fname in files:
if not fname.endswith('.json'):
continue
if not is_account_dir(fname):
continue
actor_filename = os.path.join(subdir, fname)
cached_actor_json = load_json(actor_filename)
if cached_actor_json:
if cached_actor_json.get('actor'):
actor_json = cached_actor_json['actor']
if actor_json.get('id') and \
no_of_actor_skills(actor_json) > 0 and \
actor_json.get('name') and \
actor_json.get('icon'):
actor = actor_json['id']
actor_skills_list = \
actor_json['hasOccupation']['skills']
skills = get_skills_from_list(actor_skills_list)
for skill_name, skill_level in skills.items():
skill_name = skill_name.lower()
if not (skill_name in skillsearch or
skillsearch in skill_name):
continue
skill_level_str = str(skill_level)
if skill_level < 100:
skill_level_str = '0' + skill_level_str
if skill_level < 10:
skill_level_str = '0' + skill_level_str
index_str = \
skill_level_str + ';' + actor + ';' + \
actor_json['name'] + \
';' + actor_json['icon']['url']
if index_str not in results:
results.append(index_str)
break
results.sort(reverse=True)
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
skill_search_form = \
html_header_with_external_style(css_filename, instance_title, None)
skill_search_form += \
'