__filename__ = "webinterface.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.1.0" __maintainer__ = "Bob Mottram" __email__ = "bob@freedombone.net" __status__ = "Production" import json import time import os from collections import OrderedDict from datetime import datetime from datetime import date from dateutil.parser import parse from shutil import copyfile from shutil import copyfileobj from pprint import pprint from person import personBoxJson from person import isPersonSnoozed from pgp import getEmailAddress from pgp import getPGPpubKey from xmpp import getXmppAddress from ssb import getSSBAddress from matrix import getMatrixAddress from donate import getDonationUrl from utils import isBlogPost from utils import updateRecentPostsCache from utils import getNicknameFromActor from utils import getDomainFromActor from utils import locatePost from utils import noOfAccounts from utils import isPublicPost from utils import isPublicPostFromUrl from utils import getDisplayName from utils import getCachedPostDirectory from utils import getCachedPostFilename from utils import loadJson from utils import saveJson from follow import isFollowingActor from webfinger import webfingerHandle from posts import isDM from posts import getPersonBox from posts import getUserUrl from posts import parseUserFeed from posts import populateRepliesJson from posts import isModerator from posts import outboxMessageCreateWrap from posts import downloadAnnounce from session import getJson from auth import createPassword from like import likedByPerson from like import noOfLikes from bookmarks import bookmarkedByPerson from announce import announcedByPerson from blocking import isBlocked from blocking import isBlockedHashtag from content import switchWords from content import getMentionsFromHtml from content import addHtmlTags from content import replaceEmojiFromTags from content import removeLongWords from config import getConfigParam from skills import getSkills from cache import getPersonFromCache from cache import storePersonInCache from shares import getValidSharedItemID from happening import todaysEventsCheck from happening import thisWeeksEventsCheck from happening import getCalendarEvents from happening import getTodaysEvents def updateAvatarImageCache(session,baseDir: str,httpPrefix: str, \ actor: str,avatarUrl: str, \ personCache: {},force=False) -> str: """Updates the cached avatar for the given actor """ if not avatarUrl: return None actorStr=actor.replace('/','-') avatarImagePath=baseDir+'/cache/avatars/'+actorStr if avatarUrl.endswith('.png') or '.png?' in avatarUrl: sessionHeaders = {'Accept': 'image/png'} avatarImageFilename=avatarImagePath+'.png' elif avatarUrl.endswith('.jpg') or avatarUrl.endswith('.jpeg') or \ '.jpg?' in avatarUrl or '.jpeg?' in avatarUrl: sessionHeaders = {'Accept': 'image/jpeg'} avatarImageFilename=avatarImagePath+'.jpg' elif avatarUrl.endswith('.gif') or '.gif?' in avatarUrl: sessionHeaders = {'Accept': 'image/gif'} avatarImageFilename=avatarImagePath+'.gif' elif avatarUrl.endswith('.webp') or '.webp?' in avatarUrl: sessionHeaders = {'Accept': 'image/webp'} avatarImageFilename=avatarImagePath+'.webp' else: return None if not os.path.isfile(avatarImageFilename) or force: try: print('avatar image url: '+avatarUrl) result=session.get(avatarUrl, headers=sessionHeaders, params=None) if result.status_code<200 or result.status_code>202: print('Avatar image download failed with status '+ \ str(result.status_code)) # remove partial download if os.path.isfile(avatarImageFilename): os.remove(avatarImageFilename) else: with open(avatarImageFilename, 'wb') as f: f.write(result.content) print('avatar image downloaded for '+actor) return avatarImageFilename.replace(baseDir+'/cache','') except Exception as e: print('Failed to download avatar image: '+str(avatarUrl)) print(e) if '/channel/' not in actor: sessionHeaders = { 'Accept': 'application/activity+json; profile="https://www.w3.org/ns/activitystreams"' } else: sessionHeaders = { 'Accept': 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' } personJson = \ getJson(session,actor,sessionHeaders,None,__version__, \ httpPrefix,None) if personJson: if not personJson.get('id'): return None if not personJson.get('publicKey'): return None if not personJson['publicKey'].get('publicKeyPem'): return None if personJson['id']!=actor: return None if not personCache.get(actor): return None if personCache[actor]['actor']['publicKey']['publicKeyPem']!= \ personJson['publicKey']['publicKeyPem']: print("ERROR: public keys don't match when downloading actor for "+actor) return None storePersonInCache(baseDir,actor,personJson,personCache) return getPersonAvatarUrl(baseDir,actor,personCache) return None return avatarImageFilename.replace(baseDir+'/cache','') def getPersonAvatarUrl(baseDir: str,personUrl: str,personCache: {}) -> str: """Returns the avatar url for the person """ personJson = getPersonFromCache(baseDir,personUrl,personCache) if not personJson: return None # get from locally stored image actorStr=personJson['id'].replace('/','-') avatarImagePath=baseDir+'/cache/avatars/'+actorStr if os.path.isfile(avatarImagePath+'.png'): return '/avatars/'+actorStr+'.png' if os.path.isfile(avatarImagePath+'.jpg'): return '/avatars/'+actorStr+'.jpg' if os.path.isfile(avatarImagePath+'.gif'): return '/avatars/'+actorStr+'.gif' if os.path.isfile(avatarImagePath+'.webp'): return '/avatars/'+actorStr+'.webp' if os.path.isfile(avatarImagePath): return '/avatars/'+actorStr if personJson.get('icon'): if personJson['icon'].get('url'): return personJson['icon']['url'] return None def htmlSearchEmoji(translate: {},baseDir: str,httpPrefix: str, \ searchStr: str) -> str: """Search results for emoji """ # emoji.json is generated so that it can be customized and the changes # will be retained even if default_emoji.json is subsequently updated if not os.path.isfile(baseDir+'/emoji/emoji.json'): copyfile(baseDir+'/emoji/default_emoji.json',baseDir+'/emoji/emoji.json') searchStr=searchStr.lower().replace(':','').strip('\n') cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: emojiCSS=cssFile.read() if httpPrefix!='https': emojiCSS=emojiCSS.replace('https://',httpPrefix+'://') emojiLookupFilename=baseDir+'/emoji/emoji.json' # create header emojiForm=htmlHeader(cssFilename,emojiCSS) emojiForm+='

