__filename__ = "webapp_utils.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.5.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "Web Interface" import os from shutil import copyfile from collections import OrderedDict from session import get_json from session import get_json_valid from utils import data_dir from utils import string_contains from utils import get_post_attachments from utils import image_mime_types_dict from utils import get_url_from_post from utils import get_media_url_from_video from utils import get_attributed_to from utils import local_network_host from utils import dangerous_markup from utils import acct_handle_dir from utils import remove_id_ending from utils import get_attachment_property_value from utils import is_account_dir from utils import remove_html from utils import get_protocol_prefixes from utils import load_json from utils import get_cached_post_filename from utils import get_config_param from utils import acct_dir from utils import get_nickname_from_actor from utils import get_domain_from_actor from utils import is_float from utils import get_audio_extensions from utils import get_video_extensions from utils import get_image_extensions from utils import local_actor_url from utils import text_in_file from utils import remove_eol from utils import binary_is_image from utils import resembles_url from filters import is_filtered from cache import get_actor_public_key_from_id from cache import store_person_in_cache from content import add_html_tags from content import replace_emoji_from_tags from person import get_person_avatar_url from person import get_person_notes from posts import is_moderator from blocking import is_blocked from blocking import allowed_announce from shares import vf_proposal_from_share from webapp_pwa import get_pwa_theme_colors def minimizing_attached_images(base_dir: str, nickname: str, domain: str, following_nickname: str, following_domain: str) -> bool: """Returns true if images from the account being followed should be minimized by default """ if following_nickname == nickname and following_domain == domain: # reminder post return False minimize_filename = \ acct_dir(base_dir, nickname, domain) + '/followingMinimizeImages.txt' handle = following_nickname + '@' + following_domain if not os.path.isfile(minimize_filename): following_filename = \ acct_dir(base_dir, nickname, domain) + '/following.txt' if not os.path.isfile(following_filename): return False # create a new minimize file from the following file try: with open(minimize_filename, 'w+', encoding='utf-8') as fp_min: fp_min.write('') except OSError: print('EX: minimizing_attached_images 2 ' + minimize_filename) return text_in_file(handle + '\n', minimize_filename, False) def get_broken_link_substitute() -> str: """Returns html used to show a default image if the link to an image is broken """ return " onerror=\"this.onerror=null; this.src='" + \ "/icons/avatar_default.png'\"" def html_following_list(base_dir: str, following_filename: str) -> str: """Returns a list of handles being followed """ msg = '' try: with open(following_filename, 'r', encoding='utf-8') as following_file: msg = following_file.read() except OSError: print('EX: html_following_list unable to read ' + following_filename) if msg: following_list = msg.split('\n') following_list.sort() if following_list: css_filename = base_dir + '/epicyon-profile.css' if os.path.isfile(base_dir + '/epicyon.css'): css_filename = base_dir + '/epicyon.css' instance_title = \ get_config_param(base_dir, 'instanceTitle') following_list_html = \ html_header_with_external_style(css_filename, instance_title, None) for following_address in following_list: if following_address: following_list_html += \ '

@' + following_address + '

' following_list_html += html_footer() msg = following_list_html return msg return '' def csv_following_list(following_filename: str, base_dir: str, nickname: str, domain: str) -> str: """Returns a csv of handles being followed """ msg = '' try: with open(following_filename, 'r', encoding='utf-8') as following_file: msg = following_file.read() except OSError: print('EX: csv_following_list unable to read ' + following_filename) if msg: following_list = msg.split('\n') following_list.sort() if following_list: following_list_csv = '' for following_address in following_list: if not following_address: continue following_nickname = \ get_nickname_from_actor(following_address) following_domain, _ = \ get_domain_from_actor(following_address) announce_is_allowed = \ allowed_announce(base_dir, nickname, domain, following_nickname, following_domain) notify_on_new = 'false' languages = '' person_notes = \ get_person_notes(base_dir, nickname, domain, following_address) if person_notes: # make notes suitable for csv file person_notes = person_notes.replace(',', ' ') person_notes = person_notes.replace('"', "'") person_notes = person_notes.replace('\n', '
') person_notes = person_notes.replace(' ', ' ') if not following_list_csv: following_list_csv = \ 'Account address,Show boosts,' + \ 'Notify on new posts,Languages,Notes\n' following_list_csv += \ following_address + ',' + \ str(announce_is_allowed).lower() + ',' + \ notify_on_new + ',' + \ languages + ',' + \ person_notes + '\n' msg = following_list_csv return msg return '' def html_hashtag_blocked(base_dir: str, translate: {}) -> str: """Show the screen for a blocked hashtag """ blocked_hashtag_form = '' css_filename = base_dir + '/epicyon-suspended.css' if os.path.isfile(base_dir + '/suspended.css'): css_filename = base_dir + '/suspended.css' instance_title = \ get_config_param(base_dir, 'instanceTitle') blocked_hashtag_form = \ html_header_with_external_style(css_filename, instance_title, None) blocked_hashtag_form += '
\n' blocked_hashtag_form += \ '

' + \ translate['Hashtag Blocked'] + '

\n' blocked_hashtag_form += \ '

See ' + \ translate['Terms of Service'] + '

\n' blocked_hashtag_form += '
\n' blocked_hashtag_form += html_footer() return blocked_hashtag_form def header_buttons_front_screen(translate: {}, nickname: str, box_name: str, authorized: bool, icons_as_buttons: bool) -> str: """Returns the header buttons for the front page of a news instance """ header_str = '' if nickname == 'news': button_features = 'buttonMobile' button_newswire = 'buttonMobile' button_links = 'buttonMobile' if box_name == 'features': button_features = 'buttonselected' elif box_name == 'newswire': button_newswire = 'buttonselected' elif box_name == 'links': button_links = 'buttonselected' header_str += \ ' ' + \ '' if not authorized: header_str += \ ' ' + \ '' if icons_as_buttons: header_str += \ ' ' + \ '' header_str += \ ' ' + \ '' else: header_str += \ ' ' + \ '| ' + translate['Newswire'] + '\n' header_str += \ ' ' + \ '| ' + translate['Links'] + '\n' else: if not authorized: header_str += \ ' ' + \ '' if header_str: header_str = \ '\n
\n' + \ header_str + \ '
\n' return header_str def get_content_warning_button(post_id: str, translate: {}, content: str) -> str: """Returns the markup for a content warning button """ return '
' + \ translate['SHOW MORE'] + '' + \ '
' + content + \ '
\n' def _set_actor_property_url(actor_json: {}, property_name: str, url: str) -> None: """Sets a url for the given actor property """ if not actor_json.get('attachment'): actor_json['attachment'] = [] property_name_lower = property_name.lower() # remove any existing value property_found = None for property_value in actor_json['attachment']: name_value = None if property_value.get('name'): name_value = property_value['name'] elif property_value.get('schema:name'): name_value = property_value['schema:name'] if not name_value: continue if not property_value.get('type'): continue if not name_value.lower().startswith(property_name_lower): continue property_found = property_value break if property_found: actor_json['attachment'].remove(property_found) prefixes = get_protocol_prefixes() prefix_found = False for prefix in prefixes: if url.startswith(prefix): prefix_found = True break if not prefix_found: return if '.' not in url: return if ' ' in url: return if ',' in url: return for property_value in actor_json['attachment']: name_value = None if property_value.get('name'): name_value = property_value['name'] elif property_value.get('schema:name'): name_value = property_value['schema:name'] if not name_value: continue if not property_value.get('type'): continue if not name_value.lower().startswith(property_name_lower): continue if not property_value['type'].endswith('PropertyValue'): continue prop_value_name, _ = \ get_attachment_property_value(property_value) if not prop_value_name: continue property_value[prop_value_name] = url return new_address = { "name": property_name, "type": "PropertyValue", "value": url } actor_json['attachment'].append(new_address) def set_blog_address(actor_json: {}, blog_address: str) -> None: """Sets an blog address for the given actor """ _set_actor_property_url(actor_json, 'Blog', remove_html(blog_address)) def update_avatar_image_cache(signing_priv_key_pem: str, session, base_dir: str, http_prefix: str, actor: str, avatar_url: str, person_cache: {}, allow_downloads: bool, force: bool = False, debug: bool = False) -> str: """Updates the cached avatar for the given actor """ if not avatar_url: return None actor_str = actor.replace('/', '-') avatar_image_path = base_dir + '/cache/avatars/' + actor_str # try different image types image_formats = image_mime_types_dict() avatar_image_filename = None for im_format, mime_type in image_formats.items(): if avatar_url.endswith('.' + im_format) or \ '.' + im_format + '?' in avatar_url: session_headers = { 'Accept': 'image/' + mime_type } avatar_image_filename = avatar_image_path + '.' + im_format if not avatar_image_filename: return None if (not os.path.isfile(avatar_image_filename) or force) and \ allow_downloads: try: if debug: print('avatar image url: ' + avatar_url) result = session.get(avatar_url, headers=session_headers, params=None, allow_redirects=True) if result.status_code < 200 or \ result.status_code > 202: if debug: print('Avatar image download failed with status ' + str(result.status_code)) # remove partial download if os.path.isfile(avatar_image_filename): try: os.remove(avatar_image_filename) except OSError: print('EX: ' + 'update_avatar_image_cache unable to delete ' + avatar_image_filename) else: media_binary = result.content if binary_is_image(avatar_image_filename, media_binary): with open(avatar_image_filename, 'wb') as fp_av: fp_av.write(media_binary) if debug: print('avatar image downloaded for ' + actor) return avatar_image_filename.replace(base_dir + '/cache', '') else: print('WARN: update_avatar_image_cache ' + 'avatar image binary not recognized ' + actor + ' ' + str(media_binary[0:20])) except BaseException as ex: print('EX: Failed to download avatar image: ' + str(avatar_url) + ' ' + str(ex)) prof = 'https://www.w3.org/ns/activitystreams' if '/channel/' not in actor or '/accounts/' not in actor: session_headers = { 'Accept': 'application/activity+json; profile="' + prof + '"' } else: session_headers = { 'Accept': 'application/ld+json; profile="' + prof + '"' } person_json = \ get_json(signing_priv_key_pem, session, actor, session_headers, None, debug, __version__, http_prefix, None) if get_json_valid(person_json): if not person_json.get('id'): return None pub_key, _ = get_actor_public_key_from_id(person_json, None) if not pub_key: return None if person_json['id'] != actor: return None if not person_cache.get(actor): return None cache_key, _ = \ get_actor_public_key_from_id(person_cache[actor]['actor'], None) if cache_key != pub_key: print("ERROR: " + "public keys don't match when downloading actor for " + actor) return None store_person_in_cache(base_dir, actor, person_json, person_cache, allow_downloads) return get_person_avatar_url(base_dir, actor, person_cache) return None return avatar_image_filename.replace(base_dir + '/cache', '') def scheduled_posts_exist(base_dir: str, nickname: str, domain: str) -> bool: """Returns true if there are posts scheduled to be delivered """ schedule_index_filename = \ acct_dir(base_dir, nickname, domain) + '/schedule.index' if not os.path.isfile(schedule_index_filename): return False if text_in_file('#users#', schedule_index_filename): return True return False def shares_timeline_json(actor: str, page_number: int, items_per_page: int, base_dir: str, domain: str, nickname: str, max_shares_per_account: int, shared_items_federated_domains: [], shares_file_type: str) -> ({}, bool): """Get a page on the shared items timeline as json max_shares_per_account helps to avoid one person dominating the timeline by sharing a large number of things """ all_shares_json = {} dir_str = data_dir(base_dir) for _, dirs, files in os.walk(dir_str): for handle in dirs: if not is_account_dir(handle): continue account_dir = acct_handle_dir(base_dir, handle) shares_filename = account_dir + '/' + shares_file_type + '.json' if not os.path.isfile(shares_filename): continue shares_json = load_json(shares_filename) if not shares_json: continue account_nickname = handle.split('@')[0] # Don't include shared items from blocked accounts if account_nickname != nickname: if is_blocked(base_dir, nickname, domain, account_nickname, domain, None, None): continue # actor who owns this share owner = actor.split('/users/')[0] + '/users/' + account_nickname ctr = 0 for item_id, item in shares_json.items(): # assign owner to the item item['actor'] = owner item['shareId'] = item_id all_shares_json[str(item['published'])] = item ctr += 1 if ctr >= max_shares_per_account: break break if shared_items_federated_domains: if shares_file_type == 'shares': catalogs_dir = base_dir + '/cache/catalogs' else: catalogs_dir = base_dir + '/cache/wantedItems' if os.path.isdir(catalogs_dir): for _, dirs, files in os.walk(catalogs_dir): for fname in files: if '#' in fname: continue if not fname.endswith('.' + shares_file_type + '.json'): continue federated_domain = fname.split('.')[0] if federated_domain not in shared_items_federated_domains: continue shares_filename = catalogs_dir + '/' + fname shares_json = load_json(shares_filename) if not shares_json: continue ctr = 0 for item_id, item in shares_json.items(): # assign owner to the item if '--shareditems--' not in item_id: continue share_actor = item_id.split('--shareditems--')[0] share_actor = share_actor.replace('___', '://') share_actor = share_actor.replace('--', '/') share_nickname = get_nickname_from_actor(share_actor) if not share_nickname: continue if is_blocked(base_dir, nickname, domain, share_nickname, federated_domain, None, None): continue item['actor'] = share_actor item['shareId'] = item_id all_shares_json[str(item['published'])] = item ctr += 1 if ctr >= max_shares_per_account: break break # sort the shared items in descending order of publication date shares_json = OrderedDict(sorted(all_shares_json.items(), reverse=True)) last_page = False start_index = items_per_page * page_number max_index = len(shares_json.items()) if max_index < items_per_page: last_page = True if start_index >= max_index - items_per_page: last_page = True start_index = max_index - items_per_page start_index = max(start_index, 0) ctr = 0 result_json = {} for published, item in shares_json.items(): if ctr >= start_index + items_per_page: break if ctr < start_index: ctr += 1 continue result_json[published] = item ctr += 1 return result_json, last_page def get_shares_collection(actor: str, page_number: int, items_per_page: int, base_dir: str, domain: str, nickname: str, max_shares_per_account: int, shared_items_federated_domains: [], shares_file_type: str) -> {}: """Returns an ActivityStreams collection of ValueFlows Proposal objects https://codeberg.org/fediverse/fep/src/branch/main/fep/0837/fep-0837.md """ shares_collection = [] shares_json, _ = \ shares_timeline_json(actor, page_number, items_per_page, base_dir, domain, nickname, max_shares_per_account, shared_items_federated_domains, shares_file_type) if shares_file_type == 'shares': share_type = 'offer' collection_name = nickname + "'s Shared Items" else: share_type = 'request' collection_name = nickname + "'s Wanted Items" for share_id, shared_item in shares_json.items(): shared_item['shareId'] = share_id shared_item['actor'] = actor offer_item = vf_proposal_from_share(shared_item, share_type) if offer_item: shares_collection.append(offer_item) result_json = { "@context": [ "https://www.w3.org/ns/activitystreams" ], "id": actor + '?page=' + str(page_number), "type": "OrderedCollection", "name": collection_name, "orderedItems": shares_collection } return result_json def post_contains_public(post_json_object: {}) -> bool: """Does the given post contain #Public """ contains_public = False if not post_json_object['object'].get('to'): return contains_public for to_address in post_json_object['object']['to']: if to_address.endswith('#Public') or \ to_address == 'as:Public' or \ to_address == 'Public': contains_public = True break if not contains_public: if post_json_object['object'].get('cc'): for to_address2 in post_json_object['object']['cc']: if to_address2.endswith('#Public') or \ to_address2 == 'as:Public' or \ to_address2 == 'Public': contains_public = True break return contains_public def _get_image_file(base_dir: str, name: str, directory: str, theme: str) -> (str, str): """ returns the filenames for an image with the given name """ banner_extensions = get_image_extensions() banner_file = '' banner_filename = '' im_name = name for ext in banner_extensions: banner_file_test = im_name + '.' + ext banner_filename_test = directory + '/' + banner_file_test if os.path.isfile(banner_filename_test): banner_file = banner_file_test banner_filename = banner_filename_test return banner_file, banner_filename # if not found then use the default image theme = 'default' directory = base_dir + '/theme/' + theme for ext in banner_extensions: banner_file_test = name + '.' + ext banner_filename_test = directory + '/' + banner_file_test if os.path.isfile(banner_filename_test): banner_file = name + '_' + theme + '.' + ext banner_filename = banner_filename_test break return banner_file, banner_filename def get_banner_file(base_dir: str, nickname: str, domain: str, theme: str) -> (str, str): """Gets the image for the timeline banner """ account_dir = acct_dir(base_dir, nickname, domain) banner_file, banner_filename = \ _get_image_file(base_dir, 'banner', account_dir, theme) return banner_file, banner_filename def get_profile_background_file(base_dir: str, nickname: str, domain: str, theme: str) -> (str, str): """Gets the image for the profile background """ account_dir = acct_dir(base_dir, nickname, domain) banner_file, banner_filename = \ _get_image_file(base_dir, 'image', account_dir, theme) return banner_file, banner_filename def get_search_banner_file(base_dir: str, nickname: str, domain: str, theme: str) -> (str, str): """Gets the image for the search banner """ account_dir = acct_dir(base_dir, nickname, domain) banner_file, banner_filename = \ _get_image_file(base_dir, 'search_banner', account_dir, theme) return banner_file, banner_filename def get_left_image_file(base_dir: str, nickname: str, domain: str, theme: str) -> (str, str): """Gets the image for the left column """ account_dir = acct_dir(base_dir, nickname, domain) banner_file, banner_filename = \ _get_image_file(base_dir, 'left_col_image', account_dir, theme) return banner_file, banner_filename def get_right_image_file(base_dir: str, nickname: str, domain: str, theme: str) -> (str, str): """Gets the image for the right column """ account_dir = acct_dir(base_dir, nickname, domain) banner_file, banner_filename = \ _get_image_file(base_dir, 'right_col_image', account_dir, theme) return banner_file, banner_filename def html_header_with_external_style(css_filename: str, instance_title: str, metadata: str, lang='en') -> str: if metadata is None: metadata = '' css_file = '/' + css_filename.split('/')[-1] pwa_theme_color, pwa_theme_background_color = \ get_pwa_theme_colors(css_filename) html_str = \ '\n' + \ '\n' + \ '\n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ metadata + \ ' \n' + \ ' ' + instance_title + '\n' + \ ' \n' + \ ' \n' return html_str def html_header_with_person_markup(css_filename: str, instance_title: str, actor_json: {}, city: str, content_license_url: str, lang='en') -> str: """html header which includes person markup https://schema.org/Person """ if not actor_json: html_str = \ html_header_with_external_style(css_filename, instance_title, None, lang) return html_str city_markup = '' if city: city = city.lower().title() add_comma = '' country_markup = '' if ',' in city: country = city.split(',', 1)[1].strip().title() city = city.split(',', 1)[0] country_markup = \ ' "addressCountry": "' + country + '"\n' add_comma = ',' city_markup = \ ' "address": {\n' + \ ' "@type": "PostalAddress",\n' + \ ' "addressLocality": "' + city + '"' + \ add_comma + '\n' + country_markup + ' },\n' skills_markup = '' if actor_json.get('hasOccupation'): if isinstance(actor_json['hasOccupation'], list): skills_markup = ' "hasOccupation": [\n' first_entry = True for skill_dict in actor_json['hasOccupation']: if skill_dict['@type'] == 'Role': if not first_entry: skills_markup += ',\n' skl = skill_dict['hasOccupation'] role_name = skl['name'] if not role_name: role_name = 'member' category = \ skl['occupationalCategory']['codeValue'] category_url = \ 'https://www.onetonline.org/link/summary/' + category skills_markup += \ ' {\n' + \ ' "@type": "Role",\n' + \ ' "hasOccupation": {\n' + \ ' "@type": "Occupation",\n' + \ ' "name": "' + role_name + '",\n' + \ ' "description": ' + \ '"Fediverse instance role",\n' + \ ' "occupationLocation": {\n' + \ ' "@type": "City",\n' + \ ' "name": "' + city + '"\n' + \ ' },\n' + \ ' "occupationalCategory": {\n' + \ ' "@type": "CategoryCode",\n' + \ ' "inCodeSet": {\n' + \ ' "@type": "CategoryCodeSet",\n' + \ ' "name": "O*Net-SOC",\n' + \ ' "dateModified": "2019",\n' + \ ' ' + \ '"url": "https://www.onetonline.org/"\n' + \ ' },\n' + \ ' "codeValue": "' + category + '",\n' + \ ' "url": "' + category_url + '"\n' + \ ' }\n' + \ ' }\n' + \ ' }' elif skill_dict['@type'] == 'Occupation': if not first_entry: skills_markup += ',\n' oc_name = skill_dict['name'] if not oc_name: oc_name = 'member' skills_list = skill_dict['skills'] skills_list_str = '[' for skill_str in skills_list: if skills_list_str != '[': skills_list_str += ', ' skills_list_str += '"' + skill_str + '"' skills_list_str += ']' skills_markup += \ ' {\n' + \ ' "@type": "Occupation",\n' + \ ' "name": "' + oc_name + '",\n' + \ ' "description": ' + \ '"Fediverse instance occupation",\n' + \ ' "occupationLocation": {\n' + \ ' "@type": "City",\n' + \ ' "name": "' + city + '"\n' + \ ' },\n' + \ ' "skills": ' + skills_list_str + '\n' + \ ' }' first_entry = False skills_markup += '\n ],\n' description = '' if actor_json.get('summary'): description = remove_html(actor_json['summary']) name_str = remove_html(actor_json['name']) domain_full = actor_json['id'].split('://')[1].split('/')[0] handle = actor_json['preferredUsername'] + '@' + domain_full url_str = get_url_from_post(actor_json['icon']['url']) icon_url = remove_html(url_str) person_markup = \ ' "about": {\n' + \ ' "@type" : "Person",\n' + \ ' "name": "' + name_str + '",\n' + \ ' "image": "' + icon_url + '",\n' + \ ' "description": "' + description + '",\n' + \ city_markup + skills_markup + \ ' "url": "' + actor_json['id'] + '"\n' + \ ' },\n' profile_markup = \ ' \n' description = remove_html(description) url_str = get_url_from_post(actor_json['url']) actor2_url = remove_html(url_str) og_metadata = \ " \n" + \ " \n" + \ " \n" + \ " \n" + \ " \n" + \ " \n" + \ " \n" + \ " \n" + \ " \n" + \ " \n" + \ " \n" if actor_json.get('attachment'): og_tags = ( 'email', 'openpgp', 'blog', 'xmpp', 'matrix', 'briar', 'cwtch', 'languages' ) for attach_json in actor_json['attachment']: if not attach_json.get('name'): if not attach_json.get('schema:name'): continue prop_value_name, _ = get_attachment_property_value(attach_json) if not prop_value_name: continue if attach_json.get('name'): name = attach_json['name'].lower() else: name = attach_json['schema:name'].lower() value = attach_json[prop_value_name] for og_tag in og_tags: if name != og_tag: continue og_metadata += \ " \n" html_str = \ html_header_with_external_style(css_filename, instance_title, og_metadata + profile_markup, lang) return html_str def html_header_with_website_markup(css_filename: str, instance_title: str, http_prefix: str, domain: str, system_language: str) -> str: """html header which includes website markup https://schema.org/WebSite """ license_url = 'https://www.gnu.org/licenses/agpl-3.0.rdf' # social networking category genre_url = 'http://vocab.getty.edu/aat/300312270' website_markup = \ ' \n' og_metadata = \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' html_str = \ html_header_with_external_style(css_filename, instance_title, og_metadata + website_markup, system_language) return html_str def html_header_with_blog_markup(css_filename: str, instance_title: str, http_prefix: str, domain: str, nickname: str, system_language: str, published: str, modified: str, title: str, snippet: str, url: str, content_license_url: str) -> str: """html header which includes blog post markup https://schema.org/BlogPosting """ author_url = local_actor_url(http_prefix, nickname, domain) about_url = http_prefix + '://' + domain + '/about.html' # license for content on the site may be different from # the software license blog_markup = \ ' \n' og_metadata = \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' + \ ' \n' html_str = \ html_header_with_external_style(css_filename, instance_title, og_metadata + blog_markup, system_language) return html_str def html_footer() -> str: html_str = ' \n' html_str += '\n' return html_str def load_individual_post_as_html_from_cache(base_dir: str, nickname: str, domain: str, post_json_object: {}) -> str: """If a cached html version of the given post exists then load it and return the html text This is much quicker than generating the html from the json object """ cached_post_filename = \ get_cached_post_filename(base_dir, nickname, domain, post_json_object) post_html = '' if not cached_post_filename: return post_html if not os.path.isfile(cached_post_filename): return post_html tries = 0 while tries < 3: try: with open(cached_post_filename, 'r', encoding='utf-8') as file: post_html = file.read() break except OSError as ex: print('ERROR: load_individual_post_as_html_from_cache ' + str(tries) + ' ' + str(ex)) # no sleep tries += 1 if post_html: return post_html def add_emoji_to_display_name(session, base_dir: str, http_prefix: str, nickname: str, domain: str, display_name: str, in_profile_name: bool, translate: {}) -> str: """Adds emoji icons to display names or CW on individual posts """ if ':' not in display_name: return display_name display_name = display_name.replace('

