__filename__ = "webapp_utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.4.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Web Interface"
import os
from shutil import copyfile
from collections import OrderedDict
from session import get_json
from session import get_json_valid
from utils import local_network_host
from utils import dangerous_markup
from utils import acct_handle_dir
from utils import remove_id_ending
from utils import get_attachment_property_value
from utils import is_account_dir
from utils import remove_html
from utils import get_protocol_prefixes
from utils import load_json
from utils import get_cached_post_filename
from utils import get_config_param
from utils import acct_dir
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import is_float
from utils import get_audio_extensions
from utils import get_video_extensions
from utils import get_image_extensions
from utils import local_actor_url
from utils import text_in_file
from utils import remove_eol
from filters import is_filtered
from cache import get_actor_public_key_from_id
from cache import store_person_in_cache
from content import add_html_tags
from content import replace_emoji_from_tags
from person import get_person_avatar_url
from posts import is_moderator
from blocking import is_blocked
from blocking import allowed_announce
from shares import vf_proposal_from_share
def minimizing_attached_images(base_dir: str, nickname: str, domain: str,
following_nickname: str,
following_domain: str) -> bool:
"""Returns true if images from the account being followed should be
minimized by default
"""
if following_nickname == nickname and following_domain == domain:
# reminder post
return False
minimize_filename = \
acct_dir(base_dir, nickname, domain) + '/followingMinimizeImages.txt'
handle = following_nickname + '@' + following_domain
if not os.path.isfile(minimize_filename):
following_filename = \
acct_dir(base_dir, nickname, domain) + '/following.txt'
if not os.path.isfile(following_filename):
return False
# create a new minimize file from the following file
try:
with open(minimize_filename, 'w+',
encoding='utf-8') as fp_min:
fp_min.write('')
except OSError:
print('EX: minimizing_attached_images 2 ' + minimize_filename)
return text_in_file(handle + '\n', minimize_filename, False)
def get_broken_link_substitute() -> str:
"""Returns html used to show a default image if the link to
an image is broken
"""
return " onerror=\"this.onerror=null; this.src='" + \
"/icons/avatar_default.png'\""
def html_following_list(base_dir: str, following_filename: str) -> str:
"""Returns a list of handles being followed
"""
with open(following_filename, 'r', encoding='utf-8') as following_file:
msg = following_file.read()
following_list = msg.split('\n')
following_list.sort()
if following_list:
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
following_list_html = \
html_header_with_external_style(css_filename,
instance_title, None)
for following_address in following_list:
if following_address:
following_list_html += \
'
@' + following_address + '
'
following_list_html += html_footer()
msg = following_list_html
return msg
return ''
def csv_following_list(following_filename: str,
base_dir: str, nickname: str, domain: str) -> str:
"""Returns a csv of handles being followed
"""
with open(following_filename, 'r', encoding='utf-8') as following_file:
msg = following_file.read()
following_list = msg.split('\n')
following_list.sort()
if following_list:
following_list_csv = ''
for following_address in following_list:
if not following_address:
continue
following_nickname = \
get_nickname_from_actor(following_address)
following_domain, _ = \
get_domain_from_actor(following_address)
announce_is_allowed = \
allowed_announce(base_dir, nickname, domain,
following_nickname,
following_domain)
notify_on_new = 'false'
languages = ''
person_notes = ''
person_notes_filename = \
acct_dir(base_dir, nickname, domain) + \
'/notes/' + following_address + '.txt'
if os.path.isfile(person_notes_filename):
with open(person_notes_filename, 'r',
encoding='utf-8') as fp_notes:
person_notes = fp_notes.read()
person_notes = person_notes.replace(',', ' ')
person_notes = person_notes.replace('"', "'")
person_notes = person_notes.replace('\n', ' ')
person_notes = person_notes.replace(' ', ' ')
if not following_list_csv:
following_list_csv = \
'Account address,Show boosts,' + \
'Notify on new posts,Languages,Notes\n'
following_list_csv += \
following_address + ',' + \
str(announce_is_allowed).lower() + ',' + \
notify_on_new + ',' + \
languages + ',' + \
person_notes + '\n'
msg = following_list_csv
return msg
return ''
def html_hashtag_blocked(base_dir: str, translate: {}) -> str:
"""Show the screen for a blocked hashtag
"""
blocked_hashtag_form = ''
css_filename = base_dir + '/epicyon-suspended.css'
if os.path.isfile(base_dir + '/suspended.css'):
css_filename = base_dir + '/suspended.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
blocked_hashtag_form = \
html_header_with_external_style(css_filename, instance_title, None)
blocked_hashtag_form += '
'
# obtain transcripts
transcripts = {}
for attach in post_json_object['object']['attachment']:
if not attach.get('mediaType'):
continue
if attach['mediaType'] != 'text/vtt':
continue
name = None
if attach.get('name'):
name = attach['name']
if attach.get('nameMap'):
for name_lang, name_value in attach['nameMap'].items():
if not isinstance(name_value, str):
continue
if name_lang.startswith(system_language):
name = name_value
if not name and attach.get('hreflang'):
name = attach['hreflang']
url = None
if attach.get('url'):
url = attach['url']
elif attach.get('href'):
url = attach['href']
if name and url:
transcripts[name] = remove_html(url)
for attach in post_json_object['object']['attachment']:
if not (attach.get('mediaType') and attach.get('url')):
continue
media_license = ''
if attach.get('schema:license'):
if not dangerous_markup(attach['schema:license'], False, []):
if not is_filtered(base_dir, nickname, domain,
attach['schema:license'],
system_language):
if '://' not in attach['schema:license']:
if len(attach['schema:license']) < 60:
media_license = attach['schema:license']
else:
media_license = attach['schema:license']
elif attach.get('license'):
if not dangerous_markup(attach['license'], False, []):
if not is_filtered(base_dir, nickname, domain,
attach['license'],
system_language):
if '://' not in attach['license']:
if len(attach['license']) < 60:
media_license = attach['license']
else:
media_license = attach['license']
media_creator = ''
if attach.get('schema:creator'):
if len(attach['schema:creator']) < 120:
if not dangerous_markup(attach['schema:creator'], False, []):
if not is_filtered(base_dir, nickname, domain,
attach['schema:creator'],
system_language):
media_creator = attach['schema:creator']
elif attach.get('attribution'):
if isinstance(attach['attribution'], list):
if len(attach['attribution']) > 0:
attrib_str = attach['attribution'][0]
if not dangerous_markup(attrib_str, False, []):
if not is_filtered(base_dir, nickname, domain,
attrib_str, system_language):
media_creator = attrib_str
media_type = attach['mediaType']
image_description = ''
if attach.get('name'):
image_description = attach['name'].replace('"', "'")
image_description = remove_html(image_description)
if _is_image_mime_type(media_type):
image_url = remove_html(attach['url'])
# display svg images if they have first been rendered harmless
svg_harmless = True
if 'svg' in media_type:
svg_harmless = False
if '://' + domain_full + '/' in image_url:
svg_harmless = True
else:
if post_id:
if '/' in image_url:
im_filename = image_url.split('/')[-1]
else:
im_filename = image_url
cached_svg_filename = \
base_dir + '/media/' + post_id + '_' + im_filename
if os.path.isfile(cached_svg_filename):
svg_harmless = True
if _is_attached_image(image_url) and svg_harmless:
if not attachment_str:
attachment_str += '
\n'
media_style_added = True
if attachment_ctr > 0:
attachment_str += ' '
if box_name == 'tlmedia':
gallery_str += '
\n'
attachment_ctr += 1
if media_style_added:
attachment_str += '
'
return attachment_str, gallery_str
def html_post_separator(base_dir: str, column: str) -> str:
"""Returns the html for a timeline post separator image
"""
theme = get_config_param(base_dir, 'theme')
filename = 'separator.png'
separator_class = "postSeparatorImage"
if column:
separator_class = "postSeparatorImage" + column.title()
filename = 'separator_' + column + '.png'
separator_image_filename = \
base_dir + '/theme/' + theme + '/icons/' + filename
separator_str = ''
if os.path.isfile(separator_image_filename):
separator_str = \
'
' + \
'
\n'
return separator_str
def html_highlight_label(label: str, highlight: bool) -> str:
"""If the given text should be highlighted then return
the appropriate markup.
This is so that in shell browsers, like lynx, it's possible
to see if the replies or DM button are highlighted.
"""
if not highlight:
return label
return '*' + str(label) + '*'
def get_avatar_image_url(session, base_dir: str, http_prefix: str,
post_actor: str, person_cache: {},
avatar_url: str, allow_downloads: bool,
signing_priv_key_pem: str) -> str:
"""Returns the avatar image url
"""
# get the avatar image url for the post actor
if not avatar_url:
avatar_url = \
get_person_avatar_url(base_dir, post_actor, person_cache)
avatar_url = \
update_avatar_image_cache(signing_priv_key_pem,
session, base_dir, http_prefix,
post_actor, avatar_url, person_cache,
allow_downloads)
else:
update_avatar_image_cache(signing_priv_key_pem,
session, base_dir, http_prefix,
post_actor, avatar_url, person_cache,
allow_downloads)
if not avatar_url:
avatar_url = post_actor + '/avatar.png'
return avatar_url
def html_hide_from_screen_reader(html_str: str) -> str:
"""Returns html which is hidden from screen readers
"""
return '' + html_str + ''
def html_keyboard_navigation(banner: str, links: {}, access_keys: {},
sub_heading: str = None,
users_path: str = None, translate: {} = None,
follow_approvals: bool = False) -> str:
"""Given a set of links return the html for keyboard navigation
"""
html_str = '
\n'
if banner:
html_str += '
\n' + banner + '\n
\n'
if sub_heading:
html_str += ' \n'
# show new follower approvals
if users_path and translate and follow_approvals:
html_str += '
\n'
# show the list of links
for title, url in links.items():
access_key_str = ''
if access_keys.get(title):
access_key_str = 'accesskey="' + access_keys[title] + '"'
html_str += '\n'
html_str += '
\n'
return html_str
def begin_edit_section(label: str) -> str:
"""returns the html for begining a dropdown section on edit profile screen
"""
return \
' ' + label + '\n' + \
'
'
def end_edit_section() -> str:
"""returns the html for ending a dropdown section on edit profile screen
"""
return '
\n'
def edit_text_field(label: str, name: str, value: str = "",
placeholder: str = "", required: bool = False) -> str:
"""Returns html for editing a text field
"""
if value is None:
value = ''
placeholder_str = ''
if placeholder:
placeholder_str = ' placeholder="' + placeholder + '"'
required_str = ''
if required:
required_str = ' required'
text_field_str = ''
if label:
text_field_str = \
' \n'
text_field_str += \
' \n'
return text_field_str
def edit_number_field(label: str, name: str, value: int,
min_value: int, max_value: int,
placeholder: int) -> str:
"""Returns html for editing an integer number field
"""
if value is None:
value = ''
placeholder_str = ''
if placeholder:
placeholder_str = ' placeholder="' + str(placeholder) + '"'
return \
' \n' + \
' \n'
def edit_currency_field(label: str, name: str, value: str,
placeholder: str, required: bool) -> str:
"""Returns html for editing a currency field
"""
if value is None:
value = '0.00'
placeholder_str = ''
if placeholder:
if placeholder.isdigit():
placeholder_str = ' placeholder="' + str(placeholder) + '"'
required_str = ''
if required:
required_str = ' required'
return \
' \n' + \
' \n'
def edit_check_box(label: str, name: str, checked: bool) -> str:
"""Returns html for editing a checkbox field
"""
checked_str = ''
if checked:
checked_str = ' checked'
return \
' ' + label + ' \n'
def edit_text_area(label: str, subtitle: str, name: str, value: str,
height: int, placeholder: str, spellcheck: bool) -> str:
"""Returns html for editing a textarea field
"""
if value is None:
value = ''
text = ''
if label:
text = ' \n'
if subtitle:
text += subtitle + ' \n'
text += \
' \n'
return text
def html_search_result_share(base_dir: str, shared_item: {}, translate: {},
http_prefix: str, domain_full: str,
contact_nickname: str, item_id: str,
actor: str, shares_file_type: str,
category: str) -> str:
"""Returns the html for an individual shared item
"""
shared_items_form = '
' + \
'\n' + \
'\n'
# should the remove button be shown?
show_remove_button = False
nickname = get_nickname_from_actor(actor)
if not nickname:
return ''
if actor.endswith('/users/' + contact_nickname):
show_remove_button = True
elif is_moderator(base_dir, nickname):
show_remove_button = True
else:
admin_nickname = get_config_param(base_dir, 'admin')
if admin_nickname:
if actor.endswith('/users/' + admin_nickname):
show_remove_button = True
if show_remove_button:
if shares_file_type == 'shares':
shared_items_form += \
' \n'
else:
shared_items_form += \
' \n'
shared_items_form += '
\n'
return shared_items_form
def html_show_share(base_dir: str, domain: str, nickname: str,
http_prefix: str, domain_full: str,
item_id: str, translate: {},
shared_items_federated_domains: [],
default_timeline: str, theme: str,
shares_file_type: str, category: str) -> str:
"""Shows an individual shared item after selecting it from the left column
"""
shares_json = None
share_url = item_id.replace('___', '://').replace('--', '/')
contact_nickname = get_nickname_from_actor(share_url)
if not contact_nickname:
return None
if '://' + domain_full + '/' in share_url:
# shared item on this instance
shares_filename = \
acct_dir(base_dir, contact_nickname, domain) + '/' + \
shares_file_type + '.json'
if not os.path.isfile(shares_filename):
return None
shares_json = load_json(shares_filename)
else:
# federated shared item
if shares_file_type == 'shares':
catalogs_dir = base_dir + '/cache/catalogs'
else:
catalogs_dir = base_dir + '/cache/wantedItems'
if not os.path.isdir(catalogs_dir):
return None
for _, _, files in os.walk(catalogs_dir):
for fname in files:
if '#' in fname:
continue
if not fname.endswith('.' + shares_file_type + '.json'):
continue
federated_domain = fname.split('.')[0]
if federated_domain not in shared_items_federated_domains:
continue
shares_filename = catalogs_dir + '/' + fname
shares_json = load_json(shares_filename)
if not shares_json:
continue
if shares_json.get(item_id):
break
break
if not shares_json:
return None
if not shares_json.get(item_id):
return None
shared_item = shares_json[item_id]
actor = local_actor_url(http_prefix, nickname, domain_full)
# filename of the banner shown at the top
banner_file, _ = \
get_banner_file(base_dir, nickname, domain, theme)
share_str = \
'\n' + \
'\n'
share_str += '\n' + \
' \n'
share_str += \
html_search_result_share(base_dir, shared_item, translate, http_prefix,
domain_full, contact_nickname, item_id,
actor, shares_file_type, category)
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
return html_header_with_external_style(css_filename,
instance_title, None) + \
share_str + html_footer()
def set_custom_background(base_dir: str, background: str,
new_background: str) -> str:
"""Sets a custom background
Returns the extension, if found
"""
ext = 'jpg'
if os.path.isfile(base_dir + '/img/' + background + '.' + ext):
if not new_background:
new_background = background
if not os.path.isfile(base_dir + '/accounts/' +
new_background + '.' + ext):
copyfile(base_dir + '/img/' + background + '.' + ext,
base_dir + '/accounts/' + new_background + '.' + ext)
return ext
return None
def html_common_emoji(base_dir: str, no_of_emoji: int) -> str:
"""Shows common emoji
"""
emojis_filename = base_dir + '/emoji/emoji.json'
if not os.path.isfile(emojis_filename):
emojis_filename = base_dir + '/emoji/default_emoji.json'
emojis_json = load_json(emojis_filename)
common_emoji_filename = base_dir + '/accounts/common_emoji.txt'
if not os.path.isfile(common_emoji_filename):
return ''
common_emoji = None
try:
with open(common_emoji_filename, 'r', encoding='utf-8') as fp_emoji:
common_emoji = fp_emoji.readlines()
except OSError:
print('EX: html_common_emoji unable to load file')
return ''
if not common_emoji:
return ''
line_ctr = 0
ctr = 0
html_str = ''
while ctr < no_of_emoji and line_ctr < len(common_emoji):
emoji_name1 = common_emoji[line_ctr].split(' ')[1]
emoji_name = remove_eol(emoji_name1)
emoji_icon_name = emoji_name
emoji_filename = base_dir + '/emoji/' + emoji_name + '.png'
if not os.path.isfile(emoji_filename):
emoji_filename = base_dir + '/customemoji/' + emoji_name + '.png'
if not os.path.isfile(emoji_filename):
# load the emojis index
if not emojis_json:
emojis_json = load_json(emojis_filename)
# lookup the name within the index to get the hex code
if emojis_json:
for emoji_tag, emoji_code in emojis_json.items():
if emoji_tag == emoji_name:
# get the filename based on the hex code
emoji_filename = \
base_dir + '/emoji/' + emoji_code + '.png'
emoji_icon_name = emoji_code
break
if os.path.isfile(emoji_filename):
# NOTE: deliberately no alt text, so that without graphics only
# the emoji name shows
html_str += \
'\n'
ctr += 1
line_ctr += 1
return html_str
def text_mode_browser(ua_str: str) -> bool:
"""Does the user agent indicate a text mode browser?
"""
if ua_str:
text_mode_agents = ('Lynx/', 'w3m/', 'Links (', 'Emacs/', 'ELinks')
for agent in text_mode_agents:
if agent in ua_str:
return True
return False
def language_right_to_left(language: str) -> bool:
"""is the given language written from right to left?
"""
rtl_languages = ('ar', 'fa')
if language in rtl_languages:
return True
return False
def get_default_path(media_instance: bool, blogs_instance: bool,
nickname: str) -> str:
"""Returns the default timeline
"""
if blogs_instance:
path = '/users/' + nickname + '/tlblogs'
elif media_instance:
path = '/users/' + nickname + '/tlmedia'
else:
path = '/users/' + nickname + '/inbox'
return path
def html_following_data_list(base_dir: str, nickname: str,
domain: str, domain_full: str,
following_type: str,
use_petnames: bool) -> str:
"""Returns a datalist of handles being followed
followingHandles, followersHandles
"""
list_str = '\n'
return list_str
def html_following_dropdown(base_dir: str, nickname: str,
domain: str, domain_full: str,
following_type: str,
use_petnames: bool) -> str:
"""Returns a select list of handles being followed or of followers
"""
list_str = '\n'
return list_str
def get_buy_links(post_json_object: str, translate: {}, buy_sites: {}) -> {}:
"""Returns any links to buy something from an external site
"""
if not post_json_object['object'].get('attachment'):
return {}
if not isinstance(post_json_object['object']['attachment'], list):
return {}
links = {}
buy_strings = []
for buy_str in ('Buy', 'Purchase', 'Subscribe'):
if translate.get(buy_str):
buy_str = translate[buy_str]
buy_strings += buy_str.lower()
buy_strings += ('Paypal', 'Stripe', 'Cashapp', 'Venmo')
for item in post_json_object['object']['attachment']:
if not isinstance(item, dict):
continue
if not item.get('name'):
continue
if not isinstance(item['name'], str):
continue
if not item.get('type'):
continue
if not item.get('href'):
continue
if not isinstance(item['type'], str):
continue
if not isinstance(item['href'], str):
continue
if item['type'] != 'Link':
continue
if not item.get('mediaType'):
continue
if not isinstance(item['mediaType'], str):
continue
if 'html' not in item['mediaType']:
continue
item_name = item['name']
# The name should not be excessively long
if len(item_name) > 32:
continue
# there should be no html in the name
if remove_html(item_name) != item_name:
continue
# there should be no html in the link
if '<' in item['href'] or \
'://' not in item['href'] or \
' ' in item['href']:
continue
if item.get('rel'):
if isinstance(item['rel'], str):
if item['rel'] in ('payment', 'pay', 'donate', 'donation',
'buy', 'purchase'):
links[item_name] = remove_html(item['href'])
continue
if buy_sites:
# limited to an allowlist of buying sites
for site, buy_domain in buy_sites.items():
if buy_domain in item['href']:
links[site.title()] = remove_html(item['href'])
continue
else:
# The name only needs to indicate that this is a buy link
for buy_str in buy_strings:
if buy_str in item_name.lower():
links[item_name] = remove_html(item['href'])
continue
return links
def load_buy_sites(base_dir: str) -> {}:
"""Loads domains from which buying is permitted
"""
buy_sites_filename = base_dir + '/accounts/buy_sites.json'
if os.path.isfile(buy_sites_filename):
buy_sites_json = load_json(buy_sites_filename)
if buy_sites_json:
return buy_sites_json
return {}