'+translate['Emoji Search']+'

' # does the lookup file exist? if not os.path.isfile(emojiLookupFilename): emojiForm+='
'+translate['No results']+'
' emojiForm+=htmlFooter() return emojiForm emojiJson=loadJson(emojiLookupFilename) if emojiJson: results={} for emojiName,filename in emojiJson.items(): if searchStr in emojiName: results[emojiName] = filename+'.png' for emojiName,filename in emojiJson.items(): if emojiName in searchStr: results[emojiName] = filename+'.png' headingShown=False emojiForm+='
' for emojiName,filename in results.items(): if os.path.isfile(baseDir+'/emoji/'+filename): if not headingShown: emojiForm+= \ '
'+ \ translate['Copy the text then paste it into your post']+ \ '
' headingShown=True emojiForm+= \ '

:'+emojiName+ \ ':

' emojiForm+='
' emojiForm+=htmlFooter() return emojiForm def getIconsDir(baseDir: str) -> str: """Returns the directory where icons exist """ iconsDir='icons' theme=getConfigParam(baseDir,'theme') if theme: if os.path.isdir(baseDir+'/img/icons/'+theme): iconsDir='icons/'+theme return iconsDir def htmlSearchSharedItems(translate: {}, \ baseDir: str,searchStr: str, \ pageNumber: int, \ resultsPerPage: int, \ httpPrefix: str, \ domainFull: str,actor: str) -> str: """Search results for shared items """ iconsDir=getIconsDir(baseDir) currPage=1 ctr=0 sharedItemsForm='' searchStrLower= \ searchStr.replace('%2B','+').replace('%40','@').replace('%3A',':').replace('%23','#').lower().strip('\n') searchStrLowerList=searchStrLower.split('+') cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: sharedItemsCSS=cssFile.read() if httpPrefix!='https': sharedItemsCSS= \ sharedItemsCSS.replace('https://',httpPrefix+'://') sharedItemsForm=htmlHeader(cssFilename,sharedItemsCSS) sharedItemsForm+= \ '

