__filename__ = "blog.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.3.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "ActivityPub"
import os
from datetime import datetime
from content import replace_emoji_from_tags
from webapp_utils import html_header_with_external_style
from webapp_utils import html_header_with_blog_markup
from webapp_utils import html_footer
from webapp_utils import get_post_attachments_as_html
from webapp_utils import edit_text_area
from webapp_media import add_embedded_elements
from utils import remove_eol
from utils import text_in_file
from utils import local_actor_url
from utils import get_actor_languages_list
from utils import get_base_content_from_post
from utils import get_content_from_post
from utils import is_account_dir
from utils import remove_html
from utils import get_config_param
from utils import get_full_domain
from utils import get_media_formats
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import locate_post
from utils import load_json
from utils import first_paragraph_from_string
from utils import get_actor_property_url
from utils import acct_dir
from utils import escape_text
from posts import create_blogs_timeline
from newswire import rss2header
from newswire import rss2footer
from cache import get_person_from_cache
def _no_of_blog_replies(base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_id: str, depth: int = 0) -> int:
"""Returns the number of replies on the post
This is recursive, so can handle replies to replies
"""
if depth > 4:
return 0
if not post_id:
return 0
try_post_box = ('tlblogs', 'inbox', 'outbox')
box_found = False
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#') + '.replies'
if os.path.isfile(post_filename):
box_found = True
break
if not box_found:
# post may exist but has no replies
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#')
if os.path.isfile(post_filename):
return 1
return 0
removals = []
replies = 0
lines = []
try:
with open(post_filename, 'r', encoding='utf-8') as post_file:
lines = post_file.readlines()
except OSError:
print('EX: failed to read blog ' + post_filename)
for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id)
reply_post_id = reply_post_id.replace('.json', '')
if locate_post(base_dir, nickname, domain, reply_post_id):
reply_post_id = reply_post_id.replace('.replies', '')
replies += \
1 + _no_of_blog_replies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
reply_post_id, depth+1)
else:
# remove post which no longer exists
removals.append(reply_post_id)
# remove posts from .replies file if they don't exist
if lines and removals:
print('Rewriting ' + post_filename + ' to remove ' +
str(len(removals)) + ' entries')
try:
with open(post_filename, 'w+', encoding='utf-8') as post_file:
for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id)
if reply_post_id not in removals:
post_file.write(reply_post_id + '\n')
except OSError as ex:
print('EX: unable to remove replies from post ' +
post_filename + ' ' + str(ex))
return replies
def _get_blog_replies(base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_id: str, depth: int = 0) -> str:
"""Returns a string containing html blog posts
"""
if depth > 4:
return ''
if not post_id:
return ''
try_post_box = ('tlblogs', 'inbox', 'outbox')
box_found = False
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#') + '.replies'
if os.path.isfile(post_filename):
box_found = True
break
if not box_found:
# post may exist but has no replies
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#') + '.json'
if os.path.isfile(post_filename):
post_filename = acct_dir(base_dir, nickname, domain) + \
'/postcache/' + \
post_id.replace('/', '#') + '.html'
if os.path.isfile(post_filename):
try:
with open(post_filename, 'r',
encoding='utf-8') as post_file:
return post_file.read() + '\n'
except OSError:
print('EX: unable to read blog 3 ' + post_filename)
return ''
lines = []
try:
with open(post_filename, 'r', encoding='utf-8') as post_file:
lines = post_file.readlines()
except OSError:
print('EX: unable to read blog 4 ' + post_filename)
if lines:
replies_str = ''
for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id)
reply_post_id = reply_post_id.replace('.json', '')
reply_post_id = reply_post_id.replace('.replies', '')
post_filename = acct_dir(base_dir, nickname, domain) + \
'/postcache/' + \
reply_post_id.replace('/', '#') + '.html'
if not os.path.isfile(post_filename):
continue
try:
with open(post_filename, 'r', encoding='utf-8') as post_file:
replies_str += post_file.read() + '\n'
except OSError:
print('EX: unable to read blog replies ' + post_filename)
rply = _get_blog_replies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
reply_post_id, depth+1)
if rply not in replies_str:
replies_str += rply
# indicate the reply indentation level
indent_str = '>'
indent_level = 0
while indent_level < depth:
indent_str += ' >'
indent_level += 1
replies_str = replies_str.replace(translate['SHOW MORE'], indent_str)
return replies_str.replace('?tl=outbox', '?tl=tlblogs')
return ''
def _html_blog_post_content(debug: bool, session, authorized: bool,
base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_json_object: {},
handle: str, restrict_to_domain: bool,
peertube_instances: [],
system_language: str,
person_cache: {},
blog_separator: str = '
') -> str:
"""Returns the content for a single blog post
"""
linked_author = False
actor = ''
blog_str = ''
message_link = ''
if post_json_object['object'].get('id'):
message_link = \
post_json_object['object']['id'].replace('/statuses/', '/')
title_str = ''
article_added = False
if post_json_object['object'].get('summary'):
title_str = post_json_object['object']['summary']
blog_str += '
\n'
article_added = True
# get the handle of the author
if post_json_object['object'].get('attributedTo'):
author_nickname = None
if isinstance(post_json_object['object']['attributedTo'], str):
actor = post_json_object['object']['attributedTo']
author_nickname = get_nickname_from_actor(actor)
if author_nickname:
author_domain, _ = get_domain_from_actor(actor)
if author_domain:
# author must be from the given domain
if restrict_to_domain and author_domain != domain:
return ''
handle = author_nickname + '@' + author_domain
else:
# posts from the domain are expected to have an attributedTo field
if restrict_to_domain:
return ''
if post_json_object['object'].get('published'):
if 'T' in post_json_object['object']['published']:
blog_str += '
\n'
replies = _no_of_blog_replies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
post_json_object['object']['id'])
# separator between blogs should be centered
if '
' not in blog_separator:
blog_separator = '
' + blog_separator + '
'
if replies == 0:
blog_str += blog_separator + '\n'
return blog_str
if not authorized:
blog_str += '
'
break
return blog_str + html_footer()
def html_edit_blog(media_instance: bool, translate: {},
base_dir: str, path: str, page_number: int,
nickname: str, domain: str,
post_url: str, system_language: str) -> str:
"""Edit a blog post after it was created
"""
post_filename = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
print('Edit blog: filename not found for ' + post_url)
return None
post_json_object = load_json(post_filename)
if not post_json_object:
print('Edit blog: json not loaded for ' + post_filename)
return None
edit_blog_text = \
'
' + translate['Write your post text below.'] + '
'
if os.path.isfile(base_dir + '/accounts/newpost.txt'):
try:
with open(base_dir + '/accounts/newpost.txt', 'r',
encoding='utf-8') as file:
edit_blog_text = '
' + file.read() + '
'
except OSError:
print('EX: unable to read ' + base_dir + '/accounts/newpost.txt')
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
if '?' in path:
path = path.split('?')[0]
path_base = path
edit_blog_image_section = '