', '').replace('

', '') emoji_tags = {} # print('TAG: display_name before tags: ' + display_name) display_name = \ add_html_tags(base_dir, http_prefix, nickname, domain, display_name, [], emoji_tags, translate) display_name = display_name.replace('

', '').replace('

', '') # print('TAG: display_name after tags: ' + display_name) # convert the emoji dictionary to a list emoji_tags_list = [] for _, tag in emoji_tags.items(): emoji_tags_list.append(tag) # print('TAG: emoji tags list: ' + str(emoji_tags_list)) if not in_profile_name: display_name = \ replace_emoji_from_tags(session, base_dir, display_name, emoji_tags_list, 'post header', False, False) else: display_name = \ replace_emoji_from_tags(session, base_dir, display_name, emoji_tags_list, 'profile', False, False) # print('TAG: display_name after tags 2: ' + display_name) # remove any stray emoji while ':' in display_name: if '://' in display_name: break emoji_str = display_name.split(':')[1] prev_display_name = display_name display_name = display_name.replace(':' + emoji_str + ':', '').strip() if prev_display_name == display_name: break # print('TAG: display_name after tags 3: ' + display_name) # print('TAG: display_name after tag replacements: ' + display_name) return display_name def _is_image_mime_type(mime_type: str) -> bool: """Is the given mime type an image? """ if mime_type == 'image/svg+xml': return True if not mime_type.startswith('image/'): return False extensions = get_image_extensions() ext = mime_type.split('/')[1] if ext in extensions: return True return False def _is_video_mime_type(mime_type: str) -> bool: """Is the given mime type a video? """ if not mime_type.startswith('video/'): return False extensions = get_video_extensions() ext = mime_type.split('/')[1] if ext in extensions: return True return False def _is_audio_mime_type(mime_type: str) -> bool: """Is the given mime type an audio file? """ if mime_type == 'audio/mpeg': return True if not mime_type.startswith('audio/'): return False extensions = get_audio_extensions() ext = mime_type.split('/')[1] if ext in extensions: return True return False def _is_attached_image(attachment_filename: str) -> bool: """Is the given attachment filename an image? """ if '.' not in attachment_filename: return False image_ext = get_image_extensions() ext = attachment_filename.split('.')[-1] if ext in image_ext: return True return False def _is_attached_video(attachment_filename: str) -> bool: """Is the given attachment filename a video? """ if '.' not in attachment_filename: return False video_ext = ( 'mp4', 'webm', 'ogv' ) ext = attachment_filename.split('.')[-1] if ext in video_ext: return True return False def _is_nsfw(content: str) -> bool: """Does the given content indicate nsfw? """ content_lower = content.lower() nsfw_tags = ( 'nsfw', 'porn', 'pr0n', 'explicit', 'lewd', 'nude', 'boob', 'erotic', 'sex' ) for tag_name in nsfw_tags: if tag_name in content_lower: return True return False def get_post_attachments_as_html(base_dir: str, nickname: str, domain: str, domain_full: str, post_json_object: {}, box_name: str, translate: {}, is_muted: bool, avatar_link: str, reply_str: str, announce_str: str, like_str: str, bookmark_str: str, delete_str: str, mute_str: str, content: str, minimize_all_images: bool, system_language: str) -> (str, str): """Returns a string representing any attachments """ attachment_str = '' attachment_ctr = 0 gallery_str = '' attachment_dict = [] # handle peertube-style video posts, where the media links # are stored in the url field if post_json_object.get('object'): media_type, media_url, _, _ = \ get_media_url_from_video(post_json_object['object']) else: media_type, media_url, _, _ = \ get_media_url_from_video(post_json_object) if media_url and media_type: attachment_dict = [{ 'mediaType': media_type, 'name': content, 'type': 'Document', 'url': media_url }] post_attachments = get_post_attachments(post_json_object) if not post_attachments: post_json_object['object']['attachment'] = \ attachment_dict post_attachments = get_post_attachments(post_json_object) if not post_attachments: return attachment_str, gallery_str attachment_dict += post_attachments media_style_added = False post_id = None if post_json_object['object'].get('id'): post_id = post_json_object['object']['id'] post_id = remove_id_ending(post_id).replace('/', '--') # chat links # https://codeberg.org/fediverse/fep/src/branch/main/fep/1970/fep-1970.md attached_urls = [] for attach in attachment_dict: if not attach.get('type') or \ not attach.get('name') or \ not attach.get('href') or \ not attach.get('rel'): continue if not isinstance(attach['type'], str) or \ not isinstance(attach['name'], str) or \ not isinstance(attach['href'], str) or \ not isinstance(attach['rel'], str): continue if attach['type'] != 'Link' or \ attach['name'] != 'Chat' or \ attach['rel'] != 'discussion' or \ '://' not in attach['href'] or \ '.' not in attach['href']: continue # get the domain for the chat link chat_domain_str = '' attach_url = remove_html(attach['href']) if attach_url in attached_urls: continue attached_urls.append(attach_url) chat_domain, _ = get_domain_from_actor(attach_url) if chat_domain: if local_network_host(chat_domain): print('REJECT: local network chat link ' + attach['href']) continue chat_domain_str = ' (' + chat_domain + ')' # avoid displaying very long domains if len(chat_domain_str) > 50: chat_domain_str = '' chat_url = remove_html(attach['href']) attachment_str += \ '

