__filename__ = "blog.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.2.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "ActivityPub" import os from datetime import datetime from content import replaceEmojiFromTags from webapp_utils import htmlHeaderWithExternalStyle from webapp_utils import htmlHeaderWithBlogMarkup from webapp_utils import htmlFooter from webapp_utils import getPostAttachmentsAsHtml from webapp_utils import editTextArea from webapp_media import addEmbeddedElements from utils import localActorUrl from utils import getActorLanguagesList from utils import getBaseContentFromPost from utils import getContentFromPost from utils import isAccountDir from utils import removeHtml from utils import getConfigParam from utils import getFullDomain from utils import getMediaFormats from utils import getNicknameFromActor from utils import getDomainFromActor from utils import locatePost from utils import loadJson from utils import firstParagraphFromString from utils import getActorPropertyUrl from utils import acctDir from posts import createBlogsTimeline from newswire import rss2Header from newswire import rss2Footer from cache import getPersonFromCache def _noOfBlogReplies(baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, domainFull: str, postId: str, depth=0) -> int: """Returns the number of replies on the post This is recursive, so can handle replies to replies """ if depth > 4: return 0 if not postId: return 0 tryPostBox = ('tlblogs', 'inbox', 'outbox') boxFound = False for postBox in tryPostBox: postFilename = \ acctDir(baseDir, nickname, domain) + '/' + postBox + '/' + \ postId.replace('/', '#') + '.replies' if os.path.isfile(postFilename): boxFound = True break if not boxFound: # post may exist but has no replies for postBox in tryPostBox: postFilename = \ acctDir(baseDir, nickname, domain) + '/' + postBox + '/' + \ postId.replace('/', '#') if os.path.isfile(postFilename): return 1 return 0 removals = [] replies = 0 lines = [] with open(postFilename, 'r') as f: lines = f.readlines() for replyPostId in lines: replyPostId = replyPostId.replace('\n', '').replace('\r', '') replyPostId = replyPostId.replace('.json', '') if locatePost(baseDir, nickname, domain, replyPostId): replyPostId = replyPostId.replace('.replies', '') replies += \ 1 + _noOfBlogReplies(baseDir, httpPrefix, translate, nickname, domain, domainFull, replyPostId, depth+1) else: # remove post which no longer exists removals.append(replyPostId) # remove posts from .replies file if they don't exist if lines and removals: print('Rewriting ' + postFilename + ' to remove ' + str(len(removals)) + ' entries') with open(postFilename, 'w+') as f: for replyPostId in lines: replyPostId = replyPostId.replace('\n', '').replace('\r', '') if replyPostId not in removals: f.write(replyPostId + '\n') return replies def _getBlogReplies(baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, domainFull: str, postId: str, depth=0) -> str: """Returns a string containing html blog posts """ if depth > 4: return '' if not postId: return '' tryPostBox = ('tlblogs', 'inbox', 'outbox') boxFound = False for postBox in tryPostBox: postFilename = \ acctDir(baseDir, nickname, domain) + '/' + postBox + '/' + \ postId.replace('/', '#') + '.replies' if os.path.isfile(postFilename): boxFound = True break if not boxFound: # post may exist but has no replies for postBox in tryPostBox: postFilename = \ acctDir(baseDir, nickname, domain) + '/' + postBox + '/' + \ postId.replace('/', '#') + '.json' if os.path.isfile(postFilename): postFilename = acctDir(baseDir, nickname, domain) + \ '/postcache/' + \ postId.replace('/', '#') + '.html' if os.path.isfile(postFilename): with open(postFilename, 'r') as postFile: return postFile.read() + '\n' return '' with open(postFilename, 'r') as f: lines = f.readlines() repliesStr = '' for replyPostId in lines: replyPostId = replyPostId.replace('\n', '').replace('\r', '') replyPostId = replyPostId.replace('.json', '') replyPostId = replyPostId.replace('.replies', '') postFilename = acctDir(baseDir, nickname, domain) + \ '/postcache/' + \ replyPostId.replace('/', '#') + '.html' if not os.path.isfile(postFilename): continue with open(postFilename, 'r') as postFile: repliesStr += postFile.read() + '\n' rply = _getBlogReplies(baseDir, httpPrefix, translate, nickname, domain, domainFull, replyPostId, depth+1) if rply not in repliesStr: repliesStr += rply # indicate the reply indentation level indentStr = '>' for indentLevel in range(depth): indentStr += ' >' repliesStr = repliesStr.replace(translate['SHOW MORE'], indentStr) return repliesStr.replace('?tl=outbox', '?tl=tlblogs') return '' def _htmlBlogPostContent(debug: bool, session, authorized: bool, baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, domainFull: str, postJsonObject: {}, handle: str, restrictToDomain: bool, peertubeInstances: [], systemLanguage: str, personCache: {}, blogSeparator: str = '
') -> str: """Returns the content for a single blog post """ linkedAuthor = False actor = '' blogStr = '' messageLink = '' if postJsonObject['object'].get('id'): messageLink = postJsonObject['object']['id'].replace('/statuses/', '/') titleStr = '' articleAdded = False if postJsonObject['object'].get('summary'): titleStr = postJsonObject['object']['summary'] blogStr += '

' + \ titleStr + '

\n' articleAdded = True # get the handle of the author if postJsonObject['object'].get('attributedTo'): authorNickname = None if isinstance(postJsonObject['object']['attributedTo'], str): actor = postJsonObject['object']['attributedTo'] authorNickname = getNicknameFromActor(actor) if authorNickname: authorDomain, authorPort = getDomainFromActor(actor) if authorDomain: # author must be from the given domain if restrictToDomain and authorDomain != domain: return '' handle = authorNickname + '@' + authorDomain else: # posts from the domain are expected to have an attributedTo field if restrictToDomain: return '' if postJsonObject['object'].get('published'): if 'T' in postJsonObject['object']['published']: blogStr += '

' + \ postJsonObject['object']['published'].split('T')[0] if handle: if handle.startswith(nickname + '@' + domain): blogStr += ' ' + handle + '' linkedAuthor = True else: if actor: blogStr += ' ' + \ handle + '' linkedAuthor = True else: blogStr += ' ' + handle blogStr += '

\n' avatarLink = '' replyStr = '' announceStr = '' likeStr = '' bookmarkStr = '' deleteStr = '' muteStr = '' isMuted = False attachmentStr, galleryStr = getPostAttachmentsAsHtml(postJsonObject, 'tlblogs', translate, isMuted, avatarLink, replyStr, announceStr, likeStr, bookmarkStr, deleteStr, muteStr) if attachmentStr: blogStr += '
' + attachmentStr + '
' personUrl = localActorUrl(httpPrefix, nickname, domainFull) actorJson = \ getPersonFromCache(baseDir, personUrl, personCache, False) languagesUnderstood = [] if actorJson: languagesUnderstood = getActorLanguagesList(actorJson) jsonContent = getContentFromPost(postJsonObject, systemLanguage, languagesUnderstood) if jsonContent: contentStr = addEmbeddedElements(translate, jsonContent, peertubeInstances) if postJsonObject['object'].get('tag'): contentStr = replaceEmojiFromTags(session, baseDir, contentStr, postJsonObject['object']['tag'], 'content', debug) if articleAdded: blogStr += '
' + contentStr + '
\n' else: blogStr += '
' + contentStr + '
\n' citationsStr = '' if postJsonObject['object'].get('tag'): for tagJson in postJsonObject['object']['tag']: if not isinstance(tagJson, dict): continue if not tagJson.get('type'): continue if tagJson['type'] != 'Article': continue if not tagJson.get('name'): continue if not tagJson.get('url'): continue citationsStr += \ '
  • ' + \ '' + tagJson['name'] + '
  • \n' if citationsStr: citationsStr = '

    ' + translate['Citations'] + \ ':

    ' + \ '\n' blogStr += '
    \n' + citationsStr if not linkedAuthor: blogStr += '

    ' + translate['About the author'] + \ '

    \n' replies = _noOfBlogReplies(baseDir, httpPrefix, translate, nickname, domain, domainFull, postJsonObject['object']['id']) # separator between blogs should be centered if '
    ' not in blogSeparator: blogSeparator = '
    ' + blogSeparator + '
    ' if replies == 0: blogStr += blogSeparator + '\n' return blogStr if not authorized: blogStr += '

    ' + \ translate['Replies'].lower() + ': ' + str(replies) + '

    ' blogStr += '


    ' + blogSeparator + '\n' else: blogStr += blogSeparator + '

    ' + translate['Replies'] + '

    \n' if not titleStr: blogStr += _getBlogReplies(baseDir, httpPrefix, translate, nickname, domain, domainFull, postJsonObject['object']['id']) else: blogRepliesStr = _getBlogReplies(baseDir, httpPrefix, translate, nickname, domain, domainFull, postJsonObject['object']['id']) blogStr += blogRepliesStr.replace('>' + titleStr + '<', '') return blogStr def _htmlBlogPostRSS2(authorized: bool, baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, domainFull: str, postJsonObject: {}, handle: str, restrictToDomain: bool, systemLanguage: str) -> str: """Returns the RSS version 2 feed for a single blog post """ rssStr = '' messageLink = '' if postJsonObject['object'].get('id'): messageLink = postJsonObject['object']['id'].replace('/statuses/', '/') if not restrictToDomain or \ (restrictToDomain and '/' + domain in messageLink): if postJsonObject['object'].get('summary') and \ postJsonObject['object'].get('published'): published = postJsonObject['object']['published'] pubDate = datetime.strptime(published, "%Y-%m-%dT%H:%M:%SZ") titleStr = postJsonObject['object']['summary'] rssDateStr = pubDate.strftime("%a, %d %b %Y %H:%M:%S UT") content = \ getBaseContentFromPost(postJsonObject, systemLanguage) description = firstParagraphFromString(content) rssStr = ' ' rssStr += ' ' + titleStr + '' rssStr += ' ' + messageLink + '' rssStr += \ ' ' + description + '' rssStr += ' ' + rssDateStr + '' rssStr += ' ' return rssStr def _htmlBlogPostRSS3(authorized: bool, baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, domainFull: str, postJsonObject: {}, handle: str, restrictToDomain: bool, systemLanguage: str) -> str: """Returns the RSS version 3 feed for a single blog post """ rssStr = '' messageLink = '' if postJsonObject['object'].get('id'): messageLink = postJsonObject['object']['id'].replace('/statuses/', '/') if not restrictToDomain or \ (restrictToDomain and '/' + domain in messageLink): if postJsonObject['object'].get('summary') and \ postJsonObject['object'].get('published'): published = postJsonObject['object']['published'] pubDate = datetime.strptime(published, "%Y-%m-%dT%H:%M:%SZ") titleStr = postJsonObject['object']['summary'] rssDateStr = pubDate.strftime("%a, %d %b %Y %H:%M:%S UT") content = \ getBaseContentFromPost(postJsonObject, systemLanguage) description = firstParagraphFromString(content) rssStr = 'title: ' + titleStr + '\n' rssStr += 'link: ' + messageLink + '\n' rssStr += 'description: ' + description + '\n' rssStr += 'created: ' + rssDateStr + '\n\n' return rssStr def _htmlBlogRemoveCwButton(blogStr: str, translate: {}) -> str: """Removes the CW button from blog posts, where the summary field is instead used as the blog title """ blogStr = blogStr.replace('
    ', '') blogStr = blogStr.replace('
    ', '') blogStr = blogStr.replace('', '') blogStr = blogStr.replace('', '') blogStr = blogStr.replace(translate['SHOW MORE'], '') return blogStr def _getSnippetFromBlogContent(postJsonObject: {}, systemLanguage: str) -> str: """Returns a snippet of text from the blog post as a preview """ content = getBaseContentFromPost(postJsonObject, systemLanguage) if '

    ' in content: content = content.split('

    ', 1)[1] if '

    ' in content: content = content.split('

    ', 1)[0] content = removeHtml(content) if '\n' in content: content = content.split('\n')[0] if len(content) >= 256: content = content[:252] + '...' return content def htmlBlogPost(session, authorized: bool, baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, domainFull: str, postJsonObject: {}, peertubeInstances: [], systemLanguage: str, personCache: {}, debug: bool, contentLicenseUrl: str) -> str: """Returns a html blog post """ blogStr = '' cssFilename = baseDir + '/epicyon-blog.css' if os.path.isfile(baseDir + '/blog.css'): cssFilename = baseDir + '/blog.css' instanceTitle = \ getConfigParam(baseDir, 'instanceTitle') published = postJsonObject['object']['published'] modified = published if postJsonObject['object'].get('updated'): modified = postJsonObject['object']['updated'] title = postJsonObject['object']['summary'] url = '' if postJsonObject['object'].get('url'): url = postJsonObject['object']['url'] snippet = _getSnippetFromBlogContent(postJsonObject, systemLanguage) blogStr = htmlHeaderWithBlogMarkup(cssFilename, instanceTitle, httpPrefix, domainFull, nickname, systemLanguage, published, modified, title, snippet, translate, url, contentLicenseUrl) _htmlBlogRemoveCwButton(blogStr, translate) blogStr += _htmlBlogPostContent(debug, session, authorized, baseDir, httpPrefix, translate, nickname, domain, domainFull, postJsonObject, None, False, peertubeInstances, systemLanguage, personCache) # show rss links blogStr += '

    ' blogStr += '' blogStr += 'RSS 2.0' # blogStr += '' # blogStr += 'RSS 3.0' blogStr += '

    ' return blogStr + htmlFooter() def htmlBlogPage(authorized: bool, session, baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, port: int, noOfItems: int, pageNumber: int, peertubeInstances: [], systemLanguage: str, personCache: {}, debug: bool) -> str: """Returns a html blog page containing posts """ if ' ' in nickname or '@' in nickname or \ '\n' in nickname or '\r' in nickname: return None blogStr = '' cssFilename = baseDir + '/epicyon-profile.css' if os.path.isfile(baseDir + '/epicyon.css'): cssFilename = baseDir + '/epicyon.css' instanceTitle = \ getConfigParam(baseDir, 'instanceTitle') blogStr = htmlHeaderWithExternalStyle(cssFilename, instanceTitle, None) _htmlBlogRemoveCwButton(blogStr, translate) blogsIndex = acctDir(baseDir, nickname, domain) + '/tlblogs.index' if not os.path.isfile(blogsIndex): return blogStr + htmlFooter() timelineJson = createBlogsTimeline(session, baseDir, nickname, domain, port, httpPrefix, noOfItems, False, pageNumber) if not timelineJson: return blogStr + htmlFooter() domainFull = getFullDomain(domain, port) # show previous and next buttons if pageNumber is not None: navigateStr = '

    ' if pageNumber > 1: # show previous button navigateStr += '' + \ '<\n' if len(timelineJson['orderedItems']) >= noOfItems: # show next button navigateStr += '' + \ '>\n' navigateStr += '

    ' blogStr += navigateStr for item in timelineJson['orderedItems']: if item['type'] != 'Create': continue blogStr += _htmlBlogPostContent(debug, session, authorized, baseDir, httpPrefix, translate, nickname, domain, domainFull, item, None, True, peertubeInstances, systemLanguage, personCache) if len(timelineJson['orderedItems']) >= noOfItems: blogStr += navigateStr # show rss link blogStr += '

    ' blogStr += '' blogStr += 'RSS 2.0' # blogStr += '' # blogStr += 'RSS 3.0' blogStr += '

    ' return blogStr + htmlFooter() def htmlBlogPageRSS2(authorized: bool, session, baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, port: int, noOfItems: int, pageNumber: int, includeHeader: bool, systemLanguage: str) -> str: """Returns an RSS version 2 feed containing posts """ if ' ' in nickname or '@' in nickname or \ '\n' in nickname or '\r' in nickname: return None domainFull = getFullDomain(domain, port) blogRSS2 = '' if includeHeader: blogRSS2 = rss2Header(httpPrefix, nickname, domainFull, 'Blog', translate) blogsIndex = acctDir(baseDir, nickname, domain) + '/tlblogs.index' if not os.path.isfile(blogsIndex): if includeHeader: return blogRSS2 + rss2Footer() else: return blogRSS2 timelineJson = createBlogsTimeline(session, baseDir, nickname, domain, port, httpPrefix, noOfItems, False, pageNumber) if not timelineJson: if includeHeader: return blogRSS2 + rss2Footer() else: return blogRSS2 if pageNumber is not None: for item in timelineJson['orderedItems']: if item['type'] != 'Create': continue blogRSS2 += \ _htmlBlogPostRSS2(authorized, baseDir, httpPrefix, translate, nickname, domain, domainFull, item, None, True, systemLanguage) if includeHeader: return blogRSS2 + rss2Footer() else: return blogRSS2 def htmlBlogPageRSS3(authorized: bool, session, baseDir: str, httpPrefix: str, translate: {}, nickname: str, domain: str, port: int, noOfItems: int, pageNumber: int, systemLanguage: str) -> str: """Returns an RSS version 3 feed containing posts """ if ' ' in nickname or '@' in nickname or \ '\n' in nickname or '\r' in nickname: return None domainFull = getFullDomain(domain, port) blogRSS3 = '' blogsIndex = acctDir(baseDir, nickname, domain) + '/tlblogs.index' if not os.path.isfile(blogsIndex): return blogRSS3 timelineJson = createBlogsTimeline(session, baseDir, nickname, domain, port, httpPrefix, noOfItems, False, pageNumber) if not timelineJson: return blogRSS3 if pageNumber is not None: for item in timelineJson['orderedItems']: if item['type'] != 'Create': continue blogRSS3 += \ _htmlBlogPostRSS3(authorized, baseDir, httpPrefix, translate, nickname, domain, domainFull, item, None, True, systemLanguage) return blogRSS3 def _noOfBlogAccounts(baseDir: str) -> int: """Returns the number of blog accounts """ ctr = 0 for subdir, dirs, files in os.walk(baseDir + '/accounts'): for acct in dirs: if not isAccountDir(acct): continue accountDir = os.path.join(baseDir + '/accounts', acct) blogsIndex = accountDir + '/tlblogs.index' if os.path.isfile(blogsIndex): ctr += 1 break return ctr def _singleBlogAccountNickname(baseDir: str) -> str: """Returns the nickname of a single blog account """ for subdir, dirs, files in os.walk(baseDir + '/accounts'): for acct in dirs: if not isAccountDir(acct): continue accountDir = os.path.join(baseDir + '/accounts', acct) blogsIndex = accountDir + '/tlblogs.index' if os.path.isfile(blogsIndex): return acct.split('@')[0] break return None def htmlBlogView(authorized: bool, session, baseDir: str, httpPrefix: str, translate: {}, domain: str, port: int, noOfItems: int, peertubeInstances: [], systemLanguage: str, personCache: {}, debug: bool) -> str: """Show the blog main page """ blogStr = '' cssFilename = baseDir + '/epicyon-profile.css' if os.path.isfile(baseDir + '/epicyon.css'): cssFilename = baseDir + '/epicyon.css' instanceTitle = \ getConfigParam(baseDir, 'instanceTitle') blogStr = htmlHeaderWithExternalStyle(cssFilename, instanceTitle, None) if _noOfBlogAccounts(baseDir) <= 1: nickname = _singleBlogAccountNickname(baseDir) if nickname: return htmlBlogPage(authorized, session, baseDir, httpPrefix, translate, nickname, domain, port, noOfItems, 1, peertubeInstances, systemLanguage, personCache, debug) domainFull = getFullDomain(domain, port) for subdir, dirs, files in os.walk(baseDir + '/accounts'): for acct in dirs: if not isAccountDir(acct): continue accountDir = os.path.join(baseDir + '/accounts', acct) blogsIndex = accountDir + '/tlblogs.index' if os.path.isfile(blogsIndex): blogStr += '

    ' blogStr += '' + acct + '' blogStr += '

    ' break return blogStr + htmlFooter() def htmlEditBlog(mediaInstance: bool, translate: {}, baseDir: str, httpPrefix: str, path: str, pageNumber: int, nickname: str, domain: str, postUrl: str, systemLanguage: str) -> str: """Edit a blog post after it was created """ postFilename = locatePost(baseDir, nickname, domain, postUrl) if not postFilename: print('Edit blog: Filename not found for ' + postUrl) return None postJsonObject = loadJson(postFilename) if not postJsonObject: print('Edit blog: json not loaded for ' + postFilename) return None editBlogText = '' + translate['Write your post text below.'] + '' if os.path.isfile(baseDir + '/accounts/newpost.txt'): with open(baseDir + '/accounts/newpost.txt', 'r') as file: editBlogText = '

    ' + file.read() + '

    ' cssFilename = baseDir + '/epicyon-profile.css' if os.path.isfile(baseDir + '/epicyon.css'): cssFilename = baseDir + '/epicyon.css' if '?' in path: path = path.split('?')[0] pathBase = path editBlogImageSection = '
    ' editBlogImageSection += ' ' editBlogImageSection += ' ' editBlogImageSection += \ ' ' editBlogImageSection += '
    ' placeholderMessage = translate['Write something'] + '...' endpoint = 'editblogpost' placeholderSubject = translate['Title'] scopeIcon = 'scope_blog.png' scopeDescription = translate['Blog'] dateAndLocation = '' dateAndLocation = '
    ' dateAndLocation += \ '

    ' dateAndLocation += \ '

    ' dateAndLocation += \ '' dateAndLocation += '' dateAndLocation += '

    ' dateAndLocation += '
    ' dateAndLocation += '
    ' dateAndLocation += \ '
    ' dateAndLocation += '' dateAndLocation += '
    ' instanceTitle = \ getConfigParam(baseDir, 'instanceTitle') editBlogForm = \ htmlHeaderWithExternalStyle(cssFilename, instanceTitle, None) editBlogForm += \ '
    ' editBlogForm += \ ' ' editBlogForm += \ ' ' editBlogForm += '
    ' editBlogForm += \ ' ' editBlogForm += '
    ' editBlogForm += '
    ' editBlogForm += \ ' ' + \ scopeDescription + '' editBlogForm += '
    ' editBlogForm += ' ' + \
        translate['Search for emoji'] + '' editBlogForm += '
    ' editBlogForm += '
    ' editBlogForm += ' ' editBlogForm += ' ' editBlogForm += '
    ' if mediaInstance: editBlogForm += editBlogImageSection editBlogForm += \ '
    ' titleStr = '' if postJsonObject['object'].get('summary'): titleStr = postJsonObject['object']['summary'] editBlogForm += \ ' ' editBlogForm += '' editBlogForm += '
    ' messageBoxHeight = 800 contentStr = getBaseContentFromPost(postJsonObject, systemLanguage) contentStr = contentStr.replace('

    ', '').replace('

    ', '\n') editBlogForm += \ editTextArea(placeholderMessage, 'message', contentStr, messageBoxHeight, '', True) editBlogForm += dateAndLocation if not mediaInstance: editBlogForm += editBlogImageSection editBlogForm += '
    ' editBlogForm += '
    ' editBlogForm = editBlogForm.replace('', '') editBlogForm += htmlFooter() return editBlogForm def pathContainsBlogLink(baseDir: str, httpPrefix: str, domain: str, domainFull: str, path: str) -> (str, str): """If the path contains a blog entry then return its filename """ if '/users/' not in path: return None, None userEnding = path.split('/users/', 1)[1] if '/' not in userEnding: return None, None userEnding2 = userEnding.split('/') nickname = userEnding2[0] if len(userEnding2) != 2: return None, None if len(userEnding2[1]) < 14: return None, None userEnding2[1] = userEnding2[1].strip() if not userEnding2[1].isdigit(): return None, None # check for blog posts blogIndexFilename = acctDir(baseDir, nickname, domain) + '/tlblogs.index' if not os.path.isfile(blogIndexFilename): return None, None if '#' + userEnding2[1] + '.' not in open(blogIndexFilename).read(): return None, None messageId = localActorUrl(httpPrefix, nickname, domainFull) + \ '/statuses/' + userEnding2[1] return locatePost(baseDir, nickname, domain, messageId), nickname def getBlogAddress(actorJson: {}) -> str: """Returns blog address for the given actor """ return getActorPropertyUrl(actorJson, 'Blog')