__filename__ = "webinterface.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "0.0.1"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import json
import time
import os
import commentjson
from datetime import datetime
from dateutil.parser import parse
from shutil import copyfile
from pprint import pprint
from person import personBoxJson
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import locatePost
from utils import noOfAccounts
from utils import isPublicPost
from follow import isFollowingActor
from webfinger import webfingerHandle
from posts import getPersonBox
from posts import getUserUrl
from posts import parseUserFeed
from posts import populateRepliesJson
from posts import isModerator
from posts import outboxMessageCreateWrap
from session import getJson
from auth import createPassword
from like import likedByPerson
from announce import announcedByPerson
from blocking import isBlocked
from content import getMentionsFromHtml
from config import getConfigParam
from skills import getSkills
from cache import getPersonFromCache
def getPersonAvatarUrl(baseDir: str,personUrl: str,personCache: {}) -> str:
"""Returns the avatar url for the person
"""
personJson = getPersonFromCache(baseDir,personUrl,personCache)
if personJson:
if personJson.get('icon'):
if personJson['icon'].get('url'):
return personJson['icon']['url']
return None
def htmlSearchEmoji(baseDir: str,searchStr: str) -> str:
"""Search results for emoji
"""
if not os.path.isfile(baseDir+'/emoji/emoji.json'):
copyfile(baseDir+'/emoji/default_emoji.json',baseDir+'/emoji/emoji.json')
searchStr=searchStr.lower().replace(':','').strip('\n')
with open(baseDir+'/epicyon-profile.css', 'r') as cssFile:
emojiCSS=cssFile.read()
emojiLookupFilename=baseDir+'/emoji/emoji.json'
# create header
emojiForm=htmlHeader(emojiCSS)
emojiForm+='
Emoji Search
'
# does the lookup file exist?
if not os.path.isfile(emojiLookupFilename):
emojiForm+='
No results
'
emojiForm+=htmlFooter()
return emojiForm
with open(emojiLookupFilename, 'r') as fp:
emojiJson=commentjson.load(fp)
results={}
for emojiName,filename in emojiJson.items():
if searchStr in emojiName:
results[emojiName] = filename+'.png'
for emojiName,filename in emojiJson.items():
if emojiName in searchStr:
results[emojiName] = filename+'.png'
headingShown=False
emojiForm+='
'
for emojiName,filename in results.items():
if os.path.isfile(baseDir+'/emoji/'+filename):
if not headingShown:
emojiForm+='
'
resultsExist=False
for subdir, dirs, files in os.walk(baseDir+'/accounts'):
for handle in dirs:
if '@' not in handle:
continue
sharesFilename=baseDir+'/accounts/'+handle+'/shares.json'
if not os.path.isfile(sharesFilename):
continue
with open(sharesFilename, 'r') as fp:
sharesJson=commentjson.load(fp)
for name,sharedItem in sharesJson.items():
matched=True
for searchSubstr in searchStrLowerList:
subStrMatched=False
searchSubstr=searchSubstr.strip()
if searchSubstr in sharedItem['location'].lower():
subStrMatched=True
elif searchSubstr in sharedItem['summary'].lower():
subStrMatched=True
elif searchSubstr in sharedItem['displayName'].lower():
subStrMatched=True
elif searchSubstr in sharedItem['category'].lower():
subStrMatched=True
if not subStrMatched:
matched=False
break
if matched:
if currPage==pageNumber:
sharedItemsForm+='
'
if not resultsExist and currPage>1:
# previous page link, needs to be a POST
sharedItemsForm+= \
''
resultsExist=True
ctr+=1
if ctr>=resultsPerPage:
currPage+=1
if currPage>pageNumber:
# next page link, needs to be a POST
sharedItemsForm+= \
''
break
ctr=0
if not resultsExist:
sharedItemsForm+='
No results
'
sharedItemsForm+=htmlFooter()
return sharedItemsForm
def htmlModerationInfo(baseDir: str) -> str:
infoForm=''
with open(baseDir+'/epicyon-profile.css', 'r') as cssFile:
infoCSS=cssFile.read()
infoForm=htmlHeader(infoCSS)
infoForm+='
Moderation Information
'
infoShown=False
suspendedFilename=baseDir+'/accounts/suspended.txt'
if os.path.isfile(suspendedFilename):
with open(suspendedFilename, "r") as f:
suspendedStr = f.read()
infoForm+= \
'
' \
' Suspended accounts' \
' These are currently suspended' \
' ' \
'
'
infoShown=True
blockingFilename=baseDir+'/accounts/blocking.txt'
if os.path.isfile(blockingFilename):
with open(blockingFilename, "r") as f:
blockedStr = f.read()
infoForm+= \
'
' \
' Blocked accounts and hashtags' \
' These are globally blocked for all accounts on this instance' \
' ' \
'
'
infoShown=True
if not infoShown:
infoForm+='
Any blocks or suspensions made by moderators will be shown here.
'
infoForm+=htmlFooter()
return infoForm
def htmlHashtagSearch(baseDir: str,hashtag: str,pageNumber: int,postsPerPage: int,
session,wfRequest: {},personCache: {}, \
httpPrefix: str,projectVersion: str) -> str:
"""Show a page containing search results for a hashtag
"""
if hashtag.startswith('#'):
hashtag=hashtag[1:]
hashtagIndexFile=baseDir+'/tags/'+hashtag+'.txt'
if not os.path.isfile(hashtagIndexFile):
return None
# read the index
with open(hashtagIndexFile, "r") as f:
lines = f.readlines()
with open(baseDir+'/epicyon-profile.css', 'r') as cssFile:
hashtagSearchCSS = cssFile.read()
startIndex=len(lines)-1-int(pageNumber*postsPerPage)
if startIndex<0:
startIndex=len(lines)-1
endIndex=startIndex-postsPerPage
if endIndex<0:
endIndex=0
hashtagSearchForm=htmlHeader(hashtagSearchCSS)
hashtagSearchForm+='
#'+hashtag+'
'
if startIndex!=len(lines)-1:
# previous page link
hashtagSearchForm+='
'
index=startIndex
while index>=endIndex:
postId=lines[index].strip('\n')
nickname=getNicknameFromActor(postId)
if not nickname:
index-=1
continue
domain,port=getDomainFromActor(postId)
if not domain:
index-=1
continue
postFilename=locatePost(baseDir,nickname,domain,postId)
if not postFilename:
index-=1
continue
with open(postFilename, 'r') as fp:
postJsonObject=commentjson.load(fp)
if not isPublicPost(postJsonObject):
index-=1
continue
hashtagSearchForm+= \
individualPostAsHtml(baseDir,session,wfRequest,personCache, \
nickname,domain,port,postJsonObject, \
None,True,False, \
httpPrefix,projectVersion, \
False)
index-=1
if endIndex>0:
# next page link
hashtagSearchForm+='
'
hashtagSearchForm+=htmlFooter()
return hashtagSearchForm
def htmlEditProfile(baseDir: str,path: str,domain: str,port: int) -> str:
"""Shows the edit profile screen
"""
pathOriginal=path
path=path.replace('/inbox','').replace('/outbox','').replace('/shares','')
nickname=getNicknameFromActor(path)
domainFull=domain
if port:
if port!=80 and port!=443:
if ':' not in domain:
domainFull=domain+':'+str(port)
actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json'
if not os.path.isfile(actorFilename):
return ''
isBot=''
preferredNickname=nickname
bioStr=''
manuallyApprovesFollowers=''
with open(actorFilename, 'r') as fp:
actorJson=commentjson.load(fp)
if actorJson.get('preferredUsername'):
preferredNickname=actorJson['preferredUsername']
if actorJson.get('summary'):
bioStr=actorJson['summary'].replace('
','').replace('
','')
if actorJson.get('manuallyApprovesFollowers'):
if actorJson['manuallyApprovesFollowers']:
manuallyApprovesFollowers='checked'
else:
manuallyApprovesFollowers=''
if actorJson.get('type'):
if actorJson['type']=='Service':
isBot='checked'
filterStr=''
filterFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/filters.txt'
if os.path.isfile(filterFilename):
with open(filterFilename, 'r') as filterfile:
filterStr=filterfile.read()
blockedStr=''
blockedFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt'
if os.path.isfile(blockedFilename):
with open(blockedFilename, 'r') as blockedfile:
blockedStr=blockedfile.read()
allowedInstancesStr=''
allowedInstancesFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/allowedinstances.txt'
if os.path.isfile(allowedInstancesFilename):
with open(allowedInstancesFilename, 'r') as allowedInstancesFile:
allowedInstancesStr=allowedInstancesFile.read()
skills=getSkills(baseDir,nickname,domain)
skillsStr=''
skillCtr=1
if skills:
for skillDesc,skillValue in skills.items():
skillsStr+='
'
skillsStr+='
'
skillCtr+=1
skillsStr+='
'
skillsStr+='
' \
with open(baseDir+'/epicyon-profile.css', 'r') as cssFile:
editProfileCSS = cssFile.read()
moderatorsStr=''
adminNickname=getConfigParam(baseDir,'admin')
if path.startswith('/users/'+adminNickname+'/'):
moderators=''
moderatorsFile=baseDir+'/accounts/moderators.txt'
if os.path.isfile(moderatorsFile):
with open(moderatorsFile, "r") as f:
moderators = f.read()
moderatorsStr= \
'
' \
' Moderators ' \
' A list of moderator nicknames. One per line.' \
' ' \
'
'
editProfileForm=htmlHeader(editProfileCSS)
editProfileForm+= \
''
editProfileForm+=htmlFooter()
return editProfileForm
def htmlGetLoginCredentials(loginParams: str,lastLoginTime: int) -> (str,str,bool):
"""Receives login credentials via HTTPServer POST
"""
if not loginParams.startswith('username='):
return None,None,None
# minimum time between login attempts
currTime=int(time.time())
if currTime str:
"""Shows the login screen
"""
accounts=noOfAccounts(baseDir)
if not os.path.isfile(baseDir+'/accounts/login.png'):
copyfile(baseDir+'/img/login.png',baseDir+'/accounts/login.png')
if os.path.isfile(baseDir+'/img/login-background.png'):
if not os.path.isfile(baseDir+'/accounts/login-background.png'):
copyfile(baseDir+'/img/login-background.png',baseDir+'/accounts/login-background.png')
if accounts>0:
loginText='
Welcome. Please enter your login details below.
'
else:
loginText='
Please enter some credentials
You will become the admin of this site.
'
if os.path.isfile(baseDir+'/accounts/login.txt'):
# custom login message
with open(baseDir+'/accounts/login.txt', 'r') as file:
loginText = '
'+file.read()+'
'
with open(baseDir+'/epicyon-login.css', 'r') as cssFile:
loginCSS = cssFile.read()
# show the register button
registerButtonStr=''
if getConfigParam(baseDir,'registration')=='open':
if int(getConfigParam(baseDir,'registrationsRemaining'))>0:
if accounts>0:
loginText='
'
loginButtonStr=''
if accounts>0:
loginButtonStr=''
loginForm=htmlHeader(loginCSS)
loginForm+= \
''
loginForm+=htmlFooter()
return loginForm
def htmlTermsOfService(baseDir: str,httpPrefix: str,domainFull: str) -> str:
"""Show the terms of service screen
"""
adminNickname = getConfigParam(baseDir,'admin')
if not os.path.isfile(baseDir+'/accounts/tos.txt'):
copyfile(baseDir+'/default_tos.txt',baseDir+'/accounts/tos.txt')
if os.path.isfile(baseDir+'/img/login-background.png'):
if not os.path.isfile(baseDir+'/accounts/login-background.png'):
copyfile(baseDir+'/img/login-background.png',baseDir+'/accounts/login-background.png')
TOSText='Terms of Service go here.'
if os.path.isfile(baseDir+'/accounts/tos.txt'):
with open(baseDir+'/accounts/tos.txt', 'r') as file:
TOSText = file.read()
TOSForm=''
with open(baseDir+'/epicyon-profile.css', 'r') as cssFile:
termsCSS = cssFile.read()
TOSForm=htmlHeader(termsCSS)
TOSForm+='
'+TOSText+'
'
if adminNickname:
adminActor=httpPrefix+'://'+domainFull+'/users/'+adminNickname
TOSForm+='
'
suspendedForm+=htmlFooter()
return suspendedForm
def htmlNewPost(baseDir: str,path: str,inReplyTo: str,mentions: []) -> str:
"""New post screen
"""
reportUrl=None
if '/newreport?=' in path:
reportUrl=path.split('/newreport?=')[1]
path=path.split('/newreport?=')[0]
replyStr=''
if not path.endswith('/newshare'):
if not path.endswith('/newreport'):
if not inReplyTo:
newPostText='
'
# custom report header with any additional instructions
if os.path.isfile(baseDir+'/accounts/report.txt'):
with open(baseDir+'/accounts/report.txt', 'r') as file:
customReportText=file.read()
if '' not in customReportText:
customReportText='
'+customReportText+'
'
customReportText=customReportText.replace('
','
')
newPostText+=customReportText
newPostText+='
This message only goes to moderators, even if it mentions other fediverse addresses.
You can also refer to points within the Terms of Service if necessary.
'
else:
newPostText='
Enter the details for your shared item below.
'
if os.path.isfile(baseDir+'/accounts/newpost.txt'):
with open(baseDir+'/accounts/newpost.txt', 'r') as file:
newPostText = '
'+file.read()+'
'
with open(baseDir+'/epicyon-profile.css', 'r') as cssFile:
newPostCSS = cssFile.read()
pathBase=path.replace('/newreport','').replace('/newpost','').replace('/newshare','').replace('/newunlisted','').replace('/newfollowers','').replace('/newdm','')
scopeIcon='scope_public.png'
scopeDescription='Public'
placeholderSubject='Subject or Content Warning (optional)...'
placeholderMessage='Write something...'
extraFields=''
endpoint='newpost'
if path.endswith('/newunlisted'):
scopeIcon='scope_unlisted.png'
scopeDescription='Unlisted'
endpoint='newunlisted'
if path.endswith('/newfollowers'):
scopeIcon='scope_followers.png'
scopeDescription='Followers Only'
endpoint='newfollowers'
if path.endswith('/newdm'):
scopeIcon='scope_dm.png'
scopeDescription='Direct Message'
endpoint='newdm'
if path.endswith('/newreport'):
scopeIcon='scope_report.png'
scopeDescription='Report'
endpoint='newreport'
if path.endswith('/newshare'):
scopeIcon='scope_share.png'
scopeDescription='Shared Item'
placeholderSubject='Name of the shared item...'
placeholderMessage='Description of the item being shared...'
endpoint='newshare'
extraFields= \
'
' \
' ' \
' ' \
' ' \
'
' \
''
newPostForm=htmlHeader(newPostCSS)
# only show the share option if this is not a reply
shareOptionOnDropdown=''
if not replyStr:
shareOptionOnDropdown='Share Describe a shared item'
mentionsStr=''
for m in mentions:
mentionNickname=getNicknameFromActor(m)
if not mentionNickname:
continue
mentionDomain,mentionPort=getDomainFromActor(m)
if not mentionDomain:
continue
if mentionPort:
mentionsStr+='@'+mentionNickname+'@'+mentionDomain+':'+str(mentionPort)+' '
else:
mentionsStr+='@'+mentionNickname+'@'+mentionDomain+' '
reportOptionOnDropdown='Report Send to moderators'
# For moderation reports add a link to the post reported
if reportUrl:
mentionStr='Reported link: '+reportUrl+'\n\n'
reportOptionOnDropdown='Report Send to moderators'
newPostForm+= \
''
newPostForm+=htmlFooter()
return newPostForm
def htmlHeader(css=None,lang='en') -> str:
if not css:
htmlStr= \
'\n' \
'\n' \
' \n' \
' \n' \
' \n'
else:
htmlStr= \
'\n' \
'\n' \
' \n' \
' \n' \
' \n'
return htmlStr
def htmlFooter() -> str:
htmlStr= \
' \n' \
'\n'
return htmlStr
def htmlProfilePosts(baseDir: str,httpPrefix: str, \
authorized: bool,ocapAlways: bool, \
nickname: str,domain: str,port: int, \
session,wfRequest: {},personCache: {}, \
projectVersion: str) -> str:
"""Shows posts on the profile screen
"""
profileStr=''
outboxFeed= \
personBoxJson(baseDir,domain, \
port,'/users/'+nickname+'/outbox?page=1', \
httpPrefix, \
4, 'outbox', \
authorized, \
ocapAlways)
profileStr+=''
for item in outboxFeed['orderedItems']:
if item['type']=='Create' or item['type']=='Announce':
profileStr+= \
individualPostAsHtml(baseDir,session,wfRequest,personCache, \
nickname,domain,port,item,None,True,False, \
httpPrefix,projectVersion, \
False)
return profileStr
def htmlProfileFollowing(baseDir: str,httpPrefix: str, \
authorized: bool,ocapAlways: bool, \
nickname: str,domain: str,port: int, \
session,wfRequest: {},personCache: {}, \
followingJson: {},projectVersion: str, \
buttons: []) -> str:
"""Shows following on the profile screen
"""
profileStr=''
for item in followingJson['orderedItems']:
profileStr+= \
individualFollowAsHtml(baseDir,session, \
wfRequest,personCache, \
domain,item,authorized,nickname, \
httpPrefix,projectVersion, \
buttons)
return profileStr
def htmlProfileRoles(nickname: str,domain: str,rolesJson: {}) -> str:
"""Shows roles on the profile screen
"""
profileStr=''
for project,rolesList in rolesJson.items():
profileStr+='
'+project+'
'
for role in rolesList:
profileStr+='
'+role+'
'
profileStr+='
'
if len(profileStr)==0:
profileStr+='
@'+nickname+'@'+domain+' has no roles assigned
'
else:
profileStr='
'+profileStr+'
'
return profileStr
def htmlProfileSkills(nickname: str,domain: str,skillsJson: {}) -> str:
"""Shows skills on the profile screen
"""
profileStr=''
for skill,level in skillsJson.items():
profileStr+='
'+skill+'
'
if len(profileStr)==0:
profileStr+='
@'+nickname+'@'+domain+' has no skills assigned
'
else:
profileStr='
'+profileStr+'
'
return profileStr
def htmlProfileShares(nickname: str,domain: str,sharesJson: {}) -> str:
"""Shows shares on the profile screen
"""
profileStr=''
for item in sharesJson['orderedItems']:
profileStr+='
'
return profileStr
def htmlProfile(projectVersion: str, \
baseDir: str,httpPrefix: str,authorized: bool, \
ocapAlways: bool,profileJson: {},selected: str, \
session,wfRequest: {},personCache: {}, \
extraJson=None) -> str:
"""Show the profile page as html
"""
nickname=profileJson['name']
if not nickname:
return ""
preferredName=profileJson['preferredUsername']
domain,port=getDomainFromActor(profileJson['id'])
if not domain:
return ""
domainFull=domain
if port:
domainFull=domain+':'+str(port)
profileDescription=profileJson['summary']
postsButton='button'
followingButton='button'
followersButton='button'
rolesButton='button'
skillsButton='button'
sharesButton='button'
if selected=='posts':
postsButton='buttonselected'
elif selected=='following':
followingButton='buttonselected'
elif selected=='followers':
followersButton='buttonselected'
elif selected=='roles':
rolesButton='buttonselected'
elif selected=='skills':
skillsButton='buttonselected'
elif selected=='shares':
sharesButton='buttonselected'
loginButton=''
followApprovalsSection=''
followApprovals=False
linkToTimelineStart=''
linkToTimelineEnd=''
editProfileStr=''
actor=profileJson['id']
if not authorized:
loginButton=' '
else:
editProfileStr=''
linkToTimelineStart=''
linkToTimelineEnd=''
# are there any follow requests?
followRequestsFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/followrequests.txt'
if os.path.isfile(followRequestsFilename):
with open(followRequestsFilename,'r') as f:
for line in f:
if len(line)>0:
followApprovals=True
followersButton='buttonhighlighted'
if selected=='followers':
followersButton='buttonselectedhighlighted'
break
if selected=='followers':
if followApprovals:
with open(followRequestsFilename,'r') as f:
for followerHandle in f:
if len(line)>0:
if '://' in followerHandle:
followerActor=followerHandle
else:
followerActor=httpPrefix+'://'+followerHandle.split('@')[1]+'/users/'+followerHandle.split('@')[0]
basePath=httpPrefix+'://'+domainFull+'/users/'+nickname
followApprovalsSection+='
\n'
def contentWarningScript() -> str:
"""Returns a script used for content warnings
"""
script= \
'function showContentWarning(postID) {' \
' var x = document.getElementById(postID);' \
' if (x.style.display === "none") {' \
' x.style.display = "block";' \
' } else {' \
' x.style.display = "none";' \
' }' \
'}'
return script
def htmlRemplaceEmojiFromTags(content: str,tag: {}) -> str:
"""Uses the tags to replace :emoji: with html image markup
"""
for tagItem in tag:
if not tagItem.get('type'):
continue
if tagItem['type']!='Emoji':
continue
if not tagItem.get('name'):
continue
if not tagItem.get('icon'):
continue
if not tagItem['icon'].get('url'):
continue
if tagItem['name'] not in content:
continue
emojiHtml=""
content=content.replace(tagItem['name'],emojiHtml)
return content
def addEmbeddedVideo(content: str,width=400,height=300) -> str:
"""Adds embedded videos
"""
if '>vimeo.com/' in content:
url=content.split('>vimeo.com/')[1]
if '<' in url:
url=url.split('<')[0]
content=content+"
"
return content
videoSite='https://www.youtube.com'
if '"'+videoSite in content:
url=content.split('"'+videoSite)[1]
if '"' in url:
url=url.split('"')[0].replace('/watch?v=','/embed/')
content=content+"
"
return content
videoSite='https://media.ccc.de'
if '"'+videoSite in content:
url=content.split('"'+videoSite)[1]
if '"' in url:
url=url.split('"')[0]
if not url.endswith('/oembed'):
url=url+'/oembed'
content=content+"
"
return content
# A selection of the current larger peertube sites, mostly French and German language
# These have been chosen based on reported numbers of users and the content of each has not been reviewed, so mileage could vary
peerTubeSites=['peertube.mastodon.host','open.tube','share.tube','tube.tr4sk.me','videos.elbinario.net','hkvideo.live','peertube.snargol.com','tube.22decembre.eu','tube.fabrigli.fr','libretube.net','libre.video','peertube.linuxrocks.online','spacepub.space','video.ploud.jp','video.omniatv.com','peertube.servebeer.com','tube.tchncs.de','tubee.fr','video.alternanet.fr','devtube.dev-wiki.de','video.samedi.pm','https://video.irem.univ-paris-diderot.fr','peertube.openstreetmap.fr','video.antopie.org','scitech.video','tube.4aem.com','video.ploud.fr','peervideo.net','video.valme.io','videos.pair2jeux.tube','vault.mle.party','hostyour.tv','diode.zone','visionon.tv','artitube.artifaille.fr','peertube.fr','peertube.live','tube.ac-lyon.fr','www.yiny.org','betamax.video','tube.piweb.be','pe.ertu.be','peertube.social','videos.lescommuns.org','peertube.nogafa.org','skeptikon.fr','video.tedomum.net','tube.p2p.legal','sikke.fi','exode.me','peertube.video']
for site in peerTubeSites:
if '"https://'+site in content:
url=content.split('"https://'+site)[1]
if '"' in url:
url=url.split('"')[0].replace('/watch/','/embed/')
content=content+"
"
return content
return content
def individualPostAsHtml(baseDir: str, \
session,wfRequest: {},personCache: {}, \
nickname: str,domain: str,port: int, \
postJsonObject: {}, \
avatarUrl: str, showAvatarDropdown: bool,
allowDeletion: bool, \
httpPrefix: str, projectVersion: str, \
showIcons=False) -> str:
""" Shows a single post as html
"""
titleStr=''
isAnnounced=False
if postJsonObject['type']=='Announce':
if postJsonObject.get('object'):
if isinstance(postJsonObject['object'], str):
# get the announced post
announceCacheDir=baseDir+'/cache/announce/'+nickname
if not os.path.isdir(announceCacheDir):
os.mkdir(announceCacheDir)
announceFilename=announceCacheDir+'/'+postJsonObject['object'].replace('/','#')+'.json'
print('announceFilename: '+announceFilename)
if os.path.isfile(announceFilename):
print('Reading cached Announce content for '+postJsonObject['object'])
with open(announceFilename, 'r') as fp:
postJsonObject=commentjson.load(fp)
isAnnounced=True
else:
print('Downloading Announce content for '+postJsonObject['object'])
asHeader = {'Accept': 'application/activity+json; profile="https://www.w3.org/ns/activitystreams"'}
actorNickname=getNicknameFromActor(postJsonObject['actor'])
actorDomain,actorPort=getDomainFromActor(postJsonObject['actor'])
announcedJson = getJson(session,postJsonObject['object'],asHeader,None,projectVersion,httpPrefix,domain)
if announcedJson:
if not announcedJson.get('type'):
pprint(announcedJson)
return ''
if announcedJson['type']!='Note':
pprint(announcedJson)
return ''
# wrap in create to be consistent with other posts
announcedJson= \
outboxMessageCreateWrap(httpPrefix, \
actorNickname,actorDomain,actorPort, \
announcedJson)
if announcedJson['type']!='Create':
pprint(announcedJson)
return ''
# set the id to the original status
announcedJson['id']=postJsonObject['object']
announcedJson['object']['id']=postJsonObject['object']
postJsonObject=announcedJson
with open(announceFilename, 'w') as fp:
commentjson.dump(postJsonObject, fp, indent=4, sort_keys=False)
isAnnounced=True
else:
return ''
else:
return ''
if not isinstance(postJsonObject['object'], dict):
return ''
isModerationPost=False
if postJsonObject['object'].get('moderationStatus'):
isModerationPost=True
avatarPosition=''
containerClass='container'
containerClassIcons='containericons'
timeClass='time-right'
actorNickname=getNicknameFromActor(postJsonObject['actor'])
actorDomain,actorPort=getDomainFromActor(postJsonObject['actor'])
messageId=''
if postJsonObject.get('id'):
messageId=postJsonObject['id'].replace('/activity','')
titleStr+='@'+actorNickname+'@'+actorDomain+''
if isAnnounced and postJsonObject['object'].get('attributedTo'):
announceNickname=getNicknameFromActor(postJsonObject['object']['attributedTo'])
announceDomain,announcePort=getDomainFromActor(postJsonObject['object']['attributedTo'])
titleStr+=' announced@'+announceNickname+'@'+announceDomain+''
if not isAnnounced and postJsonObject['object'].get('inReplyTo'):
containerClassIcons='containericons darker'
containerClass='container darker'
avatarPosition=' class="right"'
#timeClass='time-left'
if '/statuses/' in postJsonObject['object']['inReplyTo']:
replyNickname=getNicknameFromActor(postJsonObject['object']['inReplyTo'])
replyDomain,replyPort=getDomainFromActor(postJsonObject['object']['inReplyTo'])
if replyNickname and replyDomain:
titleStr+=' replying to@'+replyNickname+'@'+replyDomain+''
else:
postDomain=postJsonObject['object']['inReplyTo'].replace('https://','').replace('http://','').replace('dat://','')
if '/' in postDomain:
postDomain=postDomain.split('/',1)[0]
titleStr+=' replying to'+postDomain+''
attachmentStr=''
if postJsonObject['object']['attachment']:
if isinstance(postJsonObject['object']['attachment'], list):
attachmentCtr=0
attachmentStr+='
'
for attach in postJsonObject['object']['attachment']:
if attach.get('mediaType') and attach.get('url'):
mediaType=attach['mediaType']
imageDescription=''
if attach.get('name'):
imageDescription=attach['name']
if mediaType=='image/png' or \
mediaType=='image/jpeg' or \
mediaType=='image/gif':
if attach['url'].endswith('.png') or \
attach['url'].endswith('.jpg') or \
attach['url'].endswith('.jpeg') or \
attach['url'].endswith('.gif'):
if attachmentCtr>0:
attachmentStr+=' '
attachmentStr+= \
'' \
'\n'
attachmentCtr+=1
attachmentStr+='
'
if not avatarUrl:
avatarUrl=getPersonAvatarUrl(baseDir,postJsonObject['actor'],personCache)
if not avatarUrl:
avatarUrl=postJsonObject['actor']+'/avatar.png'
fullDomain=domain
if port:
if port!=80 and port!=443:
if ':' not in domain:
fullDomain=domain+':'+str(port)
if fullDomain not in postJsonObject['actor']:
inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl2,preferredName = \
getPersonBox(baseDir,session,wfRequest,personCache, \
projectVersion,httpPrefix,domain,'outbox')
if avatarUrl2:
avatarUrl=avatarUrl2
if preferredName:
titleStr=preferredName+' '+titleStr
avatarDropdown= \
' ' \
' '
if showAvatarDropdown and fullDomain+'/users/'+nickname not in postJsonObject['actor']:
# if not following then show "Follow" in the dropdown
followUnfollowStr='Follow'
# if following then show "Unfollow" in the dropdown
if isFollowingActor(baseDir,nickname,domain,postJsonObject['actor']):
followUnfollowStr='Unfollow'
blockUnblockStr='Block'
# if blocking then show "Unblock" in the dropdown
actorDomainFull=actorDomain
if actorPort:
if actorPort!=80 and actorPort!=443:
if ':' not in actorDomain:
actorDomainFull=actorDomain+':'+str(actorPort)
if isBlocked(baseDir,nickname,domain,actorNickname,actorDomainFull):
blockUnblockStr='Unblock'
reportStr=''
if messageId:
reportStr='Report'
avatarDropdown= \
'
'
publishedStr=postJsonObject['object']['published']
if '.' not in publishedStr:
datetimeObject = datetime.strptime(publishedStr,"%Y-%m-%dT%H:%M:%SZ")
else:
publishedStr=publishedStr.replace('T',' ').split('.')[0]
datetimeObject = parse(publishedStr)
publishedStr=datetimeObject.strftime("%a %b %d, %H:%M")
footerStr=''+publishedStr+'\n'
announceStr=''
if not isModerationPost:
# don't allow announce/repeat of your own posts
announceIcon='repeat_inactive.png'
announceLink='repeat'
announceTitle='Repeat this post'
if announcedByPerson(postJsonObject,nickname,fullDomain):
announceIcon='repeat.png'
announceLink='unrepeat'
announceTitle='Undo the repeat this post'
announceStr= \
'' \
''
likeStr=''
if not isModerationPost:
likeIcon='like_inactive.png'
likeLink='like'
likeTitle='Like this post'
if likedByPerson(postJsonObject,nickname,fullDomain):
likeIcon='like.png'
likeLink='unlike'
likeTitle='Undo the like of this post'
likeStr= \
'' \
''
deleteStr=''
if allowDeletion or \
('/'+fullDomain+'/' in postJsonObject['actor'] and \
postJsonObject['object']['id'].startswith(postJsonObject['actor'])):
if '/users/'+nickname+'/' in postJsonObject['object']['id']:
deleteStr= \
'' \
''
if showIcons:
replyToLink=postJsonObject['object']['id']
if postJsonObject['object'].get('attributedTo'):
replyToLink+='?mention='+postJsonObject['object']['attributedTo']
if postJsonObject['object'].get('content'):
mentionedActors=getMentionsFromHtml(postJsonObject['object']['content'])
if mentionedActors:
for actorUrl in mentionedActors:
if '?mention='+actorUrl not in replyToLink:
replyToLink+='?mention='+actorUrl
if len(replyToLink)>500:
break
footerStr='
'
if not isModerationPost:
footerStr+=''
else:
footerStr+=''
footerStr+=''
footerStr+=announceStr+likeStr+deleteStr
footerStr+=''+publishedStr+''
footerStr+='
'
if not postJsonObject['object']['sensitive']:
contentStr=postJsonObject['object']['content']+attachmentStr
else:
postID='post'+str(createPassword(8))
contentStr=''
if postJsonObject['object'].get('summary'):
contentStr+=''+postJsonObject['object']['summary']+' '
if isModerationPost:
containerClass='container report'
else:
contentStr+='Sensitive '
contentStr+=''
contentStr+='
'
# second row of buttons for moderator actions
if moderator and boxName=='moderation':
tlStr+= \
''
# add the javascript for content warnings
tlStr+=''
# page up arrow
if pageNumber>1:
tlStr+='
'
# show the posts
itemCtr=0
for item in timelineJson['orderedItems']:
if item['type']=='Create' or item['type']=='Announce':
itemCtr+=1
avatarUrl=getPersonAvatarUrl(baseDir,item['actor'],personCache)
tlStr+=individualPostAsHtml(baseDir,session,wfRequest,personCache, \
nickname,domain,port,item,avatarUrl,True, \
allowDeletion, \
httpPrefix,projectVersion,
showIndividualPostIcons)
# page down arrow
if itemCtr>=itemsPerPage:
tlStr+='
'
tlStr+=htmlFooter()
return tlStr
def htmlInbox(pageNumber: int,itemsPerPage: int, \
session,baseDir: str,wfRequest: {},personCache: {}, \
nickname: str,domain: str,port: int,inboxJson: {}, \
allowDeletion: bool, \
httpPrefix: str,projectVersion: str) -> str:
"""Show the inbox as html
"""
return htmlTimeline(pageNumber,itemsPerPage,session,baseDir,wfRequest,personCache, \
nickname,domain,port,inboxJson,'inbox',allowDeletion, \
httpPrefix,projectVersion)
def htmlModeration(pageNumber: int,itemsPerPage: int, \
session,baseDir: str,wfRequest: {},personCache: {}, \
nickname: str,domain: str,port: int,inboxJson: {}, \
allowDeletion: bool, \
httpPrefix: str,projectVersion: str) -> str:
"""Show the moderation feed as html
"""
return htmlTimeline(pageNumber,itemsPerPage,session,baseDir,wfRequest,personCache, \
nickname,domain,port,inboxJson,'moderation',allowDeletion, \
httpPrefix,projectVersion)
def htmlOutbox(pageNumber: int,itemsPerPage: int, \
session,baseDir: str,wfRequest: {},personCache: {}, \
nickname: str,domain: str,port: int,outboxJson: {}, \
allowDeletion: bool,
httpPrefix: str,projectVersion: str) -> str:
"""Show the Outbox as html
"""
return htmlTimeline(pageNumber,itemsPerPage,session,baseDir,wfRequest,personCache, \
nickname,domain,port,outboxJson,'outbox',allowDeletion, \
httpPrefix,projectVersion)
def htmlIndividualPost(baseDir: str,session,wfRequest: {},personCache: {}, \
nickname: str,domain: str,port: int,authorized: bool, \
postJsonObject: {},httpPrefix: str,projectVersion: str) -> str:
"""Show an individual post as html
"""
postStr=''
postStr+= \
individualPostAsHtml(baseDir,session,wfRequest,personCache, \
nickname,domain,port,postJsonObject,None,True,False, \
httpPrefix,projectVersion,False)
messageId=postJsonObject['id'].replace('/activity','')
# show the previous posts
while postJsonObject['object'].get('inReplyTo'):
postFilename=locatePost(baseDir,nickname,domain,postJsonObject['object']['inReplyTo'])
if not postFilename:
break
with open(postFilename, 'r') as fp:
postJsonObject=commentjson.load(fp)
postStr= \
individualPostAsHtml(baseDir,session,wfRequest,personCache, \
nickname,domain,port,postJsonObject, \
None,True,False, \
httpPrefix,projectVersion, \
False)+postStr
# show the following posts
postFilename=locatePost(baseDir,nickname,domain,messageId)
if postFilename:
# is there a replies file for this post?
repliesFilename=postFilename.replace('.json','.replies')
if os.path.isfile(repliesFilename):
# get items from the replies file
repliesJson={'orderedItems': []}
populateRepliesJson(baseDir,nickname,domain,repliesFilename,authorized,repliesJson)
# add items to the html output
for item in repliesJson['orderedItems']:
postStr+= \
individualPostAsHtml(baseDir,session,wfRequest,personCache, \
nickname,domain,port,item,None,True,False, \
httpPrefix,projectVersion,False)
return htmlHeader()+postStr+htmlFooter()
def htmlPostReplies(baseDir: str,session,wfRequest: {},personCache: {}, \
nickname: str,domain: str,port: int,repliesJson: {}, \
httpPrefix: str,projectVersion: str) -> str:
"""Show the replies to an individual post as html
"""
repliesStr=''
if repliesJson.get('orderedItems'):
for item in repliesJson['orderedItems']:
repliesStr+=individualPostAsHtml(baseDir,session,wfRequest,personCache, \
nickname,domain,port,item,None,True,False, \
httpPrefix,projectVersion,False)
return htmlHeader()+repliesStr+htmlFooter()
def htmlFollowConfirm(baseDir: str,originPathStr: str,followActor: str,followProfileUrl: str) -> str:
"""Asks to confirm a follow
"""
followDomain,port=getDomainFromActor(followActor)
if os.path.isfile(baseDir+'/img/follow-background.png'):
if not os.path.isfile(baseDir+'/accounts/follow-background.png'):
copyfile(baseDir+'/img/follow-background.png',baseDir+'/accounts/follow-background.png')
with open(baseDir+'/epicyon-follow.css', 'r') as cssFile:
profileStyle = cssFile.read()
followStr=htmlHeader(profileStyle)
followStr+='
'
followStr+=htmlFooter()
return followStr
def htmlUnfollowConfirm(baseDir: str,originPathStr: str,followActor: str,followProfileUrl: str) -> str:
"""Asks to confirm unfollowing an actor
"""
followDomain,port=getDomainFromActor(followActor)
if os.path.isfile(baseDir+'/img/follow-background.png'):
if not os.path.isfile(baseDir+'/accounts/follow-background.png'):
copyfile(baseDir+'/img/follow-background.png',baseDir+'/accounts/follow-background.png')
with open(baseDir+'/epicyon-follow.css', 'r') as cssFile:
profileStyle = cssFile.read()
followStr=htmlHeader(profileStyle)
followStr+='
Stop following '+getNicknameFromActor(followActor)+'@'+followDomain+' ?
'
followStr+= \
' '
followStr+='
'
followStr+='
'
followStr+='
'
followStr+=htmlFooter()
return followStr
def htmlBlockConfirm(baseDir: str,originPathStr: str,blockActor: str,blockProfileUrl: str) -> str:
"""Asks to confirm a block
"""
blockDomain,port=getDomainFromActor(blockActor)
if os.path.isfile(baseDir+'/img/block-background.png'):
if not os.path.isfile(baseDir+'/accounts/block-background.png'):
copyfile(baseDir+'/img/block-background.png',baseDir+'/accounts/block-background.png')
with open(baseDir+'/epicyon-follow.css', 'r') as cssFile:
profileStyle = cssFile.read()
blockStr=htmlHeader(profileStyle)
blockStr+='
'
blockStr+=htmlFooter()
return blockStr
def htmlUnblockConfirm(baseDir: str,originPathStr: str,blockActor: str,blockProfileUrl: str) -> str:
"""Asks to confirm unblocking an actor
"""
blockDomain,port=getDomainFromActor(blockActor)
if os.path.isfile(baseDir+'/img/block-background.png'):
if not os.path.isfile(baseDir+'/accounts/block-background.png'):
copyfile(baseDir+'/img/block-background.png',baseDir+'/accounts/block-background.png')
with open(baseDir+'/epicyon-follow.css', 'r') as cssFile:
profileStyle = cssFile.read()
blockStr=htmlHeader(profileStyle)
blockStr+='
'
blockStr+=htmlFooter()
return blockStr
def htmlSearchEmojiTextEntry(baseDir: str,path: str) -> str:
"""Search for an emoji by name
"""
if not os.path.isfile(baseDir+'/emoji/emoji.json'):
copyfile(baseDir+'/emoji/default_emoji.json',baseDir+'/emoji/emoji.json')
actor=path.replace('/search','')
nickname=getNicknameFromActor(actor)
domain,port=getDomainFromActor(actor)
if os.path.isfile(baseDir+'/img/search-background.png'):
if not os.path.isfile(baseDir+'/accounts/search-background.png'):
copyfile(baseDir+'/img/search-background.png',baseDir+'/accounts/search-background.png')
with open(baseDir+'/epicyon-follow.css', 'r') as cssFile:
profileStyle = cssFile.read()
emojiStr=htmlHeader(profileStyle)
emojiStr+='
'
emojiStr+='
'
emojiStr+='
'
emojiStr+='
Enter an emoji name to search for
'
emojiStr+= \
' '
emojiStr+='
'
emojiStr+='
'
emojiStr+='
'
emojiStr+=htmlFooter()
return emojiStr
def htmlSearch(baseDir: str,path: str) -> str:
"""Search called from the timeline icon
"""
actor=path.replace('/search','')
nickname=getNicknameFromActor(actor)
domain,port=getDomainFromActor(actor)
if os.path.isfile(baseDir+'/img/search-background.png'):
if not os.path.isfile(baseDir+'/accounts/search-background.png'):
copyfile(baseDir+'/img/search-background.png',baseDir+'/accounts/search-background.png')
with open(baseDir+'/epicyon-follow.css', 'r') as cssFile:
profileStyle = cssFile.read()
followStr=htmlHeader(profileStyle)
followStr+='
'
followStr+='
'
followStr+='
'
followStr+='
Enter an address, shared item, #hashtag or :emoji: to search for
'
followStr+= \
' '
followStr+='
'
followStr+='
'
followStr+='
'
followStr+=htmlFooter()
return followStr
def htmlProfileAfterSearch(baseDir: str,path: str,httpPrefix: str, \
nickname: str,domain: str,port: int, \
profileHandle: str, \
session,wfRequest: {},personCache: {},
debug: bool,projectVersion: str) -> str:
"""Show a profile page after a search for a fediverse address
"""
if '/users/' in profileHandle or '/@' in profileHandle:
searchNickname=getNicknameFromActor(profileHandle)
searchDomain,searchPort=getDomainFromActor(profileHandle)
else:
if '@' not in profileHandle:
if debug:
print('DEBUG: no @ in '+profileHandle)
return None
if profileHandle.startswith('@'):
profileHandle=profileHandle[1:]
if '@' not in profileHandle:
if debug:
print('DEBUG: no @ in '+profileHandle)
return None
searchNickname=profileHandle.split('@')[0]
searchDomain=profileHandle.split('@')[1]
searchPort=None
if ':' in searchDomain:
searchPort=int(searchDomain.split(':')[1])
searchDomain=searchDomain.split(':')[0]
if not searchNickname:
if debug:
print('DEBUG: No nickname found in '+profileHandle)
return None
if not searchDomain:
if debug:
print('DEBUG: No domain found in '+profileHandle)
return None
searchDomainFull=searchDomain
if searchPort:
if searchPort!=80 and searchPort!=443:
if ':' not in searchDomain:
searchDomainFull=searchDomain+':'+str(searchPort)
profileStr=''
with open(baseDir+'/epicyon-profile.css', 'r') as cssFile:
wf = webfingerHandle(session,searchNickname+'@'+searchDomainFull,httpPrefix,wfRequest, \
domain,projectVersion)
if not wf:
if debug:
print('DEBUG: Unable to webfinger '+searchNickname+'@'+searchDomainFull)
return None
asHeader = {'Accept': 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'}
personUrl = getUserUrl(wf)
profileJson = getJson(session,personUrl,asHeader,None,projectVersion,httpPrefix,domain)
if not profileJson:
if debug:
print('DEBUG: No actor returned from '+personUrl)
return None
avatarUrl=''
if profileJson.get('icon'):
if profileJson['icon'].get('url'):
avatarUrl=profileJson['icon']['url']
if not avatarUrl:
avatarUrl=getPersonAvatarUrl(baseDir,personUrl,personCache)
preferredName=searchNickname
if profileJson.get('preferredUsername'):
preferredName=profileJson['preferredUsername']
profileDescription=''
if profileJson.get('summary'):
profileDescription=profileJson['summary']
outboxUrl=None
if not profileJson.get('outbox'):
if debug:
pprint(profileJson)
print('DEBUG: No outbox found')
return None
outboxUrl=profileJson['outbox']
profileBackgroundImage=''
if profileJson.get('image'):
if profileJson['image'].get('url'):
profileBackgroundImage=profileJson['image']['url']
profileStyle = cssFile.read().replace('image.png',profileBackgroundImage)
# url to return to
backUrl=path
if not backUrl.endswith('/inbox'):
backUrl+='/inbox'
profileStr= \
'
' \
'
' \
' ' \
'
'+preferredName+'
' \
'
@'+searchNickname+'@'+searchDomainFull+'
' \
'
'+profileDescription+'
'+ \
'
' \
'
'+ \
'
\n' \
' ' \
'
'
profileStr+=''
result = []
i = 0
for item in parseUserFeed(session,outboxUrl,asHeader, \
projectVersion,httpPrefix,domain):
if not item.get('type'):
continue
if item['type']!='Create' and item['type']!='Announce':
continue
if not item.get('object'):
continue
profileStr+= \
individualPostAsHtml(baseDir, \
session,wfRequest,personCache, \
nickname,domain,port, \
item,avatarUrl,False,False, \
httpPrefix,projectVersion,False)
i+=1
if i>=20:
break
return htmlHeader(profileStyle)+profileStr+htmlFooter()