' + \ '💬 ' + translate['Chat'] + chat_domain_str + '

' # obtain transcripts transcripts = {} for attach in attachment_dict: if not attach.get('mediaType'): continue if attach['mediaType'] != 'text/vtt': continue name = None if attach.get('name'): name = attach['name'] if attach.get('nameMap'): for name_lang, name_value in attach['nameMap'].items(): if not isinstance(name_value, str): continue if name_lang.startswith(system_language): name = name_value if not name and attach.get('hreflang'): name = attach['hreflang'] url = None if attach.get('url'): url = get_url_from_post(attach['url']) elif attach.get('href'): url = attach['href'] if name and url: transcripts[name] = remove_html(url) for attach in attachment_dict: if not (attach.get('mediaType') and attach.get('url')): continue media_license = '' if attach.get('schema:license'): if not dangerous_markup(attach['schema:license'], False, []): if not is_filtered(base_dir, nickname, domain, attach['schema:license'], system_language): if '://' not in attach['schema:license']: if len(attach['schema:license']) < 60: media_license = attach['schema:license'] else: media_license = attach['schema:license'] elif attach.get('license'): if not dangerous_markup(attach['license'], False, []): if not is_filtered(base_dir, nickname, domain, attach['license'], system_language): if '://' not in attach['license']: if len(attach['license']) < 60: media_license = attach['license'] else: media_license = attach['license'] media_creator = '' if attach.get('schema:creator'): if len(attach['schema:creator']) < 120: if not dangerous_markup(attach['schema:creator'], False, []): if not is_filtered(base_dir, nickname, domain, attach['schema:creator'], system_language): media_creator = attach['schema:creator'] elif attach.get('attribution'): if isinstance(attach['attribution'], list): if len(attach['attribution']) > 0: attrib_str = attach['attribution'][0] if not dangerous_markup(attrib_str, False, []): if not is_filtered(base_dir, nickname, domain, attrib_str, system_language): media_creator = attrib_str media_type = attach['mediaType'] image_description = '' if attach.get('name'): image_description = attach['name'].replace('"', "'") image_description = remove_html(image_description) if _is_image_mime_type(media_type): url_str = get_url_from_post(attach['url']) image_url = remove_html(url_str) if image_url in attached_urls: continue attached_urls.append(image_url) # display svg images if they have first been rendered harmless svg_harmless = True if 'svg' in media_type: svg_harmless = False if '://' + domain_full + '/' in image_url: svg_harmless = True else: if post_id: if '/' in image_url: im_filename = image_url.split('/')[-1] else: im_filename = image_url cached_svg_filename = \ base_dir + '/media/' + post_id + '_' + im_filename if os.path.isfile(cached_svg_filename): svg_harmless = True if _is_attached_image(image_url) and svg_harmless: if not attachment_str: attachment_str += '
\n' media_style_added = True if attachment_ctr > 0: attachment_str += '
' if box_name == 'tlmedia': gallery_str += '\n' # optionally hide the image attributed_actor = None minimize_images = False if minimize_all_images: minimize_images = True if post_json_object['object'].get('attributedTo'): attrib_field = post_json_object['object']['attributedTo'] attributed_actor = get_attributed_to(attrib_field) if attributed_actor: following_nickname = \ get_nickname_from_actor(attributed_actor) following_domain, _ = \ get_domain_from_actor(attributed_actor) if minimize_all_images: minimize_images = True else: minimize_images = \ minimizing_attached_images(base_dir, nickname, domain, following_nickname, following_domain) # minimize any NSFW images if not minimize_images and content: if _is_nsfw(content): minimize_images = True if minimize_images: show_img_str = 'SHOW MEDIA' if translate: show_img_str = translate['SHOW MEDIA'] attachment_str += \ '
' + \ show_img_str + '' + \ '
\n' attachment_str += \ '' if media_license and media_creator: attachment_str += '
' attachment_str += \ '' + image_description + '\n' if media_license and media_creator: license_str = '' attachment_str += '
' media_license = remove_html(media_license) if resembles_url(media_license): license_str += \ '©' else: license_str += media_license license_str += ' ' + media_creator attachment_str += license_str + '
' if minimize_images: attachment_str += '
\n' attachment_ctr += 1 elif _is_video_mime_type(media_type): video_url = remove_html(attach['url']) if video_url in attached_urls: continue attached_urls.append(video_url) if _is_attached_video(video_url): extension = video_url.split('.')[-1] if attachment_ctr > 0: attachment_str += '
' if box_name == 'tlmedia': gallery_str += '\n' attachment_str += \ '
\n' + \ '
' attachment_ctr += 1 elif _is_audio_mime_type(media_type): extension = '.mp3' url_str = get_url_from_post(attach['url']) audio_url = remove_html(url_str) if audio_url in attached_urls: continue attached_urls.append(audio_url) if audio_url.endswith('.ogg'): extension = '.ogg' elif audio_url.endswith('.wav'): extension = '.wav' elif audio_url.endswith('.opus'): extension = '.opus' elif audio_url.endswith('.spx'): extension = '.spx' elif audio_url.endswith('.flac'): extension = '.flac' if audio_url.endswith(extension): if attachment_ctr > 0: attachment_str += '
' if box_name == 'tlmedia': gallery_str += '\n' attachment_str += '
\n\n
\n' attachment_ctr += 1 if media_style_added: attachment_str += '

