mirror of https://gitlab.com/bashrc2/epicyon
Variable types
parent
e1f1469635
commit
eee7c0db62
92
newswire.py
92
newswire.py
|
|
@ -745,7 +745,7 @@ def get_link_from_rss_item(rss_item: str,
|
|||
link: str = None
|
||||
|
||||
if preferred_mime_types and '<podcast:alternateEnclosure ' in rss_item:
|
||||
enclosures: list = rss_item.split('<podcast:alternateEnclosure ')
|
||||
enclosures: list[str] = rss_item.split('<podcast:alternateEnclosure ')
|
||||
ctr: int = 0
|
||||
for enclosure in enclosures:
|
||||
if ctr == 0:
|
||||
|
|
@ -869,14 +869,14 @@ def _xml2str_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
if '</pubDate>' not in rss_item:
|
||||
continue
|
||||
|
||||
title = rss_item.split('<title>')[1]
|
||||
title: str = rss_item.split('<title>')[1]
|
||||
title = _remove_cdata(title.split('</title>')[0])
|
||||
title = unescaped_text(title)
|
||||
title = remove_script(title, None, None, None)
|
||||
title = remove_html(title)
|
||||
title = title.replace('\n', '')
|
||||
|
||||
description = ''
|
||||
description: str = ''
|
||||
if '<description>' in rss_item and '</description>' in rss_item:
|
||||
description = rss_item.split('<description>')[1]
|
||||
description = description.split('</description>')[0]
|
||||
|
|
@ -892,7 +892,7 @@ def _xml2str_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
description = remove_script(description, None, None, None)
|
||||
description = remove_html(description)
|
||||
|
||||
proxy_type = None
|
||||
proxy_type: str = None
|
||||
if domain.endswith('.onion'):
|
||||
proxy_type = 'tor'
|
||||
elif domain.endswith('.i2p'):
|
||||
|
|
@ -906,28 +906,28 @@ def _xml2str_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
if not link:
|
||||
continue
|
||||
|
||||
item_domain = link.split('://')[1]
|
||||
item_domain: str = link.split('://')[1]
|
||||
if '/' in item_domain:
|
||||
item_domain = item_domain.split('/')[0]
|
||||
|
||||
if is_blocked_domain(base_dir, item_domain, None, None):
|
||||
continue
|
||||
pub_date = rss_item.split('<pubDate>')[1]
|
||||
pub_date: str = rss_item.split('<pubDate>')[1]
|
||||
pub_date = pub_date.split('</pubDate>')[0]
|
||||
|
||||
unique_string_identifier = title + ' ' + link
|
||||
pub_date_str = parse_feed_date(pub_date, unique_string_identifier)
|
||||
unique_string_identifier: str = title + ' ' + link
|
||||
pub_date_str: str = parse_feed_date(pub_date, unique_string_identifier)
|
||||
if not pub_date_str:
|
||||
continue
|
||||
if not _valid_feed_date(pub_date_str):
|
||||
continue
|
||||
post_filename = ''
|
||||
post_filename: str = ''
|
||||
votes_status: list[str] = []
|
||||
podcast_properties: dict = \
|
||||
xml_podcast_to_dict(base_dir, rss_item, xml_str)
|
||||
if podcast_properties:
|
||||
podcast_properties['linkMimeType'] = link_mime_type
|
||||
fediverse_handle = ''
|
||||
fediverse_handle: str = ''
|
||||
extra_links: list[str] = []
|
||||
_add_newswire_dict_entry(base_dir,
|
||||
result, pub_date_str,
|
||||
|
|
@ -956,7 +956,7 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
"""Converts an xml RSS 1.0 string to a dictionary
|
||||
https://validator.w3.org/feed/docs/rss1.html
|
||||
"""
|
||||
item_str = '<item'
|
||||
item_str: str = '<item'
|
||||
if item_str not in xml_str:
|
||||
return {}
|
||||
result: dict = {}
|
||||
|
|
@ -992,12 +992,12 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
continue
|
||||
if '</dc:date>' not in rss_item:
|
||||
continue
|
||||
title = rss_item.split('<title>')[1]
|
||||
title: str = rss_item.split('<title>')[1]
|
||||
title = _remove_cdata(title.split('</title>')[0])
|
||||
title = unescaped_text(title)
|
||||
title = remove_script(title, None, None, None)
|
||||
title = remove_html(title)
|
||||
description = ''
|
||||
description: str = ''
|
||||
if '<description>' in rss_item and '</description>' in rss_item:
|
||||
description = rss_item.split('<description>')[1]
|
||||
description = description.split('</description>')[0]
|
||||
|
|
@ -1013,7 +1013,7 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
description = remove_script(description, None, None, None)
|
||||
description = remove_html(description)
|
||||
|
||||
proxy_type = None
|
||||
proxy_type: str = None
|
||||
if domain.endswith('.onion'):
|
||||
proxy_type = 'tor'
|
||||
elif domain.endswith('.i2p'):
|
||||
|
|
@ -1027,28 +1027,28 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
if not link:
|
||||
continue
|
||||
|
||||
item_domain = link.split('://')[1]
|
||||
item_domain: str = link.split('://')[1]
|
||||
if '/' in item_domain:
|
||||
item_domain = item_domain.split('/')[0]
|
||||
|
||||
if is_blocked_domain(base_dir, item_domain, None, None):
|
||||
continue
|
||||
pub_date = rss_item.split('<dc:date>')[1]
|
||||
pub_date: str = rss_item.split('<dc:date>')[1]
|
||||
pub_date = pub_date.split('</dc:date>')[0]
|
||||
|
||||
unique_string_identifier = title + ' ' + link
|
||||
pub_date_str = parse_feed_date(pub_date, unique_string_identifier)
|
||||
unique_string_identifier: str = title + ' ' + link
|
||||
pub_date_str: str = parse_feed_date(pub_date, unique_string_identifier)
|
||||
if not pub_date_str:
|
||||
continue
|
||||
if not _valid_feed_date(pub_date_str):
|
||||
continue
|
||||
post_filename = ''
|
||||
post_filename: str = ''
|
||||
votes_status: list[str] = []
|
||||
podcast_properties: dict = \
|
||||
xml_podcast_to_dict(base_dir, rss_item, xml_str)
|
||||
if podcast_properties:
|
||||
podcast_properties['linkMimeType'] = link_mime_type
|
||||
fediverse_handle = ''
|
||||
fediverse_handle: str = ''
|
||||
extra_links: list[str] = []
|
||||
_add_newswire_dict_entry(base_dir,
|
||||
result, pub_date_str,
|
||||
|
|
@ -1102,12 +1102,12 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
continue
|
||||
if '</updated>' not in atom_item:
|
||||
continue
|
||||
title = atom_item.split('<title>')[1]
|
||||
title: str = atom_item.split('<title>')[1]
|
||||
title = _remove_cdata(title.split('</title>')[0])
|
||||
title = unescaped_text(title)
|
||||
title = remove_script(title, None, None, None)
|
||||
title = remove_html(title)
|
||||
description = ''
|
||||
description: str = ''
|
||||
if '<summary>' in atom_item and '</summary>' in atom_item:
|
||||
description = atom_item.split('<summary>')[1]
|
||||
description = unescaped_text(description.split('</summary>')[0])
|
||||
|
|
@ -1129,18 +1129,18 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
description = remove_html(description)
|
||||
|
||||
# is there a fediverse handle
|
||||
fediverse_handle = ''
|
||||
fediverse_handle: str = ''
|
||||
if '<author>' in atom_item and '</author>' in atom_item:
|
||||
actor_str = atom_item.split('<author>')[1]
|
||||
actor_str: str = atom_item.split('<author>')[1]
|
||||
actor_str = unescaped_text(actor_str.split('</author>')[0])
|
||||
actor_str = remove_script(actor_str, None, None, None)
|
||||
if '<activity:object-type>' in actor_str and \
|
||||
'</activity:object-type>' in actor_str and \
|
||||
'<uri>' in actor_str and '</uri>' in actor_str:
|
||||
obj_type = actor_str.split('<activity:object-type>')[1]
|
||||
obj_type: str = actor_str.split('<activity:object-type>')[1]
|
||||
obj_type = obj_type.split('</activity:object-type>')[0]
|
||||
if obj_type == 'Person':
|
||||
actor_uri = actor_str.split('<uri>')[1]
|
||||
actor_uri: str = actor_str.split('<uri>')[1]
|
||||
actor_uri = actor_uri.split('</uri>')[0]
|
||||
if resembles_url(actor_uri) and \
|
||||
not is_local_network_address(actor_uri):
|
||||
|
|
@ -1150,18 +1150,18 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
extra_links: list[str] = []
|
||||
if '<activity:object>' in atom_item and \
|
||||
'</activity:object>' in atom_item:
|
||||
obj_str = atom_item.split('<activity:object>')[1]
|
||||
obj_str: str = atom_item.split('<activity:object>')[1]
|
||||
obj_str = \
|
||||
unescaped_text(obj_str.split('</activity:object>')[0])
|
||||
obj_str = remove_script(obj_str, None, None, None)
|
||||
sections = obj_str.split('<link ')
|
||||
sections: list[str] = obj_str.split('<link ')
|
||||
ctr: int = 0
|
||||
for section_str in sections:
|
||||
if ctr == 0:
|
||||
ctr = 1
|
||||
continue
|
||||
if '>' in section_str:
|
||||
link_str = section_str.split('>')[0]
|
||||
link_str: str = section_str.split('>')[0]
|
||||
if 'href="' in link_str and \
|
||||
'rel="preview"' not in link_str:
|
||||
link_str = link_str.split('href="')[1]
|
||||
|
|
@ -1173,7 +1173,7 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
if link_str not in extra_links:
|
||||
extra_links.append(link_str)
|
||||
|
||||
proxy_type = None
|
||||
proxy_type: str = None
|
||||
if domain.endswith('.onion'):
|
||||
proxy_type = 'tor'
|
||||
elif domain.endswith('.i2p'):
|
||||
|
|
@ -1187,24 +1187,24 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
|
|||
if not link:
|
||||
continue
|
||||
|
||||
item_domain = link.split('://')[1]
|
||||
item_domain: str = link.split('://')[1]
|
||||
if '/' in item_domain:
|
||||
item_domain = item_domain.split('/')[0]
|
||||
|
||||
if is_blocked_domain(base_dir, item_domain, None, None):
|
||||
continue
|
||||
pub_date = atom_item.split('<updated>')[1]
|
||||
pub_date: str = atom_item.split('<updated>')[1]
|
||||
pub_date = pub_date.split('</updated>')[0]
|
||||
|
||||
unique_string_identifier = title + ' ' + link
|
||||
pub_date_str = parse_feed_date(pub_date, unique_string_identifier)
|
||||
unique_string_identifier: str = title + ' ' + link
|
||||
pub_date_str: str = parse_feed_date(pub_date, unique_string_identifier)
|
||||
if not pub_date_str:
|
||||
continue
|
||||
if not _valid_feed_date(pub_date_str):
|
||||
continue
|
||||
post_filename = ''
|
||||
post_filename: str = ''
|
||||
votes_status: list[str] = []
|
||||
podcast_properties = \
|
||||
podcast_properties: dict = \
|
||||
xml_podcast_to_dict(base_dir, atom_item, xml_str)
|
||||
if podcast_properties:
|
||||
podcast_properties['linkMimeType'] = link_mime_type
|
||||
|
|
@ -1859,13 +1859,14 @@ def get_dict_from_newswire(session, base_dir: str, domain: str,
|
|||
mirrored = True
|
||||
url = url.replace('!', '').strip()
|
||||
|
||||
items_list = get_rss(base_dir, domain, session, url,
|
||||
moderated, mirrored,
|
||||
max_posts_per_source, max_feed_size_kb,
|
||||
max_feed_item_size_kb,
|
||||
max_categories_feed_item_size_kb, debug,
|
||||
preferred_podcast_formats,
|
||||
timeout_sec, system_language)
|
||||
items_list: dict = \
|
||||
get_rss(base_dir, domain, session, url,
|
||||
moderated, mirrored,
|
||||
max_posts_per_source, max_feed_size_kb,
|
||||
max_feed_item_size_kb,
|
||||
max_categories_feed_item_size_kb, debug,
|
||||
preferred_podcast_formats,
|
||||
timeout_sec, system_language)
|
||||
if items_list:
|
||||
for date_str, item in items_list.items():
|
||||
result[date_str] = item
|
||||
|
|
@ -1877,10 +1878,11 @@ def get_dict_from_newswire(session, base_dir: str, domain: str,
|
|||
session, debug)
|
||||
|
||||
# sort into chronological order, latest first
|
||||
sorted_result = OrderedDict(sorted(result.items(), reverse=True))
|
||||
sorted_result: dict = \
|
||||
OrderedDict(sorted(result.items(), reverse=True))
|
||||
|
||||
# are there too many posts? If so then remove the oldest ones
|
||||
no_of_posts = len(sorted_result.items())
|
||||
no_of_posts: int = len(sorted_result.items())
|
||||
if no_of_posts > max_newswire_posts:
|
||||
ctr: int = 0
|
||||
removals: list[str] = []
|
||||
|
|
|
|||
Loading…
Reference in New Issue