epicyon/webapp_utils.py

392 lines
13 KiB
Python
Raw Normal View History

2020-11-09 15:22:59 +00:00
__filename__ = "webapp_utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from collections import OrderedDict
2020-11-09 15:25:18 +00:00
from session import getJson
2020-11-09 15:22:59 +00:00
from utils import getProtocolPrefixes
from utils import loadJson
from utils import getConfigParam
from cache import getPersonFromCache
from cache import storePersonInCache
def getAltPath(actor: str, domainFull: str, callingDomain: str) -> str:
"""Returns alternate path from the actor
eg. https://clearnetdomain/path becomes http://oniondomain/path
"""
postActor = actor
if callingDomain not in actor and domainFull in actor:
if callingDomain.endswith('.onion') or \
callingDomain.endswith('.i2p'):
postActor = \
'http://' + callingDomain + actor.split(domainFull)[1]
print('Changed POST domain from ' + actor + ' to ' + postActor)
return postActor
def getContentWarningButton(postID: str, translate: {},
content: str) -> str:
"""Returns the markup for a content warning button
"""
return ' <details><summary><b>' + \
translate['SHOW MORE'] + '</b></summary>' + \
'<div id="' + postID + '">' + content + \
'</div></details>\n'
def getActorPropertyUrl(actorJson: {}, propertyName: str) -> str:
"""Returns a url property from an actor
"""
if not actorJson.get('attachment'):
return ''
propertyName = propertyName.lower()
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith(propertyName):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = propertyValue['value'].strip()
prefixes = getProtocolPrefixes()
prefixFound = False
for prefix in prefixes:
if propertyValue['value'].startswith(prefix):
prefixFound = True
break
if not prefixFound:
continue
if '.' not in propertyValue['value']:
continue
if ' ' in propertyValue['value']:
continue
if ',' in propertyValue['value']:
continue
return propertyValue['value']
return ''
def getBlogAddress(actorJson: {}) -> str:
"""Returns blog address for the given actor
"""
return getActorPropertyUrl(actorJson, 'Blog')
def setActorPropertyUrl(actorJson: {}, propertyName: str, url: str) -> None:
"""Sets a url for the given actor property
"""
if not actorJson.get('attachment'):
actorJson['attachment'] = []
propertyNameLower = propertyName.lower()
# remove any existing value
propertyFound = None
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith(propertyNameLower):
continue
propertyFound = propertyValue
break
if propertyFound:
actorJson['attachment'].remove(propertyFound)
prefixes = getProtocolPrefixes()
prefixFound = False
for prefix in prefixes:
if url.startswith(prefix):
prefixFound = True
break
if not prefixFound:
return
if '.' not in url:
return
if ' ' in url:
return
if ',' in url:
return
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith(propertyNameLower):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = url
return
newAddress = {
"name": propertyName,
"type": "PropertyValue",
"value": url
}
actorJson['attachment'].append(newAddress)
def setBlogAddress(actorJson: {}, blogAddress: str) -> None:
"""Sets an blog address for the given actor
"""
setActorPropertyUrl(actorJson, 'Blog', blogAddress)
def updateAvatarImageCache(session, baseDir: str, httpPrefix: str,
actor: str, avatarUrl: str,
personCache: {}, allowDownloads: bool,
force=False) -> str:
"""Updates the cached avatar for the given actor
"""
if not avatarUrl:
return None
actorStr = actor.replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
if avatarUrl.endswith('.png') or \
'.png?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/png'
}
avatarImageFilename = avatarImagePath + '.png'
elif (avatarUrl.endswith('.jpg') or
avatarUrl.endswith('.jpeg') or
'.jpg?' in avatarUrl or
'.jpeg?' in avatarUrl):
sessionHeaders = {
'Accept': 'image/jpeg'
}
avatarImageFilename = avatarImagePath + '.jpg'
elif avatarUrl.endswith('.gif') or '.gif?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/gif'
}
avatarImageFilename = avatarImagePath + '.gif'
elif avatarUrl.endswith('.webp') or '.webp?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/webp'
}
avatarImageFilename = avatarImagePath + '.webp'
elif avatarUrl.endswith('.avif') or '.avif?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/avif'
}
avatarImageFilename = avatarImagePath + '.avif'
else:
return None
if (not os.path.isfile(avatarImageFilename) or force) and allowDownloads:
try:
print('avatar image url: ' + avatarUrl)
result = session.get(avatarUrl,
headers=sessionHeaders,
params=None)
if result.status_code < 200 or \
result.status_code > 202:
print('Avatar image download failed with status ' +
str(result.status_code))
# remove partial download
if os.path.isfile(avatarImageFilename):
os.remove(avatarImageFilename)
else:
with open(avatarImageFilename, 'wb') as f:
f.write(result.content)
print('avatar image downloaded for ' + actor)
return avatarImageFilename.replace(baseDir + '/cache', '')
except Exception as e:
print('Failed to download avatar image: ' + str(avatarUrl))
print(e)
prof = 'https://www.w3.org/ns/activitystreams'
if '/channel/' not in actor or '/accounts/' not in actor:
sessionHeaders = {
'Accept': 'application/activity+json; profile="' + prof + '"'
}
else:
sessionHeaders = {
'Accept': 'application/ld+json; profile="' + prof + '"'
}
personJson = \
getJson(session, actor, sessionHeaders, None, __version__,
httpPrefix, None)
if personJson:
if not personJson.get('id'):
return None
if not personJson.get('publicKey'):
return None
if not personJson['publicKey'].get('publicKeyPem'):
return None
if personJson['id'] != actor:
return None
if not personCache.get(actor):
return None
if personCache[actor]['actor']['publicKey']['publicKeyPem'] != \
personJson['publicKey']['publicKeyPem']:
print("ERROR: " +
"public keys don't match when downloading actor for " +
actor)
return None
storePersonInCache(baseDir, actor, personJson, personCache,
allowDownloads)
return getPersonAvatarUrl(baseDir, actor, personCache,
allowDownloads)
return None
return avatarImageFilename.replace(baseDir + '/cache', '')
def getImageExtensions() -> []:
"""Returns a list of the possible image file extensions
"""
return ('png', 'jpg', 'jpeg', 'gif', 'webp', 'avif')
def getPersonAvatarUrl(baseDir: str, personUrl: str, personCache: {},
allowDownloads: bool) -> str:
"""Returns the avatar url for the person
"""
personJson = \
getPersonFromCache(baseDir, personUrl, personCache, allowDownloads)
if not personJson:
return None
# get from locally stored image
actorStr = personJson['id'].replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
imageExtension = getImageExtensions()
for ext in imageExtension:
if os.path.isfile(avatarImagePath + '.' + ext):
return '/avatars/' + actorStr + '.' + ext
elif os.path.isfile(avatarImagePath.lower() + '.' + ext):
return '/avatars/' + actorStr.lower() + '.' + ext
if personJson.get('icon'):
if personJson['icon'].get('url'):
return personJson['icon']['url']
return None
def getIconsDir(baseDir: str) -> str:
"""Returns the directory where icons exist
"""
iconsDir = 'icons'
theme = getConfigParam(baseDir, 'theme')
if theme:
if os.path.isdir(baseDir + '/img/icons/' + theme):
iconsDir = 'icons/' + theme
return iconsDir
def scheduledPostsExist(baseDir: str, nickname: str, domain: str) -> bool:
"""Returns true if there are posts scheduled to be delivered
"""
scheduleIndexFilename = \
baseDir + '/accounts/' + nickname + '@' + domain + '/schedule.index'
if not os.path.isfile(scheduleIndexFilename):
return False
if '#users#' in open(scheduleIndexFilename).read():
return True
return False
def sharesTimelineJson(actor: str, pageNumber: int, itemsPerPage: int,
baseDir: str, maxSharesPerAccount: int) -> ({}, bool):
"""Get a page on the shared items timeline as json
maxSharesPerAccount helps to avoid one person dominating the timeline
by sharing a large number of things
"""
allSharesJson = {}
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for handle in dirs:
if '@' in handle:
accountDir = baseDir + '/accounts/' + handle
sharesFilename = accountDir + '/shares.json'
if os.path.isfile(sharesFilename):
sharesJson = loadJson(sharesFilename)
if not sharesJson:
continue
nickname = handle.split('@')[0]
# actor who owns this share
owner = actor.split('/users/')[0] + '/users/' + nickname
ctr = 0
for itemID, item in sharesJson.items():
# assign owner to the item
item['actor'] = owner
allSharesJson[str(item['published'])] = item
ctr += 1
if ctr >= maxSharesPerAccount:
break
# sort the shared items in descending order of publication date
sharesJson = OrderedDict(sorted(allSharesJson.items(), reverse=True))
lastPage = False
startIndex = itemsPerPage * pageNumber
maxIndex = len(sharesJson.items())
if maxIndex < itemsPerPage:
lastPage = True
if startIndex >= maxIndex - itemsPerPage:
lastPage = True
startIndex = maxIndex - itemsPerPage
if startIndex < 0:
startIndex = 0
ctr = 0
resultJson = {}
for published, item in sharesJson.items():
if ctr >= startIndex + itemsPerPage:
break
if ctr < startIndex:
ctr += 1
continue
resultJson[published] = item
ctr += 1
return resultJson, lastPage
def postContainsPublic(postJsonObject: {}) -> bool:
"""Does the given post contain #Public
"""
containsPublic = False
if not postJsonObject['object'].get('to'):
return containsPublic
for toAddress in postJsonObject['object']['to']:
if toAddress.endswith('#Public'):
containsPublic = True
break
if not containsPublic:
if postJsonObject['object'].get('cc'):
for toAddress in postJsonObject['object']['cc']:
if toAddress.endswith('#Public'):
containsPublic = True
break
return containsPublic
def isQuestion(postObjectJson: {}) -> bool:
""" is the given post a question?
"""
if postObjectJson['type'] != 'Create' and \
postObjectJson['type'] != 'Update':
return False
if not isinstance(postObjectJson['object'], dict):
return False
if not postObjectJson['object'].get('type'):
return False
if postObjectJson['object']['type'] != 'Question':
return False
if not postObjectJson['object'].get('oneOf'):
return False
if not isinstance(postObjectJson['object']['oneOf'], list):
return False
return True