Function to get an rss feed

main
Bob Mottram 2020-10-03 22:50:05 +01:00
parent ef6452a524
commit e6b66d5cb5
2 changed files with 95 additions and 1 deletions

View File

@ -28,6 +28,7 @@ from posts import getUserUrl
from posts import checkDomains
from session import createSession
from session import getJson
from session import getRSS
from filters import addFilter
from filters import removeFilter
import os
@ -176,6 +177,8 @@ parser.add_argument('--postsraw', dest='postsraw', type=str,
help='Show raw json of posts for the given handle')
parser.add_argument('--json', dest='json', type=str, default=None,
help='Show the json for a given activitypub url')
parser.add_argument('--rss', dest='rss', type=str, default=None,
help='Show an rss feed for a given url')
parser.add_argument('-f', '--federate', nargs='+', dest='federationList',
help='Specify federation list separated by spaces')
parser.add_argument("--repliesEnabled", "--commentsEnabled",
@ -595,6 +598,12 @@ if args.json:
pprint(testJson)
sys.exit()
if args.rss:
session = createSession(None)
testRSS = getRSS(session, args.rss)
pprint(testRSS)
sys.exit()
# create cache for actors
if not os.path.isdir(baseDir + '/cache'):
os.mkdir(baseDir + '/cache')

View File

@ -12,6 +12,7 @@ from utils import urlPermitted
import json
from socket import error as SocketError
import errno
from datetime import datetime
baseDirectory = None
@ -54,7 +55,7 @@ def createSession(proxyType: str):
def getJson(session, url: str, headers: {}, params: {},
version='1.0.0', httpPrefix='https',
version='1.1.0', httpPrefix='https',
domain='testdomain') -> {}:
if not isinstance(url, str):
print('url: ' + str(url))
@ -92,6 +93,90 @@ def getJson(session, url: str, headers: {}, params: {},
return None
def xml2StrToDict(xmlStr: str) -> {}:
"""Converts an xml 2.0 string to a dictionary
"""
if '<item>' not in xmlStr:
return {}
result = {}
rssItems = xmlStr.split('<item>')
for rssItem in rssItems:
if '<title>' not in rssItem:
continue
if '</title>' not in rssItem:
continue
if '<link>' not in rssItem:
continue
if '</link>' not in rssItem:
continue
if '<pubDate>' not in rssItem:
continue
if '</pubDate>' not in rssItem:
continue
title = rssItem.split('<title>')[1]
title = title.split('</title>')[0]
link = rssItem.split('<link>')[1]
link = link.split('</link>')[0]
pubDate = rssItem.split('<pubDate>')[1]
pubDate = pubDate.split('</pubDate>')[0]
try:
publishedDate = \
datetime.strptime(pubDate, "%a, %d %b %Y %H:%M:%S %z")
result[str(publishedDate)] = [title, link]
except BaseException:
pass
return result
def xmlStrToDict(xmlStr: str) -> {}:
"""Converts an xml string to a dictionary
"""
if 'rss version="2.0"' in xmlStr:
return xml2StrToDict(xmlStr)
return {}
def getRSS(session, url: str) -> {}:
"""Returns an RSS url as a dict
"""
if not isinstance(url, str):
print('url: ' + str(url))
print('ERROR: getRSS url should be a string')
return None
headers = {
'Accept': 'text/xml; charset=UTF-8'
}
params = None
sessionParams = {}
sessionHeaders = {}
if headers:
sessionHeaders = headers
if params:
sessionParams = params
sessionHeaders['User-Agent'] = \
'Mozilla/5.0 (X11; Linux x86_64; rv:81.0) Gecko/20100101 Firefox/81.0'
if not session:
print('WARN: no session specified for getRSS')
try:
result = session.get(url, headers=sessionHeaders, params=sessionParams)
return xmlStrToDict(result.text)
except requests.exceptions.RequestException as e:
print('ERROR: getRSS failed\nurl: ' + str(url) + '\n' +
'headers: ' + str(sessionHeaders) + '\n' +
'params: ' + str(sessionParams) + '\n')
print(e)
except ValueError as e:
print('ERROR: getRSS failed\nurl: ' + str(url) + '\n' +
'headers: ' + str(sessionHeaders) + '\n' +
'params: ' + str(sessionParams) + '\n')
print(e)
except SocketError as e:
if e.errno == errno.ECONNRESET:
print('WARN: connection was reset during getRSS')
print(e)
return None
def postJson(session, postJsonObject: {}, federationList: [],
inboxUrl: str, headers: {}) -> str:
"""Post a json message to the inbox of another person