__filename__ = "blog.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "ActivityPub"
import os
from datetime import datetime
from content import replaceEmojiFromTags
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlHeaderWithBlogMarkup
from webapp_utils import htmlFooter
from webapp_utils import getPostAttachmentsAsHtml
from webapp_utils import editTextArea
from webapp_media import addEmbeddedElements
from utils import local_actor_url
from utils import get_actor_languages_list
from utils import get_base_content_from_post
from utils import get_content_from_post
from utils import is_account_dir
from utils import removeHtml
from utils import get_config_param
from utils import get_full_domain
from utils import getMediaFormats
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import locatePost
from utils import load_json
from utils import firstParagraphFromString
from utils import get_actor_property_url
from utils import acct_dir
from posts import createBlogsTimeline
from newswire import rss2Header
from newswire import rss2Footer
from cache import getPersonFromCache
def _noOfBlogReplies(base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
postId: str, depth=0) -> int:
"""Returns the number of replies on the post
This is recursive, so can handle replies to replies
"""
if depth > 4:
return 0
if not postId:
return 0
tryPostBox = ('tlblogs', 'inbox', 'outbox')
boxFound = False
for postBox in tryPostBox:
postFilename = \
acct_dir(base_dir, nickname, domain) + '/' + postBox + '/' + \
postId.replace('/', '#') + '.replies'
if os.path.isfile(postFilename):
boxFound = True
break
if not boxFound:
# post may exist but has no replies
for postBox in tryPostBox:
postFilename = \
acct_dir(base_dir, nickname, domain) + '/' + postBox + '/' + \
postId.replace('/', '#')
if os.path.isfile(postFilename):
return 1
return 0
removals = []
replies = 0
lines = []
try:
with open(postFilename, 'r') as f:
lines = f.readlines()
except OSError:
print('EX: failed to read blog ' + postFilename)
for replyPostId in lines:
replyPostId = replyPostId.replace('\n', '').replace('\r', '')
replyPostId = replyPostId.replace('.json', '')
if locatePost(base_dir, nickname, domain, replyPostId):
replyPostId = replyPostId.replace('.replies', '')
replies += \
1 + _noOfBlogReplies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
replyPostId, depth+1)
else:
# remove post which no longer exists
removals.append(replyPostId)
# remove posts from .replies file if they don't exist
if lines and removals:
print('Rewriting ' + postFilename + ' to remove ' +
str(len(removals)) + ' entries')
try:
with open(postFilename, 'w+') as f:
for replyPostId in lines:
replyPostId = \
replyPostId.replace('\n', '').replace('\r', '')
if replyPostId not in removals:
f.write(replyPostId + '\n')
except OSError as ex:
print('EX: unable to remove replies from post ' +
postFilename + ' ' + str(ex))
return replies
def _getBlogReplies(base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
postId: str, depth=0) -> str:
"""Returns a string containing html blog posts
"""
if depth > 4:
return ''
if not postId:
return ''
tryPostBox = ('tlblogs', 'inbox', 'outbox')
boxFound = False
for postBox in tryPostBox:
postFilename = \
acct_dir(base_dir, nickname, domain) + '/' + postBox + '/' + \
postId.replace('/', '#') + '.replies'
if os.path.isfile(postFilename):
boxFound = True
break
if not boxFound:
# post may exist but has no replies
for postBox in tryPostBox:
postFilename = \
acct_dir(base_dir, nickname, domain) + '/' + postBox + '/' + \
postId.replace('/', '#') + '.json'
if os.path.isfile(postFilename):
postFilename = acct_dir(base_dir, nickname, domain) + \
'/postcache/' + \
postId.replace('/', '#') + '.html'
if os.path.isfile(postFilename):
try:
with open(postFilename, 'r') as postFile:
return postFile.read() + '\n'
except OSError:
print('EX: unable to read blog 3 ' + postFilename)
return ''
lines = []
try:
with open(postFilename, 'r') as f:
lines = f.readlines()
except OSError:
print('EX: unable to read blog 4 ' + postFilename)
if lines:
repliesStr = ''
for replyPostId in lines:
replyPostId = replyPostId.replace('\n', '').replace('\r', '')
replyPostId = replyPostId.replace('.json', '')
replyPostId = replyPostId.replace('.replies', '')
postFilename = acct_dir(base_dir, nickname, domain) + \
'/postcache/' + \
replyPostId.replace('/', '#') + '.html'
if not os.path.isfile(postFilename):
continue
try:
with open(postFilename, 'r') as postFile:
repliesStr += postFile.read() + '\n'
except OSError:
print('EX: unable to read blog replies ' + postFilename)
rply = _getBlogReplies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
replyPostId, depth+1)
if rply not in repliesStr:
repliesStr += rply
# indicate the reply indentation level
indentStr = '>'
for indentLevel in range(depth):
indentStr += ' >'
repliesStr = repliesStr.replace(translate['SHOW MORE'], indentStr)
return repliesStr.replace('?tl=outbox', '?tl=tlblogs')
return ''
def _htmlBlogPostContent(debug: bool, session, authorized: bool,
base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_json_object: {},
handle: str, restrictToDomain: bool,
peertube_instances: [],
system_language: str,
person_cache: {},
blogSeparator: str = '
') -> str:
"""Returns the content for a single blog post
"""
linkedAuthor = False
actor = ''
blogStr = ''
messageLink = ''
if post_json_object['object'].get('id'):
messageLink = \
post_json_object['object']['id'].replace('/statuses/', '/')
titleStr = ''
articleAdded = False
if post_json_object['object'].get('summary'):
titleStr = post_json_object['object']['summary']
blogStr += '
\n'
articleAdded = True
# get the handle of the author
if post_json_object['object'].get('attributedTo'):
authorNickname = None
if isinstance(post_json_object['object']['attributedTo'], str):
actor = post_json_object['object']['attributedTo']
authorNickname = getNicknameFromActor(actor)
if authorNickname:
authorDomain, authorPort = getDomainFromActor(actor)
if authorDomain:
# author must be from the given domain
if restrictToDomain and authorDomain != domain:
return ''
handle = authorNickname + '@' + authorDomain
else:
# posts from the domain are expected to have an attributedTo field
if restrictToDomain:
return ''
if post_json_object['object'].get('published'):
if 'T' in post_json_object['object']['published']:
blogStr += '
\n'
replies = _noOfBlogReplies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
post_json_object['object']['id'])
# separator between blogs should be centered
if '
' not in blogSeparator:
blogSeparator = '
' + blogSeparator + '
'
if replies == 0:
blogStr += blogSeparator + '\n'
return blogStr
if not authorized:
blogStr += '
'
break
return blogStr + htmlFooter()
def htmlEditBlog(media_instance: bool, translate: {},
base_dir: str, http_prefix: str,
path: str,
pageNumber: int,
nickname: str, domain: str,
postUrl: str, system_language: str) -> str:
"""Edit a blog post after it was created
"""
postFilename = locatePost(base_dir, nickname, domain, postUrl)
if not postFilename:
print('Edit blog: Filename not found for ' + postUrl)
return None
post_json_object = load_json(postFilename)
if not post_json_object:
print('Edit blog: json not loaded for ' + postFilename)
return None
editBlogText = '
' + translate['Write your post text below.'] + '
'
if os.path.isfile(base_dir + '/accounts/newpost.txt'):
try:
with open(base_dir + '/accounts/newpost.txt', 'r') as file:
editBlogText = '
' + file.read() + '
'
except OSError:
print('EX: unable to read ' + base_dir + '/accounts/newpost.txt')
cssFilename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
cssFilename = base_dir + '/epicyon.css'
if '?' in path:
path = path.split('?')[0]
pathBase = path
editBlogImageSection = '