__filename__ = "blog.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.1.0" __maintainer__ = "Bob Mottram" __email__ = "bob@freedombone.net" __status__ = "Production" import json import time import os from collections import OrderedDict from datetime import datetime from datetime import date from dateutil.parser import parse from shutil import copyfile from shutil import copyfileobj from pprint import pprint from content import replaceEmojiFromTags from webinterface import contentWarningScriptOpen from webinterface import getIconsDir from webinterface import getPostAttachmentsAsHtml from webinterface import htmlHeader from webinterface import htmlFooter from webinterface import addEmbeddedElements from utils import getNicknameFromActor from utils import getDomainFromActor from posts import createBlogsTimeline def noOfBlogReplies(baseDir: str,httpPrefix: str,translate: {}, \ nickname: str,domain: str,domainFull: str, \ postId: str,depth=0) -> int: """Returns the number of replies on the post This is recursive, so can handle replies to replies """ if depth>4: return 0 if not postId: return 0 tryPostBox=('tlblogs','inbox','outbox') boxFound=False for postBox in tryPostBox: postFilename= \ baseDir+'/accounts/'+nickname+'@'+domain+'/'+postBox+'/'+ \ postId.replace('/','#')+'.replies' if os.path.isfile(postFilename): boxFound=True break if not boxFound: # post may exist but has no replies for postBox in tryPostBox: postFilename= \ baseDir+'/accounts/'+nickname+'@'+domain+'/'+postBox+'/'+ \ postId.replace('/','#') if os.path.isfile(postFilename): return 1 return 0 replies=0 with open(postFilename, "r") as f: lines = f.readlines() for replyPostId in lines: replyPostId= \ replyPostId.replace('\n','').replace('.json','').replace('.replies','') replies+= \ 1 + \ noOfBlogReplies(baseDir,httpPrefix,translate, \ nickname,domain,domainFull, \ replyPostId,depth+1) return replies def getBlogReplies(baseDir: str,httpPrefix: str,translate: {}, \ nickname: str,domain: str,domainFull: str, \ postId: str,depth=0) -> str: """Returns the number of replies on the post """ if depth>4: return '' if not postId: return '' tryPostBox=('tlblogs','inbox','outbox') boxFound=False for postBox in tryPostBox: postFilename= \ baseDir+'/accounts/'+nickname+'@'+domain+'/'+postBox+'/'+ \ postId.replace('/','#')+'.replies' if os.path.isfile(postFilename): boxFound=True break if not boxFound: # post may exist but has no replies for postBox in tryPostBox: postFilename= \ baseDir+'/accounts/'+nickname+'@'+domain+'/'+postBox+'/'+ \ postId.replace('/','#') if os.path.isfile(postFilename): postFilename= \ baseDir+'/accounts/'+nickname+'@'+domain+ \ '/postcache/'+ \ postId.replace('/','#')+'.html' if os.path.isfile(postFilename): with open(postFilename, "r") as postFile: return postFile.read()+'\n' return '' with open(postFilename, "r") as f: lines = f.readlines() repliesStr='' for replyPostId in lines: replyPostId= \ replyPostId.replace('\n','').replace('.json','').replace('.replies','') postFilename= \ baseDir+'/accounts/'+nickname+'@'+domain+ \ '/postcache/'+ \ replyPostId.replace('\n','').replace('/','#')+'.html' if not os.path.isfile(postFilename): continue with open(postFilename, "r") as postFile: repliesStr+=postFile.read()+'\n' repliesStr+= \ getBlogReplies(baseDir,httpPrefix,translate, \ nickname,domain,domainFull, \ replyPostId,depth+1) # indicate the reply indentation level indentStr='>' for indentLevel in range(depth): indentStr+=' >' return repliesStr.replace(translate['SHOW MORE'],indentStr).replace('?tl=outbox','?tl=tlblogs') return '' def htmlBlogPostContent(authorized: bool, \ baseDir: str,httpPrefix: str,translate: {}, \ nickname: str,domain: str,domainFull: str, \ postJsonObject: {}, \ handle: str,restrictToDomain: bool) -> str: """Returns the content for a single blog post """ linkedAuthor=False actor='' blogStr='' messageLink='' if postJsonObject['object'].get('id'): messageLink=postJsonObject['object']['id'].replace('/statuses/','/') titleStr='' if postJsonObject['object'].get('summary'): titleStr=postJsonObject['object']['summary'] blogStr+='

'+titleStr+'

\n' # get the handle of the author if postJsonObject['object'].get('attributedTo'): actor=postJsonObject['object']['attributedTo'] authorNickname=getNicknameFromActor(actor) if authorNickname: authorDomain,authorPort=getDomainFromActor(actor) if authorDomain: # author must be from the given domain if restrictToDomain and authorDomain != domain: return '' handle=authorNickname+'@'+authorDomain else: # posts from the domain are expected to have an attributedTo field if restrictToDomain: return '' if postJsonObject['object'].get('published'): if 'T' in postJsonObject['object']['published']: blogStr+='

