Merge branch 'main' of gitlab.com:bashrc2/epicyon

merge-requests/30/head
Bob Mottram 2022-01-14 20:25:37 +00:00
commit 42f5c97601
6 changed files with 124 additions and 55 deletions

View File

@ -486,6 +486,22 @@ def add_web_links(content: str) -> str:
return content return content
def safe_web_text(arbitrary_html: str) -> str:
"""Turns arbitrary html into something safe.
So if the arbitrary html contains attack scripts those will be removed
"""
# first remove the markup, so that we have something safe
safe_text = remove_html(arbitrary_html)
if not safe_text:
return ''
# remove any spurious characters found in podcast descriptions
remove_chars = ('Œ', 'â€', 'ğŸ', '<EFBFBD>', ']]', '__')
for remchar in remove_chars:
safe_text = safe_text.replace(remchar, '')
# recreate any url links safely
return add_web_links(safe_text)
def _add_hash_tags(word_str: str, http_prefix: str, domain: str, def _add_hash_tags(word_str: str, http_prefix: str, domain: str,
replace_hashtags: {}, post_hashtags: {}) -> bool: replace_hashtags: {}, post_hashtags: {}) -> bool:
"""Detects hashtags and adds them to the replacements dict """Detects hashtags and adds them to the replacements dict

View File

@ -999,20 +999,20 @@ def send_follow_requestViaServer(base_dir: str, session,
# get the actor inbox for the To handle # get the actor inbox for the To handle
origin_domain = from_domain origin_domain = from_domain
(inboxUrl, _, _, fromPersonId, sharedInbox, avatarUrl, (inbox_url, _, _, from_person_id, _, _,
displayName, _) = get_person_box(signing_priv_key_pem, origin_domain, _, _) = get_person_box(signing_priv_key_pem, origin_domain,
base_dir, session, wf_request, base_dir, session, wf_request,
person_cache, person_cache,
project_version, http_prefix, project_version, http_prefix,
from_nickname, from_nickname,
from_domain, post_to_box, 52025) from_domain, post_to_box, 52025)
if not inboxUrl: if not inbox_url:
if debug: if debug:
print('DEBUG: follow request no ' + post_to_box + print('DEBUG: follow request no ' + post_to_box +
' was found for ' + handle) ' was found for ' + handle)
return 3 return 3
if not fromPersonId: if not from_person_id:
if debug: if debug:
print('DEBUG: follow request no actor was found for ' + handle) print('DEBUG: follow request no actor was found for ' + handle)
return 4 return 4
@ -1026,10 +1026,10 @@ def send_follow_requestViaServer(base_dir: str, session,
} }
post_result = \ post_result = \
post_json(http_prefix, from_domain_full, post_json(http_prefix, from_domain_full,
session, new_follow_json, [], inboxUrl, headers, 3, True) session, new_follow_json, [], inbox_url, headers, 3, True)
if not post_result: if not post_result:
if debug: if debug:
print('DEBUG: POST follow request failed for c2s to ' + inboxUrl) print('DEBUG: POST follow request failed for c2s to ' + inbox_url)
return 5 return 5
if debug: if debug:
@ -1095,22 +1095,22 @@ def send_unfollow_request_via_server(base_dir: str, session,
# get the actor inbox for the To handle # get the actor inbox for the To handle
origin_domain = from_domain origin_domain = from_domain
(inboxUrl, pubKeyId, pubKey, fromPersonId, sharedInbox, avatarUrl, (inbox_url, _, _, from_person_id, _, _,
displayName, _) = get_person_box(signing_priv_key_pem, _, _) = get_person_box(signing_priv_key_pem,
origin_domain, origin_domain,
base_dir, session, base_dir, session,
wf_request, person_cache, wf_request, person_cache,
project_version, http_prefix, project_version, http_prefix,
from_nickname, from_nickname,
from_domain, post_to_box, from_domain, post_to_box,
76536) 76536)
if not inboxUrl: if not inbox_url:
if debug: if debug:
print('DEBUG: unfollow no ' + post_to_box + print('DEBUG: unfollow no ' + post_to_box +
' was found for ' + handle) ' was found for ' + handle)
return 3 return 3
if not fromPersonId: if not from_person_id:
if debug: if debug:
print('DEBUG: unfollow no actor was found for ' + handle) print('DEBUG: unfollow no actor was found for ' + handle)
return 4 return 4
@ -1124,10 +1124,10 @@ def send_unfollow_request_via_server(base_dir: str, session,
} }
post_result = \ post_result = \
post_json(http_prefix, from_domain_full, post_json(http_prefix, from_domain_full,
session, unfollow_json, [], inboxUrl, headers, 3, True) session, unfollow_json, [], inbox_url, headers, 3, True)
if not post_result: if not post_result:
if debug: if debug:
print('DEBUG: POST unfollow failed for c2s to ' + inboxUrl) print('DEBUG: POST unfollow failed for c2s to ' + inbox_url)
return 5 return 5
if debug: if debug:

