__filename__ = "webinterface.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import time
import os
import urllib.parse
from collections import OrderedDict
from datetime import datetime
from datetime import date
from dateutil.parser import parse
from shutil import copyfile
from pprint import pprint
from person import personBoxJson
from person import isPersonSnoozed
from pgp import getEmailAddress
from pgp import getPGPpubKey
from pgp import getPGPfingerprint
from xmpp import getXmppAddress
from ssb import getSSBAddress
from tox import getToxAddress
from matrix import getMatrixAddress
from donate import getDonationUrl
from utils import getCSS
from utils import isSystemAccount
from utils import removeIdEnding
from utils import getProtocolPrefixes
from utils import searchBoxPosts
from utils import isEventPost
from utils import isBlogPost
from utils import isNewsPost
from utils import updateRecentPostsCache
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import locatePost
from utils import noOfAccounts
from utils import isPublicPost
from utils import isPublicPostFromUrl
from utils import getDisplayName
from utils import getCachedPostDirectory
from utils import getCachedPostFilename
from utils import loadJson
from utils import getConfigParam
from utils import votesOnNewswireItem
from utils import removeHtml
from follow import isFollowingActor
from webfinger import webfingerHandle
from posts import isDM
from posts import getPersonBox
from posts import getUserUrl
from posts import parseUserFeed
from posts import populateRepliesJson
from posts import isModerator
from posts import isEditor
from posts import downloadAnnounce
from session import getJson
from auth import createPassword
from like import likedByPerson
from like import noOfLikes
from bookmarks import bookmarkedByPerson
from announce import announcedByPerson
from blocking import isBlocked
from blocking import isBlockedHashtag
from content import htmlReplaceEmailQuote
from content import htmlReplaceQuoteMarks
from content import removeTextFormatting
from content import switchWords
from content import getMentionsFromHtml
from content import addHtmlTags
from content import replaceEmojiFromTags
from content import removeLongWords
from skills import getSkills
from cache import getPersonFromCache
from cache import storePersonInCache
from shares import getValidSharedItemID
from happening import todaysEventsCheck
from happening import thisWeeksEventsCheck
from happening import getCalendarEvents
from happening import getTodaysEvents
from git import isGitPatch
from theme import getThemesList
from petnames import getPetName
from followingCalendar import receivingCalendarEvents
from devices import E2EEdecryptMessageFromDevice
def getAltPath(actor: str, domainFull: str, callingDomain: str) -> str:
"""Returns alternate path from the actor
eg. https://clearnetdomain/path becomes http://oniondomain/path
"""
postActor = actor
if callingDomain not in actor and domainFull in actor:
if callingDomain.endswith('.onion') or \
callingDomain.endswith('.i2p'):
postActor = \
'http://' + callingDomain + actor.split(domainFull)[1]
print('Changed POST domain from ' + actor + ' to ' + postActor)
return postActor
def getContentWarningButton(postID: str, translate: {},
content: str) -> str:
"""Returns the markup for a content warning button
"""
return ' ' + \
translate['SHOW MORE'] + '' + \
'
' + content + \
'
\n'
def getBlogAddress(actorJson: {}) -> str:
"""Returns blog address for the given actor
"""
if not actorJson.get('attachment'):
return ''
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith('blog'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = propertyValue['value'].strip()
prefixes = getProtocolPrefixes()
prefixFound = False
for prefix in prefixes:
if propertyValue['value'].startswith(prefix):
prefixFound = True
break
if not prefixFound:
continue
if '.' not in propertyValue['value']:
continue
if ' ' in propertyValue['value']:
continue
if ',' in propertyValue['value']:
continue
return propertyValue['value']
return ''
def setBlogAddress(actorJson: {}, blogAddress: str) -> None:
"""Sets an blog address for the given actor
"""
if not actorJson.get('attachment'):
actorJson['attachment'] = []
# remove any existing value
propertyFound = None
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('blog'):
continue
propertyFound = propertyValue
break
if propertyFound:
actorJson['attachment'].remove(propertyFound)
prefixes = getProtocolPrefixes()
prefixFound = False
for prefix in prefixes:
if blogAddress.startswith(prefix):
prefixFound = True
break
if not prefixFound:
return
if '.' not in blogAddress:
return
if ' ' in blogAddress:
return
if ',' in blogAddress:
return
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('blog'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = blogAddress
return
newBlogAddress = {
"name": "Blog",
"type": "PropertyValue",
"value": blogAddress
}
actorJson['attachment'].append(newBlogAddress)
def updateAvatarImageCache(session, baseDir: str, httpPrefix: str,
actor: str, avatarUrl: str,
personCache: {}, allowDownloads: bool,
force=False) -> str:
"""Updates the cached avatar for the given actor
"""
if not avatarUrl:
return None
actorStr = actor.replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
if avatarUrl.endswith('.png') or \
'.png?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/png'
}
avatarImageFilename = avatarImagePath + '.png'
elif (avatarUrl.endswith('.jpg') or
avatarUrl.endswith('.jpeg') or
'.jpg?' in avatarUrl or
'.jpeg?' in avatarUrl):
sessionHeaders = {
'Accept': 'image/jpeg'
}
avatarImageFilename = avatarImagePath + '.jpg'
elif avatarUrl.endswith('.gif') or '.gif?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/gif'
}
avatarImageFilename = avatarImagePath + '.gif'
elif avatarUrl.endswith('.webp') or '.webp?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/webp'
}
avatarImageFilename = avatarImagePath + '.webp'
elif avatarUrl.endswith('.avif') or '.avif?' in avatarUrl:
sessionHeaders = {
'Accept': 'image/avif'
}
avatarImageFilename = avatarImagePath + '.avif'
else:
return None
if (not os.path.isfile(avatarImageFilename) or force) and allowDownloads:
try:
print('avatar image url: ' + avatarUrl)
result = session.get(avatarUrl,
headers=sessionHeaders,
params=None)
if result.status_code < 200 or \
result.status_code > 202:
print('Avatar image download failed with status ' +
str(result.status_code))
# remove partial download
if os.path.isfile(avatarImageFilename):
os.remove(avatarImageFilename)
else:
with open(avatarImageFilename, 'wb') as f:
f.write(result.content)
print('avatar image downloaded for ' + actor)
return avatarImageFilename.replace(baseDir + '/cache', '')
except Exception as e:
print('Failed to download avatar image: ' + str(avatarUrl))
print(e)
prof = 'https://www.w3.org/ns/activitystreams'
if '/channel/' not in actor or '/accounts/' not in actor:
sessionHeaders = {
'Accept': 'application/activity+json; profile="' + prof + '"'
}
else:
sessionHeaders = {
'Accept': 'application/ld+json; profile="' + prof + '"'
}
personJson = \
getJson(session, actor, sessionHeaders, None, __version__,
httpPrefix, None)
if personJson:
if not personJson.get('id'):
return None
if not personJson.get('publicKey'):
return None
if not personJson['publicKey'].get('publicKeyPem'):
return None
if personJson['id'] != actor:
return None
if not personCache.get(actor):
return None
if personCache[actor]['actor']['publicKey']['publicKeyPem'] != \
personJson['publicKey']['publicKeyPem']:
print("ERROR: " +
"public keys don't match when downloading actor for " +
actor)
return None
storePersonInCache(baseDir, actor, personJson, personCache,
allowDownloads)
return getPersonAvatarUrl(baseDir, actor, personCache,
allowDownloads)
return None
return avatarImageFilename.replace(baseDir + '/cache', '')
def getPersonAvatarUrl(baseDir: str, personUrl: str, personCache: {},
allowDownloads: bool) -> str:
"""Returns the avatar url for the person
"""
personJson = \
getPersonFromCache(baseDir, personUrl, personCache, allowDownloads)
if not personJson:
return None
# get from locally stored image
actorStr = personJson['id'].replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
imageExtension = ('png', 'jpg', 'jpeg', 'gif', 'webp', 'avif')
for ext in imageExtension:
if os.path.isfile(avatarImagePath + '.' + ext):
return '/avatars/' + actorStr + '.' + ext
elif os.path.isfile(avatarImagePath.lower() + '.' + ext):
return '/avatars/' + actorStr.lower() + '.' + ext
if personJson.get('icon'):
if personJson['icon'].get('url'):
return personJson['icon']['url']
return None
def htmlFollowingList(cssCache: {}, baseDir: str,
followingFilename: str) -> str:
"""Returns a list of handles being followed
"""
with open(followingFilename, 'r') as followingFile:
msg = followingFile.read()
followingList = msg.split('\n')
followingList.sort()
if followingList:
cssFilename = baseDir + '/epicyon-profile.css'
if os.path.isfile(baseDir + '/epicyon.css'):
cssFilename = baseDir + '/epicyon.css'
profileCSS = getCSS(baseDir, cssFilename, cssCache)
if profileCSS:
followingListHtml = htmlHeader(cssFilename, profileCSS)
for followingAddress in followingList:
if followingAddress:
followingListHtml += \
'
@' + followingAddress + '
'
followingListHtml += htmlFooter()
msg = followingListHtml
return msg
return ''
def htmlFollowingDataList(baseDir: str, nickname: str,
domain: str, domainFull: str) -> str:
"""Returns a datalist of handles being followed
"""
listStr = '\n'
return listStr
def htmlSearchEmoji(cssCache: {}, translate: {},
baseDir: str, httpPrefix: str,
searchStr: str) -> str:
"""Search results for emoji
"""
# emoji.json is generated so that it can be customized and the changes
# will be retained even if default_emoji.json is subsequently updated
if not os.path.isfile(baseDir + '/emoji/emoji.json'):
copyfile(baseDir + '/emoji/default_emoji.json',
baseDir + '/emoji/emoji.json')
searchStr = searchStr.lower().replace(':', '').strip('\n').strip('\r')
cssFilename = baseDir + '/epicyon-profile.css'
if os.path.isfile(baseDir + '/epicyon.css'):
cssFilename = baseDir + '/epicyon.css'
emojiCSS = getCSS(baseDir, cssFilename, cssCache)
if emojiCSS:
if httpPrefix != 'https':
emojiCSS = emojiCSS.replace('https://',
httpPrefix + '://')
emojiLookupFilename = baseDir + '/emoji/emoji.json'
# create header
emojiForm = htmlHeader(cssFilename, emojiCSS)
emojiForm += '
' + \
translate['Emoji Search'] + \
'
'
# does the lookup file exist?
if not os.path.isfile(emojiLookupFilename):
emojiForm += '
' + \
translate['No results'] + '
'
emojiForm += htmlFooter()
return emojiForm
emojiJson = loadJson(emojiLookupFilename)
if emojiJson:
results = {}
for emojiName, filename in emojiJson.items():
if searchStr in emojiName:
results[emojiName] = filename + '.png'
for emojiName, filename in emojiJson.items():
if emojiName in searchStr:
results[emojiName] = filename + '.png'
headingShown = False
emojiForm += '
'
msgStr1 = translate['Copy the text then paste it into your post']
msgStr2 = ':'
emojiForm += '
'
resultsExist = False
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for handle in dirs:
if '@' not in handle:
continue
contactNickname = handle.split('@')[0]
sharesFilename = baseDir + '/accounts/' + handle + \
'/shares.json'
if not os.path.isfile(sharesFilename):
continue
sharesJson = loadJson(sharesFilename)
if not sharesJson:
continue
for name, sharedItem in sharesJson.items():
matched = True
for searchSubstr in searchStrLowerList:
subStrMatched = False
searchSubstr = searchSubstr.strip()
if searchSubstr in sharedItem['location'].lower():
subStrMatched = True
elif searchSubstr in sharedItem['summary'].lower():
subStrMatched = True
elif searchSubstr in sharedItem['displayName'].lower():
subStrMatched = True
elif searchSubstr in sharedItem['category'].lower():
subStrMatched = True
if not subStrMatched:
matched = False
break
if matched:
if currPage == pageNumber:
sharedItemsForm += '
\n'
if not resultsExist and currPage > 1:
postActor = \
getAltPath(actor, domainFull,
callingDomain)
# previous page link, needs to be a POST
sharedItemsForm += \
'\n'
resultsExist = True
ctr += 1
if ctr >= resultsPerPage:
currPage += 1
if currPage > pageNumber:
postActor = \
getAltPath(actor, domainFull,
callingDomain)
# next page link, needs to be a POST
sharedItemsForm += \
'\n'
break
ctr = 0
if not resultsExist:
sharedItemsForm += \
'
' + translate['No results'] + '
\n'
sharedItemsForm += htmlFooter()
return sharedItemsForm
def htmlModerationInfo(cssCache: {}, translate: {},
baseDir: str, httpPrefix: str) -> str:
msgStr1 = \
'These are globally blocked for all accounts on this instance'
msgStr2 = \
'Any blocks or suspensions made by moderators will be shown here.'
infoForm = ''
cssFilename = baseDir + '/epicyon-profile.css'
if os.path.isfile(baseDir + '/epicyon.css'):
cssFilename = baseDir + '/epicyon.css'
infoCSS = getCSS(baseDir, cssFilename, cssCache)
if infoCSS:
if httpPrefix != 'https':
infoCSS = infoCSS.replace('https://',
httpPrefix + '://')
infoForm = htmlHeader(cssFilename, infoCSS)
infoForm += \
'
' + \
translate['Moderation Information'] + \
'
'
infoShown = False
suspendedFilename = baseDir + '/accounts/suspended.txt'
if os.path.isfile(suspendedFilename):
with open(suspendedFilename, "r") as f:
suspendedStr = f.read()
infoForm += '