Variable types

main
bashrc 2026-05-03 14:15:02 +01:00
parent 60385ab4f9
commit e1f1469635
2 changed files with 120 additions and 118 deletions

View File

@ -452,7 +452,7 @@ def _create_news_mirror(base_dir: str, domain: str,
if '|' in url or '>' in url: if '|' in url or '>' in url:
return True return True
mirror_dir = data_dir(base_dir) + '/newsmirror' mirror_dir: str = data_dir(base_dir) + '/newsmirror'
if not is_a_dir(mirror_dir): if not is_a_dir(mirror_dir):
makedir(mirror_dir) makedir(mirror_dir)
@ -462,7 +462,7 @@ def _create_news_mirror(base_dir: str, domain: str,
no_of_dirs = len(dirs) no_of_dirs = len(dirs)
break break
mirror_index_filename = data_dir(base_dir) + '/newsmirror.txt' mirror_index_filename: str = data_dir(base_dir) + '/newsmirror.txt'
if max_mirrored_articles > 0 and no_of_dirs > max_mirrored_articles: if max_mirrored_articles > 0 and no_of_dirs > max_mirrored_articles:
if not is_a_file(mirror_index_filename): if not is_a_file(mirror_index_filename):
@ -480,11 +480,11 @@ def _create_news_mirror(base_dir: str, domain: str,
# escape valve # escape valve
break break
post_id = fp_index.readline() post_id: str = fp_index.readline()
if not post_id: if not post_id:
continue continue
post_id = post_id.strip() post_id = post_id.strip()
mirror_article_dir = mirror_dir + '/' + post_id mirror_article_dir: str = mirror_dir + '/' + post_id
if is_a_dir(mirror_article_dir): if is_a_dir(mirror_article_dir):
rmtree(mirror_article_dir, rmtree(mirror_article_dir,
ignore_errors=False, onexc=None) ignore_errors=False, onexc=None)
@ -508,18 +508,18 @@ def _create_news_mirror(base_dir: str, domain: str,
'EX: _create_news_mirror unable to write ' + 'EX: _create_news_mirror unable to write ' +
mirror_index_filename) mirror_index_filename)
mirror_article_dir = mirror_dir + '/' + post_id_number mirror_article_dir: str = mirror_dir + '/' + post_id_number
if is_a_dir(mirror_article_dir): if is_a_dir(mirror_article_dir):
# already mirrored # already mirrored
return True return True
# for onion instances mirror via tor # for onion instances mirror via tor
prefix_str = '' prefix_str: str = ''
if domain.endswith('.onion'): if domain.endswith('.onion'):
prefix_str = '/usr/bin/torsocks ' prefix_str = '/usr/bin/torsocks '
# download the files # download the files
command_str = \ command_str: str = \
prefix_str + '/usr/bin/wget -mkEpnp -e robots=off ' + url + \ prefix_str + '/usr/bin/wget -mkEpnp -e robots=off ' + url + \
' -P ' + mirror_article_dir ' -P ' + mirror_article_dir
proc = Popen(command_str, shell=True) proc = Popen(command_str, shell=True)

View File

@ -78,7 +78,7 @@ def rss2header(http_prefix: str,
title: str, translate: {}) -> str: title: str, translate: {}) -> str:
"""Header for an RSS 2.0 feed """Header for an RSS 2.0 feed
""" """
rss_str = \ rss_str: str = \
"<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + \ "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + \
"<rss version=\"2.0\">" + \ "<rss version=\"2.0\">" + \
'<channel>' '<channel>'
@ -94,7 +94,7 @@ def rss2header(http_prefix: str,
' <link>' + http_prefix + '://' + domain_full + \ ' <link>' + http_prefix + '://' + domain_full + \
'/blog/rss.xml' + '</link>' '/blog/rss.xml' + '</link>'
else: else:
title_str = escape_text(translate[title]) title_str: str = escape_text(translate[title])
rss_str += \ rss_str += \
' <title>' + title_str + '</title>' + \ ' <title>' + title_str + '</title>' + \
' <link>' + \ ' <link>' + \
@ -106,8 +106,7 @@ def rss2header(http_prefix: str,
def rss2footer() -> str: def rss2footer() -> str:
"""Footer for an RSS 2.0 feed """Footer for an RSS 2.0 feed
""" """
rss_str = '</channel></rss>' return '</channel></rss>'
return rss_str
def get_newswire_tags(text: str, max_tags: int) -> []: def get_newswire_tags(text: str, max_tags: int) -> []:
@ -117,12 +116,12 @@ def get_newswire_tags(text: str, max_tags: int) -> []:
return [] return []
if ' ' not in text: if ' ' not in text:
return [] return []
text_simplified = \ text_simplified: str = \
text.replace(',', ' ').replace(';', ' ').replace('- ', ' ') text.replace(',', ' ').replace(';', ' ').replace('- ', ' ')
text_simplified = text_simplified.replace('. ', ' ').strip() text_simplified = text_simplified.replace('. ', ' ').strip()
if text_simplified.endswith('.'): if text_simplified.endswith('.'):
text_simplified = text_simplified[:len(text_simplified)-1] text_simplified = text_simplified[:len(text_simplified)-1]
words = text_simplified.split(' ') words: list[str] = text_simplified.split(' ')
tags: list[str] = [] tags: list[str] = []
for wrd in words: for wrd in words:
if not wrd.startswith('#'): if not wrd.startswith('#'):
@ -143,8 +142,8 @@ def limit_word_lengths(text: str, max_word_length: int) -> str:
""" """
if ' ' not in text: if ' ' not in text:
return text return text
words = text.split(' ') words: list[str] = text.split(' ')
result = '' result: str = ''
for wrd in words: for wrd in words:
if len(wrd) > max_word_length: if len(wrd) > max_word_length:
wrd = wrd[:max_word_length] wrd = wrd[:max_word_length]
@ -162,7 +161,7 @@ def get_newswire_favicon_url(url: str) -> str:
if url.startswith('http://'): if url.startswith('http://'):
if not (url.endswith('.onion') or url.endswith('.i2p')): if not (url.endswith('.onion') or url.endswith('.i2p')):
return '/newswire_favicon.ico' return '/newswire_favicon.ico'
domain = url.split('://')[1] domain: str = url.split('://')[1]
if '/' not in domain: if '/' not in domain:
return url + '/favicon.ico' return url + '/favicon.ico'
domain = domain.split('/')[0] domain = domain.split('/')[0]
@ -173,7 +172,7 @@ def _download_newswire_feed_favicon(session, base_dir: str,
link: str, debug: bool) -> bool: link: str, debug: bool) -> bool:
"""Downloads the favicon for the given feed link """Downloads the favicon for the given feed link
""" """
fav_url = get_newswire_favicon_url(link) fav_url: str = get_newswire_favicon_url(link)
if '://' not in link: if '://' not in link:
return False return False
timeout_sec: int = 10 timeout_sec: int = 10
@ -183,7 +182,7 @@ def _download_newswire_feed_favicon(session, base_dir: str,
return False return False
# update the favicon url # update the favicon url
extensions_to_mime = image_mime_types_dict() extensions_to_mime: dict = image_mime_types_dict()
for ext, mime_ext in extensions_to_mime.items(): for ext, mime_ext in extensions_to_mime.items():
if 'image/' + mime_ext in mime_type: if 'image/' + mime_ext in mime_type:
fav_url = fav_url.replace('.ico', '.' + ext) fav_url = fav_url.replace('.ico', '.' + ext)
@ -200,7 +199,7 @@ def _download_newswire_feed_favicon(session, base_dir: str,
return False return False
# save to the cache # save to the cache
fav_filename = get_fav_filename_from_url(base_dir, fav_url) fav_filename: str = get_fav_filename_from_url(base_dir, fav_url)
if is_a_file(fav_filename): if is_a_file(fav_filename):
return True return True
if not save_binary(image_data, fav_filename, if not save_binary(image_data, fav_filename,
@ -225,10 +224,10 @@ def _add_newswire_dict_entry(base_dir: str,
"""Update the newswire dictionary """Update the newswire dictionary
""" """
# remove any markup # remove any markup
title = remove_html(title) title: str = remove_html(title)
description = remove_html(description) description: str = remove_html(description)
all_text = title + ' ' + description all_text: str = title + ' ' + description
# check that none of the text is filtered against # check that none of the text is filtered against
if is_filtered(base_dir, None, None, all_text, system_language): if is_filtered(base_dir, None, None, all_text, system_language):
@ -240,7 +239,7 @@ def _add_newswire_dict_entry(base_dir: str,
tags: list[str] = [] tags: list[str] = []
# extract hashtags from the text of the feed post # extract hashtags from the text of the feed post
post_tags = get_newswire_tags(all_text, max_tags) post_tags: list[str] = get_newswire_tags(all_text, max_tags)
# Include tags from podcast categories # Include tags from podcast categories
if podcast_properties: if podcast_properties:
@ -282,15 +281,15 @@ def _add_newswire_dict_entry(base_dir: str,
def _valid_feed_date(pub_date: str, debug: bool = False) -> bool: def _valid_feed_date(pub_date: str, debug: bool = False) -> bool:
""" convert from YY-MM-DD HH:MM:SS+00:00 to YY-MM-DDTHH:MM:SSZ """ convert from YY-MM-DD HH:MM:SS+00:00 to YY-MM-DDTHH:MM:SSZ
""" """
post_date = pub_date.replace(' ', 'T').replace('+00:00', 'Z') post_date: str = pub_date.replace(' ', 'T').replace('+00:00', 'Z')
if '.' in post_date: if '.' in post_date:
ending = post_date.split('.')[1] ending: str = post_date.split('.')[1]
timezone_str = '' timezone_str: str = ''
for ending_char in ending: for ending_char in ending:
if not ending_char.isdigit(): if not ending_char.isdigit():
timezone_str += ending_char timezone_str += ending_char
if timezone_str: if timezone_str:
post_date = post_date.split('.')[0] + timezone_str post_date: str = post_date.split('.')[0] + timezone_str
return valid_post_date(post_date, 90, debug) return valid_post_date(post_date, 90, debug)
@ -303,9 +302,9 @@ def parse_feed_date(pub_date: str, unique_string_identifier: str) -> str:
# If this was published exactly on the hour then assign a # If this was published exactly on the hour then assign a
# random minute and second to make this item relatively unique # random minute and second to make this item relatively unique
randgen = random.Random(unique_string_identifier) randgen = random.Random(unique_string_identifier)
rand_min = randgen.randint(0, 59) rand_min: int = randgen.randint(0, 59)
rand_sec = randgen.randint(0, 59) rand_sec: int = randgen.randint(0, 59)
replace_time_str = \ replace_time_str: str = \
':' + str(rand_min).zfill(2) + ':' + str(rand_sec).zfill(2) ':' + str(rand_min).zfill(2) + ':' + str(rand_sec).zfill(2)
pub_date = pub_date.replace(':00:00', replace_time_str) pub_date = pub_date.replace(':00:00', replace_time_str)
@ -339,7 +338,7 @@ def parse_feed_date(pub_date: str, unique_string_identifier: str) -> str:
"%a, %d %b %Y %H:%M:%S", "%a, %d %b %Y %H:%M:%S",
"%d %b %Y %H:%M:%S") "%d %b %Y %H:%M:%S")
published_date = None published_date = None
timezone_endings = ( timezone_endings: list[str] = (
',', 'Z', 'GMT', 'EST', 'PST', 'AST', 'CST', 'MST', 'AKST', 'HST', ',', 'Z', 'GMT', 'EST', 'PST', 'AST', 'CST', 'MST', 'AKST', 'HST',
'UT' 'UT'
) )
@ -406,9 +405,10 @@ def load_hashtag_categories(base_dir: str, language: str) -> None:
if not is_a_file(hashtag_categories_filename): if not is_a_file(hashtag_categories_filename):
return return
xml_str = load_string(hashtag_categories_filename, xml_str: str = \
'EX: load_hashtag_categories unable to read ' + load_string(hashtag_categories_filename,
hashtag_categories_filename) 'EX: load_hashtag_categories unable to read ' +
hashtag_categories_filename)
if xml_str: if xml_str:
_xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True) _xml2str_to_hashtag_categories(base_dir, xml_str, 1024, True)
@ -464,20 +464,20 @@ def _get_podcast_categories(xml_item: str, xml_str: str) -> str:
# convert keywords to hashtags # convert keywords to hashtags
if '<itunes:keywords' in xml_item: if '<itunes:keywords' in xml_item:
keywords_str = xml_item.split('<itunes:keywords')[1] keywords_str: str = xml_item.split('<itunes:keywords')[1]
if '>' in keywords_str: if '>' in keywords_str:
keywords_str = keywords_str.split('>')[1] keywords_str = keywords_str.split('>')[1]
if '<' in keywords_str: if '<' in keywords_str:
keywords_str = keywords_str.split('<')[0] keywords_str = keywords_str.split('<')[0]
keywords_str = remove_html(keywords_str) keywords_str = remove_html(keywords_str)
keywords_list = keywords_str.split(',') keywords_list: list[str] = keywords_str.split(',')
for keyword in keywords_list: for keyword in keywords_list:
keyword_hashtag = '#' + keyword.strip() keyword_hashtag = '#' + keyword.strip()
if keyword_hashtag not in podcast_categories: if keyword_hashtag not in podcast_categories:
if valid_hash_tag(keyword): if valid_hash_tag(keyword):
podcast_categories.append(keyword_hashtag) podcast_categories.append(keyword_hashtag)
episode_category_tags = ['<itunes:category', '<category'] episode_category_tags: list[str] = ['<itunes:category', '<category']
for category_tag in episode_category_tags: for category_tag in episode_category_tags:
item_str = xml_item item_str = xml_item
if category_tag not in xml_item: if category_tag not in xml_item:
@ -485,7 +485,7 @@ def _get_podcast_categories(xml_item: str, xml_str: str) -> str:
continue continue
item_str = xml_str item_str = xml_str
category_list = item_str.split(category_tag) category_list: list[str] = item_str.split(category_tag)
first_category: bool = True first_category: bool = True
for episode_category in category_list: for episode_category in category_list:
if first_category: if first_category:
@ -493,26 +493,26 @@ def _get_podcast_categories(xml_item: str, xml_str: str) -> str:
continue continue
if 'text="' in episode_category: if 'text="' in episode_category:
episode_category = episode_category.split('text="')[1] episode_category: str = episode_category.split('text="')[1]
if '"' in episode_category: if '"' in episode_category:
episode_category = episode_category.split('"')[0] episode_category = episode_category.split('"')[0]
episode_category = \ episode_category = \
episode_category.lower().replace(' ', '') episode_category.lower().replace(' ', '')
episode_category = episode_category.replace('#', '') episode_category = episode_category.replace('#', '')
episode_category_hashtag = '#' + episode_category episode_category_hashtag: str = '#' + episode_category
if episode_category_hashtag not in podcast_categories: if episode_category_hashtag not in podcast_categories:
if valid_hash_tag(episode_category): if valid_hash_tag(episode_category):
podcast_categories.append(episode_category_hashtag) podcast_categories.append(episode_category_hashtag)
continue continue
if '>' in episode_category: if '>' in episode_category:
episode_category = episode_category.split('>')[1] episode_category: str = episode_category.split('>')[1]
if '<' in episode_category: if '<' in episode_category:
episode_category = episode_category.split('<')[0] episode_category = episode_category.split('<')[0]
episode_category = \ episode_category = \
episode_category.lower().replace(' ', '') episode_category.lower().replace(' ', '')
episode_category = episode_category.replace('#', '') episode_category = episode_category.replace('#', '')
episode_category_hashtag = '#' + episode_category episode_category_hashtag: str = '#' + episode_category
if episode_category_hashtag not in podcast_categories: if episode_category_hashtag not in podcast_categories:
if valid_hash_tag(episode_category): if valid_hash_tag(episode_category):
podcast_categories.append(episode_category_hashtag) podcast_categories.append(episode_category_hashtag)
@ -524,21 +524,23 @@ def _get_podcast_author(xml_item: str, xml_str: str) -> str:
""" get podcast author if specified. """ get podcast author if specified.
""" """
author = None author = None
episode_author_tags = ['<podcast:person', '<itunes:author', '<author'] episode_author_tags: list[str] = [
'<podcast:person', '<itunes:author', '<author'
]
for author_tag in episode_author_tags: for author_tag in episode_author_tags:
item_str = xml_item item_str: str = xml_item
if author_tag not in xml_item: if author_tag not in xml_item:
if author_tag not in xml_str: if author_tag not in xml_str:
continue continue
item_str = xml_str item_str = xml_str
author_str = item_str.split(author_tag)[1] author_str: str = item_str.split(author_tag)[1]
if '>' not in author_str: if '>' not in author_str:
continue continue
author_str = author_str.split('>')[1] author_str = author_str.split('>')[1]
if '<' not in author_str: if '<' not in author_str:
continue continue
author = item_str.split('>')[0] author: str = item_str.split('>')[0]
return remove_html(author).strip() return remove_html(author).strip()
return author return author
@ -566,17 +568,17 @@ def _valid_podcast_entry(base_dir: str, key: str, entry: {}) -> bool:
if not isinstance(entry['uri'], str): if not isinstance(entry['uri'], str):
print('podcast uri is not a string ' + str(entry)) print('podcast uri is not a string ' + str(entry))
return False return False
post_url = remove_html(entry['uri']) post_url: str = remove_html(entry['uri'])
elif entry.get('url'): elif entry.get('url'):
if not isinstance(entry['url'], str): if not isinstance(entry['url'], str):
print('podcast url is not a string ' + str(entry)) print('podcast url is not a string ' + str(entry))
return False return False
post_url = remove_html(entry['url']) post_url: str = remove_html(entry['url'])
else: else:
if not isinstance(entry['text'], str): if not isinstance(entry['text'], str):
print('podcast text is not a string ' + str(entry)) print('podcast text is not a string ' + str(entry))
return False return False
post_url = entry['text'] post_url: str = entry['text']
if '://' not in post_url: if '://' not in post_url:
return False return False
post_domain, _ = get_domain_from_actor(post_url) post_domain, _ = get_domain_from_actor(post_url)
@ -599,7 +601,7 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
if '<media:thumbnail' not in xml_item: if '<media:thumbnail' not in xml_item:
return {} return {}
podcast_properties = { podcast_properties: dict = {
"locations": [], "locations": [],
"persons": [], "persons": [],
"soundbites": [], "soundbites": [],
@ -612,15 +614,15 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
"socialInteract": [], "socialInteract": [],
} }
pod_lines: list = xml_item.split('<podcast:') pod_lines: list[str] = xml_item.split('<podcast:')
ctr: int = 0 ctr: int = 0
for pod_line in pod_lines: for pod_line in pod_lines:
if ctr == 0 or '>' not in pod_line: if ctr == 0 or '>' not in pod_line:
ctr += 1 ctr += 1
continue continue
if ' ' not in pod_line.split('>')[0]: if ' ' not in pod_line.split('>')[0]:
pod_key = pod_line.split('>')[0].strip() pod_key: str = pod_line.split('>')[0].strip()
pod_val = pod_line.split('>', 1)[1].strip() pod_val: str = pod_line.split('>', 1)[1].strip()
if '<' in pod_val: if '<' in pod_val:
pod_val = pod_val.split('<')[0] pod_val = pod_val.split('<')[0]
if pod_key in podcast_properties: if pod_key in podcast_properties:
@ -629,7 +631,7 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
continue continue
pod_key = pod_line.split(' ')[0] pod_key = pod_line.split(' ')[0]
pod_fields = ( pod_fields: list[str] = (
'url', 'geo', 'osm', 'type', 'method', 'group', 'url', 'geo', 'osm', 'type', 'method', 'group',
'owner', 'srcset', 'img', 'role', 'address', 'suggested', 'owner', 'srcset', 'img', 'role', 'address', 'suggested',
'startTime', 'duration', 'href', 'name', 'pubdate', 'startTime', 'duration', 'href', 'name', 'pubdate',
@ -637,14 +639,14 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
'accountId', 'priority', 'podcastAccountId', 'accountId', 'priority', 'podcastAccountId',
'podcastAccountUrl' 'podcastAccountUrl'
) )
pod_entry = {} pod_entry: dict = {}
for pod_field in pod_fields: for pod_field in pod_fields:
if pod_field + '="' not in pod_line: if pod_field + '="' not in pod_line:
continue continue
pod_str = pod_line.split(pod_field + '="')[1] pod_str: str = pod_line.split(pod_field + '="')[1]
if '"' not in pod_str: if '"' not in pod_str:
continue continue
pod_val = pod_str.split('"')[0] pod_val: str = pod_str.split('"')[0]
pod_entry[pod_field] = pod_val pod_entry[pod_field] = pod_val
pod_text = pod_line.split('>')[1] pod_text = pod_line.split('>')[1]
@ -668,7 +670,7 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
# itunes:duration rather than podcast:duration # itunes:duration rather than podcast:duration
if 'duration' not in podcast_properties: if 'duration' not in podcast_properties:
if '<itunes:duration' in xml_item: if '<itunes:duration' in xml_item:
duration = xml_item.split('<itunes:duration')[1] duration: str = xml_item.split('<itunes:duration')[1]
if '>' in duration: if '>' in duration:
duration = duration.split('>')[1] duration = duration.split('>')[1]
if '<' in duration: if '<' in duration:
@ -677,7 +679,7 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
# get the image for the podcast, if it exists # get the image for the podcast, if it exists
podcast_episode_image = None podcast_episode_image = None
episode_image_tags = ['<itunes:image', '<media:thumbnail'] episode_image_tags: list[str] = ['<itunes:image', '<media:thumbnail']
for image_tag in episode_image_tags: for image_tag in episode_image_tags:
item_str = xml_item item_str = xml_item
if image_tag not in xml_item: if image_tag not in xml_item:
@ -685,7 +687,7 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
continue continue
item_str = xml_str item_str = xml_str
episode_image = item_str.split(image_tag)[1] episode_image: str = item_str.split(image_tag)[1]
if image_tag + ' ' in item_str and '>' in episode_image: if image_tag + ' ' in item_str and '>' in episode_image:
episode_image = episode_image.split('>')[0] episode_image = episode_image.split('>')[0]
@ -710,10 +712,10 @@ def xml_podcast_to_dict(base_dir: str, xml_item: str, xml_str: str) -> {}:
break break
# get categories if they exist. These can be turned into hashtags # get categories if they exist. These can be turned into hashtags
podcast_categories = _get_podcast_categories(xml_item, xml_str) podcast_categories: str = _get_podcast_categories(xml_item, xml_str)
# get the author name # get the author name
podcast_author = _get_podcast_author(xml_item, xml_str) podcast_author: str = _get_podcast_author(xml_item, xml_str)
if podcast_author: if podcast_author:
podcast_properties['author'] = podcast_author podcast_properties['author'] = podcast_author
@ -739,7 +741,8 @@ def get_link_from_rss_item(rss_item: str,
proxy_type: str) -> (str, str): proxy_type: str) -> (str, str):
"""Extracts rss link from rss item string """Extracts rss link from rss item string
""" """
mime_type = None mime_type: str = None
link: str = None
if preferred_mime_types and '<podcast:alternateEnclosure ' in rss_item: if preferred_mime_types and '<podcast:alternateEnclosure ' in rss_item:
enclosures: list = rss_item.split('<podcast:alternateEnclosure ') enclosures: list = rss_item.split('<podcast:alternateEnclosure ')
@ -751,17 +754,17 @@ def get_link_from_rss_item(rss_item: str,
ctr += 1 ctr += 1
if '</podcast:alternateEnclosure' not in enclosure: if '</podcast:alternateEnclosure' not in enclosure:
continue continue
enclosure = enclosure.split('</podcast:alternateEnclosure')[0] enclosure: str = enclosure.split('</podcast:alternateEnclosure')[0]
if 'type="' not in enclosure: if 'type="' not in enclosure:
continue continue
mime_type = enclosure.split('type="')[1] mime_type: str = enclosure.split('type="')[1]
if '"' in mime_type: if '"' in mime_type:
mime_type = mime_type.split('"')[0] mime_type = mime_type.split('"')[0]
if mime_type not in preferred_mime_types: if mime_type not in preferred_mime_types:
continue continue
if 'uri="' not in enclosure: if 'uri="' not in enclosure:
continue continue
uris = enclosure.split('uri="') uris: str = enclosure.split('uri="')
ctr2: int = 0 ctr2: int = 0
for uri in uris: for uri in uris:
if ctr2 == 0: if ctr2 == 0:
@ -790,16 +793,16 @@ def get_link_from_rss_item(rss_item: str,
if '<enclosure ' in rss_item: if '<enclosure ' in rss_item:
# get link from audio or video enclosure # get link from audio or video enclosure
enclosure = rss_item.split('<enclosure ')[1] enclosure: str = rss_item.split('<enclosure ')[1]
if '>' in enclosure: if '>' in enclosure:
enclosure = enclosure.split('>')[0] enclosure = enclosure.split('>')[0]
if ' type="' in enclosure: if ' type="' in enclosure:
mime_type = enclosure.split(' type="')[1] mime_type: str = enclosure.split(' type="')[1]
if '"' in mime_type: if '"' in mime_type:
mime_type = mime_type.split('"')[0] mime_type = mime_type.split('"')[0]
if 'url="' in enclosure and \ if 'url="' in enclosure and \
('"audio/' in enclosure or '"video/' in enclosure): ('"audio/' in enclosure or '"video/' in enclosure):
link_str = enclosure.split('url="')[1] link_str: str = enclosure.split('url="')[1]
if '"' in link_str: if '"' in link_str:
link = link_str.split('"')[0] link = link_str.split('"')[0]
if resembles_url(link): if resembles_url(link):
@ -811,7 +814,7 @@ def get_link_from_rss_item(rss_item: str,
if '://' not in link: if '://' not in link:
return None, None return None, None
elif '<link ' in rss_item: elif '<link ' in rss_item:
link_str = rss_item.split('<link ')[1] link_str: str = rss_item.split('<link ')[1]
if '>' in link_str: if '>' in link_str:
link_str = link_str.split('>')[0] link_str = link_str.split('>')[0]
if 'href="' in link_str: if 'href="' in link_str:
@ -920,7 +923,7 @@ def _xml2str_to_dict(base_dir: str, domain: str, xml_str: str,
continue continue
post_filename = '' post_filename = ''
votes_status: list[str] = [] votes_status: list[str] = []
podcast_properties = \ podcast_properties: dict = \
xml_podcast_to_dict(base_dir, rss_item, xml_str) xml_podcast_to_dict(base_dir, rss_item, xml_str)
if podcast_properties: if podcast_properties:
podcast_properties['linkMimeType'] = link_mime_type podcast_properties['linkMimeType'] = link_mime_type
@ -1041,7 +1044,7 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
continue continue
post_filename = '' post_filename = ''
votes_status: list[str] = [] votes_status: list[str] = []
podcast_properties = \ podcast_properties: dict = \
xml_podcast_to_dict(base_dir, rss_item, xml_str) xml_podcast_to_dict(base_dir, rss_item, xml_str)
if podcast_properties: if podcast_properties:
podcast_properties['linkMimeType'] = link_mime_type podcast_properties['linkMimeType'] = link_mime_type
@ -1233,11 +1236,11 @@ def _json_feed_v1to_dict(base_dir: str, xml_str: str,
if '"items"' not in xml_str: if '"items"' not in xml_str:
return {} return {}
try: try:
feed_json = json.loads(xml_str) feed_json: dict = json.loads(xml_str)
except BaseException: except BaseException:
print('EX: _json_feed_v1to_dict unable to load json ' + str(xml_str)) print('EX: _json_feed_v1to_dict unable to load json ' + str(xml_str))
return {} return {}
max_bytes = max_feed_item_size_kb * 1024 max_bytes: int = max_feed_item_size_kb * 1024
if not feed_json.get('version'): if not feed_json.get('version'):
return {} return {}
if not feed_json['version'].startswith('https://jsonfeed.org/version/1'): if not feed_json['version'].startswith('https://jsonfeed.org/version/1'):
@ -1267,15 +1270,15 @@ def _json_feed_v1to_dict(base_dir: str, xml_str: str,
if json_feed_item.get('content_html'): if json_feed_item.get('content_html'):
if not isinstance(json_feed_item['content_html'], str): if not isinstance(json_feed_item['content_html'], str):
continue continue
title = remove_html(json_feed_item['content_html']) title: str = remove_html(json_feed_item['content_html'])
else: else:
if not isinstance(json_feed_item['content_text'], str): if not isinstance(json_feed_item['content_text'], str):
continue continue
title = remove_html(json_feed_item['content_text']) title: str = remove_html(json_feed_item['content_text'])
if len(title) > max_bytes: if len(title) > max_bytes:
print('WARN: json feed title is too long') print('WARN: json feed title is too long')
continue continue
description = '' description: str = ''
if json_feed_item.get('description'): if json_feed_item.get('description'):
if not isinstance(json_feed_item['description'], str): if not isinstance(json_feed_item['description'], str):
continue continue
@ -1295,13 +1298,13 @@ def _json_feed_v1to_dict(base_dir: str, xml_str: str,
if tag_name not in description: if tag_name not in description:
description += ' ' + tag_name description += ' ' + tag_name
link = remove_html(url_str) link: str = remove_html(url_str)
if '://' not in link: if '://' not in link:
continue continue
if len(link) > max_bytes: if len(link) > max_bytes:
print('WARN: json feed link is too long') print('WARN: json feed link is too long')
continue continue
item_domain = link.split('://')[1] item_domain: str = link.split('://')[1]
if '/' in item_domain: if '/' in item_domain:
item_domain = item_domain.split('/')[0] item_domain = item_domain.split('/')[0]
if is_blocked_domain(base_dir, item_domain, None, None): if is_blocked_domain(base_dir, item_domain, None, None):
@ -1309,21 +1312,21 @@ def _json_feed_v1to_dict(base_dir: str, xml_str: str,
if json_feed_item.get('date_published'): if json_feed_item.get('date_published'):
if not isinstance(json_feed_item['date_published'], str): if not isinstance(json_feed_item['date_published'], str):
continue continue
pub_date = json_feed_item['date_published'] pub_date: str = json_feed_item['date_published']
else: else:
if not isinstance(json_feed_item['date_modified'], str): if not isinstance(json_feed_item['date_modified'], str):
continue continue
pub_date = json_feed_item['date_modified'] pub_date: str = json_feed_item['date_modified']
unique_string_identifier = title + ' ' + link unique_string_identifier: str = title + ' ' + link
pub_date_str = parse_feed_date(pub_date, unique_string_identifier) pub_date_str: str = parse_feed_date(pub_date, unique_string_identifier)
if not pub_date_str: if not pub_date_str:
continue continue
if not _valid_feed_date(pub_date_str): if not _valid_feed_date(pub_date_str):
continue continue
post_filename = '' post_filename: str = ''
votes_status: list[str] = [] votes_status: list[str] = []
fediverse_handle = '' fediverse_handle: str = ''
extra_links: list[str] = [] extra_links: list[str] = []
_add_newswire_dict_entry(base_dir, _add_newswire_dict_entry(base_dir,
result, pub_date_str, result, pub_date_str,
@ -1382,11 +1385,11 @@ def _atom_feed_yt_to_dict(base_dir: str, xml_str: str,
continue continue
if '</yt:videoId>' not in atom_item: if '</yt:videoId>' not in atom_item:
continue continue
title = atom_item.split('<title>')[1] title: str = atom_item.split('<title>')[1]
title = _remove_cdata(title.split('</title>')[0]) title = _remove_cdata(title.split('</title>')[0])
title = remove_script(title, None, None, None) title = remove_script(title, None, None, None)
title = unescaped_text(title) title = unescaped_text(title)
description = '' description: str = ''
if '<media:description>' in atom_item and \ if '<media:description>' in atom_item and \
'</media:description>' in atom_item: '</media:description>' in atom_item:
description = atom_item.split('<media:description>')[1] description = atom_item.split('<media:description>')[1]
@ -1416,7 +1419,7 @@ def _atom_feed_yt_to_dict(base_dir: str, xml_str: str,
if not link: if not link:
continue continue
pub_date = atom_item.split('<published>')[1] pub_date: str = atom_item.split('<published>')[1]
pub_date = pub_date.split('</published>')[0] pub_date = pub_date.split('</published>')[0]
unique_string_identifier = title + ' ' + link unique_string_identifier = title + ' ' + link
@ -1425,13 +1428,13 @@ def _atom_feed_yt_to_dict(base_dir: str, xml_str: str,
continue continue
if not _valid_feed_date(pub_date_str): if not _valid_feed_date(pub_date_str):
continue continue
post_filename = '' post_filename: str = ''
votes_status: list[str] = [] votes_status: list[str] = []
podcast_properties = \ podcast_properties: dict = \
xml_podcast_to_dict(base_dir, atom_item, xml_str) xml_podcast_to_dict(base_dir, atom_item, xml_str)
if podcast_properties: if podcast_properties:
podcast_properties['linkMimeType'] = 'video/youtube' podcast_properties['linkMimeType'] = 'video/youtube'
fediverse_handle = '' fediverse_handle: str = ''
extra_links: list[str] = [] extra_links: list[str] = []
_add_newswire_dict_entry(base_dir, _add_newswire_dict_entry(base_dir,
result, pub_date_str, result, pub_date_str,
@ -1502,8 +1505,8 @@ def _yt_channel_to_atom_feed(url: str) -> str:
""" """
if 'youtube.com/channel/' not in url: if 'youtube.com/channel/' not in url:
return url return url
channel_id = url.split('youtube.com/channel/')[1].strip() channel_id: str = url.split('youtube.com/channel/')[1].strip()
channel_url = \ channel_url: str = \
'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id
print('YouTube feed: ' + channel_url) print('YouTube feed: ' + channel_url)
return channel_url return channel_url
@ -1522,12 +1525,12 @@ def get_rss(base_dir: str, domain: str, session, url: str,
print('url: ' + str(url)) print('url: ' + str(url))
print('ERROR: get_rss url should be a string') print('ERROR: get_rss url should be a string')
return None return None
headers = { headers: dict = {
'Accept': 'text/xml, application/xml; charset=UTF-8' 'Accept': 'text/xml, application/xml; charset=UTF-8'
} }
params = None params = None
session_params = {} session_params: dict = {}
session_headers = {} session_headers: dict = {}
if headers: if headers:
session_headers = headers session_headers = headers
if params: if params:
@ -1536,9 +1539,9 @@ def get_rss(base_dir: str, domain: str, session, url: str,
'Mozilla/5.0 (X11; Linux x86_64; rv:81.0) Gecko/20100101 Firefox/81.0' 'Mozilla/5.0 (X11; Linux x86_64; rv:81.0) Gecko/20100101 Firefox/81.0'
if not session: if not session:
print('WARN: no session specified for get_rss') print('WARN: no session specified for get_rss')
url = _yt_channel_to_atom_feed(url) url: str = _yt_channel_to_atom_feed(url)
try: try:
result = \ result: str = \
session.get(url, headers=session_headers, session.get(url, headers=session_headers,
params=session_params, params=session_params,
timeout=timeout_sec, timeout=timeout_sec,
@ -1581,14 +1584,13 @@ def get_rss_from_dict(newswire: {},
"""Returns an rss feed from the current newswire dict. """Returns an rss feed from the current newswire dict.
This allows other instances to subscribe to the same newswire This allows other instances to subscribe to the same newswire
""" """
rss_str = rss2header(http_prefix, rss_str: str = \
None, domain_full, rss2header(http_prefix, None, domain_full, 'Newswire', translate)
'Newswire', translate)
if not newswire: if not newswire:
return '' return ''
for published, fields in newswire.items(): for published, fields in newswire.items():
if '+00:00' in published: if '+00:00' in published:
published = published.replace('+00:00', 'Z').strip() published: str = published.replace('+00:00', 'Z').strip()
published = published.replace(' ', 'T') published = published.replace(' ', 'T')
else: else:
published_with_offset = \ published_with_offset = \
@ -1603,16 +1605,16 @@ def get_rss_from_dict(newswire: {},
rss_str += \ rss_str += \
'<item>\n' + \ '<item>\n' + \
' <title>' + escape_text(fields[0]) + '</title>\n' ' <title>' + escape_text(fields[0]) + '</title>\n'
description = remove_html(first_paragraph_from_string(fields[4])) description: str = remove_html(first_paragraph_from_string(fields[4]))
rss_str += \ rss_str += \
' <description>' + escape_text(description) + '</description>\n' ' <description>' + escape_text(description) + '</description>\n'
url = fields[1] url: str = fields[1]
if '://' not in url: if '://' not in url:
if domain_full not in url: if domain_full not in url:
url = http_prefix + '://' + domain_full + url url = http_prefix + '://' + domain_full + url
rss_str += ' <link>' + url + '</link>\n' rss_str += ' <link>' + url + '</link>\n'
rss_date_str = pub_date.strftime("%a, %d %b %Y %H:%M:%S UT") rss_date_str: str = pub_date.strftime("%a, %d %b %Y %H:%M:%S UT")
rss_str += \ rss_str += \
' <pubDate>' + rss_date_str + '</pubDate>\n' + \ ' <pubDate>' + rss_date_str + '</pubDate>\n' + \
'</item>\n' '</item>\n'
@ -1676,14 +1678,14 @@ def _add_account_blogs_to_newswire(base_dir: str, nickname: str, domain: str,
moderated: bool = False moderated: bool = False
# local blogs can potentially be moderated # local blogs can potentially be moderated
moderated_filename = \ moderated_filename: str = \
acct_dir(base_dir, nickname, domain) + '/.newswiremoderated' acct_dir(base_dir, nickname, domain) + '/.newswiremoderated'
if is_a_file(moderated_filename): if is_a_file(moderated_filename):
moderated = True moderated = True
try: try:
with open(index_filename, 'r', encoding='utf-8') as fp_index: with open(index_filename, 'r', encoding='utf-8') as fp_index:
post_filename = 'start' post_filename: str = 'start'
ctr: int = 0 ctr: int = 0
while post_filename: while post_filename:
post_filename = fp_index.readline() post_filename = fp_index.readline()
@ -1700,11 +1702,11 @@ def _add_account_blogs_to_newswire(base_dir: str, nickname: str, domain: str,
# filename of the post without any extension or path # filename of the post without any extension or path
# This should also correspond to any index entry in # This should also correspond to any index entry in
# the posts cache # the posts cache
post_url = remove_eol(post_filename) post_url: str = remove_eol(post_filename)
post_url = post_url.replace('.json', '').strip() post_url = post_url.replace('.json', '').strip()
# read the post from file # read the post from file
full_post_filename = \ full_post_filename: str = \
locate_post(base_dir, nickname, locate_post(base_dir, nickname,
domain, post_url, False) domain, post_url, False)
if not full_post_filename: if not full_post_filename:
@ -1718,24 +1720,24 @@ def _add_account_blogs_to_newswire(base_dir: str, nickname: str, domain: str,
if full_post_filename: if full_post_filename:
post_json_object = load_json(full_post_filename) post_json_object = load_json(full_post_filename)
if _is_newswire_blog_post(post_json_object): if _is_newswire_blog_post(post_json_object):
published = post_json_object['object']['published'] published: str = post_json_object['object']['published']
published = published.replace('T', ' ') published = published.replace('T', ' ')
published = published.replace('Z', '+00:00') published = published.replace('Z', '+00:00')
votes: list[str] = [] votes: list[str] = []
if is_a_file(full_post_filename + '.votes'): if is_a_file(full_post_filename + '.votes'):
votes = load_json(full_post_filename + '.votes') votes = load_json(full_post_filename + '.votes')
content = \ content: str = \
get_base_content_from_post(post_json_object, get_base_content_from_post(post_json_object,
system_language) system_language)
description = first_paragraph_from_string(content) description: str = first_paragraph_from_string(content)
description = remove_html(description) description = remove_html(description)
tags_from_post = \ tags_from_post: list[str] = \
_get_hashtags_from_post(post_json_object) _get_hashtags_from_post(post_json_object)
summary = post_json_object['object']['summary'] summary: str = post_json_object['object']['summary']
url2 = post_json_object['object']['url'] url2: str = post_json_object['object']['url']
url_str = get_url_from_post(url2) url_str: str = get_url_from_post(url2)
url3 = remove_html(url_str) url3: str = remove_html(url_str)
fediverse_handle = '' fediverse_handle: str = ''
extra_links: list[str] = [] extra_links: list[str] = []
_add_newswire_dict_entry(base_dir, _add_newswire_dict_entry(base_dir,
newswire, published, newswire, published,