View File

@ -229,8 +229,8 @@ def _add_newswire_dict_entry(base_dir: str, domain: str,
# Include tags from podcast categories # Include tags from podcast categories
if podcast_properties: if podcast_properties:
if podcast_properties.get('explicit'): if podcast_properties.get('explicit'):
if '#NSFW' not in post_tags: if '#nsfw' not in post_tags:
post_tags.append('#NSFW') post_tags.append('#nsfw')
post_tags += podcast_properties['categories'] post_tags += podcast_properties['categories']
@ -446,7 +446,8 @@ def xml_podcast_to_dict(xml_item: str, xml_str: str) -> {}:
""" """
if '<podcast:' not in xml_item: if '<podcast:' not in xml_item:
if '<itunes:' not in xml_item: if '<itunes:' not in xml_item:
return {} if '<media:thumbnail' not in xml_item:
return {}
podcast_properties = { podcast_properties = {
"locations": [], "locations": [],
@ -506,7 +507,7 @@ def xml_podcast_to_dict(xml_item: str, xml_str: str) -> {}:
# get the image for the podcast, if it exists # get the image for the podcast, if it exists
podcast_episode_image = None podcast_episode_image = None
episode_image_tags = ['<itunes:image'] episode_image_tags = ['<itunes:image', '<media:thumbnail']
for image_tag in episode_image_tags: for image_tag in episode_image_tags:
item_str = xml_item item_str = xml_item
if image_tag not in xml_item: if image_tag not in xml_item:
@ -515,20 +516,28 @@ def xml_podcast_to_dict(xml_item: str, xml_str: str) -> {}:
item_str = xml_str item_str = xml_str
episode_image = item_str.split(image_tag)[1] episode_image = item_str.split(image_tag)[1]
if image_tag + ' ' in item_str and '>' in episode_image:
episode_image = episode_image.split('>')[0]
if 'href="' in episode_image: if 'href="' in episode_image:
episode_image = episode_image.split('href="')[1] episode_image = episode_image.split('href="')[1]
if '"' in episode_image: if '"' in episode_image:
episode_image = episode_image.split('"')[0] episode_image = episode_image.split('"')[0]
podcast_episode_image = episode_image podcast_episode_image = episode_image
break break
else: elif 'url="' in episode_image:
if '>' in episode_image: episode_image = episode_image.split('url="')[1]
episode_image = episode_image.split('>')[1] if '"' in episode_image:
if '<' in episode_image: episode_image = episode_image.split('"')[0]
episode_image = episode_image.split('<')[0] podcast_episode_image = episode_image
if '://' in episode_image and '.' in episode_image: break
podcast_episode_image = episode_image elif '>' in episode_image:
break episode_image = episode_image.split('>')[1]
if '<' in episode_image:
episode_image = episode_image.split('<')[0]
if '://' in episode_image and '.' in episode_image:
podcast_episode_image = episode_image
break
# get categories if they exist. These can be turned into hashtags # get categories if they exist. These can be turned into hashtags
podcast_categories = _get_podcast_categories(xml_item, xml_str) podcast_categories = _get_podcast_categories(xml_item, xml_str)
@ -1024,9 +1033,15 @@ def _atom_feed_yt_to_dict(base_dir: str, domain: str, xml_str: str,
description = atom_item.split('<summary>')[1] description = atom_item.split('<summary>')[1]
description = description.split('</summary>')[0] description = description.split('</summary>')[0]
description = remove_html(description) description = remove_html(description)
link = atom_item.split('<yt:videoId>')[1]
link = link.split('</yt:videoId>')[0] link, _ = get_link_from_rss_item(atom_item)
link = 'https://www.youtube.com/watch?v=' + link.strip() if not link:
link = atom_item.split('<yt:videoId>')[1]
link = link.split('</yt:videoId>')[0]
link = 'https://www.youtube.com/watch?v=' + link.strip()
if not link:
continue
pub_date = atom_item.split('<published>')[1] pub_date = atom_item.split('<published>')[1]
pub_date = pub_date.split('</published>')[0] pub_date = pub_date.split('</published>')[0]
@ -1035,13 +1050,16 @@ def _atom_feed_yt_to_dict(base_dir: str, domain: str, xml_str: str,
if _valid_feed_date(pub_date_str): if _valid_feed_date(pub_date_str):
post_filename = '' post_filename = ''
votes_status = [] votes_status = []
podcast_properties = xml_podcast_to_dict(atom_item, xml_str)
if podcast_properties:
podcast_properties['linkMimeType'] = 'video/youtube'
_add_newswire_dict_entry(base_dir, domain, _add_newswire_dict_entry(base_dir, domain,
result, pub_date_str, result, pub_date_str,
title, link, title, link,
votes_status, post_filename, votes_status, post_filename,
description, moderated, mirrored, description, moderated, mirrored,
[], 32, session, debug, [], 32, session, debug,
None) podcast_properties)
post_ctr += 1 post_ctr += 1
if post_ctr >= max_posts_per_source: if post_ctr >= max_posts_per_source:
break break

View File

@ -128,6 +128,7 @@ from inbox import json_post_allows_comments
from inbox import valid_inbox from inbox import valid_inbox
from inbox import valid_inbox_filenames from inbox import valid_inbox_filenames
from categories import guess_hashtag_category from categories import guess_hashtag_category
from content import safe_web_text
from content import words_similarity from content import words_similarity
from content import get_price_from_string from content import get_price_from_string
from content import limit_repeated_words from content import limit_repeated_words
@ -6488,6 +6489,30 @@ def _test_get_link_from_rss_item() -> None:
assert link.startswith('https://test.link/creativecommons') assert link.startswith('https://test.link/creativecommons')
def _test_safe_webtext() -> None:
print('test_safe_webtext')
web_text = '<p>Some text including a link https://some.site/some-path</p>'
expected_text = 'Some text including a link ' + \
'<a href="https://some.site/some-path"'
safe_text = safe_web_text(web_text)
if expected_text not in safe_text:
print('Original html: ' + web_text)
print('Expected html: ' + expected_text)
print('Actual html: ' + safe_text)
assert expected_text in safe_text
assert '<p>' not in safe_text
assert '</p>' not in safe_text
web_text = 'Some text with <script>some script</script>'
expected_text = 'Some text with some script'
safe_text = safe_web_text(web_text)
if expected_text != safe_text:
print('Original html: ' + web_text)
print('Expected html: ' + expected_text)
print('Actual html: ' + safe_text)
assert expected_text == safe_text
def run_all_tests(): def run_all_tests():
base_dir = os.getcwd() base_dir = os.getcwd()
print('Running tests...') print('Running tests...')
@ -6504,6 +6529,7 @@ def run_all_tests():
'message_json', 'liked_post_json']) 'message_json', 'liked_post_json'])
_test_checkbox_names() _test_checkbox_names()
_test_functions() _test_functions()
_test_safe_webtext()
_test_get_link_from_rss_item() _test_get_link_from_rss_item()
_test_xml_podcast_dict() _test_xml_podcast_dict()
_test_get_actor_from_in_reply_to() _test_get_actor_from_in_reply_to()

View File

@ -39,8 +39,8 @@ def _add_embedded_video_from_sites(translate: {}, content: str,
url = content.split('>vimeo.com/')[1] url = content.split('>vimeo.com/')[1]
if '<' in url: if '<' in url:
url = url.split('<')[0] url = url.split('<')[0]
content = \ content += \
content + "<center>\n<iframe loading=\"lazy\" " + \ "<center>\n<iframe loading=\"lazy\" " + \
"src=\"https://player.vimeo.com/video/" + \ "src=\"https://player.vimeo.com/video/" + \
url + "\" width=\"" + str(width) + \ url + "\" width=\"" + str(width) + \
"\" height=\"" + str(height) + \ "\" height=\"" + str(height) + \
@ -57,8 +57,8 @@ def _add_embedded_video_from_sites(translate: {}, content: str,
url = url.split('&')[0] url = url.split('&')[0]
if '?utm_' in url: if '?utm_' in url:
url = url.split('?utm_')[0] url = url.split('?utm_')[0]
content = \ content += \
content + "<center>\n<iframe loading=\"lazy\" src=\"" + \ "<center>\n<iframe loading=\"lazy\" src=\"" + \
video_site + url + "\" width=\"" + str(width) + \ video_site + url + "\" width=\"" + str(width) + \
"\" height=\"" + str(height) + \ "\" height=\"" + str(height) + \
"\" frameborder=\"0\" allow=\"autoplay; fullscreen\" " + \ "\" frameborder=\"0\" allow=\"autoplay; fullscreen\" " + \
@ -88,8 +88,8 @@ def _add_embedded_video_from_sites(translate: {}, content: str,
url = url.split('&')[0] url = url.split('&')[0]
if '?utm_' in url: if '?utm_' in url:
url = url.split('?utm_')[0] url = url.split('?utm_')[0]
content = \ content += \
content + "<center>\n<iframe loading=\"lazy\" src=\"" + \ "<center>\n<iframe loading=\"lazy\" src=\"" + \
video_site + url + "\" width=\"" + \ video_site + url + "\" width=\"" + \
str(width) + "\" height=\"" + str(height) + \ str(width) + "\" height=\"" + str(height) + \
"\" frameborder=\"0\" allow=\"autoplay; fullscreen\" " + \ "\" frameborder=\"0\" allow=\"autoplay; fullscreen\" " + \
@ -103,8 +103,8 @@ def _add_embedded_video_from_sites(translate: {}, content: str,
url = url.split('"')[0] url = url.split('"')[0]
if not url.endswith('/oembed'): if not url.endswith('/oembed'):
url = url + '/oembed' url = url + '/oembed'
content = \ content += \
content + "<center>\n<iframe loading=\"lazy\" src=\"" + \ "<center>\n<iframe loading=\"lazy\" src=\"" + \
video_site + url + "\" width=\"" + \ video_site + url + "\" width=\"" + \
str(width) + "\" height=\"" + str(height) + \ str(width) + "\" height=\"" + str(height) + \
"\" frameborder=\"0\" allow=\"fullscreen\" " + \ "\" frameborder=\"0\" allow=\"fullscreen\" " + \
@ -153,8 +153,8 @@ def _add_embedded_video_from_sites(translate: {}, content: str,
if '"' not in url: if '"' not in url:
continue continue
url = url.split('"')[0].replace('/watch/', '/embed/') url = url.split('"')[0].replace('/watch/', '/embed/')
content = \ content += \
content + "<center>\n<iframe loading=\"lazy\" " + \ "<center>\n<iframe loading=\"lazy\" " + \
"sandbox=\"allow-same-origin " + \ "sandbox=\"allow-same-origin " + \
"allow-scripts\" src=\"https://" + \ "allow-scripts\" src=\"https://" + \
site + url + "\" width=\"" + str(width) + \ site + url + "\" width=\"" + str(width) + \

View File

@ -14,6 +14,7 @@ from shutil import copyfile
from utils import get_config_param from utils import get_config_param
from utils import remove_html from utils import remove_html
from media import path_is_audio from media import path_is_audio
from content import safe_web_text
from webapp_utils import get_broken_link_substitute from webapp_utils import get_broken_link_substitute
from webapp_utils import html_header_with_external_style from webapp_utils import html_header_with_external_style
from webapp_utils import html_footer from webapp_utils import html_footer
@ -189,7 +190,18 @@ def html_podcast_episode(css_cache: {}, translate: {},
translate['Your browser does not support the audio element.'] + \ translate['Your browser does not support the audio element.'] + \
'\n </audio>\n' '\n </audio>\n'
elif podcast_properties.get('linkMimeType'): elif podcast_properties.get('linkMimeType'):
if 'video' in podcast_properties['linkMimeType']: if '/youtube' in podcast_properties['linkMimeType']:
url = link_url.replace('/watch?v=', '/embed/')
if '&' in url:
url = url.split('&')[0]
if '?utm_' in url:
url = url.split('?utm_')[0]
podcast_str += \
" <iframe loading=\"lazy\" src=\"" + \
url + "\" width=\"400\" height=\"300\" " + \
"frameborder=\"0\" allow=\"autoplay; fullscreen\" " + \
"allowfullscreen>\n </iframe>\n"
elif 'video' in podcast_properties['linkMimeType']:
video_mime_type = podcast_properties['linkMimeType'] video_mime_type = podcast_properties['linkMimeType']
video_msg = 'Your browser does not support the video element.' video_msg = 'Your browser does not support the video element.'
podcast_str += \ podcast_str += \
@ -209,11 +221,8 @@ def html_podcast_episode(css_cache: {}, translate: {},
if newswire_item[4]: if newswire_item[4]:
podcast_description = \ podcast_description = \
html.unescape(urllib.parse.unquote_plus(newswire_item[4])) html.unescape(urllib.parse.unquote_plus(newswire_item[4]))
podcast_description = remove_html(podcast_description) podcast_description = safe_web_text(podcast_description)
if podcast_description: if podcast_description:
remove_chars = ('Œ', 'â€', 'ğŸ', '<EFBFBD>')
for remchar in remove_chars:
podcast_description = podcast_description.replace(remchar, '')
podcast_str += '<p>' + podcast_description + '</p>\n' podcast_str += '<p>' + podcast_description + '</p>\n'
# donate button # donate button