'+postJsonObject['object']['published'].split('T')[0] if handle: if handle.startswith(nickname+'@'+domain): blogStr+= \ ' '+handle+'' linkedAuthor=True else: if author: blogStr+= \ ' '+handle+'' linkedAuthor=True else: blogStr+=' '+handle blogStr+='

\n' avatarLink='' replyStr='' announceStr='' likeStr='' bookmarkStr='' deleteStr='' muteStr='' isMuted=False attachmentStr,galleryStr= \ getPostAttachmentsAsHtml(postJsonObject,'tlblogs',translate, \ isMuted,avatarLink, \ replyStr,announceStr,likeStr, \ bookmarkStr,deleteStr,muteStr) if attachmentStr: blogStr+='
'+attachmentStr+'
' if postJsonObject['object'].get('content'): contentStr= \ addEmbeddedElements(translate, \ postJsonObject['object']['content']) if postJsonObject['object'].get('tag'): contentStr= \ replaceEmojiFromTags(contentStr, \ postJsonObject['object']['tag'],'content') blogStr+='
'+contentStr+'\n' blogStr+='

\n' if not linkedAuthor: blogStr+= \ '

'+translate['About the author']+'

\n' replies= \ noOfBlogReplies(baseDir,httpPrefix,translate, \ nickname,domain,domainFull, \ postJsonObject['object']['id']) if replies>0: if not authorized: blogStr+= \ '

'+ \ translate['Replies'].lower()+': '+str(replies)+'

\n' else: blogStr+='

'+translate['Replies']+'

\n' blogStr+='\n' if not titleStr: blogStr+= \ getBlogReplies(baseDir,httpPrefix,translate, \ nickname,domain,domainFull, \ postJsonObject['object']['id']) else: blogStr+= \ getBlogReplies(baseDir,httpPrefix,translate, \ nickname,domain,domainFull, \ postJsonObject['object']['id']).replace('>'+titleStr+'<','') blogStr+='

\n' return blogStr def htmlBlogPostRSS(authorized: bool, \ baseDir: str,httpPrefix: str,translate: {}, \ nickname: str,domain: str,domainFull: str, \ postJsonObject: {}, \ handle: str,restrictToDomain: bool) -> str: """Returns the RSS feed for a single blog post """ messageLink='' if postJsonObject['object'].get('id'): messageLink=postJsonObject['object']['id'].replace('/statuses/','/') if not restrictToDomain or \ (restrictToDomain and '/'+domain in messageLink): if postJsonObject['object'].get('summary'): titleStr=postJsonObject['object']['summary'] rssStr= ' ' rssStr+=' '+titleStr+'' rssStr+=' '+messageLink+'' rssStr+=' ' return rssStr def htmlBlogPost(authorized: bool, \ baseDir: str,httpPrefix: str,translate: {}, \ nickname: str,domain: str,domainFull: str, \ postJsonObject: {}) -> str: """Returns a html blog post """ blogStr='' cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: blogCSS=cssFile.read() blogStr=htmlHeader(cssFilename,blogCSS) blogStr=blogStr.replace('.cwText','.cwTextInactive') blogStr+= \ htmlBlogPostContent(authorized,baseDir,httpPrefix,translate, \ nickname,domain,domainFull,postJsonObject, \ None,False) return blogStr+htmlFooter() return None def htmlBlogPage(authorized: bool, session, \ baseDir: str,httpPrefix: str,translate: {}, \ nickname: str,domain: str,port: int, \ noOfItems: int,pageNumber: int) -> str: """Returns a html blog page containing posts """ if ' ' in nickname or '@' in nickname or '\n' in nickname: return None blogStr='' cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: blogCSS=cssFile.read() blogStr=htmlHeader(cssFilename,blogCSS) blogStr=blogStr.replace('.cwText','.cwTextInactive') blogsIndex= \ baseDir+'/accounts/'+nickname+'@'+domain+'/tlblogs.index' if not os.path.isfile(blogsIndex): return blogStr+htmlFooter() timelineJson= \ createBlogsTimeline(session,baseDir, \ nickname,domain,port,httpPrefix, \ noOfItems,False,False,pageNumber) if not timelineJson: return blogStr+htmlFooter() domainFull=domain if port: if port!=80 and port!=443: domainFull=domain+':'+str(port) # show previous and next buttons if pageNumber!=None: iconsDir=getIconsDir(baseDir) navigateStr='

' if pageNumber>1: # show previous button navigateStr+= \ ''+ \ '<\n' if len(timelineJson['orderedItems'])>=noOfItems: # show next button navigateStr+= \ ''+ \ '>\n' navigateStr+='