' return attachment_str, gallery_str def html_post_separator(base_dir: str, column: str) -> str: """Returns the html for a timeline post separator image """ theme = get_config_param(base_dir, 'theme') if not theme: theme = 'default' filename = 'separator.png' separator_class = "postSeparatorImage" if column: separator_class = "postSeparatorImage" + column.title() filename = 'separator_' + column + '.png' separator_image_filename = \ base_dir + '/theme/' + theme + '/icons/' + filename separator_str = '' if os.path.isfile(separator_image_filename): separator_str = \ '
' + \ '
\n' return separator_str def html_highlight_label(label: str, highlight: bool) -> str: """If the given text should be highlighted then return the appropriate markup. This is so that in shell browsers, like lynx, it's possible to see if the replies or DM button are highlighted. """ if not highlight: return label return '*' + str(label) + '*' def get_avatar_image_url(session, base_dir: str, http_prefix: str, post_actor: str, person_cache: {}, avatar_url: str, allow_downloads: bool, signing_priv_key_pem: str) -> str: """Returns the avatar image url """ # get the avatar image url for the post actor if not avatar_url: avatar_url = \ get_person_avatar_url(base_dir, post_actor, person_cache) avatar_url = \ update_avatar_image_cache(signing_priv_key_pem, session, base_dir, http_prefix, post_actor, avatar_url, person_cache, allow_downloads) else: update_avatar_image_cache(signing_priv_key_pem, session, base_dir, http_prefix, post_actor, avatar_url, person_cache, allow_downloads) if not avatar_url: avatar_url = post_actor + '/avatar.png' return avatar_url def html_hide_from_screen_reader(html_str: str) -> str: """Returns html which is hidden from screen readers """ return '' def html_keyboard_navigation(banner: str, links: {}, access_keys: {}, sub_heading: str, users_path: str, translate: {}, follow_approvals: bool) -> str: """Given a set of links return the html for keyboard navigation """ html_str = '
\n' return html_str def begin_edit_section(label: str) -> str: """returns the html for begining a dropdown section on edit profile screen """ return \ '
' + label + '\n' + \ '
' def end_edit_section() -> str: """returns the html for ending a dropdown section on edit profile screen """ return '
\n' def edit_text_field(label: str, name: str, value: str = "", placeholder: str = "", required: bool = False) -> str: """Returns html for editing a text field """ if value is None: value = '' placeholder_str = '' if placeholder: placeholder_str = ' placeholder="' + placeholder + '"' required_str = '' if required: required_str = ' required' text_field_str = '' if label: text_field_str = \ '
\n' text_field_str += \ ' \n' return text_field_str def edit_number_field(label: str, name: str, value: int, min_value: int, max_value: int, placeholder: int) -> str: """Returns html for editing an integer number field """ if value is None: value = '' placeholder_str = '' if placeholder: placeholder_str = ' placeholder="' + str(placeholder) + '"' return \ '
\n' + \ ' \n' def edit_currency_field(label: str, name: str, value: str, placeholder: str, required: bool) -> str: """Returns html for editing a currency field """ if value is None: value = '0.00' placeholder_str = '' if placeholder: if placeholder.isdigit(): placeholder_str = ' placeholder="' + str(placeholder) + '"' required_str = '' if required: required_str = ' required' return \ '
\n' + \ ' \n' def edit_check_box(label: str, name: str, checked: bool) -> str: """Returns html for editing a checkbox field """ checked_str = '' if checked: checked_str = ' checked' return \ ' ' + label + '
\n' def edit_text_area(label: str, subtitle: str, name: str, value: str, height: int, placeholder: str, spellcheck: bool) -> str: """Returns html for editing a textarea field """ if value is None: value = '' text = '' if label: text = '
\n' if subtitle: text += subtitle + '
\n' text += \ ' \n' return text def html_search_result_share(base_dir: str, shared_item: {}, translate: {}, http_prefix: str, domain_full: str, contact_nickname: str, item_id: str, actor: str, shares_file_type: str, category: str, publicly_visible: bool) -> str: """Returns the html for an individual shared item """ shared_items_form = '
\n' shared_items_form += \ '

' + shared_item['displayName'] + '

\n' if shared_item.get('imageUrl'): shared_items_form += \ '\n' shared_items_form += \ 'Item image\n' shared_items_form += '

' + shared_item['summary'] + '

\n

' if shared_item.get('itemQty'): if shared_item['itemQty'] > 1: shared_items_form += \ '' + translate['Quantity'] + \ ': ' + str(shared_item['itemQty']) + '
' shared_items_form += \ '' + translate['Type'] + ': ' + shared_item['itemType'] + '
' shared_items_form += \ '' + translate['Category'] + ': ' + \ shared_item['category'] + '
' if shared_item.get('location'): shared_items_form += \ '' + translate['Location'] + ': ' + \ shared_item['location'] + '
' contact_title_str = translate['Contact'] if shared_item.get('itemPrice') and \ shared_item.get('itemCurrency'): if is_float(shared_item['itemPrice']): if float(shared_item['itemPrice']) > 0: shared_items_form += \ ' ' + translate['Price'] + \ ': ' + shared_item['itemPrice'] + \ ' ' + shared_item['itemCurrency'] contact_title_str = translate['Buy'] shared_items_form += '

\n' contact_actor = \ local_actor_url(http_prefix, contact_nickname, domain_full) button_style_str = 'button' if category == 'accommodation': contact_title_str = translate['Request to stay'] button_style_str = 'contactbutton' if not publicly_visible: shared_items_form += \ '

' + \ '' + \ '\n' + \ '\n' else: shared_items_form += \ '\n' # should the remove button be shown? show_remove_button = False nickname = get_nickname_from_actor(actor) if not nickname: return '' if actor.endswith('/users/' + contact_nickname): show_remove_button = True elif is_moderator(base_dir, nickname): show_remove_button = True else: admin_nickname = get_config_param(base_dir, 'admin') if admin_nickname: if actor.endswith('/users/' + admin_nickname): show_remove_button = True if show_remove_button and not publicly_visible: if shares_file_type == 'shares': shared_items_form += \ ' \n' else: shared_items_form += \ ' \n' shared_items_form += '

\n' return shared_items_form def html_show_share(base_dir: str, domain: str, nickname: str, http_prefix: str, domain_full: str, item_id: str, translate: {}, shared_items_federated_domains: [], default_timeline: str, theme: str, shares_file_type: str, category: str, publicly_visible: bool) -> str: """Shows an individual shared item after selecting it from the left column """ shares_json = None share_url = item_id.replace('___', '://').replace('--', '/') contact_nickname = get_nickname_from_actor(share_url) if not contact_nickname: return None if '://' + domain_full + '/' in share_url: # shared item on this instance shares_filename = \ acct_dir(base_dir, contact_nickname, domain) + '/' + \ shares_file_type + '.json' if not os.path.isfile(shares_filename): return None shares_json = load_json(shares_filename) else: # federated shared item if shares_file_type == 'shares': catalogs_dir = base_dir + '/cache/catalogs' else: catalogs_dir = base_dir + '/cache/wantedItems' if not os.path.isdir(catalogs_dir): return None for _, _, files in os.walk(catalogs_dir): for fname in files: if '#' in fname: continue if not fname.endswith('.' + shares_file_type + '.json'): continue federated_domain = fname.split('.')[0] if federated_domain not in shared_items_federated_domains: continue shares_filename = catalogs_dir + '/' + fname shares_json = load_json(shares_filename) if not shares_json: continue if shares_json.get(item_id): break break if not shares_json: return None if not shares_json.get(item_id): return None shared_item = shares_json[item_id] actor = local_actor_url(http_prefix, nickname, domain_full) # filename of the banner shown at the top banner_file, _ = \ get_banner_file(base_dir, nickname, domain, theme) share_str = \ '
\n' + \ '\n' share_str += '\n' + \ '

\n' share_str += \ html_search_result_share(base_dir, shared_item, translate, http_prefix, domain_full, contact_nickname, item_id, actor, shares_file_type, category, publicly_visible) css_filename = base_dir + '/epicyon-profile.css' if os.path.isfile(base_dir + '/epicyon.css'): css_filename = base_dir + '/epicyon.css' instance_title = \ get_config_param(base_dir, 'instanceTitle') return html_header_with_external_style(css_filename, instance_title, None) + \ share_str + html_footer() def set_custom_background(base_dir: str, background: str, new_background: str) -> str: """Sets a custom background Returns the extension, if found """ ext = 'jpg' if os.path.isfile(base_dir + '/img/' + background + '.' + ext): if not new_background: new_background = background dir_str = data_dir(base_dir) if not os.path.isfile(dir_str + '/' + new_background + '.' + ext): copyfile(base_dir + '/img/' + background + '.' + ext, dir_str + '/' + new_background + '.' + ext) return ext return None def html_common_emoji(base_dir: str, no_of_emoji: int) -> str: """Shows common emoji """ emojis_filename = base_dir + '/emoji/emoji.json' if not os.path.isfile(emojis_filename): emojis_filename = base_dir + '/emoji/default_emoji.json' emojis_json = load_json(emojis_filename) common_emoji_filename = data_dir(base_dir) + '/common_emoji.txt' if not os.path.isfile(common_emoji_filename): return '' common_emoji = None try: with open(common_emoji_filename, 'r', encoding='utf-8') as fp_emoji: common_emoji = fp_emoji.readlines() except OSError: print('EX: html_common_emoji unable to load file') return '' if not common_emoji: return '' line_ctr = 0 ctr = 0 html_str = '' while ctr < no_of_emoji and line_ctr < len(common_emoji): emoji_name1 = common_emoji[line_ctr].split(' ')[1] emoji_name = remove_eol(emoji_name1) emoji_icon_name = emoji_name emoji_filename = base_dir + '/emoji/' + emoji_name + '.png' if not os.path.isfile(emoji_filename): emoji_filename = base_dir + '/customemoji/' + emoji_name + '.png' if not os.path.isfile(emoji_filename): # load the emojis index if not emojis_json: emojis_json = load_json(emojis_filename) # lookup the name within the index to get the hex code if emojis_json: for emoji_tag, emoji_code in emojis_json.items(): if emoji_tag == emoji_name: # get the filename based on the hex code emoji_filename = \ base_dir + '/emoji/' + emoji_code + '.png' emoji_icon_name = emoji_code break if os.path.isfile(emoji_filename): # NOTE: deliberately no alt text, so that without graphics only # the emoji name shows html_str += \ '\n' ctr += 1 line_ctr += 1 return html_str def text_mode_browser(ua_str: str) -> bool: """Does the user agent indicate a text mode browser? """ if ua_str: text_mode_agents = ('Lynx/', 'w3m/', 'Links (', 'Emacs/', 'ELinks') for agent in text_mode_agents: if agent in ua_str: return True return False def get_default_path(media_instance: bool, blogs_instance: bool, nickname: str) -> str: """Returns the default timeline """ if blogs_instance: path = '/users/' + nickname + '/tlblogs' elif media_instance: path = '/users/' + nickname + '/tlmedia' else: path = '/users/' + nickname + '/inbox' return path def html_following_data_list(base_dir: str, nickname: str, domain: str, domain_full: str, following_type: str, use_petnames: bool) -> str: """Returns a datalist of handles being followed followingHandles, followersHandles """ list_str = '\n' following_filename = \ acct_dir(base_dir, nickname, domain) + '/' + following_type + '.txt' msg = '' if os.path.isfile(following_filename): try: with open(following_filename, 'r', encoding='utf-8') as following_file: msg = following_file.read() # add your own handle, so that you can send DMs # to yourself as reminders msg += nickname + '@' + domain_full + '\n' except OSError: print('EX: html_following_data_list unable to read ' + following_filename) if msg: # include petnames petnames_filename = \ acct_dir(base_dir, nickname, domain) + '/petnames.txt' if use_petnames and os.path.isfile(petnames_filename): following_list = [] try: with open(petnames_filename, 'r', encoding='utf-8') as fp_petnames: pet_str = fp_petnames.read() # extract each petname and append it petnames_list = pet_str.split('\n') for pet in petnames_list: following_list.append(pet.split(' ')[0]) except OSError: print('EX: html_following_data_list unable to read ' + petnames_filename) # add the following.txt entries following_list += msg.split('\n') else: # no petnames list exists - just use following.txt following_list = msg.split('\n') following_list.sort() if following_list: for following_address in following_list: if not following_address: continue if '@' not in following_address and \ '://' not in following_address: continue list_str += '\n' list_str += '\n' return list_str def html_following_dropdown(base_dir: str, nickname: str, domain: str, domain_full: str, following_type: str, use_petnames: bool) -> str: """Returns a select list of handles being followed or of followers """ list_str = '\n' return list_str def get_buy_links(post_json_object: str, translate: {}, buy_sites: {}) -> {}: """Returns any links to buy something from an external site """ post_attachments = get_post_attachments(post_json_object) if not post_attachments: return {} links = {} buy_strings = [] for buy_str in ('Buy', 'Purchase', 'Subscribe'): if translate.get(buy_str): buy_str = translate[buy_str] buy_strings += buy_str.lower() buy_strings += ('Paypal', 'Stripe', 'Cashapp', 'Venmo') for item in post_attachments: if not isinstance(item, dict): continue if not item.get('name'): continue if not isinstance(item['name'], str): continue if not item.get('type'): continue if not item.get('href'): continue if not isinstance(item['type'], str): continue if not isinstance(item['href'], str): continue if item['type'] != 'Link': continue if not item.get('mediaType'): continue if not isinstance(item['mediaType'], str): continue if 'html' not in item['mediaType']: continue item_name = item['name'] # The name should not be excessively long if len(item_name) > 32: continue # there should be no html in the name if remove_html(item_name) != item_name: continue # there should be no html in the link if string_contains(item['href'], ('<', '://', ' ')): continue if item.get('rel'): if isinstance(item['rel'], str): if item['rel'] in ('payment', 'pay', 'donate', 'donation', 'buy', 'purchase', 'support'): links[item_name] = remove_html(item['href']) continue if buy_sites: # limited to an allowlist of buying sites for site, buy_domain in buy_sites.items(): if buy_domain in item['href']: links[site.title()] = remove_html(item['href']) continue else: # The name only needs to indicate that this is a buy link for buy_str in buy_strings: if buy_str in item_name.lower(): links[item_name] = remove_html(item['href']) continue return links def load_buy_sites(base_dir: str) -> {}: """Loads domains from which buying is permitted """ buy_sites_filename = data_dir(base_dir) + '/buy_sites.json' if os.path.isfile(buy_sites_filename): buy_sites_json = load_json(buy_sites_filename) if buy_sites_json: return buy_sites_json return {}