__filename__ = "webinterface.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import json
import time
import os
from collections import OrderedDict
from datetime import datetime
from datetime import date
from dateutil.parser import parse
from shutil import copyfile
from shutil import copyfileobj
from pprint import pprint
from person import personBoxJson
from person import isPersonSnoozed
from pgp import getEmailAddress
from pgp import getPGPpubKey
from xmpp import getXmppAddress
from matrix import getMatrixAddress
from donate import getDonationUrl
from utils import updateRecentPostsCache
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import locatePost
from utils import noOfAccounts
from utils import isPublicPost
from utils import isPublicPostFromUrl
from utils import getDisplayName
from utils import getCachedPostDirectory
from utils import getCachedPostFilename
from utils import loadJson
from utils import saveJson
from follow import isFollowingActor
from webfinger import webfingerHandle
from posts import isDM
from posts import getPersonBox
from posts import getUserUrl
from posts import parseUserFeed
from posts import populateRepliesJson
from posts import isModerator
from posts import outboxMessageCreateWrap
from posts import downloadAnnounce
from session import getJson
from auth import createPassword
from like import likedByPerson
from like import noOfLikes
from bookmarks import bookmarkedByPerson
from announce import announcedByPerson
from blocking import isBlocked
from blocking import isBlockedHashtag
from content import switchWords
from content import getMentionsFromHtml
from content import addHtmlTags
from content import replaceEmojiFromTags
from content import removeLongWords
from config import getConfigParam
from skills import getSkills
from cache import getPersonFromCache
from cache import storePersonInCache
from shares import getValidSharedItemID
def updateAvatarImageCache(session,baseDir: str,httpPrefix: str,actor: str,avatarUrl: str,personCache: {},force=False) -> str:
"""Updates the cached avatar for the given actor
"""
if not avatarUrl:
return None
actorStr=actor.replace('/','-')
avatarImagePath=baseDir+'/cache/avatars/'+actorStr
if avatarUrl.endswith('.png') or '.png?' in avatarUrl:
sessionHeaders = {'Accept': 'image/png'}
avatarImageFilename=avatarImagePath+'.png'
elif avatarUrl.endswith('.jpg') or avatarUrl.endswith('.jpeg') or \
'.jpg?' in avatarUrl or '.jpeg?' in avatarUrl:
sessionHeaders = {'Accept': 'image/jpeg'}
avatarImageFilename=avatarImagePath+'.jpg'
elif avatarUrl.endswith('.gif') or '.gif?' in avatarUrl:
sessionHeaders = {'Accept': 'image/gif'}
avatarImageFilename=avatarImagePath+'.gif'
elif avatarUrl.endswith('.webp') or '.webp?' in avatarUrl:
sessionHeaders = {'Accept': 'image/webp'}
avatarImageFilename=avatarImagePath+'.webp'
else:
return None
if not os.path.isfile(avatarImageFilename) or force:
try:
print('avatar image url: '+avatarUrl)
result=session.get(avatarUrl, headers=sessionHeaders, params=None)
if result.status_code<200 or result.status_code>202:
print('Avatar image download failed with status '+str(result.status_code))
# remove partial download
if os.path.isfile(avatarImageFilename):
os.remove(avatarImageFilename)
else:
with open(avatarImageFilename, 'wb') as f:
f.write(result.content)
print('avatar image downloaded for '+actor)
return avatarImageFilename.replace(baseDir+'/cache','')
except Exception as e:
print('Failed to download avatar image: '+str(avatarUrl))
print(e)
if '/channel/' not in actor:
sessionHeaders = {'Accept': 'application/activity+json; profile="https://www.w3.org/ns/activitystreams"'}
else:
sessionHeaders = {'Accept': 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'}
personJson = getJson(session,actor,sessionHeaders,None,__version__,httpPrefix,None)
if personJson:
if not personJson.get('id'):
return None
if not personJson.get('publicKey'):
return None
if not personJson['publicKey'].get('publicKeyPem'):
return None
if personJson['id']!=actor:
return None
if not personCache.get(actor):
return None
if personCache[actor]['actor']['publicKey']['publicKeyPem']!=personJson['publicKey']['publicKeyPem']:
print("ERROR: public keys don't match when downloading actor for "+actor)
return None
storePersonInCache(baseDir,actor,personJson,personCache)
return getPersonAvatarUrl(baseDir,actor,personCache)
return None
return avatarImageFilename.replace(baseDir+'/cache','')
def getPersonAvatarUrl(baseDir: str,personUrl: str,personCache: {}) -> str:
"""Returns the avatar url for the person
"""
personJson = getPersonFromCache(baseDir,personUrl,personCache)
if not personJson:
return None
# get from locally stored image
actorStr=personJson['id'].replace('/','-')
avatarImagePath=baseDir+'/cache/avatars/'+actorStr
if os.path.isfile(avatarImagePath+'.png'):
return '/avatars/'+actorStr+'.png'
if os.path.isfile(avatarImagePath+'.jpg'):
return '/avatars/'+actorStr+'.jpg'
if os.path.isfile(avatarImagePath+'.gif'):
return '/avatars/'+actorStr+'.gif'
if os.path.isfile(avatarImagePath+'.webp'):
return '/avatars/'+actorStr+'.webp'
if os.path.isfile(avatarImagePath):
return '/avatars/'+actorStr
if personJson.get('icon'):
if personJson['icon'].get('url'):
return personJson['icon']['url']
return None
def htmlSearchEmoji(translate: {},baseDir: str,httpPrefix: str,searchStr: str) -> str:
"""Search results for emoji
"""
# emoji.json is generated so that it can be customized and the changes
# will be retained even if default_emoji.json is subsequently updated
if not os.path.isfile(baseDir+'/emoji/emoji.json'):
copyfile(baseDir+'/emoji/default_emoji.json',baseDir+'/emoji/emoji.json')
searchStr=searchStr.lower().replace(':','').strip('\n')
cssFilename=baseDir+'/epicyon-profile.css'
if os.path.isfile(baseDir+'/epicyon.css'):
cssFilename=baseDir+'/epicyon.css'
with open(cssFilename, 'r') as cssFile:
emojiCSS=cssFile.read()
if httpPrefix!='https':
emojiCSS=emojiCSS.replace('https://',httpPrefix+'://')
emojiLookupFilename=baseDir+'/emoji/emoji.json'
# create header
emojiForm=htmlHeader(cssFilename,emojiCSS)
emojiForm+='
'+translate['Emoji Search']+'
'
# does the lookup file exist?
if not os.path.isfile(emojiLookupFilename):
emojiForm+='
'+translate['No results']+'
'
emojiForm+=htmlFooter()
return emojiForm
emojiJson=loadJson(emojiLookupFilename)
if emojiJson:
results={}
for emojiName,filename in emojiJson.items():
if searchStr in emojiName:
results[emojiName] = filename+'.png'
for emojiName,filename in emojiJson.items():
if emojiName in searchStr:
results[emojiName] = filename+'.png'
headingShown=False
emojiForm+='
'
for emojiName,filename in results.items():
if os.path.isfile(baseDir+'/emoji/'+filename):
if not headingShown:
emojiForm+='
'+translate['Copy the text then paste it into your post']+'
'
headingShown=True
emojiForm+='
:'+emojiName+':
'
emojiForm+='
'
emojiForm+=htmlFooter()
return emojiForm
def getIconsDir(baseDir: str) -> str:
"""Returns the directory where icons exist
"""
iconsDir='icons'
theme=getConfigParam(baseDir,'theme')
if theme:
if os.path.isdir(baseDir+'/img/icons/'+theme):
iconsDir='icons/'+theme
return iconsDir
def htmlSearchSharedItems(translate: {}, \
baseDir: str,searchStr: str, \
pageNumber: int, \
resultsPerPage: int, \
httpPrefix: str, \
domainFull: str,actor: str) -> str:
"""Search results for shared items
"""
iconsDir=getIconsDir(baseDir)
currPage=1
ctr=0
sharedItemsForm=''
searchStrLower=searchStr.replace('%2B','+').replace('%40','@').replace('%3A',':').replace('%23','#').lower().strip('\n')
searchStrLowerList=searchStrLower.split('+')
cssFilename=baseDir+'/epicyon-profile.css'
if os.path.isfile(baseDir+'/epicyon.css'):
cssFilename=baseDir+'/epicyon.css'
with open(cssFilename, 'r') as cssFile:
sharedItemsCSS=cssFile.read()
if httpPrefix!='https':
sharedItemsCSS=sharedItemsCSS.replace('https://',httpPrefix+'://')
sharedItemsForm=htmlHeader(cssFilename,sharedItemsCSS)
sharedItemsForm+='
'+translate['Shared Items Search']+'
'
resultsExist=False
for subdir, dirs, files in os.walk(baseDir+'/accounts'):
for handle in dirs:
if '@' not in handle:
continue
contactNickname=handle.split('@')[0]
sharesFilename=baseDir+'/accounts/'+handle+'/shares.json'
if not os.path.isfile(sharesFilename):
continue
sharesJson=loadJson(sharesFilename)
if not sharesJson:
continue
for name,sharedItem in sharesJson.items():
matched=True
for searchSubstr in searchStrLowerList:
subStrMatched=False
searchSubstr=searchSubstr.strip()
if searchSubstr in sharedItem['location'].lower():
subStrMatched=True
elif searchSubstr in sharedItem['summary'].lower():
subStrMatched=True
elif searchSubstr in sharedItem['displayName'].lower():
subStrMatched=True
elif searchSubstr in sharedItem['category'].lower():
subStrMatched=True
if not subStrMatched:
matched=False
break
if matched:
if currPage==pageNumber:
sharedItemsForm+='
'
sharedItemsForm+='
'+sharedItem['displayName']+'
'
if sharedItem.get('imageUrl'):
sharedItemsForm+=''
sharedItemsForm+=''
sharedItemsForm+='
'
if actor.endswith('/users/'+contactNickname):
sharedItemsForm+=' '
sharedItemsForm+='
'
if not resultsExist and currPage>1:
# previous page link, needs to be a POST
sharedItemsForm+=''
resultsExist=True
ctr+=1
if ctr>=resultsPerPage:
currPage+=1
if currPage>pageNumber:
# next page link, needs to be a POST
sharedItemsForm+=''
break
ctr=0
if not resultsExist:
sharedItemsForm+='
'+translate['No results']+'
'
sharedItemsForm+=htmlFooter()
return sharedItemsForm
def htmlModerationInfo(translate: {},baseDir: str,httpPrefix: str) -> str:
infoForm=''
cssFilename=baseDir+'/epicyon-profile.css'
if os.path.isfile(baseDir+'/epicyon.css'):
cssFilename=baseDir+'/epicyon.css'
with open(cssFilename, 'r') as cssFile:
infoCSS=cssFile.read()
if httpPrefix!='https':
infoCSS=infoCSS.replace('https://',httpPrefix+'://')
infoForm=htmlHeader(cssFilename,infoCSS)
infoForm+='
'+translate['Moderation Information']+'
'
infoShown=False
suspendedFilename=baseDir+'/accounts/suspended.txt'
if os.path.isfile(suspendedFilename):
with open(suspendedFilename, "r") as f:
suspendedStr = f.read()
infoForm+='
'
infoForm+=' '+translate['Suspended accounts']+''
infoForm+=' '+translate['These are currently suspended']
infoForm+=' '
infoForm+='
'
infoShown=True
blockingFilename=baseDir+'/accounts/blocking.txt'
if os.path.isfile(blockingFilename):
with open(blockingFilename, "r") as f:
blockedStr = f.read()
infoForm+='
'
infoForm+=' '+translate['Blocked accounts and hashtags']+''
infoForm+=' '+translate['These are globally blocked for all accounts on this instance']
infoForm+=' '
infoForm+='
'
infoShown=True
if not infoShown:
infoForm+='
'+translate['Any blocks or suspensions made by moderators will be shown here.']+'
'
infoForm+=htmlFooter()
return infoForm
def htmlHashtagSearch(nickname: str,domain: str,port: int, \
recentPostsCache: {},maxRecentPosts: int, \
translate: {}, \
baseDir: str,hashtag: str,pageNumber: int, \
postsPerPage: int, \
session,wfRequest: {},personCache: {}, \
httpPrefix: str,projectVersion: str) -> str:
"""Show a page containing search results for a hashtag
"""
iconsDir=getIconsDir(baseDir)
if hashtag.startswith('#'):
hashtag=hashtag[1:]
hashtagIndexFile=baseDir+'/tags/'+hashtag+'.txt'
if not os.path.isfile(hashtagIndexFile):
return None
# check that the directory for the nickname exists
if nickname:
if not os.path.isdir(baseDir+'/accounts/'+nickname+'@'+domain):
nickname=None
# read the index
with open(hashtagIndexFile, "r") as f:
lines = f.readlines()
# read the css
cssFilename=baseDir+'/epicyon-profile.css'
if os.path.isfile(baseDir+'/epicyon.css'):
cssFilename=baseDir+'/epicyon.css'
with open(cssFilename, 'r') as cssFile:
hashtagSearchCSS = cssFile.read()
if httpPrefix!='https':
hashtagSearchCSS=hashtagSearchCSS.replace('https://',httpPrefix+'://')
# ensure that the page number is in bounds
if not pageNumber:
pageNumber=1
elif pageNumber<1:
pageNumber=1
# get the start end end within the index file
startIndex=int((pageNumber-1)*postsPerPage)
endIndex=startIndex+postsPerPage
noOfLines=len(lines)
if endIndex>=noOfLines and noOfLines>0:
endIndex=noOfLines-1
# add the page title
hashtagSearchForm=htmlHeader(cssFilename,hashtagSearchCSS)
hashtagSearchForm+=''
hashtagSearchForm+='
#'+hashtag+'
'
if startIndex>0:
# previous page link
hashtagSearchForm+= \
'
'
index=startIndex
while index<=endIndex:
postId=lines[index].strip('\n')
if ' ' not in postId:
nickname=getNicknameFromActor(postId)
if not nickname:
index+=1
continue
else:
postFields=postId.split(' ')
if len(postFields)!=3:
index=+1
continue
postDaysSinceEposh=int(postFields[0])
nickname=postFields[1]
postId=postFields[2]
postFilename=locatePost(baseDir,nickname,domain,postId)
if not postFilename:
index+=1
continue
postJsonObject=loadJson(postFilename)
if postJsonObject:
if not isPublicPost(postJsonObject):
index+=1
continue
showIndividualPostIcons=False
if nickname:
showIndividualPostIcons=True
allowDeletion=False
hashtagSearchForm+= \
individualPostAsHtml(recentPostsCache,maxRecentPosts, \
iconsDir,translate,None, \
baseDir,session,wfRequest,personCache, \
nickname,domain,port,postJsonObject, \
None,True,allowDeletion, \
httpPrefix,projectVersion,'search', \
showIndividualPostIcons, \
showIndividualPostIcons, \
False,False,False)
index+=1
if endIndex'
hashtagSearchForm+=htmlFooter()
return hashtagSearchForm
def htmlSkillsSearch(translate: {},baseDir: str, \
httpPrefix: str, \
skillsearch: str,instanceOnly: bool, \
postsPerPage: int) -> str:
"""Show a page containing search results for a skill
"""
if skillsearch.startswith('*'):
skillsearch=skillsearch[1:].strip()
skillsearch=skillsearch.lower().strip('\n')
results=[]
# search instance accounts
for subdir, dirs, files in os.walk(baseDir+'/accounts/'):
for f in files:
if not f.endswith('.json'):
continue
if '@' not in f:
continue
if f.startswith('inbox@'):
continue
actorFilename = os.path.join(subdir, f)
actorJson=loadJson(actorFilename)
if actorJson:
if actorJson.get('id') and \
actorJson.get('skills') and \
actorJson.get('name') and \
actorJson.get('icon'):
actor=actorJson['id']
for skillName,skillLevel in actorJson['skills'].items():
skillName=skillName.lower()
if skillName in skillsearch or skillsearch in skillName:
skillLevelStr=str(skillLevel)
if skillLevel<100:
skillLevelStr='0'+skillLevelStr
if skillLevel<10:
skillLevelStr='0'+skillLevelStr
indexStr=skillLevelStr+';'+actor+';'+actorJson['name']+';'+actorJson['icon']['url']
if indexStr not in results:
results.append(indexStr)
if not instanceOnly:
# search actor cache
for subdir, dirs, files in os.walk(baseDir+'/cache/actors/'):
for f in files:
if not f.endswith('.json'):
continue
if '@' not in f:
continue
if f.startswith('inbox@'):
continue
actorFilename = os.path.join(subdir, f)
cachedActorJson=loadJson(actorFilename)
if cachedActorJson:
if cachedActorJson.get('actor'):
actorJson=cachedActorJson['actor']
if actorJson.get('id') and \
actorJson.get('skills') and \
actorJson.get('name') and \
actorJson.get('icon'):
actor=actorJson['id']
for skillName,skillLevel in actorJson['skills'].items():
skillName=skillName.lower()
if skillName in skillsearch or skillsearch in skillName:
skillLevelStr=str(skillLevel)
if skillLevel<100:
skillLevelStr='0'+skillLevelStr
if skillLevel<10:
skillLevelStr='0'+skillLevelStr
indexStr=skillLevelStr+';'+actor+';'+actorJson['name']+';'+actorJson['icon']['url']
if indexStr not in results:
results.append(indexStr)
results.sort(reverse=True)
cssFilename=baseDir+'/epicyon-profile.css'
if os.path.isfile(baseDir+'/epicyon.css'):
cssFilename=baseDir+'/epicyon.css'
with open(cssFilename, 'r') as cssFile:
skillSearchCSS = cssFile.read()
if httpPrefix!='https':
skillSearchCSS=skillSearchCSS.replace('https://',httpPrefix+'://')
skillSearchForm=htmlHeader(cssFilename,skillSearchCSS)
skillSearchForm+='
'+translate['Skills search']+': '+skillsearch+'
'
if len(results)==0:
skillSearchForm+='
'+translate['No results']+'
'
else:
skillSearchForm+='
'
ctr=0
for skillMatch in results:
skillMatchFields=skillMatch.split(';')
if len(skillMatchFields)==4:
actor=skillMatchFields[1]
actorName=skillMatchFields[2]
avatarUrl=skillMatchFields[3]
skillSearchForm+='
'
ctr+=1
if ctr>=postsPerPage:
break
skillSearchForm+='
'
skillSearchForm+=htmlFooter()
return skillSearchForm
def scheduledPostsExist(baseDir: str,nickname: str,domain: str) -> bool:
"""Returns true if there are posts scheduled to be delivered
"""
scheduleIndexFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/schedule.index'
if not os.path.isfile(scheduleIndexFilename):
return False
if '#users#' in open(scheduleIndexFilename).read():
return True
return False
def htmlEditProfile(translate: {},baseDir: str,path: str,domain: str,port: int,httpPrefix: str) -> str:
"""Shows the edit profile screen
"""
imageFormats='.png, .jpg, .jpeg, .gif, .webp'
pathOriginal=path
path=path.replace('/inbox','').replace('/outbox','').replace('/shares','')
nickname=getNicknameFromActor(path)
if not nickname:
return ''
domainFull=domain
if port:
if port!=80 and port!=443:
if ':' not in domain:
domainFull=domain+':'+str(port)
actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json'
if not os.path.isfile(actorFilename):
return ''
isBot=''
isGroup=''
followDMs=''
removeTwitter=''
mediaInstanceStr=''
displayNickname=nickname
bioStr=''
donateUrl=''
emailAddress=''
PGPpubKey=''
xmppAddress=''
matrixAddress=''
manuallyApprovesFollowers=''
actorJson=loadJson(actorFilename)
if actorJson:
donateUrl=getDonationUrl(actorJson)
xmppAddress=getXmppAddress(actorJson)
matrixAddress=getMatrixAddress(actorJson)
emailAddress=getEmailAddress(actorJson)
PGPpubKey=getPGPpubKey(actorJson)
if actorJson.get('name'):
displayNickname=actorJson['name']
if actorJson.get('summary'):
bioStr=actorJson['summary'].replace('
','').replace('
','')
if actorJson.get('manuallyApprovesFollowers'):
if actorJson['manuallyApprovesFollowers']:
manuallyApprovesFollowers='checked'
else:
manuallyApprovesFollowers=''
if actorJson.get('type'):
if actorJson['type']=='Service':
isBot='checked'
isGroup=''
elif actorJson['type']=='Group':
isGroup='checked'
isBot=''
if os.path.isfile(baseDir+'/accounts/'+nickname+'@'+domain+'/.followDMs'):
followDMs='checked'
if os.path.isfile(baseDir+'/accounts/'+nickname+'@'+domain+'/.removeTwitter'):
removeTwitter='checked'
mediaInstance=getConfigParam(baseDir,"mediaInstance")
if mediaInstance:
if mediaInstance==True:
mediaInstanceStr='checked'
filterStr=''
filterFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/filters.txt'
if os.path.isfile(filterFilename):
with open(filterFilename, 'r') as filterfile:
filterStr=filterfile.read()
blockedStr=''
blockedFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt'
if os.path.isfile(blockedFilename):
with open(blockedFilename, 'r') as blockedfile:
blockedStr=blockedfile.read()
allowedInstancesStr=''
allowedInstancesFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/allowedinstances.txt'
if os.path.isfile(allowedInstancesFilename):
with open(allowedInstancesFilename, 'r') as allowedInstancesFile:
allowedInstancesStr=allowedInstancesFile.read()
skills=getSkills(baseDir,nickname,domain)
skillsStr=''
skillCtr=1
if skills:
for skillDesc,skillValue in skills.items():
skillsStr+='
'
skillsStr+='
'
skillCtr+=1
skillsStr+='
'
skillsStr+='
' \
cssFilename=baseDir+'/epicyon-profile.css'
if os.path.isfile(baseDir+'/epicyon.css'):
cssFilename=baseDir+'/epicyon.css'
with open(cssFilename, 'r') as cssFile:
editProfileCSS = cssFile.read()
if httpPrefix!='https':
editProfileCSS=editProfileCSS.replace('https://',httpPrefix+'://')
instanceStr=''
moderatorsStr=''
themesDropdown=''
adminNickname=getConfigParam(baseDir,'admin')
if path.startswith('/users/'+adminNickname+'/'):
instanceDescription=getConfigParam(baseDir,'instanceDescription')
instanceDescriptionShort=getConfigParam(baseDir,'instanceDescriptionShort')
instanceTitle=getConfigParam(baseDir,'instanceTitle')
instanceStr='
'
moderators=''
moderatorsFile=baseDir+'/accounts/moderators.txt'
if os.path.isfile(moderatorsFile):
with open(moderatorsFile, "r") as f:
moderators = f.read()
moderatorsStr='
'
moderatorsStr+=' '+translate['Moderators']+' '
moderatorsStr+=' '+translate['A list of moderator nicknames. One per line.']
moderatorsStr+=' '
moderatorsStr+='