__filename__ = "blog.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from datetime import datetime
from content import replaceEmojiFromTags
from webapp_utils import getIconsWebPath
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from webapp_utils import getPostAttachmentsAsHtml
from webapp_media import addEmbeddedElements
from utils import getMediaFormats
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import locatePost
from utils import loadJson
from utils import firstParagraphFromString
from posts import createBlogsTimeline
from newswire import rss2Header
from newswire import rss2Footer
def noOfBlogReplies(baseDir: str, httpPrefix: str, translate: {},
nickname: str, domain: str, domainFull: str,
postId: str, depth=0) -> int:
"""Returns the number of replies on the post
This is recursive, so can handle replies to replies
"""
if depth > 4:
return 0
if not postId:
return 0
tryPostBox = ('tlblogs', 'inbox', 'outbox')
boxFound = False
for postBox in tryPostBox:
postFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/' + postBox + '/' + \
postId.replace('/', '#') + '.replies'
if os.path.isfile(postFilename):
boxFound = True
break
if not boxFound:
# post may exist but has no replies
for postBox in tryPostBox:
postFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/' + postBox + '/' + \
postId.replace('/', '#')
if os.path.isfile(postFilename):
return 1
return 0
removals = []
replies = 0
lines = []
with open(postFilename, "r") as f:
lines = f.readlines()
for replyPostId in lines:
replyPostId = replyPostId.replace('\n', '').replace('\r', '')
replyPostId = replyPostId.replace('.json', '')
if locatePost(baseDir, nickname, domain, replyPostId):
replyPostId = replyPostId.replace('.replies', '')
replies += 1 + noOfBlogReplies(baseDir, httpPrefix, translate,
nickname, domain, domainFull,
replyPostId, depth+1)
else:
# remove post which no longer exists
removals.append(replyPostId)
# remove posts from .replies file if they don't exist
if lines and removals:
print('Rewriting ' + postFilename + ' to remove ' +
str(len(removals)) + ' entries')
with open(postFilename, 'w+') as f:
for replyPostId in lines:
replyPostId = replyPostId.replace('\n', '').replace('\r', '')
if replyPostId not in removals:
f.write(replyPostId + '\n')
return replies
def getBlogReplies(baseDir: str, httpPrefix: str, translate: {},
nickname: str, domain: str, domainFull: str,
postId: str, depth=0) -> str:
"""Returns a string containing html blog posts
"""
if depth > 4:
return ''
if not postId:
return ''
tryPostBox = ('tlblogs', 'inbox', 'outbox')
boxFound = False
for postBox in tryPostBox:
postFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/' + postBox + '/' + \
postId.replace('/', '#') + '.replies'
if os.path.isfile(postFilename):
boxFound = True
break
if not boxFound:
# post may exist but has no replies
for postBox in tryPostBox:
postFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/' + postBox + '/' + \
postId.replace('/', '#') + '.json'
if os.path.isfile(postFilename):
postFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + \
'/postcache/' + \
postId.replace('/', '#') + '.html'
if os.path.isfile(postFilename):
with open(postFilename, "r") as postFile:
return postFile.read() + '\n'
return ''
with open(postFilename, "r") as f:
lines = f.readlines()
repliesStr = ''
for replyPostId in lines:
replyPostId = replyPostId.replace('\n', '').replace('\r', '')
replyPostId = replyPostId.replace('.json', '')
replyPostId = replyPostId.replace('.replies', '')
postFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + \
'/postcache/' + \
replyPostId.replace('/', '#') + '.html'
if not os.path.isfile(postFilename):
continue
with open(postFilename, "r") as postFile:
repliesStr += postFile.read() + '\n'
rply = getBlogReplies(baseDir, httpPrefix, translate,
nickname, domain, domainFull,
replyPostId, depth+1)
if rply not in repliesStr:
repliesStr += rply
# indicate the reply indentation level
indentStr = '>'
for indentLevel in range(depth):
indentStr += ' >'
repliesStr = repliesStr.replace(translate['SHOW MORE'], indentStr)
return repliesStr.replace('?tl=outbox', '?tl=tlblogs')
return ''
def htmlBlogPostContent(authorized: bool,
baseDir: str, httpPrefix: str, translate: {},
nickname: str, domain: str, domainFull: str,
postJsonObject: {},
handle: str, restrictToDomain: bool,
blogSeparator='
') -> str:
"""Returns the content for a single blog post
"""
linkedAuthor = False
actor = ''
blogStr = ''
messageLink = ''
if postJsonObject['object'].get('id'):
messageLink = postJsonObject['object']['id'].replace('/statuses/', '/')
titleStr = ''
articleAdded = False
if postJsonObject['object'].get('summary'):
titleStr = postJsonObject['object']['summary']
blogStr += '
\n'
articleAdded = True
# get the handle of the author
if postJsonObject['object'].get('attributedTo'):
authorNickname = None
if isinstance(postJsonObject['object']['attributedTo'], str):
actor = postJsonObject['object']['attributedTo']
authorNickname = getNicknameFromActor(actor)
if authorNickname:
authorDomain, authorPort = getDomainFromActor(actor)
if authorDomain:
# author must be from the given domain
if restrictToDomain and authorDomain != domain:
return ''
handle = authorNickname + '@' + authorDomain
else:
# posts from the domain are expected to have an attributedTo field
if restrictToDomain:
return ''
if postJsonObject['object'].get('published'):
if 'T' in postJsonObject['object']['published']:
blogStr += '
'
if postJsonObject['object'].get('content'):
contentStr = addEmbeddedElements(translate,
postJsonObject['object']['content'])
if postJsonObject['object'].get('tag'):
contentStr = replaceEmojiFromTags(contentStr,
postJsonObject['object']['tag'],
'content')
if articleAdded:
blogStr += ' ' + contentStr + '\n'
else:
blogStr += ' ' + contentStr + '\n'
citationsStr = ''
if postJsonObject['object'].get('tag'):
for tagJson in postJsonObject['object']['tag']:
if not isinstance(tagJson, dict):
continue
if not tagJson.get('type'):
continue
if tagJson['type'] != 'Article':
continue
if not tagJson.get('name'):
continue
if not tagJson.get('url'):
continue
citationsStr += \
'
\n'
replies = noOfBlogReplies(baseDir, httpPrefix, translate,
nickname, domain, domainFull,
postJsonObject['object']['id'])
# separator between blogs should be centered
if '
' not in blogSeparator:
blogSeparator = '
' + blogSeparator + '
'
if replies == 0:
blogStr += blogSeparator + '\n'
return blogStr
if not authorized:
blogStr += '
'
return blogStr + htmlFooter()
def htmlBlogPageRSS2(authorized: bool, session,
baseDir: str, httpPrefix: str, translate: {},
nickname: str, domain: str, port: int,
noOfItems: int, pageNumber: int,
includeHeader: bool) -> str:
"""Returns an RSS version 2 feed containing posts
"""
if ' ' in nickname or '@' in nickname or \
'\n' in nickname or '\r' in nickname:
return None
domainFull = domain
if port:
if port != 80 and port != 443:
domainFull = domain + ':' + str(port)
blogRSS2 = ''
if includeHeader:
blogRSS2 = rss2Header(httpPrefix, nickname, domainFull,
'Blog', translate)
blogsIndex = baseDir + '/accounts/' + \
nickname + '@' + domain + '/tlblogs.index'
if not os.path.isfile(blogsIndex):
if includeHeader:
return blogRSS2 + rss2Footer()
else:
return blogRSS2
timelineJson = createBlogsTimeline(session, baseDir,
nickname, domain, port,
httpPrefix,
noOfItems, False,
pageNumber)
if not timelineJson:
if includeHeader:
return blogRSS2 + rss2Footer()
else:
return blogRSS2
if pageNumber is not None:
for item in timelineJson['orderedItems']:
if item['type'] != 'Create':
continue
blogRSS2 += \
htmlBlogPostRSS2(authorized, baseDir,
httpPrefix, translate,
nickname, domain,
domainFull, item,
None, True)
if includeHeader:
return blogRSS2 + rss2Footer()
else:
return blogRSS2
def htmlBlogPageRSS3(authorized: bool, session,
baseDir: str, httpPrefix: str, translate: {},
nickname: str, domain: str, port: int,
noOfItems: int, pageNumber: int) -> str:
"""Returns an RSS version 3 feed containing posts
"""
if ' ' in nickname or '@' in nickname or \
'\n' in nickname or '\r' in nickname:
return None
domainFull = domain
if port:
if port != 80 and port != 443:
domainFull = domain + ':' + str(port)
blogRSS3 = ''
blogsIndex = baseDir + '/accounts/' + \
nickname + '@' + domain + '/tlblogs.index'
if not os.path.isfile(blogsIndex):
return blogRSS3
timelineJson = createBlogsTimeline(session, baseDir,
nickname, domain, port,
httpPrefix,
noOfItems, False,
pageNumber)
if not timelineJson:
return blogRSS3
if pageNumber is not None:
for item in timelineJson['orderedItems']:
if item['type'] != 'Create':
continue
blogRSS3 += \
htmlBlogPostRSS3(authorized, baseDir,
httpPrefix, translate,
nickname, domain,
domainFull, item,
None, True)
return blogRSS3
def getBlogIndexesForAccounts(baseDir: str) -> {}:
""" Get the index files for blogs for each account
and add them to a dict
"""
blogIndexes = {}
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if 'inbox@' in acct:
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'
if os.path.isfile(blogsIndex):
blogIndexes[acct] = blogsIndex
return blogIndexes
def noOfBlogAccounts(baseDir: str) -> int:
"""Returns the number of blog accounts
"""
ctr = 0
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if 'inbox@' in acct:
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'
if os.path.isfile(blogsIndex):
ctr += 1
return ctr
def singleBlogAccountNickname(baseDir: str) -> str:
"""Returns the nickname of a single blog account
"""
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if 'inbox@' in acct:
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'
if os.path.isfile(blogsIndex):
return acct.split('@')[0]
return None
def htmlBlogView(authorized: bool,
session, baseDir: str, httpPrefix: str,
translate: {}, domain: str, port: int,
noOfItems: int) -> str:
"""Show the blog main page
"""
blogStr = ''
cssFilename = baseDir + '/epicyon-profile.css'
if os.path.isfile(baseDir + '/epicyon.css'):
cssFilename = baseDir + '/epicyon.css'
blogStr = htmlHeaderWithExternalStyle(cssFilename)
if noOfBlogAccounts(baseDir) <= 1:
nickname = singleBlogAccountNickname(baseDir)
if nickname:
return htmlBlogPage(authorized, session,
baseDir, httpPrefix, translate,
nickname, domain, port,
noOfItems, 1)
domainFull = domain
if port:
if port != 80 and port != 443:
domainFull = domain + ':' + str(port)
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if 'inbox@' in acct:
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'
if os.path.isfile(blogsIndex):
blogStr += '
'
return blogStr + htmlFooter()
def htmlEditBlog(mediaInstance: bool, translate: {},
baseDir: str, httpPrefix: str,
path: str,
pageNumber: int,
nickname: str, domain: str,
postUrl: str) -> str:
"""Edit a blog post after it was created
"""
postFilename = locatePost(baseDir, nickname, domain, postUrl)
if not postFilename:
print('Edit blog: Filename not found for ' + postUrl)
return None
postJsonObject = loadJson(postFilename)
if not postJsonObject:
print('Edit blog: json not loaded for ' + postFilename)
return None
iconsPath = getIconsWebPath(baseDir)
editBlogText = '
' + translate['Write your post text below.'] + '
'
if os.path.isfile(baseDir + '/accounts/newpost.txt'):
with open(baseDir + '/accounts/newpost.txt', 'r') as file:
editBlogText = '
' + file.read() + '
'
cssFilename = baseDir + '/epicyon-profile.css'
if os.path.isfile(baseDir + '/epicyon.css'):
cssFilename = baseDir + '/epicyon.css'
if '?' in path:
path = path.split('?')[0]
pathBase = path
editBlogImageSection = '