' blogStr+=navigateStr for item in timelineJson['orderedItems']: if item['type']!='Create': continue blogStr+= \ htmlBlogPostContent(authorized,baseDir,httpPrefix,translate, \ nickname,domain,domainFull,item, \ None,True) if len(timelineJson['orderedItems'])>=noOfItems: blogStr+=navigateStr # show rss link blogStr+='

' blogStr+='' blogStr+='RSS' blogStr+='

' return blogStr+htmlFooter() return None def rssHeader(httpPrefix: str,nickname: str,domainFull: str,translate: {}) -> str: rssStr="" rssStr+="" rssStr+='' rssStr+=' '+translate['Blog']+'' rssStr+=' '+httpPrefix+'://'+domainFull+'/users/'+nickname+'/rss.xml'+'' return rssStr def rssFooter() -> str: rssStr='' rssStr+='' return rssStr def htmlBlogPageRSS(authorized: bool, session, \ baseDir: str,httpPrefix: str,translate: {}, \ nickname: str,domain: str,port: int, \ noOfItems: int,pageNumber: int) -> str: """Returns an rss feed containing posts """ if ' ' in nickname or '@' in nickname or '\n' in nickname: return None domainFull=domain if port: if port!=80 and port!=443: domainFull=domain+':'+str(port) blogRSS=rssHeader(httpPrefix,nickname,domainFull,translate) blogsIndex= \ baseDir+'/accounts/'+nickname+'@'+domain+'/tlblogs.index' if not os.path.isfile(blogsIndex): return blogRSS+rssFooter() timelineJson= \ createBlogsTimeline(session,baseDir, \ nickname,domain,port,httpPrefix, \ noOfItems,False,False,pageNumber) if not timelineJson: return blogRSS+rssFooter() if pageNumber!=None: for item in timelineJson['orderedItems']: if item['type']!='Create': continue blogRSS+= \ htmlBlogPostRSS(authorized,baseDir,httpPrefix,translate, \ nickname,domain,domainFull,item, \ None,True) return blogRSS+rssFooter() def getBlogIndexesForAccounts(baseDir: str) -> {}: """ Get the index files for blogs for each account and add them to a dict """ blogIndexes={} for subdir, dirs, files in os.walk(baseDir+'/accounts'): for acct in dirs: if '@' not in acct: continue if 'inbox@' in acct: continue accountDir=os.path.join(baseDir+'/accounts', acct) blogsIndex=accountDir+'/tlblogs.index' if os.path.isfile(blogsIndex): blogIndexes[acct]=blogsIndex return blogIndexes def noOfBlogAccounts(baseDir: str) -> int: """Returns the number of blog accounts """ ctr=0 for subdir, dirs, files in os.walk(baseDir+'/accounts'): for acct in dirs: if '@' not in acct: continue if 'inbox@' in acct: continue accountDir=os.path.join(baseDir+'/accounts', acct) blogsIndex=accountDir+'/tlblogs.index' if os.path.isfile(blogsIndex): ctr+=1 return ctr def singleBlogAccountNickname(baseDir: str) -> str: """Returns the nickname of a single blog account """ for subdir, dirs, files in os.walk(baseDir+'/accounts'): for acct in dirs: if '@' not in acct: continue if 'inbox@' in acct: continue accountDir=os.path.join(baseDir+'/accounts', acct) blogsIndex=accountDir+'/tlblogs.index' if os.path.isfile(blogsIndex): return acct.split('@')[0] return None def htmlBlogView(authorized: bool, \ session,baseDir: str,httpPrefix: str, \ translate: {},domain: str,port: int, \ noOfItems: int) -> str: """Show the blog main page """ blogStr='' cssFilename=baseDir+'/epicyon-profile.css' if os.path.isfile(baseDir+'/epicyon.css'): cssFilename=baseDir+'/epicyon.css' with open(cssFilename, 'r') as cssFile: blogCSS=cssFile.read() blogStr=htmlHeader(cssFilename,blogCSS) if noOfBlogAccounts(baseDir) <= 1: nickname=singleBlogAccountNickname(baseDir) if nickname: return htmlBlogPage(authorized,session, \ baseDir,httpPrefix,translate, \ nickname,domain,port, \ noOfItems,1) domainFull=domain if port: if port!=80 and port!=443: domainFull=domain+':'+str(port) for subdir, dirs, files in os.walk(baseDir+'/accounts'): for acct in dirs: if '@' not in acct: continue if 'inbox@' in acct: continue accountDir=os.path.join(baseDir+'/accounts', acct) blogsIndex=accountDir+'/tlblogs.index' if os.path.isfile(blogsIndex): blogStr+='

' blogStr+= \ ''+acct+'' blogStr+='

' return blogStr+htmlFooter() return None