__filename__ = "webapp_post.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Web Interface"
import os
import time
from dateutil.parser import parse
from auth import createPassword
from git import isGitPatch
from datetime import datetime
from cache import getPersonFromCache
from bookmarks import bookmarkedByPerson
from like import likedByPerson
from like import noOfLikes
from follow import isFollowingActor
from posts import postIsMuted
from posts import getPersonBox
from posts import downloadAnnounce
from posts import populateRepliesJson
from utils import getActorLanguagesList
from utils import getBaseContentFromPost
from utils import getContentFromPost
from utils import hasObjectDict
from utils import updateAnnounceCollection
from utils import isPGPEncrypted
from utils import isDM
from utils import rejectPostId
from utils import isRecentPost
from utils import getConfigParam
from utils import getFullDomain
from utils import isEditor
from utils import locatePost
from utils import loadJson
from utils import getCachedPostDirectory
from utils import getCachedPostFilename
from utils import getProtocolPrefixes
from utils import isNewsPost
from utils import isBlogPost
from utils import getDisplayName
from utils import isPublicPost
from utils import updateRecentPostsCache
from utils import removeIdEnding
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import acctDir
from utils import localActorUrl
from content import limitRepeatedWords
from content import replaceEmojiFromTags
from content import htmlReplaceQuoteMarks
from content import htmlReplaceEmailQuote
from content import removeTextFormatting
from content import removeLongWords
from content import getMentionsFromHtml
from content import switchWords
from person import isPersonSnoozed
from person import getPersonAvatarUrl
from announce import announcedByPerson
from webapp_utils import getAvatarImageUrl
from webapp_utils import updateAvatarImageCache
from webapp_utils import loadIndividualPostAsHtmlFromCache
from webapp_utils import addEmojiToDisplayName
from webapp_utils import postContainsPublic
from webapp_utils import getContentWarningButton
from webapp_utils import getPostAttachmentsAsHtml
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from webapp_utils import getBrokenLinkSubstitute
from webapp_media import addEmbeddedElements
from webapp_question import insertQuestion
from devices import E2EEdecryptMessageFromDevice
from webfinger import webfingerHandle
from speaker import updateSpeaker
from languages import autoTranslatePost
from blocking import isBlocked
def _logPostTiming(enableTimingLog: bool, postStartTime, debugId: str) -> None:
"""Create a log of timings for performance tuning
"""
if not enableTimingLog:
return
timeDiff = int((time.time() - postStartTime) * 1000)
if timeDiff > 100:
print('TIMING INDIV ' + debugId + ' = ' + str(timeDiff))
def prepareHtmlPostNickname(nickname: str, postHtml: str) -> str:
"""html posts stored in memory are for all accounts on the instance
and they're indexed by id. However, some incoming posts may be
destined for multiple accounts (followers). This creates a problem
where the icon links whose urls begin with href="/users/nickname?
need to be changed for different nicknames to display correctly
within their timelines.
This function changes the nicknames for the icon links.
"""
# replace the nickname
usersStr = ' href="/users/'
if usersStr not in postHtml:
return postHtml
userFound = True
postStr = postHtml
newPostStr = ''
while userFound:
if usersStr not in postStr:
newPostStr += postStr
break
# the next part, after href="/users/nickname?
nextStr = postStr.split(usersStr, 1)[1]
if '?' in nextStr:
nextStr = nextStr.split('?', 1)[1]
else:
newPostStr += postStr
break
# append the previous text to the result
newPostStr += postStr.split(usersStr)[0]
newPostStr += usersStr + nickname + '?'
# post is now the next part
postStr = nextStr
return newPostStr
def preparePostFromHtmlCache(nickname: str, postHtml: str, boxName: str,
pageNumber: int) -> str:
"""Sets the page number on a cached html post
"""
# if on the bookmarks timeline then remain there
if boxName == 'tlbookmarks' or boxName == 'bookmarks':
postHtml = postHtml.replace('?tl=inbox', '?tl=tlbookmarks')
if '?page=' in postHtml:
pageNumberStr = postHtml.split('?page=')[1]
if '?' in pageNumberStr:
pageNumberStr = pageNumberStr.split('?')[0]
postHtml = postHtml.replace('?page=' + pageNumberStr, '?page=-999')
withPageNumber = postHtml.replace(';-999;', ';' + str(pageNumber) + ';')
withPageNumber = withPageNumber.replace('?page=-999',
'?page=' + str(pageNumber))
return prepareHtmlPostNickname(nickname, withPageNumber)
def _saveIndividualPostAsHtmlToCache(baseDir: str,
nickname: str, domain: str,
postJsonObject: {},
postHtml: str) -> bool:
"""Saves the given html for a post to a cache file
This is so that it can be quickly reloaded on subsequent
refresh of the timeline
"""
htmlPostCacheDir = \
getCachedPostDirectory(baseDir, nickname, domain)
cachedPostFilename = \
getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
# create the cache directory if needed
if not os.path.isdir(htmlPostCacheDir):
os.mkdir(htmlPostCacheDir)
try:
with open(cachedPostFilename, 'w+') as fp:
fp.write(postHtml)
return True
except Exception as e:
print('ERROR: saving post to cache, ' + str(e))
return False
def _getPostFromRecentCache(session,
baseDir: str,
httpPrefix: str,
nickname: str, domain: str,
postJsonObject: {},
postActor: str,
personCache: {},
allowDownloads: bool,
showPublicOnly: bool,
storeToCache: bool,
boxName: str,
avatarUrl: str,
enableTimingLog: bool,
postStartTime,
pageNumber: int,
recentPostsCache: {},
maxRecentPosts: int,
signingPrivateKeyPem: str) -> str:
"""Attempts to get the html post from the recent posts cache in memory
"""
if boxName == 'tlmedia':
return None
if showPublicOnly:
return None
tryCache = False
bmTimeline = boxName == 'bookmarks' or boxName == 'tlbookmarks'
if storeToCache or bmTimeline:
tryCache = True
if not tryCache:
return None
# update avatar if needed
if not avatarUrl:
avatarUrl = \
getPersonAvatarUrl(baseDir, postActor, personCache,
allowDownloads)
_logPostTiming(enableTimingLog, postStartTime, '2.1')
updateAvatarImageCache(signingPrivateKeyPem,
session, baseDir, httpPrefix,
postActor, avatarUrl, personCache,
allowDownloads)
_logPostTiming(enableTimingLog, postStartTime, '2.2')
postHtml = \
loadIndividualPostAsHtmlFromCache(baseDir, nickname, domain,
postJsonObject)
if not postHtml:
return None
postHtml = \
preparePostFromHtmlCache(nickname, postHtml, boxName, pageNumber)
updateRecentPostsCache(recentPostsCache, maxRecentPosts,
postJsonObject, postHtml)
_logPostTiming(enableTimingLog, postStartTime, '3')
return postHtml
def _getAvatarImageHtml(showAvatarOptions: bool,
nickname: str, domainFull: str,
avatarUrl: str, postActor: str,
translate: {}, avatarPosition: str,
pageNumber: int, messageIdStr: str) -> str:
"""Get html for the avatar image
"""
avatarLink = ''
if '/users/news/' not in avatarUrl:
avatarLink = ' '
showProfileStr = 'Show profile'
if translate.get(showProfileStr):
showProfileStr = translate[showProfileStr]
avatarLink += \
'\n'
if showAvatarOptions and \
domainFull + '/users/' + nickname not in postActor:
showOptionsForThisPersonStr = 'Show options for this person'
if translate.get(showOptionsForThisPersonStr):
showOptionsForThisPersonStr = \
translate[showOptionsForThisPersonStr]
if '/users/news/' not in avatarUrl:
avatarLink = \
' \n'
avatarLink += \
' \n'
else:
# don't link to the person options for the news account
avatarLink += \
' \n'
return avatarLink.strip()
def _getReplyIconHtml(baseDir: str, nickname: str, domain: str,
isPublicRepeat: bool,
showIcons: bool, commentsEnabled: bool,
postJsonObject: {}, pageNumberParam: str,
translate: {}, systemLanguage: str,
conversationId: str) -> str:
"""Returns html for the reply icon/button
"""
replyStr = ''
if not (showIcons and commentsEnabled):
return replyStr
# reply is permitted - create reply icon
replyToLink = removeIdEnding(postJsonObject['object']['id'])
# see Mike MacGirvin's replyTo suggestion
if postJsonObject['object'].get('replyTo'):
# check that the alternative replyTo url is not blocked
blockNickname = \
getNicknameFromActor(postJsonObject['object']['replyTo'])
blockDomain, _ = \
getDomainFromActor(postJsonObject['object']['replyTo'])
if not isBlocked(baseDir, nickname, domain,
blockNickname, blockDomain, {}):
replyToLink = postJsonObject['object']['replyTo']
if postJsonObject['object'].get('attributedTo'):
if isinstance(postJsonObject['object']['attributedTo'], str):
replyToLink += \
'?mention=' + postJsonObject['object']['attributedTo']
content = getBaseContentFromPost(postJsonObject, systemLanguage)
if content:
mentionedActors = getMentionsFromHtml(content)
if mentionedActors:
for actorUrl in mentionedActors:
if '?mention=' + actorUrl not in replyToLink:
replyToLink += '?mention=' + actorUrl
if len(replyToLink) > 500:
break
replyToLink += pageNumberParam
replyStr = ''
replyToThisPostStr = 'Reply to this post'
if translate.get(replyToThisPostStr):
replyToThisPostStr = translate[replyToThisPostStr]
conversationStr = ''
if conversationId:
conversationStr = '?conversationId=' + conversationId
if isPublicRepeat:
replyStr += \
' \n'
else:
if isDM(postJsonObject):
replyStr += \
' ' + \
'\n'
else:
replyStr += \
' ' + \
'\n'
replyStr += \
' ' + \
'\n'
return replyStr
def _getEditIconHtml(baseDir: str, nickname: str, domainFull: str,
postJsonObject: {}, actorNickname: str,
translate: {}, isEvent: bool) -> str:
"""Returns html for the edit icon/button
"""
editStr = ''
actor = postJsonObject['actor']
# This should either be a post which you created,
# or it could be generated from the newswire (see
# _addBlogsToNewswire) in which case anyone with
# editor status should be able to alter it
if (actor.endswith('/' + domainFull + '/users/' + nickname) or
(isEditor(baseDir, nickname) and
actor.endswith('/' + domainFull + '/users/news'))):
postId = removeIdEnding(postJsonObject['object']['id'])
if '/statuses/' not in postId:
return editStr
if isBlogPost(postJsonObject):
editBlogPostStr = 'Edit blog post'
if translate.get(editBlogPostStr):
editBlogPostStr = translate[editBlogPostStr]
if not isNewsPost(postJsonObject):
editStr += \
' ' + \
'' + \
'\n'
else:
editStr += \
' ' + \
'' + \
'\n'
elif isEvent:
editEventStr = 'Edit event'
if translate.get(editEventStr):
editEventStr = translate[editEventStr]
editStr += \
' ' + \
'' + \
'\n'
return editStr
def _getAnnounceIconHtml(isAnnounced: bool,
postActor: str,
nickname: str, domainFull: str,
announceJsonObject: {},
postJsonObject: {},
isPublicRepeat: bool,
isModerationPost: bool,
showRepeatIcon: bool,
translate: {},
pageNumberParam: str,
timelinePostBookmark: str,
boxName: str) -> str:
"""Returns html for announce icon/button
"""
announceStr = ''
if not showRepeatIcon:
return announceStr
if isModerationPost:
return announceStr
# don't allow announce/repeat of your own posts
announceIcon = 'repeat_inactive.png'
announceLink = 'repeat'
announceEmoji = ''
if not isPublicRepeat:
announceLink = 'repeatprivate'
repeatThisPostStr = 'Repeat this post'
if translate.get(repeatThisPostStr):
repeatThisPostStr = translate[repeatThisPostStr]
announceTitle = repeatThisPostStr
unannounceLinkStr = ''
if announcedByPerson(isAnnounced,
postActor, nickname, domainFull):
announceIcon = 'repeat.png'
announceEmoji = '🔁 '
announceLink = 'unrepeat'
if not isPublicRepeat:
announceLink = 'unrepeatprivate'
undoTheRepeatStr = 'Undo the repeat'
if translate.get(undoTheRepeatStr):
undoTheRepeatStr = translate[undoTheRepeatStr]
announceTitle = undoTheRepeatStr
if announceJsonObject:
unannounceLinkStr = '?unannounce=' + \
removeIdEnding(announceJsonObject['id'])
announcePostId = removeIdEnding(postJsonObject['object']['id'])
announceLinkStr = '?' + \
announceLink + '=' + announcePostId + pageNumberParam
announceStr = \
' \n'
announceStr += \
' ' + \
'\n'
return announceStr
def _getLikeIconHtml(nickname: str, domainFull: str,
isModerationPost: bool,
showLikeButton: bool,
postJsonObject: {},
enableTimingLog: bool,
postStartTime,
translate: {}, pageNumberParam: str,
timelinePostBookmark: str,
boxName: str,
maxLikeCount: int) -> str:
"""Returns html for like icon/button
"""
if not showLikeButton or isModerationPost:
return ''
likeStr = ''
likeIcon = 'like_inactive.png'
likeLink = 'like'
likeTitle = 'Like this post'
if translate.get(likeTitle):
likeTitle = translate[likeTitle]
likeEmoji = ''
likeCount = noOfLikes(postJsonObject)
_logPostTiming(enableTimingLog, postStartTime, '12.1')
likeCountStr = ''
if likeCount > 0:
if likeCount <= maxLikeCount:
likeCountStr = ' (' + str(likeCount) + ')'
else:
likeCountStr = ' (' + str(maxLikeCount) + '+)'
if likedByPerson(postJsonObject, nickname, domainFull):
if likeCount == 1:
# liked by the reader only
likeCountStr = ''
likeIcon = 'like.png'
likeLink = 'unlike'
likeTitle = 'Undo the like'
if translate.get(likeTitle):
likeTitle = translate[likeTitle]
likeEmoji = '👍 '
_logPostTiming(enableTimingLog, postStartTime, '12.2')
likeStr = ''
if likeCountStr:
# show the number of likes next to icon
likeStr += '\n'
likePostId = removeIdEnding(postJsonObject['id'])
likeStr += \
' \n'
likeStr += \
' ' + \
'\n'
return likeStr
def _getBookmarkIconHtml(nickname: str, domainFull: str,
postJsonObject: {},
isModerationPost: bool,
translate: {},
enableTimingLog: bool,
postStartTime, boxName: str,
pageNumberParam: str,
timelinePostBookmark: str) -> str:
"""Returns html for bookmark icon/button
"""
bookmarkStr = ''
if isModerationPost:
return bookmarkStr
bookmarkIcon = 'bookmark_inactive.png'
bookmarkLink = 'bookmark'
bookmarkEmoji = ''
bookmarkTitle = 'Bookmark this post'
if translate.get(bookmarkTitle):
bookmarkTitle = translate[bookmarkTitle]
if bookmarkedByPerson(postJsonObject, nickname, domainFull):
bookmarkIcon = 'bookmark.png'
bookmarkLink = 'unbookmark'
bookmarkEmoji = '🔖 '
bookmarkTitle = 'Undo the bookmark'
if translate.get(bookmarkTitle):
bookmarkTitle = translate[bookmarkTitle]
_logPostTiming(enableTimingLog, postStartTime, '12.6')
bookmarkPostId = removeIdEnding(postJsonObject['object']['id'])
bookmarkStr = \
' \n'
bookmarkStr += \
' ' + \
'\n'
return bookmarkStr
def _getMuteIconHtml(isMuted: bool,
postActor: str,
messageId: str,
nickname: str, domainFull: str,
allowDeletion: bool,
pageNumberParam: str,
boxName: str,
timelinePostBookmark: str,
translate: {}) -> str:
"""Returns html for mute icon/button
"""
muteStr = ''
if (allowDeletion or
('/' + domainFull + '/' in postActor and
messageId.startswith(postActor))):
return muteStr
if not isMuted:
muteThisPostStr = 'Mute this post'
if translate.get('Mute this post'):
muteThisPostStr = translate[muteThisPostStr]
muteStr = \
' \n'
muteStr += \
' ' + \
'\n'
else:
undoMuteStr = 'Undo mute'
if translate.get(undoMuteStr):
undoMuteStr = translate[undoMuteStr]
muteStr = \
' \n'
muteStr += \
' ' + \
'\n'
return muteStr
def _getDeleteIconHtml(nickname: str, domainFull: str,
allowDeletion: bool,
postActor: str,
messageId: str,
postJsonObject: {},
pageNumberParam: str,
translate: {}) -> str:
"""Returns html for delete icon/button
"""
deleteStr = ''
if (allowDeletion or
('/' + domainFull + '/' in postActor and
messageId.startswith(postActor))):
if '/users/' + nickname + '/' in messageId:
if not isNewsPost(postJsonObject):
deleteThisPostStr = 'Delete this post'
if translate.get(deleteThisPostStr):
deleteThisPostStr = translate[deleteThisPostStr]
deleteStr = \
' \n'
deleteStr += \
' ' + \
'\n'
return deleteStr
def _getPublishedDateStr(postJsonObject: {},
showPublishedDateOnly: bool) -> str:
"""Return the html for the published date on a post
"""
publishedStr = ''
if not postJsonObject['object'].get('published'):
return publishedStr
publishedStr = postJsonObject['object']['published']
if '.' not in publishedStr:
if '+' not in publishedStr:
datetimeObject = \
datetime.strptime(publishedStr, "%Y-%m-%dT%H:%M:%SZ")
else:
datetimeObject = \
datetime.strptime(publishedStr.split('+')[0] + 'Z',
"%Y-%m-%dT%H:%M:%SZ")
else:
publishedStr = \
publishedStr.replace('T', ' ').split('.')[0]
datetimeObject = parse(publishedStr)
if not showPublishedDateOnly:
publishedStr = datetimeObject.strftime("%a %b %d, %H:%M")
else:
publishedStr = datetimeObject.strftime("%a %b %d")
# if the post has replies then append a symbol to indicate this
if postJsonObject.get('hasReplies'):
if postJsonObject['hasReplies'] is True:
publishedStr = '[' + publishedStr + ']'
return publishedStr
def _getBlogCitationsHtml(boxName: str,
postJsonObject: {},
translate: {}) -> str:
"""Returns blog citations as html
"""
# show blog citations
citationsStr = ''
if not (boxName == 'tlblogs' or boxName == 'tlfeatures'):
return citationsStr
if not postJsonObject['object'].get('tag'):
return citationsStr
for tagJson in postJsonObject['object']['tag']:
if not isinstance(tagJson, dict):
continue
if not tagJson.get('type'):
continue
if tagJson['type'] != 'Article':
continue
if not tagJson.get('name'):
continue
if not tagJson.get('url'):
continue
citationsStr += \
'