__filename__ = "blog.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "ActivityPub"
import os
from content import replace_emoji_from_tags
from webapp_utils import html_header_with_external_style
from webapp_utils import html_header_with_blog_markup
from webapp_utils import html_footer
from webapp_utils import get_post_attachments_as_html
from webapp_utils import edit_text_area
from webapp_media import add_embedded_elements
from utils import remove_link_tracking
from utils import get_url_from_post
from utils import date_from_string_format
from utils import get_attributed_to
from utils import remove_eol
from utils import text_in_file
from utils import local_actor_url
from utils import get_actor_languages_list
from utils import get_base_content_from_post
from utils import get_content_from_post
from utils import is_account_dir
from utils import remove_html
from utils import get_config_param
from utils import get_full_domain
from utils import get_media_formats
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import locate_post
from utils import load_json
from utils import first_paragraph_from_string
from utils import get_actor_property_url
from utils import acct_dir
from utils import escape_text
from posts import create_blogs_timeline
from newswire import rss2header
from newswire import rss2footer
from cache import get_person_from_cache
def _no_of_blog_replies(base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_id: str, depth: int) -> int:
"""Returns the number of replies on the post
This is recursive, so can handle replies to replies
"""
if depth > 4:
return 0
if not post_id:
return 0
try_post_box = ('tlblogs', 'inbox', 'outbox')
box_found = False
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#') + '.replies'
if os.path.isfile(post_filename):
box_found = True
break
if not box_found:
# post may exist but has no replies
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#')
if os.path.isfile(post_filename):
return 1
return 0
removals = []
replies = 0
lines = []
try:
with open(post_filename, 'r', encoding='utf-8') as post_file:
lines = post_file.readlines()
except OSError:
print('EX: failed to read blog ' + post_filename)
for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id)
reply_post_id = reply_post_id.replace('.json', '')
if locate_post(base_dir, nickname, domain, reply_post_id):
reply_post_id = reply_post_id.replace('.replies', '')
replies += \
1 + _no_of_blog_replies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
reply_post_id, depth+1)
else:
# remove post which no longer exists
removals.append(reply_post_id)
# remove posts from .replies file if they don't exist
if lines and removals:
print('Rewriting ' + post_filename + ' to remove ' +
str(len(removals)) + ' entries')
try:
with open(post_filename, 'w+', encoding='utf-8') as post_file:
for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id)
if reply_post_id not in removals:
post_file.write(reply_post_id + '\n')
except OSError as ex:
print('EX: unable to remove replies from post ' +
post_filename + ' ' + str(ex))
return replies
def _get_blog_replies(base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_id: str, depth: int) -> str:
"""Returns a string containing html blog posts
"""
if depth > 4:
return ''
if not post_id:
return ''
try_post_box = ('tlblogs', 'inbox', 'outbox')
box_found = False
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#') + '.replies'
if os.path.isfile(post_filename):
box_found = True
break
if not box_found:
# post may exist but has no replies
for post_box in try_post_box:
post_filename = \
acct_dir(base_dir, nickname, domain) + '/' + post_box + '/' + \
post_id.replace('/', '#') + '.json'
if os.path.isfile(post_filename):
post_filename = acct_dir(base_dir, nickname, domain) + \
'/postcache/' + \
post_id.replace('/', '#') + '.html'
if os.path.isfile(post_filename):
try:
with open(post_filename, 'r',
encoding='utf-8') as post_file:
return post_file.read() + '\n'
except OSError:
print('EX: unable to read blog 3 ' + post_filename)
return ''
lines = []
try:
with open(post_filename, 'r', encoding='utf-8') as post_file:
lines = post_file.readlines()
except OSError:
print('EX: unable to read blog 4 ' + post_filename)
if lines:
replies_str = ''
for reply_post_id in lines:
reply_post_id = remove_eol(reply_post_id)
reply_post_id = reply_post_id.replace('.json', '')
reply_post_id = reply_post_id.replace('.replies', '')
post_filename = acct_dir(base_dir, nickname, domain) + \
'/postcache/' + \
reply_post_id.replace('/', '#') + '.html'
if not os.path.isfile(post_filename):
continue
try:
with open(post_filename, 'r', encoding='utf-8') as post_file:
replies_str += post_file.read() + '\n'
except OSError:
print('EX: unable to read blog replies ' + post_filename)
rply = _get_blog_replies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
reply_post_id, depth+1)
if rply not in replies_str:
replies_str += rply
# indicate the reply indentation level
indent_str = '>'
indent_level = 0
while indent_level < depth:
indent_str += ' >'
indent_level += 1
replies_str = replies_str.replace(translate['SHOW MORE'], indent_str)
return replies_str.replace('?tl=outbox', '?tl=tlblogs')
return ''
def _html_blog_post_content(debug: bool, session, authorized: bool,
base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_json_object: {},
handle: str, restrict_to_domain: bool,
peertube_instances: [],
system_language: str,
person_cache: {},
blog_separator: str) -> str:
"""Returns the content for a single blog post
"""
linked_author = False
actor = ''
blog_str = ''
message_link = ''
if post_json_object['object'].get('id'):
message_link = \
post_json_object['object']['id'].replace('/statuses/', '/')
title_str = ''
article_added = False
if post_json_object['object'].get('summary'):
title_str = post_json_object['object']['summary']
blog_str += '\n'
article_added = True
# get the handle of the author
if post_json_object['object'].get('attributedTo'):
author_nickname = None
actor_str = \
get_attributed_to(post_json_object['object']['attributedTo'])
if actor_str:
actor = actor_str
author_nickname = get_nickname_from_actor(actor)
if author_nickname:
author_domain, _ = get_domain_from_actor(actor)
if author_domain:
# author must be from the given domain
if restrict_to_domain and author_domain != domain:
return ''
handle = author_nickname + '@' + author_domain
else:
# posts from the domain are expected to have an attributedTo field
if restrict_to_domain:
return ''
if post_json_object['object'].get('published'):
if 'T' in post_json_object['object']['published']:
blog_str += '' + \
post_json_object['object']['published'].split('T')[0]
if handle:
if handle.startswith(nickname + '@' + domain):
blog_str += ' ' + handle + ' '
linked_author = True
else:
if actor:
blog_str += ' ' + \
handle + ' '
linked_author = True
else:
blog_str += ' ' + handle
blog_str += ' \n'
avatar_link = ''
reply_str = ''
announce_str = ''
like_str = ''
bookmark_str = ''
delete_str = ''
mute_str = ''
is_muted = False
person_url = local_actor_url(http_prefix, nickname, domain_full)
actor_json = \
get_person_from_cache(base_dir, person_url, person_cache)
languages_understood = []
if actor_json:
languages_understood = get_actor_languages_list(actor_json)
json_content = get_content_from_post(post_json_object, system_language,
languages_understood, "content")
minimize_all_images = False
attachment_str, _ = \
get_post_attachments_as_html(base_dir, nickname, domain,
domain_full, post_json_object,
'tlblogs', translate,
is_muted, avatar_link,
reply_str, announce_str,
like_str, bookmark_str,
delete_str, mute_str,
json_content,
minimize_all_images,
system_language)
if attachment_str:
blog_str += '' + attachment_str + ' '
if json_content:
content_str = add_embedded_elements(translate, json_content,
peertube_instances, domain_full)
if post_json_object['object'].get('tag'):
post_json_object_tags = post_json_object['object']['tag']
content_str = replace_emoji_from_tags(session, base_dir,
content_str,
post_json_object_tags,
'content', debug, True)
if article_added:
blog_str += ' ' + content_str + ' \n'
else:
blog_str += '' + content_str + ' \n'
citations_str = ''
if post_json_object['object'].get('tag'):
for tag_json in post_json_object['object']['tag']:
if not isinstance(tag_json, dict):
continue
if not tag_json.get('type'):
continue
if tag_json['type'] != 'Article':
continue
if not tag_json.get('name'):
continue
if not tag_json.get('url'):
continue
url_str = get_url_from_post(tag_json['url'])
citation_url = remove_html(url_str)
citation_name = remove_html(tag_json['name'])
citations_str += \
'
' + \
'' + citation_name + ' \n'
if citations_str:
citations_str = '' + translate['Citations'] + \
':
' + \
'\n' + citations_str + ' \n'
blog_str += ' \n' + citations_str
if not linked_author:
blog_str += '' + translate['About the author'] + \
'
\n'
replies = _no_of_blog_replies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
post_json_object['object']['id'], 0)
# separator between blogs should be centered
if '' not in blog_separator:
blog_separator = '' + blog_separator + ' '
if replies == 0:
blog_str += blog_separator + '\n'
return blog_str
if not authorized:
blog_str += '' + \
translate['Replies'].lower() + ': ' + str(replies) + '
'
blog_str += ' ' + blog_separator + '\n'
else:
blog_str += blog_separator + '' + translate['Replies'] + ' \n'
if not title_str:
blog_str += \
_get_blog_replies(base_dir, http_prefix, translate,
nickname, domain, domain_full,
post_json_object['object']['id'], 0)
else:
obj_id = post_json_object['object']['id']
blog_replies_str = \
_get_blog_replies(base_dir, http_prefix,
translate, nickname,
domain, domain_full, obj_id, 0)
blog_str += blog_replies_str.replace('>' + title_str + '<', '')
return blog_str
def _html_blog_post_rss2(domain: str, post_json_object: {},
restrict_to_domain: bool,
system_language: str) -> str:
"""Returns the RSS version 2 feed for a single blog post
"""
rss_str = ''
message_link = ''
if post_json_object['object'].get('id'):
message_link = \
post_json_object['object']['id'].replace('/statuses/', '/')
if not restrict_to_domain or \
(restrict_to_domain and '/' + domain in message_link):
if post_json_object['object'].get('summary') and \
post_json_object['object'].get('published'):
published = post_json_object['object']['published']
pub_date = date_from_string_format(published,
["%Y-%m-%dT%H:%M:%S%z"])
title_str = escape_text(post_json_object['object']['summary'])
rss_date_str = pub_date.strftime("%a, %d %b %Y %H:%M:%S UT")
content = \
get_base_content_from_post(post_json_object,
system_language)
description = first_paragraph_from_string(content)
description = escape_text(description)
rss_str = ' - '
rss_str += '
' + title_str + ' '
rss_str += ' ' + message_link + ''
rss_str += \
' ' + description + ' '
rss_str += ' ' + rss_date_str + ' '
rss_str += ' '
return rss_str
def _html_blog_post_rss3(domain: str, post_json_object: {},
restrict_to_domain: bool,
system_language: str) -> str:
"""Returns the RSS version 3 feed for a single blog post
"""
rss_str = ''
message_link = ''
if post_json_object['object'].get('id'):
message_link = \
post_json_object['object']['id'].replace('/statuses/', '/')
if not restrict_to_domain or \
(restrict_to_domain and '/' + domain in message_link):
if post_json_object['object'].get('summary') and \
post_json_object['object'].get('published'):
published = post_json_object['object']['published']
pub_date = date_from_string_format(published,
["%Y-%m-%dT%H:%M:%S%z"])
title_str = post_json_object['object']['summary']
rss_date_str = pub_date.strftime("%a, %d %b %Y %H:%M:%S UT")
content = \
get_base_content_from_post(post_json_object,
system_language)
description = first_paragraph_from_string(content)
rss_str = 'title: ' + title_str + '\n'
rss_str += 'link: ' + message_link + '\n'
rss_str += 'description: ' + description + '\n'
rss_str += 'created: ' + rss_date_str + '\n\n'
return rss_str
def _html_blog_remove_cw_button(blog_str: str, translate: {}) -> str:
"""Removes the CW button from blog posts, where the
summary field is instead used as the blog title
"""
blog_str = blog_str.replace('', '')
blog_str = blog_str.replace(' ', '')
blog_str = blog_str.replace('', '')
blog_str = blog_str.replace(' ', '')
blog_str = blog_str.replace(translate['SHOW MORE'], '')
return blog_str
def _get_snippet_from_blog_content(post_json_object: {},
system_language: str) -> str:
"""Returns a snippet of text from the blog post as a preview
"""
content = get_base_content_from_post(post_json_object, system_language)
if '' in content:
content = content.split('
', 1)[1]
if '
' in content:
content = content.split('', 1)[0]
content = remove_html(content)
if '\n' in content:
content = content.split('\n')[0]
if len(content) >= 256:
content = content[:252] + '...'
return content
def html_blog_post(session, authorized: bool,
base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, domain_full: str,
post_json_object: {},
peertube_instances: [],
system_language: str, person_cache: {},
debug: bool, content_license_url: str) -> str:
"""Returns a html blog post
"""
blog_str = ''
css_filename = base_dir + '/epicyon-blog.css'
if os.path.isfile(base_dir + '/blog.css'):
css_filename = base_dir + '/blog.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
published = post_json_object['object']['published']
modified = published
if post_json_object['object'].get('updated'):
modified = post_json_object['object']['updated']
title = post_json_object['object']['summary']
url = ''
if post_json_object['object'].get('url'):
url_str = get_url_from_post(post_json_object['object']['url'])
url = remove_html(url_str)
snippet = _get_snippet_from_blog_content(post_json_object,
system_language)
blog_str = html_header_with_blog_markup(css_filename, instance_title,
http_prefix, domain_full, nickname,
system_language, published,
modified, title, snippet, url,
content_license_url)
_html_blog_remove_cw_button(blog_str, translate)
blog_str += _html_blog_post_content(debug, session, authorized, base_dir,
http_prefix, translate,
nickname, domain,
domain_full, post_json_object,
None, False,
peertube_instances, system_language,
person_cache, ' ')
# show rss links
blog_str += ''
return blog_str + html_footer()
def html_blog_page(authorized: bool, session,
base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, port: int,
no_of_items: int, page_number: int,
peertube_instances: [], system_language: str,
person_cache: {}, debug: bool) -> str:
"""Returns a html blog page containing posts
"""
if ' ' in nickname or '@' in nickname or \
'\n' in nickname or '\r' in nickname:
return None
blog_str = ''
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
blog_str = \
html_header_with_external_style(css_filename, instance_title, None)
_html_blog_remove_cw_button(blog_str, translate)
blogs_index = acct_dir(base_dir, nickname, domain) + '/tlblogs.index'
if not os.path.isfile(blogs_index):
return blog_str + html_footer()
timeline_json = \
create_blogs_timeline(base_dir,
nickname, domain, port, http_prefix,
no_of_items, False, page_number)
if not timeline_json:
return blog_str + html_footer()
domain_full = get_full_domain(domain, port)
# show previous and next buttons
if page_number is not None:
navigate_str = ''
if page_number > 1:
# show previous button
navigate_str += '' + \
' \n'
if len(timeline_json['orderedItems']) >= no_of_items:
# show next button
navigate_str += '' + \
' \n'
navigate_str += '
'
blog_str += navigate_str
for item in timeline_json['orderedItems']:
if item['type'] != 'Create':
continue
blog_str += \
_html_blog_post_content(debug, session, authorized,
base_dir, http_prefix, translate,
nickname, domain, domain_full, item,
None, True, peertube_instances,
system_language, person_cache,
' ')
if len(timeline_json['orderedItems']) >= no_of_items:
blog_str += navigate_str
# show rss link
blog_str += ''
return blog_str + html_footer()
def html_blog_page_rss2(base_dir: str, http_prefix: str, translate: {},
nickname: str, domain: str, port: int,
no_of_items: int, page_number: int,
include_header: bool, system_language: str) -> str:
"""Returns an RSS version 2 feed containing posts
"""
if ' ' in nickname or '@' in nickname or \
'\n' in nickname or '\r' in nickname:
return None
domain_full = get_full_domain(domain, port)
blog_rss2 = ''
if include_header:
blog_rss2 = rss2header(http_prefix, nickname, domain_full,
'Blog', translate)
blogs_index = acct_dir(base_dir, nickname, domain) + '/tlblogs.index'
if not os.path.isfile(blogs_index):
if include_header:
return blog_rss2 + rss2footer()
return blog_rss2
timeline_json = create_blogs_timeline(base_dir,
nickname, domain, port,
http_prefix,
no_of_items, False,
page_number)
if not timeline_json:
if include_header:
return blog_rss2 + rss2footer()
return blog_rss2
if page_number is not None:
for item in timeline_json['orderedItems']:
if item['type'] != 'Create':
continue
blog_rss2 += \
_html_blog_post_rss2(domain, item, True, system_language)
if include_header:
return blog_rss2 + rss2footer()
return blog_rss2
def html_blog_page_rss3(base_dir: str, http_prefix: str,
nickname: str, domain: str, port: int,
no_of_items: int, page_number: int,
system_language: str) -> str:
"""Returns an RSS version 3 feed containing posts
"""
if ' ' in nickname or '@' in nickname or \
'\n' in nickname or '\r' in nickname:
return None
blog_rss3 = ''
blogs_index = acct_dir(base_dir, nickname, domain) + '/tlblogs.index'
if not os.path.isfile(blogs_index):
return blog_rss3
timeline_json = \
create_blogs_timeline(base_dir,
nickname, domain, port, http_prefix,
no_of_items, False, page_number)
if not timeline_json:
return blog_rss3
if page_number is not None:
for item in timeline_json['orderedItems']:
if item['type'] != 'Create':
continue
blog_rss3 += \
_html_blog_post_rss3(domain, item, True, system_language)
return blog_rss3
def _no_of_blog_accounts(base_dir: str) -> int:
"""Returns the number of blog accounts
"""
ctr = 0
for _, dirs, _ in os.walk(base_dir + '/accounts'):
for acct in dirs:
if not is_account_dir(acct):
continue
account_dir = os.path.join(base_dir + '/accounts', acct)
blogs_index = account_dir + '/tlblogs.index'
if os.path.isfile(blogs_index):
ctr += 1
break
return ctr
def _single_blog_account_nickname(base_dir: str) -> str:
"""Returns the nickname of a single blog account
"""
for _, dirs, _ in os.walk(base_dir + '/accounts'):
for acct in dirs:
if not is_account_dir(acct):
continue
account_dir = os.path.join(base_dir + '/accounts', acct)
blogs_index = account_dir + '/tlblogs.index'
if os.path.isfile(blogs_index):
return acct.split('@')[0]
break
return None
def html_blog_view(authorized: bool,
session, base_dir: str, http_prefix: str,
translate: {}, domain: str, port: int,
no_of_items: int,
peertube_instances: [], system_language: str,
person_cache: {}, debug: bool) -> str:
"""Show the blog main page
"""
blog_str = ''
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
blog_str = \
html_header_with_external_style(css_filename, instance_title, None)
if _no_of_blog_accounts(base_dir) <= 1:
nickname = _single_blog_account_nickname(base_dir)
if nickname:
return html_blog_page(authorized, session,
base_dir, http_prefix, translate,
nickname, domain, port,
no_of_items, 1, peertube_instances,
system_language, person_cache, debug)
domain_full = get_full_domain(domain, port)
for _, dirs, _ in os.walk(base_dir + '/accounts'):
for acct in dirs:
if not is_account_dir(acct):
continue
account_dir = os.path.join(base_dir + '/accounts', acct)
blogs_index = account_dir + '/tlblogs.index'
if os.path.isfile(blogs_index):
blog_str += ''
blog_str += '' + acct + ' '
blog_str += '
'
break
return blog_str + html_footer()
def html_edit_blog(media_instance: bool, translate: {},
base_dir: str, path: str, page_number: int,
nickname: str, domain: str,
post_url: str, system_language: str) -> str:
"""Edit a blog post after it was created
"""
post_filename = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
print('Edit blog: filename not found for ' + post_url)
return None
post_json_object = load_json(post_filename)
if not post_json_object:
print('Edit blog: json not loaded for ' + post_filename)
return None
edit_blog_text = \
'' + translate['Write your post text below.'] + ' '
if os.path.isfile(base_dir + '/accounts/newpost.txt'):
try:
with open(base_dir + '/accounts/newpost.txt', 'r',
encoding='utf-8') as file:
edit_blog_text = '' + file.read() + '
'
except OSError:
print('EX: unable to read ' + base_dir + '/accounts/newpost.txt')
css_filename = base_dir + '/epicyon-profile.css'
if os.path.isfile(base_dir + '/epicyon.css'):
css_filename = base_dir + '/epicyon.css'
if '?' in path:
path = path.split('?')[0]
path_base = path
edit_blog_image_section = ' '
edit_blog_image_section += ' ' + \
translate['Image description'] + ' '
edit_blog_image_section += \
' '
edit_blog_image_section += \
' '
edit_blog_image_section += '
'
placeholder_message = translate['Write something'] + '...'
endpoint = 'editblogpost'
placeholder_subject = translate['Title']
scope_icon = 'scope_blog.png'
scope_description = translate['Blog']
date_and_location = ''
date_and_location = ''
date_and_location += ''
date_and_location += \
'' + translate['Location'] + ': '
date_and_location += ' '
date_and_location += '
'
instance_title = \
get_config_param(base_dir, 'instanceTitle')
edit_blog_form = \
html_header_with_external_style(css_filename, instance_title, None)
edit_blog_form += \
''
edit_blog_form += html_footer()
return edit_blog_form
def path_contains_blog_link(base_dir: str,
http_prefix: str, domain: str,
domain_full: str, path: str) -> (str, str):
"""If the path contains a blog entry then return its filename
"""
if '/users/' not in path:
return None, None
user_ending = path.split('/users/', 1)[1]
if '/' not in user_ending:
return None, None
user_ending2 = user_ending.split('/')
nickname = user_ending2[0]
if len(user_ending2) != 2:
return None, None
if len(user_ending2[1]) < 14:
return None, None
user_ending2[1] = user_ending2[1].strip()
if not user_ending2[1].isdigit():
return None, None
# check for blog posts
blog_index_filename = \
acct_dir(base_dir, nickname, domain) + '/tlblogs.index'
if not os.path.isfile(blog_index_filename):
return None, None
if not text_in_file('#' + user_ending2[1] + '.', blog_index_filename):
return None, None
message_id = local_actor_url(http_prefix, nickname, domain_full) + \
'/statuses/' + user_ending2[1]
return locate_post(base_dir, nickname, domain, message_id), nickname
def get_blog_address(actor_json: {}) -> str:
"""Returns blog address for the given actor
"""
result = get_actor_property_url(actor_json, 'Blog')
if not result:
result = get_actor_property_url(actor_json, 'My Blog')
url = remove_html(result)
return remove_link_tracking(url)
def account_has_blog(base_dir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account has a blog
"""
blogs_dir = acct_dir(base_dir, nickname, domain) + '/tlblogs'
if os.path.isdir(blogs_dir):
for path in os.listdir(blogs_dir):
if path.endswith('.json'):
return True
return False