'+translate['Shared Items Search']+'

' resultsExist=False for subdir, dirs, files in os.walk(baseDir+'/accounts'): for handle in dirs: if '@' not in handle: continue contactNickname=handle.split('@')[0] sharesFilename=baseDir+'/accounts/'+handle+'/shares.json' if not os.path.isfile(sharesFilename): continue sharesJson=loadJson(sharesFilename) if not sharesJson: continue for name,sharedItem in sharesJson.items(): matched=True for searchSubstr in searchStrLowerList: subStrMatched=False searchSubstr=searchSubstr.strip() if searchSubstr in sharedItem['location'].lower(): subStrMatched=True elif searchSubstr in sharedItem['summary'].lower(): subStrMatched=True elif searchSubstr in sharedItem['displayName'].lower(): subStrMatched=True elif searchSubstr in sharedItem['category'].lower(): subStrMatched=True if not subStrMatched: matched=False break if matched: if currPage==pageNumber: sharedItemsForm+='
' sharedItemsForm+= \ '

'+sharedItem['displayName']+'

' if sharedItem.get('imageUrl'): sharedItemsForm+='' sharedItemsForm+= \ 'Item image' sharedItemsForm+='

'+sharedItem['summary']+'

' sharedItemsForm+='

'+translate['Type']+': '+sharedItem['itemType']+' ' sharedItemsForm+=''+translate['Category']+': '+sharedItem['category']+' ' sharedItemsForm+=''+translate['Location']+': '+sharedItem['location']+'

' contactActor=httpPrefix+'://'+domainFull+'/users/'+contactNickname sharedItemsForm+= \ '

' if actor.endswith('/users/'+contactNickname): sharedItemsForm+= \ ' ' sharedItemsForm+='

' if not resultsExist and currPage>1: # previous page link, needs to be a POST sharedItemsForm+= \ '
' sharedItemsForm+= \ ' ' sharedItemsForm+= \ '
' sharedItemsForm+= \ '
' sharedItemsForm+= \ ' '+translate['Page up']+'' sharedItemsForm+='
' sharedItemsForm+='
' resultsExist=True ctr+=1 if ctr>=resultsPerPage: currPage+=1 if currPage>pageNumber: # next page link, needs to be a POST sharedItemsForm+= \ '
' sharedItemsForm+= \ ' ' sharedItemsForm+= \ '
' sharedItemsForm+= \ '
' sharedItemsForm+= \ ' '+translate['Page down']+'' sharedItemsForm+='
' sharedItemsForm+='
' break ctr=0 if not resultsExist: sharedItemsForm+='
'+translate['No results']+'
' sharedItemsForm+=htmlFooter() return sharedItemsForm def htmlModerationInfo(translate: {},baseDir: str,httpPrefix: str) -> str: infoForm='' cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: infoCSS=cssFile.read() if httpPrefix!='https': infoCSS=infoCSS.replace('https://',httpPrefix+'://') infoForm=htmlHeader(cssFilename,infoCSS) infoForm+= \ '

'+translate['Moderation Information']+'

