epicyon/webapp_utils.py

743 lines
28 KiB
Python
Raw Normal View History

2020-11-09 15:22:59 +00:00
__filename__ = "webapp_utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from collections import OrderedDict
2020-11-09 15:25:18 +00:00
from session import getJson
2020-11-09 15:22:59 +00:00
from utils import getProtocolPrefixes
from utils import loadJson
2020-11-09 19:41:01 +00:00
from utils import getCachedPostFilename
2020-11-09 15:22:59 +00:00
from utils import getConfigParam
from cache import getPersonFromCache
from cache import storePersonInCache
2020-11-09 19:41:01 +00:00
from content import addHtmlTags
from content import replaceEmojiFromTags
2020-11-09 15:22:59 +00:00
def getAltPath(actor: str, domainFull: str, callingDomain: str) -> str:
"""Returns alternate path from the actor
eg. https://clearnetdomain/path becomes http://oniondomain/path
"""
postActor = actor
if callingDomain not in actor and domainFull in actor:
if callingDomain.endswith('.onion') or \
callingDomain.endswith('.i2p'):
postActor = \
'http://' + callingDomain + actor.split(domainFull)[1]
print('Changed POST domain from ' + actor + ' to ' + postActor)
return postActor
def getContentWarningButton(postID: str, translate: {},
content: str) -> str:
"""Returns the markup for a content warning button
"""
return ' <details><summary><b>' + \
translate['SHOW MORE'] + '</b></summary>' + \
'<div id="' + postID + '">' + content + \
'</div></details>\n'
def getActorPropertyUrl(actorJson: {}, propertyName: str) -> str:
"""Returns a url property from an actor
"""
if not actorJson.get('attachment'):
return ''
propertyName = propertyName.lower()
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith(propertyName):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = propertyValue['value'].strip()
prefixes = getProtocolPrefixes()
prefixFound = False
for prefix in prefixes:
if propertyValue['value'].startswith(prefix):
prefixFound = True
break
if not prefixFound:
continue
if '.' not in propertyValue['value']:
continue
if ' ' in propertyValue['value']:
continue
if ',' in propertyValue['value']:
continue
return propertyValue['value']
return ''
def getBlogAddress(actorJson: {}) -> str:
"""Returns blog address for the given actor
"""
return getActorPropertyUrl(actorJson, 'Blog')
def setActorPropertyUrl(actorJson: {}, propertyName: str, url: str) -> None:
"""Sets a url for the given actor property
"""
if not actorJson.get('attachment'):
actorJson['attachment'] = []
propertyNameLower = propertyName.lower()
# remove any existing value
propertyFound = None
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith(propertyNameLower):
continue
propertyFound = propertyValue
break
if propertyFound:
actorJson['attachment'].remove(propertyFound)
prefixes = getProtocolPrefixes()
prefixFound = False
for prefix in prefixes:
if url.startswith(prefix):
prefixFound = True
break
if not prefixFound:
return
if '.' not in url:
return
if ' ' in url:
return
if ',' in url:
return
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith(propertyNameLower):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = url
return
newAddress = {
"name": propertyName,
"type": "PropertyValue",
"value": url
}
actorJson['attachment'].append(newAddress)
def setBlogAddress(actorJson: {}, blogAddress: str) -> None:
"""Sets an blog address for the given actor
"""
setActorPropertyUrl(actorJson, 'Blog', blogAddress)
def updateAvatarImageCache(session, baseDir: str, httpPrefix: str,
actor: str, avatarUrl: str,
personCache: {}, allowDownloads: bool,
force=False) -> str:
"""Updates the cached avatar for the given actor
"""
if not avatarUrl:
return None
actorStr = actor.replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
if avatarUrl.endswith('.png') or \
'.png?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/png'
}
avatarImageFilename = avatarImagePath + '.png'
elif (avatarUrl.endswith('.jpg') or
avatarUrl.endswith('.jpeg') or
'.jpg?' in avatarUrl or
'.jpeg?' in avatarUrl):
sessionHeaders = {
'Accept': 'image/jpeg'
}
avatarImageFilename = avatarImagePath + '.jpg'
elif avatarUrl.endswith('.gif') or '.gif?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/gif'
}
avatarImageFilename = avatarImagePath + '.gif'
elif avatarUrl.endswith('.webp') or '.webp?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/webp'
}
avatarImageFilename = avatarImagePath + '.webp'
elif avatarUrl.endswith('.avif') or '.avif?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/avif'
}
avatarImageFilename = avatarImagePath + '.avif'
else:
return None
if (not os.path.isfile(avatarImageFilename) or force) and allowDownloads:
try:
print('avatar image url: ' + avatarUrl)
result = session.get(avatarUrl,
headers=sessionHeaders,
params=None)
if result.status_code < 200 or \
result.status_code > 202:
print('Avatar image download failed with status ' +
str(result.status_code))
# remove partial download
if os.path.isfile(avatarImageFilename):
os.remove(avatarImageFilename)
else:
with open(avatarImageFilename, 'wb') as f:
f.write(result.content)
print('avatar image downloaded for ' + actor)
return avatarImageFilename.replace(baseDir + '/cache', '')
except Exception as e:
print('Failed to download avatar image: ' + str(avatarUrl))
print(e)
prof = 'https://www.w3.org/ns/activitystreams'
if '/channel/' not in actor or '/accounts/' not in actor:
sessionHeaders = {
'Accept': 'application/activity+json; profile="' + prof + '"'
}
else:
sessionHeaders = {
'Accept': 'application/ld+json; profile="' + prof + '"'
}
personJson = \
getJson(session, actor, sessionHeaders, None, __version__,
httpPrefix, None)
if personJson:
if not personJson.get('id'):
return None
if not personJson.get('publicKey'):
return None
if not personJson['publicKey'].get('publicKeyPem'):
return None
if personJson['id'] != actor:
return None
if not personCache.get(actor):
return None
if personCache[actor]['actor']['publicKey']['publicKeyPem'] != \
personJson['publicKey']['publicKeyPem']:
print("ERROR: " +
"public keys don't match when downloading actor for " +
actor)
return None
storePersonInCache(baseDir, actor, personJson, personCache,
allowDownloads)
return getPersonAvatarUrl(baseDir, actor, personCache,
allowDownloads)
return None
return avatarImageFilename.replace(baseDir + '/cache', '')
def getImageExtensions() -> []:
"""Returns a list of the possible image file extensions
"""
return ('png', 'jpg', 'jpeg', 'gif', 'webp', 'avif')
def getPersonAvatarUrl(baseDir: str, personUrl: str, personCache: {},
allowDownloads: bool) -> str:
"""Returns the avatar url for the person
"""
personJson = \
getPersonFromCache(baseDir, personUrl, personCache, allowDownloads)
if not personJson:
return None
# get from locally stored image
actorStr = personJson['id'].replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
imageExtension = getImageExtensions()
for ext in imageExtension:
if os.path.isfile(avatarImagePath + '.' + ext):
return '/avatars/' + actorStr + '.' + ext
elif os.path.isfile(avatarImagePath.lower() + '.' + ext):
return '/avatars/' + actorStr.lower() + '.' + ext
if personJson.get('icon'):
if personJson['icon'].get('url'):
return personJson['icon']['url']
return None
def getIconsDir(baseDir: str) -> str:
"""Returns the directory where icons exist
"""
iconsDir = 'icons'
theme = getConfigParam(baseDir, 'theme')
if theme:
if os.path.isdir(baseDir + '/img/icons/' + theme):
iconsDir = 'icons/' + theme
return iconsDir
def scheduledPostsExist(baseDir: str, nickname: str, domain: str) -> bool:
"""Returns true if there are posts scheduled to be delivered
"""
scheduleIndexFilename = \
baseDir + '/accounts/' + nickname + '@' + domain + '/schedule.index'
if not os.path.isfile(scheduleIndexFilename):
return False
if '#users#' in open(scheduleIndexFilename).read():
return True
return False
def sharesTimelineJson(actor: str, pageNumber: int, itemsPerPage: int,
baseDir: str, maxSharesPerAccount: int) -> ({}, bool):
"""Get a page on the shared items timeline as json
maxSharesPerAccount helps to avoid one person dominating the timeline
by sharing a large number of things
"""
allSharesJson = {}
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for handle in dirs:
if '@' in handle:
accountDir = baseDir + '/accounts/' + handle
sharesFilename = accountDir + '/shares.json'
if os.path.isfile(sharesFilename):
sharesJson = loadJson(sharesFilename)
if not sharesJson:
continue
nickname = handle.split('@')[0]
# actor who owns this share
owner = actor.split('/users/')[0] + '/users/' + nickname
ctr = 0
for itemID, item in sharesJson.items():
# assign owner to the item
item['actor'] = owner
allSharesJson[str(item['published'])] = item
ctr += 1
if ctr >= maxSharesPerAccount:
break
# sort the shared items in descending order of publication date
sharesJson = OrderedDict(sorted(allSharesJson.items(), reverse=True))
lastPage = False
startIndex = itemsPerPage * pageNumber
maxIndex = len(sharesJson.items())
if maxIndex < itemsPerPage:
lastPage = True
if startIndex >= maxIndex - itemsPerPage:
lastPage = True
startIndex = maxIndex - itemsPerPage
if startIndex < 0:
startIndex = 0
ctr = 0
resultJson = {}
for published, item in sharesJson.items():
if ctr >= startIndex + itemsPerPage:
break
if ctr < startIndex:
ctr += 1
continue
resultJson[published] = item
ctr += 1
return resultJson, lastPage
def postContainsPublic(postJsonObject: {}) -> bool:
"""Does the given post contain #Public
"""
containsPublic = False
if not postJsonObject['object'].get('to'):
return containsPublic
for toAddress in postJsonObject['object']['to']:
if toAddress.endswith('#Public'):
containsPublic = True
break
if not containsPublic:
if postJsonObject['object'].get('cc'):
for toAddress in postJsonObject['object']['cc']:
if toAddress.endswith('#Public'):
containsPublic = True
break
return containsPublic
2020-11-09 15:40:24 +00:00
def getImageFile(baseDir: str, name: str, directory: str,
nickname: str, domain: str) -> (str, str):
2020-11-09 15:22:59 +00:00
"""
2020-11-09 15:40:24 +00:00
returns the filenames for an image with the given name
"""
bannerExtensions = getImageExtensions()
bannerFile = ''
bannerFilename = ''
for ext in bannerExtensions:
bannerFile = name + '.' + ext
bannerFilename = directory + '/' + bannerFile
if os.path.isfile(bannerFilename):
break
return bannerFile, bannerFilename
def getBannerFile(baseDir: str,
nickname: str, domain: str) -> (str, str):
return getImageFile(baseDir, 'banner',
baseDir + '/accounts/' + nickname + '@' + domain,
nickname, domain)
def getSearchBannerFile(baseDir: str,
nickname: str, domain: str) -> (str, str):
return getImageFile(baseDir, 'search_banner',
baseDir + '/accounts/' + nickname + '@' + domain,
nickname, domain)
def getLeftImageFile(baseDir: str,
nickname: str, domain: str) -> (str, str):
return getImageFile(baseDir, 'left_col_image',
baseDir + '/accounts/' + nickname + '@' + domain,
nickname, domain)
def getRightImageFile(baseDir: str,
nickname: str, domain: str) -> (str, str):
return getImageFile(baseDir, 'right_col_image',
baseDir + '/accounts/' + nickname + '@' + domain,
nickname, domain)
2020-11-09 19:41:01 +00:00
def htmlHeader(cssFilename: str, css: str, lang='en') -> str:
htmlStr = '<!DOCTYPE html>\n'
htmlStr += '<html lang="' + lang + '">\n'
htmlStr += ' <head>\n'
htmlStr += ' <meta charset="utf-8">\n'
fontName, fontFormat = getFontFromCss(css)
if fontName:
htmlStr += ' <link rel="preload" as="font" type="' + \
fontFormat + '" href="' + fontName + '" crossorigin>\n'
htmlStr += ' <style>\n' + css + '</style>\n'
htmlStr += ' <link rel="manifest" href="/manifest.json">\n'
htmlStr += ' <meta name="theme-color" content="grey">\n'
htmlStr += ' <title>Epicyon</title>\n'
htmlStr += ' </head>\n'
htmlStr += ' <body>\n'
return htmlStr
def htmlFooter() -> str:
htmlStr = ' </body>\n'
htmlStr += '</html>\n'
return htmlStr
def getFontFromCss(css: str) -> (str, str):
"""Returns the font name and format
"""
if ' url(' not in css:
return None, None
fontName = css.split(" url(")[1].split(")")[0].replace("'", '')
fontFormat = css.split(" format('")[1].split("')")[0]
return fontName, fontFormat
def loadIndividualPostAsHtmlFromCache(baseDir: str,
nickname: str, domain: str,
postJsonObject: {}) -> str:
"""If a cached html version of the given post exists then load it and
return the html text
This is much quicker than generating the html from the json object
"""
cachedPostFilename = \
getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
postHtml = ''
if not cachedPostFilename:
return postHtml
if not os.path.isfile(cachedPostFilename):
return postHtml
tries = 0
while tries < 3:
try:
with open(cachedPostFilename, 'r') as file:
postHtml = file.read()
break
except Exception as e:
print(e)
# no sleep
tries += 1
if postHtml:
return postHtml
def addEmojiToDisplayName(baseDir: str, httpPrefix: str,
nickname: str, domain: str,
displayName: str, inProfileName: bool) -> str:
"""Adds emoji icons to display names on individual posts
"""
if ':' not in displayName:
return displayName
displayName = displayName.replace('<p>', '').replace('</p>', '')
emojiTags = {}
print('TAG: displayName before tags: ' + displayName)
displayName = \
addHtmlTags(baseDir, httpPrefix,
nickname, domain, displayName, [], emojiTags)
displayName = displayName.replace('<p>', '').replace('</p>', '')
print('TAG: displayName after tags: ' + displayName)
# convert the emoji dictionary to a list
emojiTagsList = []
for tagName, tag in emojiTags.items():
emojiTagsList.append(tag)
print('TAG: emoji tags list: ' + str(emojiTagsList))
if not inProfileName:
displayName = \
replaceEmojiFromTags(displayName, emojiTagsList, 'post header')
else:
displayName = \
replaceEmojiFromTags(displayName, emojiTagsList, 'profile')
print('TAG: displayName after tags 2: ' + displayName)
# remove any stray emoji
while ':' in displayName:
if '://' in displayName:
break
emojiStr = displayName.split(':')[1]
prevDisplayName = displayName
displayName = displayName.replace(':' + emojiStr + ':', '').strip()
if prevDisplayName == displayName:
break
print('TAG: displayName after tags 3: ' + displayName)
print('TAG: displayName after tag replacements: ' + displayName)
return displayName
def getPostAttachmentsAsHtml(postJsonObject: {}, boxName: str, translate: {},
isMuted: bool, avatarLink: str,
replyStr: str, announceStr: str, likeStr: str,
bookmarkStr: str, deleteStr: str,
muteStr: str) -> (str, str):
"""Returns a string representing any attachments
"""
attachmentStr = ''
galleryStr = ''
if not postJsonObject['object'].get('attachment'):
return attachmentStr, galleryStr
if not isinstance(postJsonObject['object']['attachment'], list):
return attachmentStr, galleryStr
attachmentCtr = 0
attachmentStr += '<div class="media">\n'
for attach in postJsonObject['object']['attachment']:
if not (attach.get('mediaType') and attach.get('url')):
continue
mediaType = attach['mediaType']
imageDescription = ''
if attach.get('name'):
imageDescription = attach['name'].replace('"', "'")
if mediaType == 'image/png' or \
mediaType == 'image/jpeg' or \
mediaType == 'image/webp' or \
mediaType == 'image/avif' or \
mediaType == 'image/gif':
if attach['url'].endswith('.png') or \
attach['url'].endswith('.jpg') or \
attach['url'].endswith('.jpeg') or \
attach['url'].endswith('.webp') or \
attach['url'].endswith('.avif') or \
attach['url'].endswith('.gif'):
if attachmentCtr > 0:
attachmentStr += '<br>'
if boxName == 'tlmedia':
galleryStr += '<div class="gallery">\n'
if not isMuted:
galleryStr += ' <a href="' + attach['url'] + '">\n'
galleryStr += \
' <img loading="lazy" src="' + \
attach['url'] + '" alt="" title="">\n'
galleryStr += ' </a>\n'
if postJsonObject['object'].get('url'):
imagePostUrl = postJsonObject['object']['url']
else:
imagePostUrl = postJsonObject['object']['id']
if imageDescription and not isMuted:
galleryStr += \
' <a href="' + imagePostUrl + \
'" class="gallerytext"><div ' + \
'class="gallerytext">' + \
imageDescription + '</div></a>\n'
else:
galleryStr += \
'<label class="transparent">---</label><br>'
galleryStr += ' <div class="mediaicons">\n'
galleryStr += \
' ' + replyStr+announceStr + likeStr + \
bookmarkStr + deleteStr + muteStr + '\n'
galleryStr += ' </div>\n'
galleryStr += ' <div class="mediaavatar">\n'
galleryStr += ' ' + avatarLink + '\n'
galleryStr += ' </div>\n'
galleryStr += '</div>\n'
attachmentStr += '<a href="' + attach['url'] + '">'
attachmentStr += \
'<img loading="lazy" src="' + attach['url'] + \
'" alt="' + imageDescription + '" title="' + \
imageDescription + '" class="attachment"></a>\n'
attachmentCtr += 1
elif (mediaType == 'video/mp4' or
mediaType == 'video/webm' or
mediaType == 'video/ogv'):
extension = '.mp4'
if attach['url'].endswith('.webm'):
extension = '.webm'
elif attach['url'].endswith('.ogv'):
extension = '.ogv'
if attach['url'].endswith(extension):
if attachmentCtr > 0:
attachmentStr += '<br>'
if boxName == 'tlmedia':
galleryStr += '<div class="gallery">\n'
if not isMuted:
galleryStr += ' <a href="' + attach['url'] + '">\n'
galleryStr += \
' <video width="600" height="400" controls>\n'
galleryStr += \
' <source src="' + attach['url'] + \
'" alt="' + imageDescription + \
'" title="' + imageDescription + \
'" class="attachment" type="video/' + \
extension.replace('.', '') + '">'
idx = 'Your browser does not support the video tag.'
galleryStr += translate[idx]
galleryStr += ' </video>\n'
galleryStr += ' </a>\n'
if postJsonObject['object'].get('url'):
videoPostUrl = postJsonObject['object']['url']
else:
videoPostUrl = postJsonObject['object']['id']
if imageDescription and not isMuted:
galleryStr += \
' <a href="' + videoPostUrl + \
'" class="gallerytext"><div ' + \
'class="gallerytext">' + \
imageDescription + '</div></a>\n'
else:
galleryStr += \
'<label class="transparent">---</label><br>'
galleryStr += ' <div class="mediaicons">\n'
galleryStr += \
' ' + replyStr + announceStr + likeStr + \
bookmarkStr + deleteStr + muteStr + '\n'
galleryStr += ' </div>\n'
galleryStr += ' <div class="mediaavatar">\n'
galleryStr += ' ' + avatarLink + '\n'
galleryStr += ' </div>\n'
galleryStr += '</div>\n'
attachmentStr += \
'<center><video width="400" height="300" controls>'
attachmentStr += \
'<source src="' + attach['url'] + '" alt="' + \
imageDescription + '" title="' + imageDescription + \
'" class="attachment" type="video/' + \
extension.replace('.', '') + '">'
attachmentStr += \
translate['Your browser does not support the video tag.']
attachmentStr += '</video></center>'
attachmentCtr += 1
elif (mediaType == 'audio/mpeg' or
mediaType == 'audio/ogg'):
extension = '.mp3'
if attach['url'].endswith('.ogg'):
extension = '.ogg'
if attach['url'].endswith(extension):
if attachmentCtr > 0:
attachmentStr += '<br>'
if boxName == 'tlmedia':
galleryStr += '<div class="gallery">\n'
if not isMuted:
galleryStr += ' <a href="' + attach['url'] + '">\n'
galleryStr += ' <audio controls>\n'
galleryStr += \
' <source src="' + attach['url'] + \
'" alt="' + imageDescription + \
'" title="' + imageDescription + \
'" class="attachment" type="audio/' + \
extension.replace('.', '') + '">'
idx = 'Your browser does not support the audio tag.'
galleryStr += translate[idx]
galleryStr += ' </audio>\n'
galleryStr += ' </a>\n'
if postJsonObject['object'].get('url'):
audioPostUrl = postJsonObject['object']['url']
else:
audioPostUrl = postJsonObject['object']['id']
if imageDescription and not isMuted:
galleryStr += \
' <a href="' + audioPostUrl + \
'" class="gallerytext"><div ' + \
'class="gallerytext">' + \
imageDescription + '</div></a>\n'
else:
galleryStr += \
'<label class="transparent">---</label><br>'
galleryStr += ' <div class="mediaicons">\n'
galleryStr += \
' ' + replyStr + announceStr + \
likeStr + bookmarkStr + \
deleteStr + muteStr+'\n'
galleryStr += ' </div>\n'
galleryStr += ' <div class="mediaavatar">\n'
galleryStr += ' ' + avatarLink + '\n'
galleryStr += ' </div>\n'
galleryStr += '</div>\n'
attachmentStr += '<center>\n<audio controls>\n'
attachmentStr += \
'<source src="' + attach['url'] + '" alt="' + \
imageDescription + '" title="' + imageDescription + \
'" class="attachment" type="audio/' + \
extension.replace('.', '') + '">'
attachmentStr += \
translate['Your browser does not support the audio tag.']
attachmentStr += '</audio>\n</center>\n'
attachmentCtr += 1
attachmentStr += '</div>'
return attachmentStr, galleryStr
def htmlPostSeparator(baseDir: str, column: str) -> str:
"""Returns the html for a timeline post separator image
"""
iconsDir = getIconsDir(baseDir)
filename = 'separator.png'
if column:
filename = 'separator_' + column + '.png'
separatorImageFilename = baseDir + '/img/' + iconsDir + '/' + filename
separatorStr = ''
if os.path.isfile(separatorImageFilename):
separatorStr = \
'<div class="postSeparatorImage"><center>' + \
'<img src="/' + iconsDir + '/' + filename + '"/>' + \
'</center></div>\n'
return separatorStr