__filename__ = "webapp_post.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.2.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "Web Interface" import os import time from dateutil.parser import parse from auth import createPassword from git import isGitPatch from datetime import datetime from cache import getPersonFromCache from bookmarks import bookmarkedByPerson from like import likedByPerson from like import noOfLikes from follow import isFollowingActor from posts import postIsMuted from posts import getPersonBox from posts import downloadAnnounce from posts import populateRepliesJson from utils import getActorLanguagesList from utils import getBaseContentFromPost from utils import getContentFromPost from utils import hasObjectDict from utils import updateAnnounceCollection from utils import isPGPEncrypted from utils import isDM from utils import rejectPostId from utils import isRecentPost from utils import getConfigParam from utils import getFullDomain from utils import isEditor from utils import locatePost from utils import loadJson from utils import getCachedPostDirectory from utils import getCachedPostFilename from utils import getProtocolPrefixes from utils import isNewsPost from utils import isBlogPost from utils import getDisplayName from utils import isPublicPost from utils import updateRecentPostsCache from utils import removeIdEnding from utils import getNicknameFromActor from utils import getDomainFromActor from utils import acctDir from utils import localActorUrl from content import limitRepeatedWords from content import replaceEmojiFromTags from content import htmlReplaceQuoteMarks from content import htmlReplaceEmailQuote from content import removeTextFormatting from content import removeLongWords from content import getMentionsFromHtml from content import switchWords from person import isPersonSnoozed from person import getPersonAvatarUrl from announce import announcedByPerson from webapp_utils import getAvatarImageUrl from webapp_utils import updateAvatarImageCache from webapp_utils import loadIndividualPostAsHtmlFromCache from webapp_utils import addEmojiToDisplayName from webapp_utils import postContainsPublic from webapp_utils import getContentWarningButton from webapp_utils import getPostAttachmentsAsHtml from webapp_utils import htmlHeaderWithExternalStyle from webapp_utils import htmlFooter from webapp_utils import getBrokenLinkSubstitute from webapp_media import addEmbeddedElements from webapp_question import insertQuestion from devices import E2EEdecryptMessageFromDevice from webfinger import webfingerHandle from speaker import updateSpeaker from languages import autoTranslatePost def _logPostTiming(enableTimingLog: bool, postStartTime, debugId: str) -> None: """Create a log of timings for performance tuning """ if not enableTimingLog: return timeDiff = int((time.time() - postStartTime) * 1000) if timeDiff > 100: print('TIMING INDIV ' + debugId + ' = ' + str(timeDiff)) def prepareHtmlPostNickname(nickname: str, postHtml: str) -> str: """html posts stored in memory are for all accounts on the instance and they're indexed by id. However, some incoming posts may be destined for multiple accounts (followers). This creates a problem where the icon links whose urls begin with href="/users/nickname? need to be changed for different nicknames to display correctly within their timelines. This function changes the nicknames for the icon links. """ # replace the nickname usersStr = ' href="/users/' if usersStr not in postHtml: return postHtml userFound = True postStr = postHtml newPostStr = '' while userFound: if usersStr not in postStr: newPostStr += postStr break # the next part, after href="/users/nickname? nextStr = postStr.split(usersStr, 1)[1] if '?' in nextStr: nextStr = nextStr.split('?', 1)[1] else: newPostStr += postStr break # append the previous text to the result newPostStr += postStr.split(usersStr)[0] newPostStr += usersStr + nickname + '?' # post is now the next part postStr = nextStr return newPostStr def preparePostFromHtmlCache(nickname: str, postHtml: str, boxName: str, pageNumber: int) -> str: """Sets the page number on a cached html post """ # if on the bookmarks timeline then remain there if boxName == 'tlbookmarks' or boxName == 'bookmarks': postHtml = postHtml.replace('?tl=inbox', '?tl=tlbookmarks') if '?page=' in postHtml: pageNumberStr = postHtml.split('?page=')[1] if '?' in pageNumberStr: pageNumberStr = pageNumberStr.split('?')[0] postHtml = postHtml.replace('?page=' + pageNumberStr, '?page=-999') withPageNumber = postHtml.replace(';-999;', ';' + str(pageNumber) + ';') withPageNumber = withPageNumber.replace('?page=-999', '?page=' + str(pageNumber)) return prepareHtmlPostNickname(nickname, withPageNumber) def _saveIndividualPostAsHtmlToCache(baseDir: str, nickname: str, domain: str, postJsonObject: {}, postHtml: str) -> bool: """Saves the given html for a post to a cache file This is so that it can be quickly reloaded on subsequent refresh of the timeline """ htmlPostCacheDir = \ getCachedPostDirectory(baseDir, nickname, domain) cachedPostFilename = \ getCachedPostFilename(baseDir, nickname, domain, postJsonObject) # create the cache directory if needed if not os.path.isdir(htmlPostCacheDir): os.mkdir(htmlPostCacheDir) try: with open(cachedPostFilename, 'w+') as fp: fp.write(postHtml) return True except Exception as e: print('ERROR: saving post to cache, ' + str(e)) return False def _getPostFromRecentCache(session, baseDir: str, httpPrefix: str, nickname: str, domain: str, postJsonObject: {}, postActor: str, personCache: {}, allowDownloads: bool, showPublicOnly: bool, storeToCache: bool, boxName: str, avatarUrl: str, enableTimingLog: bool, postStartTime, pageNumber: int, recentPostsCache: {}, maxRecentPosts: int, signingPrivateKeyPem: str) -> str: """Attempts to get the html post from the recent posts cache in memory """ if boxName == 'tlmedia': return None if showPublicOnly: return None tryCache = False bmTimeline = boxName == 'bookmarks' or boxName == 'tlbookmarks' if storeToCache or bmTimeline: tryCache = True if not tryCache: return None # update avatar if needed if not avatarUrl: avatarUrl = \ getPersonAvatarUrl(baseDir, postActor, personCache, allowDownloads) _logPostTiming(enableTimingLog, postStartTime, '2.1') updateAvatarImageCache(signingPrivateKeyPem, session, baseDir, httpPrefix, postActor, avatarUrl, personCache, allowDownloads) _logPostTiming(enableTimingLog, postStartTime, '2.2') postHtml = \ loadIndividualPostAsHtmlFromCache(baseDir, nickname, domain, postJsonObject) if not postHtml: return None postHtml = \ preparePostFromHtmlCache(nickname, postHtml, boxName, pageNumber) updateRecentPostsCache(recentPostsCache, maxRecentPosts, postJsonObject, postHtml) _logPostTiming(enableTimingLog, postStartTime, '3') return postHtml def _getAvatarImageHtml(showAvatarOptions: bool, nickname: str, domainFull: str, avatarUrl: str, postActor: str, translate: {}, avatarPosition: str, pageNumber: int, messageIdStr: str) -> str: """Get html for the avatar image """ avatarLink = '' if '/users/news/' not in avatarUrl: avatarLink = ' ' showProfileStr = 'Show profile' if translate.get(showProfileStr): showProfileStr = translate[showProfileStr] avatarLink += \ ' \n' if showAvatarOptions and \ domainFull + '/users/' + nickname not in postActor: showOptionsForThisPersonStr = 'Show options for this person' if translate.get(showOptionsForThisPersonStr): showOptionsForThisPersonStr = \ translate[showOptionsForThisPersonStr] if '/users/news/' not in avatarUrl: avatarLink = \ ' \n' avatarLink += \ ' \n' else: # don't link to the person options for the news account avatarLink += \ ' \n' return avatarLink.strip() def _getReplyIconHtml(nickname: str, isPublicRepeat: bool, showIcons: bool, commentsEnabled: bool, postJsonObject: {}, pageNumberParam: str, translate: {}, systemLanguage: str, conversationId: str) -> str: """Returns html for the reply icon/button """ replyStr = '' if not (showIcons and commentsEnabled): return replyStr # reply is permitted - create reply icon replyToLink = postJsonObject['object']['id'] if postJsonObject['object'].get('attributedTo'): if isinstance(postJsonObject['object']['attributedTo'], str): replyToLink += \ '?mention=' + postJsonObject['object']['attributedTo'] content = getBaseContentFromPost(postJsonObject, systemLanguage) if content: mentionedActors = getMentionsFromHtml(content) if mentionedActors: for actorUrl in mentionedActors: if '?mention=' + actorUrl not in replyToLink: replyToLink += '?mention=' + actorUrl if len(replyToLink) > 500: break replyToLink += pageNumberParam replyStr = '' replyToThisPostStr = 'Reply to this post' if translate.get(replyToThisPostStr): replyToThisPostStr = translate[replyToThisPostStr] conversationStr = '' if conversationId: conversationStr = '?conversationId=' + conversationId if isPublicRepeat: replyStr += \ ' \n' else: if isDM(postJsonObject): replyStr += \ ' ' + \ '\n' else: replyStr += \ ' ' + \ '\n' replyStr += \ ' ' + \ '' + replyToThisPostStr + \
        ' |\n' return replyStr def _getEditIconHtml(baseDir: str, nickname: str, domainFull: str, postJsonObject: {}, actorNickname: str, translate: {}, isEvent: bool) -> str: """Returns html for the edit icon/button """ editStr = '' actor = postJsonObject['actor'] # This should either be a post which you created, # or it could be generated from the newswire (see # _addBlogsToNewswire) in which case anyone with # editor status should be able to alter it if (actor.endswith('/' + domainFull + '/users/' + nickname) or (isEditor(baseDir, nickname) and actor.endswith('/' + domainFull + '/users/news'))): postId = postJsonObject['object']['id'] if '/statuses/' not in postId: return editStr if isBlogPost(postJsonObject): editBlogPostStr = 'Edit blog post' if translate.get(editBlogPostStr): editBlogPostStr = translate[editBlogPostStr] if not isNewsPost(postJsonObject): editStr += \ ' ' + \ '' + \ '' + editBlogPostStr + \
                    ' |\n' else: editStr += \ ' ' + \ '' + \ '' + editBlogPostStr + \
                    ' |\n' elif isEvent: editEventStr = 'Edit event' if translate.get(editEventStr): editEventStr = translate[editEventStr] editStr += \ ' ' + \ '' + \ '' + editEventStr + \
                ' |\n' return editStr def _getAnnounceIconHtml(isAnnounced: bool, postActor: str, nickname: str, domainFull: str, announceJsonObject: {}, postJsonObject: {}, isPublicRepeat: bool, isModerationPost: bool, showRepeatIcon: bool, translate: {}, pageNumberParam: str, timelinePostBookmark: str, boxName: str) -> str: """Returns html for announce icon/button """ announceStr = '' if not showRepeatIcon: return announceStr if isModerationPost: return announceStr # don't allow announce/repeat of your own posts announceIcon = 'repeat_inactive.png' announceLink = 'repeat' announceEmoji = '' if not isPublicRepeat: announceLink = 'repeatprivate' repeatThisPostStr = 'Repeat this post' if translate.get(repeatThisPostStr): repeatThisPostStr = translate[repeatThisPostStr] announceTitle = repeatThisPostStr unannounceLinkStr = '' if announcedByPerson(isAnnounced, postActor, nickname, domainFull): announceIcon = 'repeat.png' announceEmoji = '🔁 ' announceLink = 'unrepeat' if not isPublicRepeat: announceLink = 'unrepeatprivate' undoTheRepeatStr = 'Undo the repeat' if translate.get(undoTheRepeatStr): undoTheRepeatStr = translate[undoTheRepeatStr] announceTitle = undoTheRepeatStr if announceJsonObject: unannounceLinkStr = '?unannounce=' + \ removeIdEnding(announceJsonObject['id']) announceLinkStr = '?' + \ announceLink + '=' + postJsonObject['object']['id'] + pageNumberParam announceStr = \ ' \n' announceStr += \ ' ' + \ '' + announceEmoji + announceTitle + \
        ' |\n' return announceStr def _getLikeIconHtml(nickname: str, domainFull: str, isModerationPost: bool, showLikeButton: bool, postJsonObject: {}, enableTimingLog: bool, postStartTime, translate: {}, pageNumberParam: str, timelinePostBookmark: str, boxName: str, maxLikeCount: int) -> str: """Returns html for like icon/button """ likeStr = '' if not isModerationPost and showLikeButton: likeIcon = 'like_inactive.png' likeLink = 'like' likeTitle = 'Like this post' if translate.get(likeTitle): likeTitle = translate[likeTitle] likeEmoji = '' likeCount = noOfLikes(postJsonObject) _logPostTiming(enableTimingLog, postStartTime, '12.1') likeCountStr = '' if likeCount > 0: if likeCount <= maxLikeCount: likeCountStr = ' (' + str(likeCount) + ')' else: likeCountStr = ' (' + str(maxLikeCount) + '+)' if likedByPerson(postJsonObject, nickname, domainFull): if likeCount == 1: # liked by the reader only likeCountStr = '' likeIcon = 'like.png' likeLink = 'unlike' likeTitle = 'Undo the like' if translate.get(likeTitle): likeTitle = translate[likeTitle] likeEmoji = '👍 ' _logPostTiming(enableTimingLog, postStartTime, '12.2') likeStr = '' if likeCountStr: # show the number of likes next to icon likeStr += '\n' likeStr += \ ' \n' likeStr += \ ' ' + \ '' + likeEmoji + likeTitle + \
            ' |\n' return likeStr def _getBookmarkIconHtml(nickname: str, domainFull: str, postJsonObject: {}, isModerationPost: bool, translate: {}, enableTimingLog: bool, postStartTime, boxName: str, pageNumberParam: str, timelinePostBookmark: str) -> str: """Returns html for bookmark icon/button """ bookmarkStr = '' if isModerationPost: return bookmarkStr bookmarkIcon = 'bookmark_inactive.png' bookmarkLink = 'bookmark' bookmarkEmoji = '' bookmarkTitle = 'Bookmark this post' if translate.get(bookmarkTitle): bookmarkTitle = translate[bookmarkTitle] if bookmarkedByPerson(postJsonObject, nickname, domainFull): bookmarkIcon = 'bookmark.png' bookmarkLink = 'unbookmark' bookmarkEmoji = '🔖 ' bookmarkTitle = 'Undo the bookmark' if translate.get(bookmarkTitle): bookmarkTitle = translate[bookmarkTitle] _logPostTiming(enableTimingLog, postStartTime, '12.6') bookmarkStr = \ ' \n' bookmarkStr += \ ' ' + \ '' + \
        bookmarkEmoji + bookmarkTitle + ' |\n' return bookmarkStr def _getMuteIconHtml(isMuted: bool, postActor: str, messageId: str, nickname: str, domainFull: str, allowDeletion: bool, pageNumberParam: str, boxName: str, timelinePostBookmark: str, translate: {}) -> str: """Returns html for mute icon/button """ muteStr = '' if (allowDeletion or ('/' + domainFull + '/' in postActor and messageId.startswith(postActor))): return muteStr if not isMuted: muteThisPostStr = 'Mute this post' if translate.get('Mute this post'): muteThisPostStr = translate[muteThisPostStr] muteStr = \ ' \n' muteStr += \ ' ' + \ '' + \
            muteThisPostStr + \
            ' |\n' else: undoMuteStr = 'Undo mute' if translate.get(undoMuteStr): undoMuteStr = translate[undoMuteStr] muteStr = \ ' \n' muteStr += \ ' ' + \ '🔇 ' + undoMuteStr + \
            ' |\n' return muteStr def _getDeleteIconHtml(nickname: str, domainFull: str, allowDeletion: bool, postActor: str, messageId: str, postJsonObject: {}, pageNumberParam: str, translate: {}) -> str: """Returns html for delete icon/button """ deleteStr = '' if (allowDeletion or ('/' + domainFull + '/' in postActor and messageId.startswith(postActor))): if '/users/' + nickname + '/' in messageId: if not isNewsPost(postJsonObject): deleteThisPostStr = 'Delete this post' if translate.get(deleteThisPostStr): deleteThisPostStr = translate[deleteThisPostStr] deleteStr = \ ' \n' deleteStr += \ ' ' + \ '' + \
                    deleteThisPostStr + \
                    ' |\n' return deleteStr def _getPublishedDateStr(postJsonObject: {}, showPublishedDateOnly: bool) -> str: """Return the html for the published date on a post """ publishedStr = '' if not postJsonObject['object'].get('published'): return publishedStr publishedStr = postJsonObject['object']['published'] if '.' not in publishedStr: if '+' not in publishedStr: datetimeObject = \ datetime.strptime(publishedStr, "%Y-%m-%dT%H:%M:%SZ") else: datetimeObject = \ datetime.strptime(publishedStr.split('+')[0] + 'Z', "%Y-%m-%dT%H:%M:%SZ") else: publishedStr = \ publishedStr.replace('T', ' ').split('.')[0] datetimeObject = parse(publishedStr) if not showPublishedDateOnly: publishedStr = datetimeObject.strftime("%a %b %d, %H:%M") else: publishedStr = datetimeObject.strftime("%a %b %d") # if the post has replies then append a symbol to indicate this if postJsonObject.get('hasReplies'): if postJsonObject['hasReplies'] is True: publishedStr = '[' + publishedStr + ']' return publishedStr def _getBlogCitationsHtml(boxName: str, postJsonObject: {}, translate: {}) -> str: """Returns blog citations as html """ # show blog citations citationsStr = '' if not (boxName == 'tlblogs' or boxName == 'tlfeatures'): return citationsStr if not postJsonObject['object'].get('tag'): return citationsStr for tagJson in postJsonObject['object']['tag']: if not isinstance(tagJson, dict): continue if not tagJson.get('type'): continue if tagJson['type'] != 'Article': continue if not tagJson.get('name'): continue if not tagJson.get('url'): continue citationsStr += \ '
  • ' + \ '' + tagJson['name'] + '
  • \n' if citationsStr: translatedCitationsStr = 'Citations' if translate.get(translatedCitationsStr): translatedCitationsStr = translate[translatedCitationsStr] citationsStr = '

    ' + translatedCitationsStr + ':

    ' + \ '\n' return citationsStr def _boostOwnPostHtml(translate: {}) -> str: """The html title for announcing your own post """ announcesStr = 'announces' if translate.get(announcesStr): announcesStr = translate[announcesStr] return ' ' + announcesStr + \
        '\n' def _announceUnattributedHtml(translate: {}, postJsonObject: {}) -> str: """Returns the html for an announce title where there is no attribution on the announced post """ announcesStr = 'announces' if translate.get(announcesStr): announcesStr = translate[announcesStr] return ' ' + \
        announcesStr + '\n' + \ ' @unattributed\n' def _announceWithDisplayNameHtml(translate: {}, postJsonObject: {}, announceDisplayName: str) -> str: """Returns html for an announce having a display name """ announcesStr = 'announces' if translate.get(announcesStr): announcesStr = translate[announcesStr] return ' ' + \
        announcesStr + '\n' + \ ' ' + announceDisplayName + '\n' def _getPostTitleAnnounceHtml(baseDir: str, httpPrefix: str, nickname: str, domain: str, showRepeatIcon: bool, isAnnounced: bool, postJsonObject: {}, postActor: str, translate: {}, enableTimingLog: bool, postStartTime, boxName: str, personCache: {}, allowDownloads: bool, avatarPosition: str, pageNumber: int, messageIdStr: str, containerClassIcons: str, containerClass: str) -> (str, str, str, str): """Returns the announce title of a post containing names of participants x announces y """ titleStr = '' replyAvatarImageInPost = '' objJson = postJsonObject['object'] # has no attribution if not objJson.get('attributedTo'): titleStr += _announceUnattributedHtml(translate, postJsonObject) return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) attributedTo = '' if isinstance(objJson['attributedTo'], str): attributedTo = objJson['attributedTo'] # boosting your own post if attributedTo.startswith(postActor): titleStr += _boostOwnPostHtml(translate) return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) # boosting another person's post _logPostTiming(enableTimingLog, postStartTime, '13.2') announceNickname = None if attributedTo: announceNickname = getNicknameFromActor(attributedTo) if not announceNickname: titleStr += _announceUnattributedHtml(translate, postJsonObject) return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) announceDomain, announcePort = getDomainFromActor(attributedTo) getPersonFromCache(baseDir, attributedTo, personCache, allowDownloads) announceDisplayName = getDisplayName(baseDir, attributedTo, personCache) if not announceDisplayName: announceDisplayName = announceNickname + '@' + announceDomain _logPostTiming(enableTimingLog, postStartTime, '13.3') # add any emoji to the display name if ':' in announceDisplayName: announceDisplayName = \ addEmojiToDisplayName(baseDir, httpPrefix, nickname, domain, announceDisplayName, False) _logPostTiming(enableTimingLog, postStartTime, '13.3.1') titleStr += \ _announceWithDisplayNameHtml(translate, postJsonObject, announceDisplayName) # show avatar of person replied to announceActor = attributedTo announceAvatarUrl = \ getPersonAvatarUrl(baseDir, announceActor, personCache, allowDownloads) _logPostTiming(enableTimingLog, postStartTime, '13.4') if not announceAvatarUrl: announceAvatarUrl = '' idx = 'Show options for this person' if '/users/news/' not in announceAvatarUrl: showOptionsForThisPersonStr = idx if translate.get(idx): showOptionsForThisPersonStr = translate[idx] replyAvatarImageInPost = \ '
    \n' \ ' ' \ ' \n
    \n' return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) def _replyToYourselfHtml(translate: {}) -> str: """Returns html for a title which is a reply to yourself """ replyingToThemselvesStr = 'replying to themselves' if translate.get(replyingToThemselvesStr): replyingToThemselvesStr = translate[replyingToThemselvesStr] return ' ' + replyingToThemselvesStr + \
        '\n' def _replyToUnknownHtml(translate: {}, postJsonObject: {}) -> str: """Returns the html title for a reply to an unknown handle """ replyingToStr = 'replying to' if translate.get(replyingToStr): replyingToStr = translate[replyingToStr] return ' ' + \
        replyingToStr + '\n' + \ ' @unknown\n' def _replyWithUnknownPathHtml(translate: {}, postJsonObject: {}, postDomain: str) -> str: """Returns html title for a reply with an unknown path eg. does not contain /statuses/ """ replyingToStr = 'replying to' if translate.get(replyingToStr): replyingToStr = translate[replyingToStr] return ' ' + replyingToStr + \
        '\n' + \ ' ' + \ postDomain + '\n' def _getReplyHtml(translate: {}, inReplyTo: str, replyDisplayName: str) -> str: """Returns html title for a reply """ replyingToStr = 'replying to' if translate.get(replyingToStr): replyingToStr = translate[replyingToStr] return ' ' + \ '' + \
        replyingToStr + '\n' + \ ' ' + \ replyDisplayName + '\n' def _getPostTitleReplyHtml(baseDir: str, httpPrefix: str, nickname: str, domain: str, showRepeatIcon: bool, isAnnounced: bool, postJsonObject: {}, postActor: str, translate: {}, enableTimingLog: bool, postStartTime, boxName: str, personCache: {}, allowDownloads: bool, avatarPosition: str, pageNumber: int, messageIdStr: str, containerClassIcons: str, containerClass: str) -> (str, str, str, str): """Returns the reply title of a post containing names of participants x replies to y """ titleStr = '' replyAvatarImageInPost = '' objJson = postJsonObject['object'] # not a reply if not objJson.get('inReplyTo'): return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) containerClassIcons = 'containericons darker' containerClass = 'container darker' # reply to self if objJson['inReplyTo'].startswith(postActor): titleStr += _replyToYourselfHtml(translate) return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) # has a reply if '/statuses/' not in objJson['inReplyTo']: postDomain = objJson['inReplyTo'] prefixes = getProtocolPrefixes() for prefix in prefixes: postDomain = postDomain.replace(prefix, '') if '/' in postDomain: postDomain = postDomain.split('/', 1)[0] if postDomain: titleStr += \ _replyWithUnknownPathHtml(translate, postJsonObject, postDomain) return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) inReplyTo = objJson['inReplyTo'] replyActor = inReplyTo.split('/statuses/')[0] replyNickname = getNicknameFromActor(replyActor) if not replyNickname: titleStr += _replyToUnknownHtml(translate, postJsonObject) return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) replyDomain, replyPort = getDomainFromActor(replyActor) if not (replyNickname and replyDomain): titleStr += _replyToUnknownHtml(translate, postJsonObject) return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) getPersonFromCache(baseDir, replyActor, personCache, allowDownloads) replyDisplayName = getDisplayName(baseDir, replyActor, personCache) if not replyDisplayName: replyDisplayName = replyNickname + '@' + replyDomain # add emoji to the display name if ':' in replyDisplayName: _logPostTiming(enableTimingLog, postStartTime, '13.5') replyDisplayName = \ addEmojiToDisplayName(baseDir, httpPrefix, nickname, domain, replyDisplayName, False) _logPostTiming(enableTimingLog, postStartTime, '13.6') titleStr += _getReplyHtml(translate, inReplyTo, replyDisplayName) _logPostTiming(enableTimingLog, postStartTime, '13.7') # show avatar of person replied to replyAvatarUrl = \ getPersonAvatarUrl(baseDir, replyActor, personCache, allowDownloads) _logPostTiming(enableTimingLog, postStartTime, '13.8') if replyAvatarUrl: showProfileStr = 'Show profile' if translate.get(showProfileStr): showProfileStr = translate[showProfileStr] replyAvatarImageInPost = \ '
    \n' + \ ' \n' + \ '  \n
    \n' return (titleStr, replyAvatarImageInPost, containerClassIcons, containerClass) def _getPostTitleHtml(baseDir: str, httpPrefix: str, nickname: str, domain: str, showRepeatIcon: bool, isAnnounced: bool, postJsonObject: {}, postActor: str, translate: {}, enableTimingLog: bool, postStartTime, boxName: str, personCache: {}, allowDownloads: bool, avatarPosition: str, pageNumber: int, messageIdStr: str, containerClassIcons: str, containerClass: str) -> (str, str, str, str): """Returns the title of a post containing names of participants x replies to y, x announces y, etc """ if not isAnnounced and boxName == 'search' and \ postJsonObject.get('object'): if postJsonObject['object'].get('attributedTo'): if postJsonObject['object']['attributedTo'] != postActor: isAnnounced = True if isAnnounced: return _getPostTitleAnnounceHtml(baseDir, httpPrefix, nickname, domain, showRepeatIcon, isAnnounced, postJsonObject, postActor, translate, enableTimingLog, postStartTime, boxName, personCache, allowDownloads, avatarPosition, pageNumber, messageIdStr, containerClassIcons, containerClass) return _getPostTitleReplyHtml(baseDir, httpPrefix, nickname, domain, showRepeatIcon, isAnnounced, postJsonObject, postActor, translate, enableTimingLog, postStartTime, boxName, personCache, allowDownloads, avatarPosition, pageNumber, messageIdStr, containerClassIcons, containerClass) def _getFooterWithIcons(showIcons: bool, containerClassIcons: str, replyStr: str, announceStr: str, likeStr: str, bookmarkStr: str, deleteStr: str, muteStr: str, editStr: str, postJsonObject: {}, publishedLink: str, timeClass: str, publishedStr: str) -> str: """Returns the html for a post footer containing icons """ if not showIcons: return None footerStr = '\n \n' return footerStr def individualPostAsHtml(signingPrivateKeyPem: str, allowDownloads: bool, recentPostsCache: {}, maxRecentPosts: int, translate: {}, pageNumber: int, baseDir: str, session, cachedWebfingers: {}, personCache: {}, nickname: str, domain: str, port: int, postJsonObject: {}, avatarUrl: str, showAvatarOptions: bool, allowDeletion: bool, httpPrefix: str, projectVersion: str, boxName: str, YTReplacementDomain: str, showPublishedDateOnly: bool, peertubeInstances: [], allowLocalNetworkAccess: bool, themeName: str, systemLanguage: str, maxLikeCount: int, showRepeats: bool = True, showIcons: bool = False, manuallyApprovesFollowers: bool = False, showPublicOnly: bool = False, storeToCache: bool = True, useCacheOnly: bool = False) -> str: """ Shows a single post as html """ if not postJsonObject: return '' # benchmark postStartTime = time.time() postActor = postJsonObject['actor'] # ZZZzzz if isPersonSnoozed(baseDir, nickname, domain, postActor): return '' # if downloads of avatar images aren't enabled then we can do more # accurate timing of different parts of the code enableTimingLog = not allowDownloads _logPostTiming(enableTimingLog, postStartTime, '1') avatarPosition = '' messageId = '' if postJsonObject.get('id'): messageId = removeIdEnding(postJsonObject['id']) _logPostTiming(enableTimingLog, postStartTime, '2') messageIdStr = '' if messageId: messageIdStr = ';' + messageId domainFull = getFullDomain(domain, port) pageNumberParam = '' if pageNumber: pageNumberParam = '?page=' + str(pageNumber) # get the html post from the recent posts cache if it exists there postHtml = \ _getPostFromRecentCache(session, baseDir, httpPrefix, nickname, domain, postJsonObject, postActor, personCache, allowDownloads, showPublicOnly, storeToCache, boxName, avatarUrl, enableTimingLog, postStartTime, pageNumber, recentPostsCache, maxRecentPosts, signingPrivateKeyPem) if postHtml: return postHtml if useCacheOnly and postJsonObject['type'] != 'Announce': return '' _logPostTiming(enableTimingLog, postStartTime, '4') avatarUrl = \ getAvatarImageUrl(session, baseDir, httpPrefix, postActor, personCache, avatarUrl, allowDownloads, signingPrivateKeyPem) _logPostTiming(enableTimingLog, postStartTime, '5') # get the display name if domainFull not in postActor: # lookup the correct webfinger for the postActor postActorNickname = getNicknameFromActor(postActor) postActorDomain, postActorPort = getDomainFromActor(postActor) postActorDomainFull = getFullDomain(postActorDomain, postActorPort) postActorHandle = postActorNickname + '@' + postActorDomainFull postActorWf = \ webfingerHandle(session, postActorHandle, httpPrefix, cachedWebfingers, domain, __version__, False, False, signingPrivateKeyPem) avatarUrl2 = None displayName = None if postActorWf: (inboxUrl, pubKeyId, pubKey, fromPersonId, sharedInbox, avatarUrl2, displayName) = getPersonBox(signingPrivateKeyPem, baseDir, session, postActorWf, personCache, projectVersion, httpPrefix, nickname, domain, 'outbox', 72367) _logPostTiming(enableTimingLog, postStartTime, '6') if avatarUrl2: avatarUrl = avatarUrl2 if displayName: # add any emoji to the display name if ':' in displayName: displayName = \ addEmojiToDisplayName(baseDir, httpPrefix, nickname, domain, displayName, False) _logPostTiming(enableTimingLog, postStartTime, '7') avatarLink = \ _getAvatarImageHtml(showAvatarOptions, nickname, domainFull, avatarUrl, postActor, translate, avatarPosition, pageNumber, messageIdStr) avatarImageInPost = \ '
    ' + avatarLink + '
    \n' timelinePostBookmark = removeIdEnding(postJsonObject['id']) timelinePostBookmark = timelinePostBookmark.replace('://', '-') timelinePostBookmark = timelinePostBookmark.replace('/', '-') # If this is the inbox timeline then don't show the repeat icon on any DMs showRepeatIcon = showRepeats isPublicRepeat = False postIsDM = isDM(postJsonObject) if showRepeats: if postIsDM: showRepeatIcon = False else: if not isPublicPost(postJsonObject): isPublicRepeat = True titleStr = '' galleryStr = '' isAnnounced = False announceJsonObject = None if postJsonObject['type'] == 'Announce': announceJsonObject = postJsonObject.copy() postJsonAnnounce = \ downloadAnnounce(session, baseDir, httpPrefix, nickname, domain, postJsonObject, projectVersion, translate, YTReplacementDomain, allowLocalNetworkAccess, recentPostsCache, False, systemLanguage, domainFull, personCache, signingPrivateKeyPem) if not postJsonAnnounce: # if the announce could not be downloaded then mark it as rejected rejectPostId(baseDir, nickname, domain, postJsonObject['id'], recentPostsCache) return '' postJsonObject = postJsonAnnounce announceFilename = \ locatePost(baseDir, nickname, domain, postJsonObject['id']) if announceFilename: updateAnnounceCollection(recentPostsCache, baseDir, announceFilename, postActor, nickname, domainFull, False) # create a file for use by text-to-speech if isRecentPost(postJsonObject): if postJsonObject.get('actor'): if not os.path.isfile(announceFilename + '.tts'): updateSpeaker(baseDir, httpPrefix, nickname, domain, domainFull, postJsonObject, personCache, translate, postJsonObject['actor'], themeName) with open(announceFilename + '.tts', 'w+') as ttsFile: ttsFile.write('\n') isAnnounced = True _logPostTiming(enableTimingLog, postStartTime, '8') if not hasObjectDict(postJsonObject): return '' # if this post should be public then check its recipients if showPublicOnly: if not postContainsPublic(postJsonObject): return '' isModerationPost = False if postJsonObject['object'].get('moderationStatus'): isModerationPost = True containerClass = 'container' containerClassIcons = 'containericons' timeClass = 'time-right' actorNickname = getNicknameFromActor(postActor) if not actorNickname: # single user instance actorNickname = 'dev' actorDomain, actorPort = getDomainFromActor(postActor) displayName = getDisplayName(baseDir, postActor, personCache) if displayName: if ':' in displayName: displayName = \ addEmojiToDisplayName(baseDir, httpPrefix, nickname, domain, displayName, False) titleStr += \ ' ' + displayName + '\n' else: if not messageId: # pprint(postJsonObject) print('ERROR: no messageId') if not actorNickname: # pprint(postJsonObject) print('ERROR: no actorNickname') if not actorDomain: # pprint(postJsonObject) print('ERROR: no actorDomain') titleStr += \ ' @' + actorNickname + '@' + actorDomain + '\n' # benchmark 9 _logPostTiming(enableTimingLog, postStartTime, '9') # Show a DM icon for DMs in the inbox timeline if postIsDM: titleStr = \ titleStr + ' \n' # check if replying is permitted commentsEnabled = True if isinstance(postJsonObject['object'], dict) and \ 'commentsEnabled' in postJsonObject['object']: if postJsonObject['object']['commentsEnabled'] is False: commentsEnabled = False elif 'rejectReplies' in postJsonObject['object']: if postJsonObject['object']['rejectReplies']: commentsEnabled = False conversationId = None if isinstance(postJsonObject['object'], dict) and \ 'conversation' in postJsonObject['object']: if postJsonObject['object']['conversation']: conversationId = postJsonObject['object']['conversation'] replyStr = _getReplyIconHtml(nickname, isPublicRepeat, showIcons, commentsEnabled, postJsonObject, pageNumberParam, translate, systemLanguage, conversationId) _logPostTiming(enableTimingLog, postStartTime, '10') editStr = _getEditIconHtml(baseDir, nickname, domainFull, postJsonObject, actorNickname, translate, False) _logPostTiming(enableTimingLog, postStartTime, '11') announceStr = \ _getAnnounceIconHtml(isAnnounced, postActor, nickname, domainFull, announceJsonObject, postJsonObject, isPublicRepeat, isModerationPost, showRepeatIcon, translate, pageNumberParam, timelinePostBookmark, boxName) _logPostTiming(enableTimingLog, postStartTime, '12') # whether to show a like button hideLikeButtonFile = \ acctDir(baseDir, nickname, domain) + '/.hideLikeButton' showLikeButton = True if os.path.isfile(hideLikeButtonFile): showLikeButton = False likeStr = _getLikeIconHtml(nickname, domainFull, isModerationPost, showLikeButton, postJsonObject, enableTimingLog, postStartTime, translate, pageNumberParam, timelinePostBookmark, boxName, maxLikeCount) _logPostTiming(enableTimingLog, postStartTime, '12.5') bookmarkStr = \ _getBookmarkIconHtml(nickname, domainFull, postJsonObject, isModerationPost, translate, enableTimingLog, postStartTime, boxName, pageNumberParam, timelinePostBookmark) _logPostTiming(enableTimingLog, postStartTime, '12.9') isMuted = postIsMuted(baseDir, nickname, domain, postJsonObject, messageId) _logPostTiming(enableTimingLog, postStartTime, '13') muteStr = \ _getMuteIconHtml(isMuted, postActor, messageId, nickname, domainFull, allowDeletion, pageNumberParam, boxName, timelinePostBookmark, translate) deleteStr = \ _getDeleteIconHtml(nickname, domainFull, allowDeletion, postActor, messageId, postJsonObject, pageNumberParam, translate) _logPostTiming(enableTimingLog, postStartTime, '13.1') # get the title: x replies to y, x announces y, etc (titleStr2, replyAvatarImageInPost, containerClassIcons, containerClass) = _getPostTitleHtml(baseDir, httpPrefix, nickname, domain, showRepeatIcon, isAnnounced, postJsonObject, postActor, translate, enableTimingLog, postStartTime, boxName, personCache, allowDownloads, avatarPosition, pageNumber, messageIdStr, containerClassIcons, containerClass) titleStr += titleStr2 _logPostTiming(enableTimingLog, postStartTime, '14') attachmentStr, galleryStr = \ getPostAttachmentsAsHtml(postJsonObject, boxName, translate, isMuted, avatarLink, replyStr, announceStr, likeStr, bookmarkStr, deleteStr, muteStr) publishedStr = \ _getPublishedDateStr(postJsonObject, showPublishedDateOnly) _logPostTiming(enableTimingLog, postStartTime, '15') publishedLink = messageId # blog posts should have no /statuses/ in their link if isBlogPost(postJsonObject): # is this a post to the local domain? if '://' + domain in messageId: publishedLink = messageId.replace('/statuses/', '/') # if this is a local link then make it relative so that it works # on clearnet or onion address if domain + '/users/' in publishedLink or \ domain + ':' + str(port) + '/users/' in publishedLink: publishedLink = '/users/' + publishedLink.split('/users/')[1] if not isNewsPost(postJsonObject): footerStr = '' + publishedStr + '\n' else: footerStr = '' + publishedStr + '\n' # change the background color for DMs in inbox timeline if postIsDM: containerClassIcons = 'containericons dm' containerClass = 'container dm' newFooterStr = _getFooterWithIcons(showIcons, containerClassIcons, replyStr, announceStr, likeStr, bookmarkStr, deleteStr, muteStr, editStr, postJsonObject, publishedLink, timeClass, publishedStr) if newFooterStr: footerStr = newFooterStr postIsSensitive = False if postJsonObject['object'].get('sensitive'): # sensitive posts should have a summary if postJsonObject['object'].get('summary'): postIsSensitive = postJsonObject['object']['sensitive'] else: # add a generic summary if none is provided sensitiveStr = 'Sensitive' if translate.get(sensitiveStr): sensitiveStr = translate[sensitiveStr] postJsonObject['object']['summary'] = sensitiveStr # add an extra line if there is a content warning, # for better vertical spacing on mobile if postIsSensitive: footerStr = '
    ' + footerStr if not postJsonObject['object'].get('summary'): postJsonObject['object']['summary'] = '' if postJsonObject['object'].get('cipherText'): postJsonObject['object']['content'] = \ E2EEdecryptMessageFromDevice(postJsonObject['object']) postJsonObject['object']['contentMap'][systemLanguage] = \ postJsonObject['object']['content'] domainFull = getFullDomain(domain, port) personUrl = localActorUrl(httpPrefix, nickname, domainFull) actorJson = \ getPersonFromCache(baseDir, personUrl, personCache, False) languagesUnderstood = [] if actorJson: languagesUnderstood = getActorLanguagesList(actorJson) contentStr = getContentFromPost(postJsonObject, systemLanguage, languagesUnderstood) if not contentStr: contentStr = \ autoTranslatePost(baseDir, postJsonObject, systemLanguage, translate) if not contentStr: return '' isPatch = isGitPatch(baseDir, nickname, domain, postJsonObject['object']['type'], postJsonObject['object']['summary'], contentStr) _logPostTiming(enableTimingLog, postStartTime, '16') if not isPGPEncrypted(contentStr): if not isPatch: objectContent = \ removeLongWords(contentStr, 40, []) objectContent = removeTextFormatting(objectContent) objectContent = limitRepeatedWords(objectContent, 6) objectContent = \ switchWords(baseDir, nickname, domain, objectContent) objectContent = htmlReplaceEmailQuote(objectContent) objectContent = htmlReplaceQuoteMarks(objectContent) else: objectContent = contentStr else: encryptedStr = 'Encrypted' if translate.get(encryptedStr): encryptedStr = translate[encryptedStr] objectContent = '🔒 ' + encryptedStr objectContent = '
    ' + objectContent + '
    ' if not postIsSensitive: contentStr = objectContent + attachmentStr contentStr = addEmbeddedElements(translate, contentStr, peertubeInstances) contentStr = insertQuestion(baseDir, translate, nickname, domain, port, contentStr, postJsonObject, pageNumber) else: postID = 'post' + str(createPassword(8)) contentStr = '' if postJsonObject['object'].get('summary'): cwStr = str(postJsonObject['object']['summary']) cwStr = \ addEmojiToDisplayName(baseDir, httpPrefix, nickname, domain, cwStr, False) contentStr += \ '\n ' if isModerationPost: containerClass = 'container report' # get the content warning text cwContentStr = objectContent + attachmentStr if not isPatch: cwContentStr = addEmbeddedElements(translate, cwContentStr, peertubeInstances) cwContentStr = \ insertQuestion(baseDir, translate, nickname, domain, port, cwContentStr, postJsonObject, pageNumber) cwContentStr = \ switchWords(baseDir, nickname, domain, cwContentStr) if not isBlogPost(postJsonObject): # get the content warning button contentStr += \ getContentWarningButton(postID, translate, cwContentStr) else: contentStr += cwContentStr _logPostTiming(enableTimingLog, postStartTime, '17') if postJsonObject['object'].get('tag') and not isPatch: contentStr = \ replaceEmojiFromTags(contentStr, postJsonObject['object']['tag'], 'content') if isMuted: contentStr = '' else: if not isPatch: contentStr = '
    ' + \ contentStr + \ '
    \n' else: contentStr = \ '
    ' + contentStr + \
                    '
    \n' # show blog citations citationsStr = \ _getBlogCitationsHtml(boxName, postJsonObject, translate) postHtml = '' if boxName != 'tlmedia': postHtml = '
    \n' postHtml += avatarImageInPost postHtml += '
    \n' + \ ' ' + titleStr + \ replyAvatarImageInPost + '
    \n' postHtml += contentStr + citationsStr + footerStr + '\n' postHtml += '
    \n' else: postHtml = galleryStr _logPostTiming(enableTimingLog, postStartTime, '18') # save the created html to the recent posts cache if not showPublicOnly and storeToCache and \ boxName != 'tlmedia' and boxName != 'tlbookmarks' and \ boxName != 'bookmarks': _saveIndividualPostAsHtmlToCache(baseDir, nickname, domain, postJsonObject, postHtml) updateRecentPostsCache(recentPostsCache, maxRecentPosts, postJsonObject, postHtml) _logPostTiming(enableTimingLog, postStartTime, '19') return postHtml def htmlIndividualPost(cssCache: {}, recentPostsCache: {}, maxRecentPosts: int, translate: {}, baseDir: str, session, cachedWebfingers: {}, personCache: {}, nickname: str, domain: str, port: int, authorized: bool, postJsonObject: {}, httpPrefix: str, projectVersion: str, likedBy: str, YTReplacementDomain: str, showPublishedDateOnly: bool, peertubeInstances: [], allowLocalNetworkAccess: bool, themeName: str, systemLanguage: str, maxLikeCount: int, signingPrivateKeyPem: str) -> str: """Show an individual post as html """ postStr = '' if likedBy: likedByNickname = getNicknameFromActor(likedBy) likedByDomain, likedByPort = getDomainFromActor(likedBy) likedByDomain = getFullDomain(likedByDomain, likedByPort) likedByHandle = likedByNickname + '@' + likedByDomain likedByStr = 'Liked by' if translate.get(likedByStr): likedByStr = translate[likedByStr] postStr += \ '

    ' + likedByStr + ' @' + \ likedByHandle + '\n' domainFull = getFullDomain(domain, port) actor = '/users/' + nickname followStr = '

    \n' followStr += \ ' \n' followStr += \ ' \n' if not isFollowingActor(baseDir, nickname, domainFull, likedBy): translateFollowStr = 'Follow' if translate.get(translateFollowStr): translateFollowStr = translate[translateFollowStr] followStr += ' \n' goBackStr = 'Go Back' if translate.get(goBackStr): goBackStr = translate[goBackStr] followStr += ' \n' followStr += '
    \n' postStr += followStr + '

    \n' postStr += \ individualPostAsHtml(signingPrivateKeyPem, True, recentPostsCache, maxRecentPosts, translate, None, baseDir, session, cachedWebfingers, personCache, nickname, domain, port, postJsonObject, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, peertubeInstances, allowLocalNetworkAccess, themeName, systemLanguage, maxLikeCount, False, authorized, False, False, False) messageId = removeIdEnding(postJsonObject['id']) # show the previous posts if hasObjectDict(postJsonObject): while postJsonObject['object'].get('inReplyTo'): postFilename = \ locatePost(baseDir, nickname, domain, postJsonObject['object']['inReplyTo']) if not postFilename: break postJsonObject = loadJson(postFilename) if postJsonObject: postStr = \ individualPostAsHtml(signingPrivateKeyPem, True, recentPostsCache, maxRecentPosts, translate, None, baseDir, session, cachedWebfingers, personCache, nickname, domain, port, postJsonObject, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, peertubeInstances, allowLocalNetworkAccess, themeName, systemLanguage, maxLikeCount, False, authorized, False, False, False) + postStr # show the following posts postFilename = locatePost(baseDir, nickname, domain, messageId) if postFilename: # is there a replies file for this post? repliesFilename = postFilename.replace('.json', '.replies') if os.path.isfile(repliesFilename): # get items from the replies file repliesJson = { 'orderedItems': [] } populateRepliesJson(baseDir, nickname, domain, repliesFilename, authorized, repliesJson) # add items to the html output for item in repliesJson['orderedItems']: postStr += \ individualPostAsHtml(signingPrivateKeyPem, True, recentPostsCache, maxRecentPosts, translate, None, baseDir, session, cachedWebfingers, personCache, nickname, domain, port, item, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, peertubeInstances, allowLocalNetworkAccess, themeName, systemLanguage, maxLikeCount, False, authorized, False, False, False) cssFilename = baseDir + '/epicyon-profile.css' if os.path.isfile(baseDir + '/epicyon.css'): cssFilename = baseDir + '/epicyon.css' instanceTitle = \ getConfigParam(baseDir, 'instanceTitle') return htmlHeaderWithExternalStyle(cssFilename, instanceTitle) + \ postStr + htmlFooter() def htmlPostReplies(cssCache: {}, recentPostsCache: {}, maxRecentPosts: int, translate: {}, baseDir: str, session, cachedWebfingers: {}, personCache: {}, nickname: str, domain: str, port: int, repliesJson: {}, httpPrefix: str, projectVersion: str, YTReplacementDomain: str, showPublishedDateOnly: bool, peertubeInstances: [], allowLocalNetworkAccess: bool, themeName: str, systemLanguage: str, maxLikeCount: int, signingPrivateKeyPem: str) -> str: """Show the replies to an individual post as html """ repliesStr = '' if repliesJson.get('orderedItems'): for item in repliesJson['orderedItems']: repliesStr += \ individualPostAsHtml(signingPrivateKeyPem, True, recentPostsCache, maxRecentPosts, translate, None, baseDir, session, cachedWebfingers, personCache, nickname, domain, port, item, None, True, False, httpPrefix, projectVersion, 'inbox', YTReplacementDomain, showPublishedDateOnly, peertubeInstances, allowLocalNetworkAccess, themeName, systemLanguage, maxLikeCount, False, False, False, False, False) cssFilename = baseDir + '/epicyon-profile.css' if os.path.isfile(baseDir + '/epicyon.css'): cssFilename = baseDir + '/epicyon.css' instanceTitle = \ getConfigParam(baseDir, 'instanceTitle') return htmlHeaderWithExternalStyle(cssFilename, instanceTitle) + \ repliesStr + htmlFooter()