' infoShown=False suspendedFilename=baseDir+'/accounts/suspended.txt' if os.path.isfile(suspendedFilename): with open(suspendedFilename, "r") as f: suspendedStr = f.read() infoForm+='
' infoForm+='
'+translate['Suspended accounts']+'' infoForm+='
'+translate['These are currently suspended'] infoForm+= \ ' ' infoForm+='
' infoShown=True blockingFilename=baseDir+'/accounts/blocking.txt' if os.path.isfile(blockingFilename): with open(blockingFilename, "r") as f: blockedStr = f.read() infoForm+='
' infoForm+= \ '
'+translate['Blocked accounts and hashtags']+'' infoForm+= \ '
'+translate['These are globally blocked for all accounts on this instance'] infoForm+= \ ' ' infoForm+='
' infoShown=True if not infoShown: infoForm+= \ '

'+ \ translate['Any blocks or suspensions made by moderators will be shown here.']+ \ '

' infoForm+=htmlFooter() return infoForm def htmlHashtagSearch(nickname: str,domain: str,port: int, \ recentPostsCache: {},maxRecentPosts: int, \ translate: {}, \ baseDir: str,hashtag: str,pageNumber: int, \ postsPerPage: int, \ session,wfRequest: {},personCache: {}, \ httpPrefix: str,projectVersion: str) -> str: """Show a page containing search results for a hashtag """ iconsDir=getIconsDir(baseDir) if hashtag.startswith('#'): hashtag=hashtag[1:] hashtagIndexFile=baseDir+'/tags/'+hashtag+'.txt' if not os.path.isfile(hashtagIndexFile): return None # check that the directory for the nickname exists if nickname: if not os.path.isdir(baseDir+'/accounts/'+nickname+'@'+domain): nickname=None # read the index with open(hashtagIndexFile, "r") as f: lines = f.readlines() # read the css cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: hashtagSearchCSS = cssFile.read() if httpPrefix!='https': hashtagSearchCSS= \ hashtagSearchCSS.replace('https://',httpPrefix+'://') # ensure that the page number is in bounds if not pageNumber: pageNumber=1 elif pageNumber<1: pageNumber=1 # get the start end end within the index file startIndex=int((pageNumber-1)*postsPerPage) endIndex=startIndex+postsPerPage noOfLines=len(lines) if endIndex>=noOfLines and noOfLines>0: endIndex=noOfLines-1 # add the page title hashtagSearchForm=htmlHeader(cssFilename,hashtagSearchCSS) hashtagSearchForm+='' hashtagSearchForm+='

#'+hashtag+'

