__filename__ = "webapp_utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.3.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Web Interface"
import os
from shutil import copyfile
from collections import OrderedDict
from session import get_json
from utils import remove_id_ending
from utils import get_attachment_property_value
from utils import is_account_dir
from utils import remove_html
from utils import get_protocol_prefixes
from utils import load_json
from utils import get_cached_post_filename
from utils import get_config_param
from utils import acct_dir
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import is_float
from utils import get_audio_extensions
from utils import get_video_extensions
from utils import get_image_extensions
from utils import local_actor_url
from utils import text_in_file
from utils import remove_eol
from cache import store_person_in_cache
from content import add_html_tags
from content import replace_emoji_from_tags
from person import get_person_avatar_url
from posts import is_moderator
from blocking import is_blocked
def minimizing_attached_images(base_dir: str, nickname: str, domain: str,
following_nickname: str,
following_domain: str) -> bool:
"""Returns true if images from the account being followed should be
minimized by default
"""
if following_nickname == nickname and following_domain == domain:
# reminder post
return False
minimize_filename = \
acct_dir(base_dir, nickname, domain) + '/followingMinimizeImages.txt'
handle = following_nickname + '@' + following_domain
if not os.path.isfile(minimize_filename):
following_filename = \
acct_dir(base_dir, nickname, domain) + '/following.txt'
if not os.path.isfile(following_filename):
return False
# create a new minimize file from the following file
try:
with open(minimize_filename, 'w+',
encoding='utf-8') as fp_min:
fp_min.write('')
except OSError:
print('EX: minimizing_attached_images 2 ' + minimize_filename)
return text_in_file(handle + '\n', minimize_filename)
def get_broken_link_substitute() -> str:
"""Returns html used to show a default image if the link to
an image is broken
"""
return " onerror=\"this.onerror=null; this.src='" + \
"/icons/avatar_default.png'\""
def html_following_list(base_dir: str, following_filename: str) -> str:
"""Returns a list of handles being followed
"""
with open(following_filename, 'r', encoding='utf-8') as following_file:
msg = following_file.read()
following_list = msg.split('\n')
following_list.sort()
if following_list:
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
following_list_html = \
html_header_with_external_style(css_filename,
instance_title, None)
for following_address in following_list:
if following_address:
following_list_html += \
'
@' + following_address + ' '
following_list_html += html_footer()
msg = following_list_html
return msg
return ''
def csv_following_list(following_filename: str) -> str:
"""Returns a csv of handles being followed
"""
with open(following_filename, 'r', encoding='utf-8') as following_file:
msg = following_file.read()
following_list = msg.split('\n')
following_list.sort()
if following_list:
following_list_csv = ''
for following_address in following_list:
if not following_address:
continue
if following_list_csv:
following_list_csv += '\n'
following_list_csv += following_address + ',true'
msg = 'Account address,Show boosts\n' + following_list_csv
return msg
return ''
def html_hashtag_blocked(base_dir: str, translate: {}) -> str:
"""Show the screen for a blocked hashtag
"""
blocked_hashtag_form = ''
css_filename = base_dir + '/epicyon-suspended.css'
if os.path.isfile(base_dir + '/suspended.css'):
css_filename = base_dir + '/suspended.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
blocked_hashtag_form = \
html_header_with_external_style(css_filename, instance_title, None)
blocked_hashtag_form += '\n'
blocked_hashtag_form += html_footer()
return blocked_hashtag_form
def header_buttons_front_screen(translate: {},
nickname: str, box_name: str,
authorized: bool,
icons_as_buttons: bool) -> str:
"""Returns the header buttons for the front page of a news instance
"""
header_str = ''
if nickname == 'news':
button_features = 'buttonMobile'
button_newswire = 'buttonMobile'
button_links = 'buttonMobile'
if box_name == 'features':
button_features = 'buttonselected'
elif box_name == 'newswire':
button_newswire = 'buttonselected'
elif box_name == 'links':
button_links = 'buttonselected'
header_str += \
' ' + \
'' + \
'' + translate['Features'] + \
' '
if not authorized:
header_str += \
' ' + \
'' + \
'' + translate['Login'] + \
' '
if icons_as_buttons:
header_str += \
' ' + \
'' + \
'' + translate['Newswire'] + \
' '
header_str += \
' ' + \
'' + \
'' + translate['Links'] + \
' '
else:
header_str += \
' ' + \
' \n'
header_str += \
' ' + \
' \n'
else:
if not authorized:
header_str += \
' ' + \
'' + \
'' + translate['Login'] + \
' '
if header_str:
header_str = \
'\n \n' + \
header_str + \
'
\n'
return header_str
def get_content_warning_button(post_id: str, translate: {},
content: str) -> str:
"""Returns the markup for a content warning button
"""
return ' ' + \
translate['SHOW MORE'] + ' ' + \
'' + content + \
'
\n'
def _set_actor_property_url(actor_json: {},
property_name: str, url: str) -> None:
"""Sets a url for the given actor property
"""
if not actor_json.get('attachment'):
actor_json['attachment'] = []
property_name_lower = property_name.lower()
# remove any existing value
property_found = None
for property_value in actor_json['attachment']:
name_value = None
if property_value.get('name'):
name_value = property_value['name']
elif property_value.get('schema:name'):
name_value = property_value['schema:name']
if not name_value:
continue
if not property_value.get('type'):
continue
if not name_value.lower().startswith(property_name_lower):
continue
property_found = property_value
break
if property_found:
actor_json['attachment'].remove(property_found)
prefixes = get_protocol_prefixes()
prefix_found = False
for prefix in prefixes:
if url.startswith(prefix):
prefix_found = True
break
if not prefix_found:
return
if '.' not in url:
return
if ' ' in url:
return
if ',' in url:
return
for property_value in actor_json['attachment']:
name_value = None
if property_value.get('name'):
name_value = property_value['name']
elif property_value.get('schema:name'):
name_value = property_value['schema:name']
if not name_value:
continue
if not property_value.get('type'):
continue
if not name_value.lower().startswith(property_name_lower):
continue
if not property_value['type'].endswith('PropertyValue'):
continue
prop_value_name, _ = \
get_attachment_property_value(property_value)
if not prop_value_name:
continue
property_value[prop_value_name] = url
return
new_address = {
"name": property_name,
"type": "PropertyValue",
"value": url
}
actor_json['attachment'].append(new_address)
def set_blog_address(actor_json: {}, blog_address: str) -> None:
"""Sets an blog address for the given actor
"""
_set_actor_property_url(actor_json, 'Blog', remove_html(blog_address))
def update_avatar_image_cache(signing_priv_key_pem: str,
session, base_dir: str, http_prefix: str,
actor: str, avatar_url: str,
person_cache: {}, allow_downloads: bool,
force: bool = False, debug: bool = False) -> str:
"""Updates the cached avatar for the given actor
"""
if not avatar_url:
return None
actor_str = actor.replace('/', '-')
avatar_image_path = base_dir + '/cache/avatars/' + actor_str
# try different image types
image_formats = {
'png': 'png',
'jpg': 'jpeg',
'jxl': 'jxl',
'jpeg': 'jpeg',
'gif': 'gif',
'svg': 'svg+xml',
'webp': 'webp',
'avif': 'avif'
}
avatar_image_filename = None
for im_format, mime_type in image_formats.items():
if avatar_url.endswith('.' + im_format) or \
'.' + im_format + '?' in avatar_url:
session_headers = {
'Accept': 'image/' + mime_type
}
avatar_image_filename = avatar_image_path + '.' + im_format
if not avatar_image_filename:
return None
if (not os.path.isfile(avatar_image_filename) or force) and \
allow_downloads:
try:
if debug:
print('avatar image url: ' + avatar_url)
result = session.get(avatar_url,
headers=session_headers,
params=None,
allow_redirects=False)
if result.status_code < 200 or \
result.status_code > 202:
if debug:
print('Avatar image download failed with status ' +
str(result.status_code))
# remove partial download
if os.path.isfile(avatar_image_filename):
try:
os.remove(avatar_image_filename)
except OSError:
print('EX: ' +
'update_avatar_image_cache unable to delete ' +
avatar_image_filename)
else:
with open(avatar_image_filename, 'wb') as fp_av:
fp_av.write(result.content)
if debug:
print('avatar image downloaded for ' + actor)
return avatar_image_filename.replace(base_dir + '/cache',
'')
except Exception as ex:
print('EX: Failed to download avatar image: ' +
str(avatar_url) + ' ' + str(ex))
prof = 'https://www.w3.org/ns/activitystreams'
if '/channel/' not in actor or '/accounts/' not in actor:
session_headers = {
'Accept': 'application/activity+json; profile="' + prof + '"'
}
else:
session_headers = {
'Accept': 'application/ld+json; profile="' + prof + '"'
}
person_json = \
get_json(signing_priv_key_pem, session, actor,
session_headers, None,
debug, __version__, http_prefix, None)
if person_json:
if not person_json.get('id'):
return None
if not person_json.get('publicKey'):
return None
if not person_json['publicKey'].get('publicKeyPem'):
return None
if person_json['id'] != actor:
return None
if not person_cache.get(actor):
return None
if person_cache[actor]['actor']['publicKey']['publicKeyPem'] != \
person_json['publicKey']['publicKeyPem']:
print("ERROR: " +
"public keys don't match when downloading actor for " +
actor)
return None
store_person_in_cache(base_dir, actor, person_json, person_cache,
allow_downloads)
return get_person_avatar_url(base_dir, actor, person_cache)
return None
return avatar_image_filename.replace(base_dir + '/cache', '')
def scheduled_posts_exist(base_dir: str, nickname: str, domain: str) -> bool:
"""Returns true if there are posts scheduled to be delivered
"""
schedule_index_filename = \
acct_dir(base_dir, nickname, domain) + '/schedule.index'
if not os.path.isfile(schedule_index_filename):
return False
if text_in_file('#users#', schedule_index_filename):
return True
return False
def shares_timeline_json(actor: str, pageNumber: int, items_per_page: int,
base_dir: str, domain: str, nickname: str,
max_shares_per_account: int,
shared_items_federated_domains: [],
shares_file_type: str) -> ({}, bool):
"""Get a page on the shared items timeline as json
max_shares_per_account helps to avoid one person dominating the timeline
by sharing a large number of things
"""
all_shares_json = {}
for _, dirs, files in os.walk(base_dir + '/accounts'):
for handle in dirs:
if not is_account_dir(handle):
continue
account_dir = base_dir + '/accounts/' + handle
shares_filename = account_dir + '/' + shares_file_type + '.json'
if not os.path.isfile(shares_filename):
continue
shares_json = load_json(shares_filename)
if not shares_json:
continue
account_nickname = handle.split('@')[0]
# Don't include shared items from blocked accounts
if account_nickname != nickname:
if is_blocked(base_dir, nickname, domain,
account_nickname, domain, None):
continue
# actor who owns this share
owner = actor.split('/users/')[0] + '/users/' + account_nickname
ctr = 0
for item_id, item in shares_json.items():
# assign owner to the item
item['actor'] = owner
item['shareId'] = item_id
all_shares_json[str(item['published'])] = item
ctr += 1
if ctr >= max_shares_per_account:
break
break
if shared_items_federated_domains:
if shares_file_type == 'shares':
catalogs_dir = base_dir + '/cache/catalogs'
else:
catalogs_dir = base_dir + '/cache/wantedItems'
if os.path.isdir(catalogs_dir):
for _, dirs, files in os.walk(catalogs_dir):
for fname in files:
if '#' in fname:
continue
if not fname.endswith('.' + shares_file_type + '.json'):
continue
federated_domain = fname.split('.')[0]
if federated_domain not in shared_items_federated_domains:
continue
shares_filename = catalogs_dir + '/' + fname
shares_json = load_json(shares_filename)
if not shares_json:
continue
ctr = 0
for item_id, item in shares_json.items():
# assign owner to the item
if '--shareditems--' not in item_id:
continue
share_actor = item_id.split('--shareditems--')[0]
share_actor = share_actor.replace('___', '://')
share_actor = share_actor.replace('--', '/')
share_nickname = get_nickname_from_actor(share_actor)
if not share_nickname:
continue
if is_blocked(base_dir, nickname, domain,
share_nickname, federated_domain, None):
continue
item['actor'] = share_actor
item['shareId'] = item_id
all_shares_json[str(item['published'])] = item
ctr += 1
if ctr >= max_shares_per_account:
break
break
# sort the shared items in descending order of publication date
shares_json = OrderedDict(sorted(all_shares_json.items(), reverse=True))
last_page = False
start_index = items_per_page * pageNumber
max_index = len(shares_json.items())
if max_index < items_per_page:
last_page = True
if start_index >= max_index - items_per_page:
last_page = True
start_index = max_index - items_per_page
start_index = max(start_index, 0)
ctr = 0
result_json = {}
for published, item in shares_json.items():
if ctr >= start_index + items_per_page:
break
if ctr < start_index:
ctr += 1
continue
result_json[published] = item
ctr += 1
return result_json, last_page
def post_contains_public(post_json_object: {}) -> bool:
"""Does the given post contain #Public
"""
contains_public = False
if not post_json_object['object'].get('to'):
return contains_public
for to_address in post_json_object['object']['to']:
if to_address.endswith('#Public'):
contains_public = True
break
if not contains_public:
if post_json_object['object'].get('cc'):
for to_address2 in post_json_object['object']['cc']:
if to_address2.endswith('#Public'):
contains_public = True
break
return contains_public
def _get_image_file(base_dir: str, name: str, directory: str,
theme: str) -> (str, str):
"""
returns the filenames for an image with the given name
"""
banner_extensions = get_image_extensions()
banner_file = ''
banner_filename = ''
im_name = name
for ext in banner_extensions:
banner_file_test = im_name + '.' + ext
banner_filename_test = directory + '/' + banner_file_test
if os.path.isfile(banner_filename_test):
banner_file = banner_file_test
banner_filename = banner_filename_test
return banner_file, banner_filename
# if not found then use the default image
theme = 'default'
directory = base_dir + '/theme/' + theme
for ext in banner_extensions:
banner_file_test = name + '.' + ext
banner_filename_test = directory + '/' + banner_file_test
if os.path.isfile(banner_filename_test):
banner_file = name + '_' + theme + '.' + ext
banner_filename = banner_filename_test
break
return banner_file, banner_filename
def get_banner_file(base_dir: str,
nickname: str, domain: str, theme: str) -> (str, str):
"""Gets the image for the timeline banner
"""
account_dir = acct_dir(base_dir, nickname, domain)
return _get_image_file(base_dir, 'banner', account_dir, theme)
def get_profile_background_file(base_dir: str,
nickname: str, domain: str,
theme: str) -> (str, str):
"""Gets the image for the profile background
"""
account_dir = acct_dir(base_dir, nickname, domain)
return _get_image_file(base_dir, 'image', account_dir, theme)
def get_search_banner_file(base_dir: str,
nickname: str, domain: str,
theme: str) -> (str, str):
"""Gets the image for the search banner
"""
account_dir = acct_dir(base_dir, nickname, domain)
return _get_image_file(base_dir, 'search_banner', account_dir, theme)
def get_left_image_file(base_dir: str,
nickname: str, domain: str, theme: str) -> (str, str):
"""Gets the image for the left column
"""
account_dir = acct_dir(base_dir, nickname, domain)
return _get_image_file(base_dir, 'left_col_image', account_dir, theme)
def get_right_image_file(base_dir: str,
nickname: str, domain: str, theme: str) -> (str, str):
"""Gets the image for the right column
"""
account_dir = acct_dir(base_dir, nickname, domain)
return _get_image_file(base_dir, 'right_col_image', account_dir, theme)
def _get_variable_from_css(css_str: str, variable: str) -> str:
"""Gets a variable value from the css file text
"""
if '--' + variable + ':' not in css_str:
return None
value = css_str.split('--' + variable + ':')[1]
if ';' in value:
value = value.split(';')[0].strip()
value = remove_html(value)
if ' ' in value:
value = None
return value
def get_pwa_theme_colors(css_filename: str) -> (str, str):
"""Gets the theme/statusbar color for progressive web apps
"""
default_pwa_theme_color = 'apple-mobile-web-app-status-bar-style'
pwa_theme_color = default_pwa_theme_color
default_pwa_theme_background_color = 'black-translucent'
pwa_theme_background_color = default_pwa_theme_background_color
if not os.path.isfile(css_filename):
return pwa_theme_color, pwa_theme_background_color
css_str = ''
with open(css_filename, 'r', encoding='utf-8') as fp_css:
css_str = fp_css.read()
pwa_theme_color = \
_get_variable_from_css(css_str, 'pwa-theme-color')
if not pwa_theme_color:
pwa_theme_color = default_pwa_theme_color
pwa_theme_background_color = \
_get_variable_from_css(css_str, 'pwa-theme-background-color')
if not pwa_theme_background_color:
pwa_theme_background_color = default_pwa_theme_background_color
return pwa_theme_color, pwa_theme_background_color
def html_header_with_external_style(css_filename: str, instance_title: str,
metadata: str, lang='en') -> str:
if metadata is None:
metadata = ''
css_file = '/' + css_filename.split('/')[-1]
pwa_theme_color, pwa_theme_background_color = \
get_pwa_theme_colors(css_filename)
html_str = \
'\n' + \
'\n' + \
'\n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
metadata + \
' \n' + \
' ' + instance_title + ' \n' + \
' \n' + \
' \n'
return html_str
def html_header_with_person_markup(css_filename: str, instance_title: str,
actor_json: {}, city: str,
content_license_url: str,
lang='en') -> str:
"""html header which includes person markup
https://schema.org/Person
"""
if not actor_json:
html_str = \
html_header_with_external_style(css_filename,
instance_title, None, lang)
return html_str
city_markup = ''
if city:
city = city.lower().title()
add_comma = ''
country_markup = ''
if ',' in city:
country = city.split(',', 1)[1].strip().title()
city = city.split(',', 1)[0]
country_markup = \
' "addressCountry": "' + country + '"\n'
add_comma = ','
city_markup = \
' "address": {\n' + \
' "@type": "PostalAddress",\n' + \
' "addressLocality": "' + city + '"' + \
add_comma + '\n' + country_markup + ' },\n'
skills_markup = ''
if actor_json.get('hasOccupation'):
if isinstance(actor_json['hasOccupation'], list):
skills_markup = ' "hasOccupation": [\n'
first_entry = True
for skill_dict in actor_json['hasOccupation']:
if skill_dict['@type'] == 'Role':
if not first_entry:
skills_markup += ',\n'
skl = skill_dict['hasOccupation']
role_name = skl['name']
if not role_name:
role_name = 'member'
category = \
skl['occupationalCategory']['codeValue']
category_url = \
'https://www.onetonline.org/link/summary/' + category
skills_markup += \
' {\n' + \
' "@type": "Role",\n' + \
' "hasOccupation": {\n' + \
' "@type": "Occupation",\n' + \
' "name": "' + role_name + '",\n' + \
' "description": ' + \
'"Fediverse instance role",\n' + \
' "occupationLocation": {\n' + \
' "@type": "City",\n' + \
' "name": "' + city + '"\n' + \
' },\n' + \
' "occupationalCategory": {\n' + \
' "@type": "CategoryCode",\n' + \
' "inCodeSet": {\n' + \
' "@type": "CategoryCodeSet",\n' + \
' "name": "O*Net-SOC",\n' + \
' "dateModified": "2019",\n' + \
' ' + \
'"url": "https://www.onetonline.org/"\n' + \
' },\n' + \
' "codeValue": "' + category + '",\n' + \
' "url": "' + category_url + '"\n' + \
' }\n' + \
' }\n' + \
' }'
elif skill_dict['@type'] == 'Occupation':
if not first_entry:
skills_markup += ',\n'
oc_name = skill_dict['name']
if not oc_name:
oc_name = 'member'
skills_list = skill_dict['skills']
skills_list_str = '['
for skill_str in skills_list:
if skills_list_str != '[':
skills_list_str += ', '
skills_list_str += '"' + skill_str + '"'
skills_list_str += ']'
skills_markup += \
' {\n' + \
' "@type": "Occupation",\n' + \
' "name": "' + oc_name + '",\n' + \
' "description": ' + \
'"Fediverse instance occupation",\n' + \
' "occupationLocation": {\n' + \
' "@type": "City",\n' + \
' "name": "' + city + '"\n' + \
' },\n' + \
' "skills": ' + skills_list_str + '\n' + \
' }'
first_entry = False
skills_markup += '\n ],\n'
description = remove_html(actor_json['summary'])
name_str = remove_html(actor_json['name'])
domain_full = actor_json['id'].split('://')[1].split('/')[0]
handle = actor_json['preferredUsername'] + '@' + domain_full
person_markup = \
' "about": {\n' + \
' "@type" : "Person",\n' + \
' "name": "' + name_str + '",\n' + \
' "image": "' + actor_json['icon']['url'] + '",\n' + \
' "description": "' + description + '",\n' + \
city_markup + skills_markup + \
' "url": "' + actor_json['id'] + '"\n' + \
' },\n'
profile_markup = \
' \n'
description = remove_html(description)
og_metadata = \
" \n" + \
" \n" + \
" \n" + \
" \n" + \
" \n" + \
" \n" + \
" \n" + \
" \n" + \
" \n" + \
" \n" + \
" \n"
if actor_json.get('attachment'):
og_tags = (
'email', 'openpgp', 'blog', 'xmpp', 'matrix', 'briar',
'cwtch', 'languages'
)
for attach_json in actor_json['attachment']:
if not attach_json.get('name'):
if not attach_json.get('schema:name'):
continue
prop_value_name, _ = get_attachment_property_value(attach_json)
if not prop_value_name:
continue
if attach_json.get('name'):
name = attach_json['name'].lower()
else:
name = attach_json['schema:name'].lower()
value = attach_json[prop_value_name]
for og_tag in og_tags:
if name != og_tag:
continue
og_metadata += \
" \n"
html_str = \
html_header_with_external_style(css_filename, instance_title,
og_metadata + profile_markup, lang)
return html_str
def html_header_with_website_markup(css_filename: str, instance_title: str,
http_prefix: str, domain: str,
system_language: str) -> str:
"""html header which includes website markup
https://schema.org/WebSite
"""
license_url = 'https://www.gnu.org/licenses/agpl-3.0.rdf'
# social networking category
genre_url = 'http://vocab.getty.edu/aat/300312270'
website_markup = \
' \n'
og_metadata = \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n'
html_str = \
html_header_with_external_style(css_filename, instance_title,
og_metadata + website_markup,
system_language)
return html_str
def html_header_with_blog_markup(css_filename: str, instance_title: str,
http_prefix: str, domain: str, nickname: str,
system_language: str,
published: str, modified: str,
title: str, snippet: str,
translate: {}, url: str,
content_license_url: str) -> str:
"""html header which includes blog post markup
https://schema.org/BlogPosting
"""
author_url = local_actor_url(http_prefix, nickname, domain)
about_url = http_prefix + '://' + domain + '/about.html'
# license for content on the site may be different from
# the software license
blog_markup = \
' \n'
og_metadata = \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n'
html_str = \
html_header_with_external_style(css_filename, instance_title,
og_metadata + blog_markup,
system_language)
return html_str
def html_footer() -> str:
html_str = ' \n'
html_str += '\n'
return html_str
def load_individual_post_as_html_from_cache(base_dir: str,
nickname: str, domain: str,
post_json_object: {}) -> str:
"""If a cached html version of the given post exists then load it and
return the html text
This is much quicker than generating the html from the json object
"""
cached_post_filename = \
get_cached_post_filename(base_dir, nickname, domain, post_json_object)
post_html = ''
if not cached_post_filename:
return post_html
if not os.path.isfile(cached_post_filename):
return post_html
tries = 0
while tries < 3:
try:
with open(cached_post_filename, 'r', encoding='utf-8') as file:
post_html = file.read()
break
except Exception as ex:
print('ERROR: load_individual_post_as_html_from_cache ' +
str(tries) + ' ' + str(ex))
# no sleep
tries += 1
if post_html:
return post_html
def add_emoji_to_display_name(session, base_dir: str, http_prefix: str,
nickname: str, domain: str,
display_name: str, in_profile_name: bool,
translate: {}) -> str:
"""Adds emoji icons to display names or CW on individual posts
"""
if ':' not in display_name:
return display_name
display_name = display_name.replace('', '').replace('
', '')
emoji_tags = {}
# print('TAG: display_name before tags: ' + display_name)
display_name = \
add_html_tags(base_dir, http_prefix,
nickname, domain, display_name, [],
emoji_tags, translate)
display_name = display_name.replace('', '').replace('
', '')
# print('TAG: display_name after tags: ' + display_name)
# convert the emoji dictionary to a list
emoji_tags_list = []
for _, tag in emoji_tags.items():
emoji_tags_list.append(tag)
# print('TAG: emoji tags list: ' + str(emoji_tags_list))
if not in_profile_name:
display_name = \
replace_emoji_from_tags(session, base_dir,
display_name, emoji_tags_list,
'post header', False, False)
else:
display_name = \
replace_emoji_from_tags(session, base_dir,
display_name, emoji_tags_list, 'profile',
False, False)
# print('TAG: display_name after tags 2: ' + display_name)
# remove any stray emoji
while ':' in display_name:
if '://' in display_name:
break
emoji_str = display_name.split(':')[1]
prev_display_name = display_name
display_name = display_name.replace(':' + emoji_str + ':', '').strip()
if prev_display_name == display_name:
break
# print('TAG: display_name after tags 3: ' + display_name)
# print('TAG: display_name after tag replacements: ' + display_name)
return display_name
def _is_image_mime_type(mime_type: str) -> bool:
"""Is the given mime type an image?
"""
if mime_type == 'image/svg+xml':
return True
if not mime_type.startswith('image/'):
return False
extensions = get_image_extensions()
ext = mime_type.split('/')[1]
if ext in extensions:
return True
return False
def _is_video_mime_type(mime_type: str) -> bool:
"""Is the given mime type a video?
"""
if not mime_type.startswith('video/'):
return False
extensions = get_video_extensions()
ext = mime_type.split('/')[1]
if ext in extensions:
return True
return False
def _is_audio_mime_type(mime_type: str) -> bool:
"""Is the given mime type an audio file?
"""
if mime_type == 'audio/mpeg':
return True
if not mime_type.startswith('audio/'):
return False
extensions = get_audio_extensions()
ext = mime_type.split('/')[1]
if ext in extensions:
return True
return False
def _is_attached_image(attachment_filename: str) -> bool:
"""Is the given attachment filename an image?
"""
if '.' not in attachment_filename:
return False
image_ext = (
'png', 'jpg', 'jpeg', 'webp', 'avif', 'svg', 'gif', 'jxl'
)
ext = attachment_filename.split('.')[-1]
if ext in image_ext:
return True
return False
def _is_attached_video(attachment_filename: str) -> bool:
"""Is the given attachment filename a video?
"""
if '.' not in attachment_filename:
return False
video_ext = (
'mp4', 'webm', 'ogv'
)
ext = attachment_filename.split('.')[-1]
if ext in video_ext:
return True
return False
def _is_nsfw(content: str) -> bool:
"""Does the given content indicate nsfw?
"""
content_lower = content.lower()
nsfw_tags = (
'nsfw', 'porn', 'pr0n', 'explicit', 'lewd',
'nude', 'boob', 'erotic', 'sex'
)
for tag_name in nsfw_tags:
if tag_name in content_lower:
return True
return False
def get_post_attachments_as_html(base_dir: str,
nickname: str, domain: str,
domain_full: str,
post_json_object: {}, box_name: str,
translate: {},
is_muted: bool, avatar_link: str,
reply_str: str, announce_str: str,
like_str: str,
bookmark_str: str, delete_str: str,
mute_str: str,
content: str) -> (str, str):
"""Returns a string representing any attachments
"""
attachment_str = ''
gallery_str = ''
if not post_json_object['object'].get('attachment'):
return attachment_str, gallery_str
if not isinstance(post_json_object['object']['attachment'], list):
return attachment_str, gallery_str
attachment_ctr = 0
attachment_str = ''
media_style_added = False
post_id = None
if post_json_object['object'].get('id'):
post_id = post_json_object['object']['id']
post_id = remove_id_ending(post_id).replace('/', '--')
for attach in post_json_object['object']['attachment']:
if not (attach.get('mediaType') and attach.get('url')):
continue
media_type = attach['mediaType']
image_description = ''
if attach.get('name'):
image_description = attach['name'].replace('"', "'")
if _is_image_mime_type(media_type):
image_url = attach['url']
# display svg images if they have first been rendered harmless
svg_harmless = True
if 'svg' in media_type:
svg_harmless = False
if '://' + domain_full + '/' in image_url:
svg_harmless = True
else:
if post_id:
if '/' in image_url:
im_filename = image_url.split('/')[-1]
else:
im_filename = image_url
cached_svg_filename = \
base_dir + '/media/' + post_id + '_' + im_filename
if os.path.isfile(cached_svg_filename):
svg_harmless = True
if _is_attached_image(attach['url']) and svg_harmless:
if not attachment_str:
attachment_str += '\n'
media_style_added = True
if attachment_ctr > 0:
attachment_str += '
'
if box_name == 'tlmedia':
gallery_str += '
\n'
if not is_muted:
gallery_str += '
\n'
gallery_str += \
' \n'
gallery_str += ' \n'
if post_json_object['object'].get('url'):
image_post_url = post_json_object['object']['url']
else:
image_post_url = post_json_object['object']['id']
if image_description and not is_muted:
gallery_str += \
'
' + \
image_description + '
\n'
else:
gallery_str += \
'
--- '
gallery_str += '
\n'
gallery_str += \
' ' + reply_str + announce_str + like_str + \
bookmark_str + delete_str + mute_str + '\n'
gallery_str += '
\n'
gallery_str += '
\n'
gallery_str += ' ' + avatar_link + '\n'
gallery_str += '
\n'
gallery_str += '
\n'
# optionally hide the image
attributed_actor = None
minimize_images = False
if post_json_object['object'].get('attributedTo'):
if isinstance(post_json_object['object']['attributedTo'],
str):
attributed_actor = \
post_json_object['object']['attributedTo']
if attributed_actor:
following_nickname = \
get_nickname_from_actor(attributed_actor)
following_domain, _ = \
get_domain_from_actor(attributed_actor)
minimize_images = \
minimizing_attached_images(base_dir, nickname, domain,
following_nickname,
following_domain)
# minimize any NSFW images
if not minimize_images and content:
if _is_nsfw(content):
minimize_images = True
if minimize_images:
show_img_str = 'SHOW MEDIA'
if translate:
show_img_str = translate['SHOW MEDIA']
attachment_str += \
'
' + \
show_img_str + ' ' + \
'\n'
attachment_str += '
'
attachment_str += \
' \n'
if minimize_images:
attachment_str += '
\n'
attachment_ctr += 1
elif _is_video_mime_type(media_type):
if _is_attached_video(attach['url']):
extension = attach['url'].split('.')[-1]
if attachment_ctr > 0:
attachment_str += '
'
if box_name == 'tlmedia':
gallery_str += '
\n'
attachment_str += \
'
\n' + \
' \n'
attachment_str += \
''
attachment_str += \
translate['Your browser does not support the video tag.']
attachment_str += ' '
attachment_ctr += 1
elif _is_audio_mime_type(media_type):
extension = '.mp3'
if attach['url'].endswith('.ogg'):
extension = '.ogg'
elif attach['url'].endswith('.opus'):
extension = '.opus'
elif attach['url'].endswith('.flac'):
extension = '.flac'
if attach['url'].endswith(extension):
if attachment_ctr > 0:
attachment_str += '
'
if box_name == 'tlmedia':
gallery_str += '
\n'
attachment_str += '
\n\n'
attachment_str += \
''
attachment_str += \
translate['Your browser does not support the audio tag.']
attachment_str += ' \n \n'
attachment_ctr += 1
if media_style_added:
attachment_str += '
'
return attachment_str, gallery_str
def html_post_separator(base_dir: str, column: str) -> str:
"""Returns the html for a timeline post separator image
"""
theme = get_config_param(base_dir, 'theme')
filename = 'separator.png'
separator_class = "postSeparatorImage"
if column:
separator_class = "postSeparatorImage" + column.title()
filename = 'separator_' + column + '.png'
separator_image_filename = \
base_dir + '/theme/' + theme + '/icons/' + filename
separator_str = ''
if os.path.isfile(separator_image_filename):
separator_str = \
'' + \
' \n'
return separator_str
def html_highlight_label(label: str, highlight: bool) -> str:
"""If the given text should be highlighted then return
the appropriate markup.
This is so that in shell browsers, like lynx, it's possible
to see if the replies or DM button are highlighted.
"""
if not highlight:
return label
return '*' + str(label) + '*'
def get_avatar_image_url(session, base_dir: str, http_prefix: str,
post_actor: str, person_cache: {},
avatar_url: str, allow_downloads: bool,
signing_priv_key_pem: str) -> str:
"""Returns the avatar image url
"""
# get the avatar image url for the post actor
if not avatar_url:
avatar_url = \
get_person_avatar_url(base_dir, post_actor, person_cache)
avatar_url = \
update_avatar_image_cache(signing_priv_key_pem,
session, base_dir, http_prefix,
post_actor, avatar_url, person_cache,
allow_downloads)
else:
update_avatar_image_cache(signing_priv_key_pem,
session, base_dir, http_prefix,
post_actor, avatar_url, person_cache,
allow_downloads)
if not avatar_url:
avatar_url = post_actor + '/avatar.png'
return avatar_url
def html_hide_from_screen_reader(html_str: str) -> str:
"""Returns html which is hidden from screen readers
"""
return '' + html_str + ' '
def html_keyboard_navigation(banner: str, links: {}, access_keys: {},
sub_heading: str = None,
users_path: str = None, translate: {} = None,
follow_approvals: bool = False) -> str:
"""Given a set of links return the html for keyboard navigation
"""
html_str = '\n'
if banner:
html_str += ' \n' + banner + '\n \n'
if sub_heading:
html_str += '
' + \
sub_heading + ' \n'
# show new follower approvals
if users_path and translate and follow_approvals:
html_str += '
' + \
'' + \
translate['Approve follow requests'] + ' ' + \
' \n'
# show the list of links
for title, url in links.items():
access_key_str = ''
if access_keys.get(title):
access_key_str = 'accesskey="' + access_keys[title] + '"'
html_str += '
' + \
'' + \
str(title) + ' \n'
html_str += '
\n'
return html_str
def begin_edit_section(label: str) -> str:
"""returns the html for begining a dropdown section on edit profile screen
"""
return \
' ' + label + ' \n' + \
''
def end_edit_section() -> str:
"""returns the html for ending a dropdown section on edit profile screen
"""
return '
\n'
def edit_text_field(label: str, name: str, value: str = "",
placeholder: str = "", required: bool = False) -> str:
"""Returns html for editing a text field
"""
if value is None:
value = ''
placeholder_str = ''
if placeholder:
placeholder_str = ' placeholder="' + placeholder + '"'
required_str = ''
if required:
required_str = ' required'
text_field_str = ''
if label:
text_field_str = \
'' + label + ' \n'
text_field_str += \
' \n'
return text_field_str
def edit_number_field(label: str, name: str, value: int,
min_value: int, max_value: int,
placeholder: int) -> str:
"""Returns html for editing an integer number field
"""
if value is None:
value = ''
placeholder_str = ''
if placeholder:
placeholder_str = ' placeholder="' + str(placeholder) + '"'
return \
'' + label + ' \n' + \
' \n'
def edit_currency_field(label: str, name: str, value: str,
placeholder: str, required: bool) -> str:
"""Returns html for editing a currency field
"""
if value is None:
value = '0.00'
placeholder_str = ''
if placeholder:
if placeholder.isdigit():
placeholder_str = ' placeholder="' + str(placeholder) + '"'
required_str = ''
if required:
required_str = ' required'
return \
'' + label + ' \n' + \
' \n'
def edit_check_box(label: str, name: str, checked: bool) -> str:
"""Returns html for editing a checkbox field
"""
checked_str = ''
if checked:
checked_str = ' checked'
return \
' ' + label + ' \n'
def edit_text_area(label: str, name: str, value: str,
height: int, placeholder: str, spellcheck: bool) -> str:
"""Returns html for editing a textarea field
"""
if value is None:
value = ''
text = ''
if label:
text = '' + label + ' \n'
text += \
' \n'
return text
def html_search_result_share(base_dir: str, shared_item: {}, translate: {},
http_prefix: str, domain_full: str,
contact_nickname: str, item_id: str,
actor: str, shares_file_type: str,
category: str) -> str:
"""Returns the html for an individual shared item
"""
shared_items_form = '\n'
shared_items_form += \
'
' + shared_item['displayName'] + '
\n'
if shared_item.get('imageUrl'):
shared_items_form += \
'
\n'
shared_items_form += \
' \n'
shared_items_form += '
' + shared_item['summary'] + '
\n
'
if shared_item.get('itemQty'):
if shared_item['itemQty'] > 1:
shared_items_form += \
'' + translate['Quantity'] + \
': ' + str(shared_item['itemQty']) + ' '
shared_items_form += \
'' + translate['Type'] + ': ' + shared_item['itemType'] + ' '
shared_items_form += \
'' + translate['Category'] + ': ' + \
shared_item['category'] + ' '
if shared_item.get('location'):
shared_items_form += \
'' + translate['Location'] + ': ' + \
shared_item['location'] + ' '
contact_title_str = translate['Contact']
if shared_item.get('itemPrice') and \
shared_item.get('itemCurrency'):
if is_float(shared_item['itemPrice']):
if float(shared_item['itemPrice']) > 0:
shared_items_form += \
' ' + translate['Price'] + \
': ' + shared_item['itemPrice'] + \
' ' + shared_item['itemCurrency']
contact_title_str = translate['Buy']
shared_items_form += '
\n'
contact_actor = \
local_actor_url(http_prefix, contact_nickname, domain_full)
button_style_str = 'button'
if category == 'accommodation':
contact_title_str = translate['Request to stay']
button_style_str = 'contactbutton'
shared_items_form += \
'
' + \
'' + contact_title_str + \
' \n' + \
'' + \
translate['Profile'] + ' \n'
# should the remove button be shown?
show_remove_button = False
nickname = get_nickname_from_actor(actor)
if not nickname:
return ''
if actor.endswith('/users/' + contact_nickname):
show_remove_button = True
elif is_moderator(base_dir, nickname):
show_remove_button = True
else:
admin_nickname = get_config_param(base_dir, 'admin')
if admin_nickname:
if actor.endswith('/users/' + admin_nickname):
show_remove_button = True
if show_remove_button:
if shares_file_type == 'shares':
shared_items_form += \
' ' + \
translate['Remove'] + ' \n'
else:
shared_items_form += \
' ' + \
translate['Remove'] + ' \n'
shared_items_form += '
\n'
return shared_items_form
def html_show_share(base_dir: str, domain: str, nickname: str,
http_prefix: str, domain_full: str,
item_id: str, translate: {},
shared_items_federated_domains: [],
default_timeline: str, theme: str,
shares_file_type: str, category: str) -> str:
"""Shows an individual shared item after selecting it from the left column
"""
shares_json = None
share_url = item_id.replace('___', '://').replace('--', '/')
contact_nickname = get_nickname_from_actor(share_url)
if not contact_nickname:
return None
if '://' + domain_full + '/' in share_url:
# shared item on this instance
shares_filename = \
acct_dir(base_dir, contact_nickname, domain) + '/' + \
shares_file_type + '.json'
if not os.path.isfile(shares_filename):
return None
shares_json = load_json(shares_filename)
else:
# federated shared item
if shares_file_type == 'shares':
catalogs_dir = base_dir + '/cache/catalogs'
else:
catalogs_dir = base_dir + '/cache/wantedItems'
if not os.path.isdir(catalogs_dir):
return None
for _, _, files in os.walk(catalogs_dir):
for fname in files:
if '#' in fname:
continue
if not fname.endswith('.' + shares_file_type + '.json'):
continue
federated_domain = fname.split('.')[0]
if federated_domain not in shared_items_federated_domains:
continue
shares_filename = catalogs_dir + '/' + fname
shares_json = load_json(shares_filename)
if not shares_json:
continue
if shares_json.get(item_id):
break
break
if not shares_json:
return None
if not shares_json.get(item_id):
return None
shared_item = shares_json[item_id]
actor = local_actor_url(http_prefix, nickname, domain_full)
# filename of the banner shown at the top
banner_file, _ = \
get_banner_file(base_dir, nickname, domain, theme)
share_str = \
' \n'
share_str += \
html_search_result_share(base_dir, shared_item, translate, http_prefix,
domain_full, contact_nickname, item_id,
actor, shares_file_type, category)
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
return html_header_with_external_style(css_filename,
instance_title, None) + \
share_str + html_footer()
def set_custom_background(base_dir: str, background: str,
new_background: str) -> str:
"""Sets a custom background
Returns the extension, if found
"""
ext = 'jpg'
if os.path.isfile(base_dir + '/img/' + background + '.' + ext):
if not new_background:
new_background = background
if not os.path.isfile(base_dir + '/accounts/' +
new_background + '.' + ext):
copyfile(base_dir + '/img/' + background + '.' + ext,
base_dir + '/accounts/' + new_background + '.' + ext)
return ext
return None
def html_common_emoji(base_dir: str, no_of_emoji: int) -> str:
"""Shows common emoji
"""
emojis_filename = base_dir + '/emoji/emoji.json'
if not os.path.isfile(emojis_filename):
emojis_filename = base_dir + '/emoji/default_emoji.json'
emojis_json = load_json(emojis_filename)
common_emoji_filename = base_dir + '/accounts/common_emoji.txt'
if not os.path.isfile(common_emoji_filename):
return ''
common_emoji = None
try:
with open(common_emoji_filename, 'r', encoding='utf-8') as fp_emoji:
common_emoji = fp_emoji.readlines()
except OSError:
print('EX: html_common_emoji unable to load file')
return ''
if not common_emoji:
return ''
line_ctr = 0
ctr = 0
html_str = ''
while ctr < no_of_emoji and line_ctr < len(common_emoji):
emoji_name1 = common_emoji[line_ctr].split(' ')[1]
emoji_name = remove_eol(emoji_name1)
emoji_icon_name = emoji_name
emoji_filename = base_dir + '/emoji/' + emoji_name + '.png'
if not os.path.isfile(emoji_filename):
emoji_filename = base_dir + '/customemoji/' + emoji_name + '.png'
if not os.path.isfile(emoji_filename):
# load the emojis index
if not emojis_json:
emojis_json = load_json(emojis_filename)
# lookup the name within the index to get the hex code
if emojis_json:
for emoji_tag, emoji_code in emojis_json.items():
if emoji_tag == emoji_name:
# get the filename based on the hex code
emoji_filename = \
base_dir + '/emoji/' + emoji_code + '.png'
emoji_icon_name = emoji_code
break
if os.path.isfile(emoji_filename):
# NOTE: deliberately no alt text, so that without graphics only
# the emoji name shows
html_str += \
'' + \
' ' + \
':' + emoji_name + ': \n'
ctr += 1
line_ctr += 1
return html_str