__filename__ = "webapp_post.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
import time
from dateutil.parser import parse
from auth import createPassword
from git import isGitPatch
from datetime import datetime
from cache import getPersonFromCache
from bookmarks import bookmarkedByPerson
from like import likedByPerson
from like import noOfLikes
from follow import isFollowingActor
from posts import isEditor
from posts import postIsMuted
from posts import getPersonBox
from posts import isDM
from posts import downloadAnnounce
from posts import populateRepliesJson
from utils import locatePost
from utils import loadJson
from utils import getCachedPostDirectory
from utils import getCachedPostFilename
from utils import getProtocolPrefixes
from utils import isNewsPost
from utils import isBlogPost
from utils import getDisplayName
from utils import isPublicPost
from utils import updateRecentPostsCache
from utils import removeIdEnding
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import isEventPost
from content import replaceEmojiFromTags
from content import htmlReplaceQuoteMarks
from content import htmlReplaceEmailQuote
from content import removeTextFormatting
from content import removeLongWords
from content import getMentionsFromHtml
from content import switchWords
from person import isPersonSnoozed
from announce import announcedByPerson
from webapp_utils import getPersonAvatarUrl
from webapp_utils import updateAvatarImageCache
from webapp_utils import loadIndividualPostAsHtmlFromCache
from webapp_utils import addEmojiToDisplayName
from webapp_utils import postContainsPublic
from webapp_utils import getContentWarningButton
from webapp_utils import getPostAttachmentsAsHtml
from webapp_utils import getIconsWebPath
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from webapp_media import addEmbeddedElements
from webapp_question import insertQuestion
from devices import E2EEdecryptMessageFromDevice
def preparePostFromHtmlCache(postHtml: str, boxName: str,
pageNumber: int) -> str:
"""Sets the page number on a cached html post
"""
# if on the bookmarks timeline then remain there
if boxName == 'tlbookmarks' or boxName == 'bookmarks':
postHtml = postHtml.replace('?tl=inbox', '?tl=tlbookmarks')
if '?page=' in postHtml:
pageNumberStr = postHtml.split('?page=')[1]
if '?' in pageNumberStr:
pageNumberStr = pageNumberStr.split('?')[0]
postHtml = postHtml.replace('?page=' + pageNumberStr, '?page=-999')
withPageNumber = postHtml.replace(';-999;', ';' + str(pageNumber) + ';')
withPageNumber = withPageNumber.replace('?page=-999',
'?page=' + str(pageNumber))
return withPageNumber
def saveIndividualPostAsHtmlToCache(baseDir: str,
nickname: str, domain: str,
postJsonObject: {},
postHtml: str) -> bool:
"""Saves the given html for a post to a cache file
This is so that it can be quickly reloaded on subsequent
refresh of the timeline
"""
htmlPostCacheDir = \
getCachedPostDirectory(baseDir, nickname, domain)
cachedPostFilename = \
getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
# create the cache directory if needed
if not os.path.isdir(htmlPostCacheDir):
os.mkdir(htmlPostCacheDir)
try:
with open(cachedPostFilename, 'w+') as fp:
fp.write(postHtml)
return True
except Exception as e:
print('ERROR: saving post to cache ' + str(e))
return False
def getPostFromRecentCache(session,
baseDir: str,
httpPrefix: str,
nickname: str, domain: str,
postJsonObject: {},
postActor: str,
personCache: {},
allowDownloads: bool,
showPublicOnly: bool,
storeToCache: bool,
boxName: str,
avatarUrl: str,
enableTimingLog: bool,
postStartTime,
pageNumber: int,
recentPostsCache: {},
maxRecentPosts: int) -> str:
"""Attempts to get the html post from the recent posts cache in memory
"""
if boxName == 'tlmedia':
return None
if showPublicOnly:
return None
tryCache = False
bmTimeline = boxName == 'bookmarks' or boxName == 'tlbookmarks'
if storeToCache or bmTimeline:
tryCache = True
if not tryCache:
return None
# update avatar if needed
if not avatarUrl:
avatarUrl = \
getPersonAvatarUrl(baseDir, postActor, personCache,
allowDownloads)
# benchmark 2.1
if enableTimingLog:
timeDiff = int((time.time() - postStartTime) * 1000)
if timeDiff > 100:
print('TIMING INDIV ' + boxName +
' 2.1 = ' + str(timeDiff))
updateAvatarImageCache(session, baseDir, httpPrefix,
postActor, avatarUrl, personCache,
allowDownloads)
# benchmark 2.2
if enableTimingLog:
timeDiff = int((time.time() - postStartTime) * 1000)
if timeDiff > 100:
print('TIMING INDIV ' + boxName +
' 2.2 = ' + str(timeDiff))
postHtml = \
loadIndividualPostAsHtmlFromCache(baseDir, nickname, domain,
postJsonObject)
if not postHtml:
return None
postHtml = preparePostFromHtmlCache(postHtml, boxName, pageNumber)
updateRecentPostsCache(recentPostsCache, maxRecentPosts,
postJsonObject, postHtml)
# benchmark 3
if enableTimingLog:
timeDiff = int((time.time() - postStartTime) * 1000)
if timeDiff > 100:
print('TIMING INDIV ' + boxName +
' 3 = ' + str(timeDiff))
return postHtml
def getAvatarImageUrl(session,
baseDir: str, httpPrefix: str,
postActor: str, personCache: {},
avatarUrl: str, allowDownloads: bool) -> str:
"""Returns the avatar image url
"""
# get the avatar image url for the post actor
if not avatarUrl:
avatarUrl = \
getPersonAvatarUrl(baseDir, postActor, personCache,
allowDownloads)
avatarUrl = \
updateAvatarImageCache(session, baseDir, httpPrefix,
postActor, avatarUrl, personCache,
allowDownloads)
else:
updateAvatarImageCache(session, baseDir, httpPrefix,
postActor, avatarUrl, personCache,
allowDownloads)
if not avatarUrl:
avatarUrl = postActor + '/avatar.png'
return avatarUrl
def getAvatarImageHtml(showAvatarOptions: bool,
nickname: str, domainFull: str,
avatarUrl: str, postActor: str,
translate: {}, avatarPosition: str,
pageNumber: int, messageIdStr: str) -> str:
"""Get html for the avatar image
"""
avatarLink = ''
if '/users/news/' not in avatarUrl:
avatarLink = ' '
avatarLink += \
' \n'
if showAvatarOptions and \
domainFull + '/users/' + nickname not in postActor:
if '/users/news/' not in avatarUrl:
avatarLink = \
' \n'
avatarLink += \
'
\n'
else:
# don't link to the person options for the news account
avatarLink += \
'
\n'
return avatarLink.strip()
def getReplyIconHtml(nickname: str, isPublicRepeat: bool,
showIcons: bool, commentsEnabled: bool,
postJsonObject: {}, pageNumberParam: str,
iconsPath: str, translate: {}) -> str:
"""Returns html for the reply icon/button
"""
replyStr = ''
if not (showIcons and commentsEnabled):
return replyStr
# reply is permitted - create reply icon
replyToLink = postJsonObject['object']['id']
if postJsonObject['object'].get('attributedTo'):
if isinstance(postJsonObject['object']['attributedTo'], str):
replyToLink += \
'?mention=' + postJsonObject['object']['attributedTo']
if postJsonObject['object'].get('content'):
mentionedActors = \
getMentionsFromHtml(postJsonObject['object']['content'])
if mentionedActors:
for actorUrl in mentionedActors:
if '?mention=' + actorUrl not in replyToLink:
replyToLink += '?mention=' + actorUrl
if len(replyToLink) > 500:
break
replyToLink += pageNumberParam
replyStr = ''
replyToThisPostStr = translate['Reply to this post']
if isPublicRepeat:
replyStr += \
' \n'
else:
if isDM(postJsonObject):
replyStr += \
' ' + \
'\n'
else:
replyStr += \
' ' + \
'\n'
replyStr += \
' ' + \
'
\n'
return replyStr
def getEditIconHtml(baseDir: str, nickname: str, domainFull: str,
postJsonObject: {}, actorNickname: str,
translate: {}, iconsPath: str, isEvent: bool) -> str:
"""Returns html for the edit icon/button
"""
editStr = ''
actor = postJsonObject['actor']
if (actor.endswith(domainFull + '/users/' + nickname) or
(isEditor(baseDir, nickname) and
actor.endswith(domainFull + '/users/news'))):
postId = postJsonObject['object']['id']
if '/statuses/' in postId:
if isBlogPost(postJsonObject):
editBlogPostStr = translate['Edit blog post']
if not isNewsPost(postJsonObject):
editStr += \
' ' + \
'' + \
'
\n'
else:
editStr += \
' ' + \
'' + \
'
\n'
elif isEvent:
editEventStr = translate['Edit event']
editStr += \
' ' + \
'' + \
'
\n'
return editStr
def getAnnounceIconHtml(nickname: str, domainFull: str,
postJsonObject: {},
isPublicRepeat: bool,
isModerationPost: bool,
showRepeatIcon: bool,
translate: {},
pageNumberParam: str,
timelinePostBookmark: str,
boxName: str, iconsPath: str) -> str:
"""Returns html for announce icon/button
"""
announceStr = ''
if not isModerationPost and showRepeatIcon:
# don't allow announce/repeat of your own posts
announceIcon = 'repeat_inactive.png'
announceLink = 'repeat'
if not isPublicRepeat:
announceLink = 'repeatprivate'
announceTitle = translate['Repeat this post']
if announcedByPerson(postJsonObject, nickname, domainFull):
announceIcon = 'repeat.png'
if not isPublicRepeat:
announceLink = 'unrepeatprivate'
announceTitle = translate['Undo the repeat']
announceStr = \
' \n'
announceStr += \
' ' + \
'
\n'
return announceStr
def getLikeIconHtml(nickname: str, domainFull: str,
isModerationPost: bool,
showLikeButton: bool,
postJsonObject: {},
enableTimingLog: bool,
postStartTime,
translate: {}, pageNumberParam: str,
timelinePostBookmark: str,
boxName: str, iconsPath: str) -> str:
"""Returns html for like icon/button
"""
likeStr = ''
if not isModerationPost and showLikeButton:
likeIcon = 'like_inactive.png'
likeLink = 'like'
likeTitle = translate['Like this post']
likeCount = noOfLikes(postJsonObject)
# benchmark 12.1
if enableTimingLog:
timeDiff = int((time.time() - postStartTime) * 1000)
if timeDiff > 100:
print('TIMING INDIV ' + boxName + ' 12.1 = ' + str(timeDiff))
likeCountStr = ''
if likeCount > 0:
if likeCount <= 10:
likeCountStr = ' (' + str(likeCount) + ')'
else:
likeCountStr = ' (10+)'
if likedByPerson(postJsonObject, nickname, domainFull):
if likeCount == 1:
# liked by the reader only
likeCountStr = ''
likeIcon = 'like.png'
likeLink = 'unlike'
likeTitle = translate['Undo the like']
# benchmark 12.2
if enableTimingLog:
timeDiff = int((time.time() - postStartTime) * 1000)
if timeDiff > 100:
print('TIMING INDIV ' + boxName + ' 12.2 = ' + str(timeDiff))
likeStr = ''
if likeCountStr:
# show the number of likes next to icon
likeStr += '\n'
likeStr += \
' \n'
likeStr += \
' ' + \
'
\n'
return likeStr
def getBookmarkIconHtml(nickname: str, domainFull: str,
postJsonObject: {},
isModerationPost: bool,
translate: {},
enableTimingLog: bool,
postStartTime, boxName: str,
pageNumberParam: str,
timelinePostBookmark: str,
iconsPath: str) -> str:
"""Returns html for bookmark icon/button
"""
bookmarkStr = ''
if not isModerationPost:
bookmarkIcon = 'bookmark_inactive.png'
bookmarkLink = 'bookmark'
bookmarkTitle = translate['Bookmark this post']
if bookmarkedByPerson(postJsonObject, nickname, domainFull):
bookmarkIcon = 'bookmark.png'
bookmarkLink = 'unbookmark'
bookmarkTitle = translate['Undo the bookmark']
# benchmark 12.6
if enableTimingLog:
timeDiff = int((time.time() - postStartTime) * 1000)
if timeDiff > 100:
print('TIMING INDIV ' + boxName + ' 12.6 = ' + str(timeDiff))
bookmarkStr = \
' \n'
bookmarkStr += \
' ' + \
'
\n'
return bookmarkStr
def getMuteIconHtml(isMuted: bool,
postActor: str,
messageId: str,
nickname: str, domainFull: str,
allowDeletion: bool,
pageNumberParam: str,
iconsPath: str,
boxName: str,
timelinePostBookmark: str,
translate: {}) -> str:
"""Returns html for mute icon/button
"""
muteStr = ''
if (allowDeletion or
('/' + domainFull + '/' in postActor and
messageId.startswith(postActor))):
return muteStr
if not isMuted:
muteStr = \
' \n'
muteStr += \
' ' + \
'
\n'
else:
muteStr = \
' \n'
muteStr += \
' ' + \
'
\n'
return muteStr
def getDeleteIconHtml(nickname: str, domainFull: str,
allowDeletion: bool,
postActor: str,
messageId: str,
postJsonObject: {},
pageNumberParam: str,
iconsPath: str,
translate: {}) -> str:
"""Returns html for delete icon/button
"""
deleteStr = ''
if (allowDeletion or
('/' + domainFull + '/' in postActor and
messageId.startswith(postActor))):
if '/users/' + nickname + '/' in messageId:
if not isNewsPost(postJsonObject):
deleteStr = \
' \n'
deleteStr += \
' ' + \
'
\n'
return deleteStr
def getPublishedDateStr(postJsonObject: {},
showPublishedDateOnly: bool) -> str:
"""Return the html for the published date on a post
"""
publishedStr = ''
if postJsonObject['object'].get('published'):
publishedStr = postJsonObject['object']['published']
if '.' not in publishedStr:
if '+' not in publishedStr:
datetimeObject = \
datetime.strptime(publishedStr, "%Y-%m-%dT%H:%M:%SZ")
else:
datetimeObject = \
datetime.strptime(publishedStr.split('+')[0] + 'Z',
"%Y-%m-%dT%H:%M:%SZ")
else:
publishedStr = \
publishedStr.replace('T', ' ').split('.')[0]
datetimeObject = parse(publishedStr)
if not showPublishedDateOnly:
publishedStr = datetimeObject.strftime("%a %b %d, %H:%M")
else:
publishedStr = datetimeObject.strftime("%a %b %d")
# if the post has replies then append a symbol to indicate this
if postJsonObject.get('hasReplies'):
if postJsonObject['hasReplies'] is True:
publishedStr = '[' + publishedStr + ']'
return publishedStr
def getBlogCitationsHtml(boxName: str,
postJsonObject: {},
translate: {}) -> str:
"""Returns blog citations as html
"""
# show blog citations
citationsStr = ''
if boxName == 'tlblogs' or boxName == 'tlfeatures':
if postJsonObject['object'].get('tag'):
for tagJson in postJsonObject['object']['tag']:
if not isinstance(tagJson, dict):
continue
if not tagJson.get('type'):
continue
if tagJson['type'] != 'Article':
continue
if not tagJson.get('name'):
continue
if not tagJson.get('url'):
continue
citationsStr += \
'
' + translate['Citations'] + \ ':
' + \ '' + contentStr + \
'
' + translate['Liked by'] + \ ' @' + \ likedByHandle + '\n' domainFull = domain if port: if port != 80 and port != 443: domainFull = domain + ':' + str(port) actor = '/users/' + nickname followStr = '
\n' postStr += followStr + '\n' postStr += \ individualPostAsHtml(True, recentPostsCache, maxRecentPosts, iconsPath, translate, None, baseDir, session, wfRequest, personCache, nickname, domain, port, postJsonObject, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, False, authorized, False, False, False) messageId = removeIdEnding(postJsonObject['id']) # show the previous posts if isinstance(postJsonObject['object'], dict): while postJsonObject['object'].get('inReplyTo'): postFilename = \ locatePost(baseDir, nickname, domain, postJsonObject['object']['inReplyTo']) if not postFilename: break postJsonObject = loadJson(postFilename) if postJsonObject: postStr = \ individualPostAsHtml(True, recentPostsCache, maxRecentPosts, iconsPath, translate, None, baseDir, session, wfRequest, personCache, nickname, domain, port, postJsonObject, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, False, authorized, False, False, False) + postStr # show the following posts postFilename = locatePost(baseDir, nickname, domain, messageId) if postFilename: # is there a replies file for this post? repliesFilename = postFilename.replace('.json', '.replies') if os.path.isfile(repliesFilename): # get items from the replies file repliesJson = { 'orderedItems': [] } populateRepliesJson(baseDir, nickname, domain, repliesFilename, authorized, repliesJson) # add items to the html output for item in repliesJson['orderedItems']: postStr += \ individualPostAsHtml(True, recentPostsCache, maxRecentPosts, iconsPath, translate, None, baseDir, session, wfRequest, personCache, nickname, domain, port, item, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, False, authorized, False, False, False) cssFilename = baseDir + '/epicyon-profile.css' if os.path.isfile(baseDir + '/epicyon.css'): cssFilename = baseDir + '/epicyon.css' return htmlHeaderWithExternalStyle(cssFilename) + postStr + htmlFooter() def htmlPostReplies(cssCache: {}, recentPostsCache: {}, maxRecentPosts: int, translate: {}, baseDir: str, session, wfRequest: {}, personCache: {}, nickname: str, domain: str, port: int, repliesJson: {}, httpPrefix: str, projectVersion: str, YTReplacementDomain: str, showPublishedDateOnly: bool) -> str: """Show the replies to an individual post as html """ iconsPath = getIconsWebPath(baseDir) repliesStr = '' if repliesJson.get('orderedItems'): for item in repliesJson['orderedItems']: repliesStr += \ individualPostAsHtml(True, recentPostsCache, maxRecentPosts, iconsPath, translate, None, baseDir, session, wfRequest, personCache, nickname, domain, port, item, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, False, False, False, False, False) cssFilename = baseDir + '/epicyon-profile.css' if os.path.isfile(baseDir + '/epicyon.css'): cssFilename = baseDir + '/epicyon.css' return htmlHeaderWithExternalStyle(cssFilename) + repliesStr + htmlFooter()