' if startIndex>0: # previous page link hashtagSearchForm+= \ '
'+translate['Page up']+'
' index=startIndex while index<=endIndex: postId=lines[index].strip('\n') if ' ' not in postId: nickname=getNicknameFromActor(postId) if not nickname: index+=1 continue else: postFields=postId.split(' ') if len(postFields)!=3: index=+1 continue postDaysSinceEposh=int(postFields[0]) nickname=postFields[1] postId=postFields[2] postFilename=locatePost(baseDir,nickname,domain,postId) if not postFilename: index+=1 continue postJsonObject=loadJson(postFilename) if postJsonObject: if not isPublicPost(postJsonObject): index+=1 continue showIndividualPostIcons=False if nickname: showIndividualPostIcons=True allowDeletion=False hashtagSearchForm+= \ individualPostAsHtml(recentPostsCache,maxRecentPosts, \ iconsDir,translate,None, \ baseDir,session,wfRequest,personCache, \ nickname,domain,port,postJsonObject, \ None,True,allowDeletion, \ httpPrefix,projectVersion,'search', \ showIndividualPostIcons, \ showIndividualPostIcons, \ False,False,False) index+=1 if endIndex'+translate['Page down']+'' hashtagSearchForm+=htmlFooter() return hashtagSearchForm def htmlSkillsSearch(translate: {},baseDir: str, \ httpPrefix: str, \ skillsearch: str,instanceOnly: bool, \ postsPerPage: int) -> str: """Show a page containing search results for a skill """ if skillsearch.startswith('*'): skillsearch=skillsearch[1:].strip() skillsearch=skillsearch.lower().strip('\n') results=[] # search instance accounts for subdir, dirs, files in os.walk(baseDir+'/accounts/'): for f in files: if not f.endswith('.json'): continue if '@' not in f: continue if f.startswith('inbox@'): continue actorFilename = os.path.join(subdir, f) actorJson=loadJson(actorFilename) if actorJson: if actorJson.get('id') and \ actorJson.get('skills') and \ actorJson.get('name') and \ actorJson.get('icon'): actor=actorJson['id'] for skillName,skillLevel in actorJson['skills'].items(): skillName=skillName.lower() if skillName in skillsearch or skillsearch in skillName: skillLevelStr=str(skillLevel) if skillLevel<100: skillLevelStr='0'+skillLevelStr if skillLevel<10: skillLevelStr='0'+skillLevelStr indexStr= \ skillLevelStr+';'+actor+';'+actorJson['name']+ \ ';'+actorJson['icon']['url'] if indexStr not in results: results.append(indexStr) if not instanceOnly: # search actor cache for subdir, dirs, files in os.walk(baseDir+'/cache/actors/'): for f in files: if not f.endswith('.json'): continue if '@' not in f: continue if f.startswith('inbox@'): continue actorFilename = os.path.join(subdir, f) cachedActorJson=loadJson(actorFilename) if cachedActorJson: if cachedActorJson.get('actor'): actorJson=cachedActorJson['actor'] if actorJson.get('id') and \ actorJson.get('skills') and \ actorJson.get('name') and \ actorJson.get('icon'): actor=actorJson['id'] for skillName,skillLevel in actorJson['skills'].items(): skillName=skillName.lower() if skillName in skillsearch or skillsearch in skillName: skillLevelStr=str(skillLevel) if skillLevel<100: skillLevelStr='0'+skillLevelStr if skillLevel<10: skillLevelStr='0'+skillLevelStr indexStr= \ skillLevelStr+';'+actor+';'+actorJson['name']+ \ ';'+actorJson['icon']['url'] if indexStr not in results: results.append(indexStr) results.sort(reverse=True) cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: skillSearchCSS = cssFile.read() if httpPrefix!='https': skillSearchCSS=skillSearchCSS.replace('https://',httpPrefix+'://') skillSearchForm=htmlHeader(cssFilename,skillSearchCSS) skillSearchForm+= \ '

'+translate['Skills search']+': '+ \ skillsearch+'

' if len(results)==0: skillSearchForm+='
'+translate['No results']+'
' else: skillSearchForm+='
' ctr=0 for skillMatch in results: skillMatchFields=skillMatch.split(';') if len(skillMatchFields)==4: actor=skillMatchFields[1] actorName=skillMatchFields[2] avatarUrl=skillMatchFields[3] skillSearchForm+='' ctr+=1 if ctr>=postsPerPage: break skillSearchForm+='
' skillSearchForm+=htmlFooter() return skillSearchForm def scheduledPostsExist(baseDir: str,nickname: str,domain: str) -> bool: """Returns true if there are posts scheduled to be delivered """ scheduleIndexFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/schedule.index' if not os.path.isfile(scheduleIndexFilename): return False if '#users#' in open(scheduleIndexFilename).read(): return True return False def htmlEditProfile(translate: {},baseDir: str,path: str, \ domain: str,port: int,httpPrefix: str) -> str: """Shows the edit profile screen """ imageFormats='.png, .jpg, .jpeg, .gif, .webp' pathOriginal=path path=path.replace('/inbox','').replace('/outbox','').replace('/shares','') nickname=getNicknameFromActor(path) if not nickname: return '' domainFull=domain if port: if port!=80 and port!=443: if ':' not in domain: domainFull=domain+':'+str(port) actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json' if not os.path.isfile(actorFilename): return '' isBot='' isGroup='' followDMs='' removeTwitter='' mediaInstanceStr='' displayNickname=nickname bioStr='' donateUrl='' emailAddress='' PGPpubKey='' xmppAddress='' matrixAddress='' ssbAddress='' manuallyApprovesFollowers='' actorJson=loadJson(actorFilename) if actorJson: donateUrl=getDonationUrl(actorJson) xmppAddress=getXmppAddress(actorJson) matrixAddress=getMatrixAddress(actorJson) ssbAddress=getSSBAddress(actorJson) emailAddress=getEmailAddress(actorJson) PGPpubKey=getPGPpubKey(actorJson) if actorJson.get('name'): displayNickname=actorJson['name'] if actorJson.get('summary'): bioStr=actorJson['summary'].replace('

','').replace('

','') if actorJson.get('manuallyApprovesFollowers'): if actorJson['manuallyApprovesFollowers']: manuallyApprovesFollowers='checked' else: manuallyApprovesFollowers='' if actorJson.get('type'): if actorJson['type']=='Service': isBot='checked' isGroup='' elif actorJson['type']=='Group': isGroup='checked' isBot='' if os.path.isfile(baseDir+'/accounts/'+nickname+'@'+domain+'/.followDMs'): followDMs='checked' if os.path.isfile(baseDir+'/accounts/'+nickname+'@'+domain+'/.removeTwitter'): removeTwitter='checked' mediaInstance=getConfigParam(baseDir,"mediaInstance") if mediaInstance: if mediaInstance==True: mediaInstanceStr='checked' filterStr='' filterFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/filters.txt' if os.path.isfile(filterFilename): with open(filterFilename, 'r') as filterfile: filterStr=filterfile.read() switchStr='' switchFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/replacewords.txt' if os.path.isfile(switchFilename): with open(switchFilename, 'r') as switchfile: switchStr=switchfile.read() blockedStr='' blockedFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt' if os.path.isfile(blockedFilename): with open(blockedFilename, 'r') as blockedfile: blockedStr=blockedfile.read() allowedInstancesStr='' allowedInstancesFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/allowedinstances.txt' if os.path.isfile(allowedInstancesFilename): with open(allowedInstancesFilename, 'r') as allowedInstancesFile: allowedInstancesStr=allowedInstancesFile.read() skills=getSkills(baseDir,nickname,domain) skillsStr='' skillCtr=1 if skills: for skillDesc,skillValue in skills.items(): skillsStr+= \ '

' skillsStr+= \ '

' skillCtr+=1 skillsStr+= \ '

' skillsStr+= \ '

' \ cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: editProfileCSS = cssFile.read() if httpPrefix!='https': editProfileCSS=editProfileCSS.replace('https://',httpPrefix+'://') instanceStr='' moderatorsStr='' themesDropdown='' adminNickname=getConfigParam(baseDir,'admin') if path.startswith('/users/'+adminNickname+'/'): instanceDescription=getConfigParam(baseDir,'instanceDescription') instanceDescriptionShort=getConfigParam(baseDir,'instanceDescriptionShort') instanceTitle=getConfigParam(baseDir,'instanceTitle') instanceStr='
' instanceStr+= \ ' ' instanceStr+= \ '
' instanceStr+= \ ' ' instanceStr+= \ '
' instanceStr+= \ ' ' instanceStr+= \ ' ' instanceStr+= \ ' ' instanceStr+= \ ' ' instanceStr+='
' moderators='' moderatorsFile=baseDir+'/accounts/moderators.txt' if os.path.isfile(moderatorsFile): with open(moderatorsFile, "r") as f: moderators = f.read() moderatorsStr='
' moderatorsStr+=' '+translate['Moderators']+'
' moderatorsStr+=' '+translate['A list of moderator nicknames. One per line.'] moderatorsStr+= \ ' ' moderatorsStr+='
' themesDropdown= '
' themesDropdown+=' '+translate['Theme']+'
' themesDropdown+='
' themesDropdown+='
' themeName=getConfigParam(baseDir,'theme') themesDropdown= \ themesDropdown.replace('