Break up inbox into separate modules

main
Bob Mottram 2024-08-31 23:05:31 +01:00
parent 87bccf1c70
commit 4c2daf2705
5 changed files with 3190 additions and 3101 deletions

3378
inbox.py

File diff suppressed because it is too large Load Diff

2051
inbox_receive.py 100644

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,609 @@
__filename__ = "inbox_receive_undo.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Timeline"
import os
from utils import undo_announce_collection_entry
from utils import has_object_dict
from utils import remove_domain_port
from utils import remove_id_ending
from utils import get_url_from_post
from utils import undo_reaction_collection_entry
from utils import remove_html
from utils import get_account_timezone
from utils import is_dm
from utils import get_cached_post_filename
from utils import load_json
from utils import undo_likes_collection_entry
from utils import locate_post
from utils import acct_handle_dir
from utils import has_object_string_object
from utils import has_object_string_type
from utils import has_actor
from utils import has_group_type
from utils import get_full_domain
from utils import get_actor_from_post
from utils import has_users_path
from utils import get_domain_from_actor
from utils import get_nickname_from_actor
from follow import unfollower_of_account
from follow import follower_approval_active
from bookmarks import undo_bookmarks_collection_entry
from webapp_post import individual_post_as_html
def _receive_undo_follow(base_dir: str, message_json: {},
debug: bool, domain: str,
onion_domain: str, i2p_domain: str) -> bool:
"""
Receives an undo follow
{
"type": "Undo",
"actor": "https://some.instance/@someone",
"object": {
"type": "Follow",
"actor": "https://some.instance/@someone",
"object": "https://social.example/@somenickname"
}
}
"""
if not message_json['object'].get('object'):
return False
if not message_json['object'].get('actor'):
if debug:
print('DEBUG: undo follow request has no actor within object')
return False
actor = get_actor_from_post(message_json['object'])
if not has_users_path(actor):
if debug:
print('DEBUG: undo follow request "users" or "profile" missing ' +
'from actor within object')
return False
if actor != get_actor_from_post(message_json):
if debug:
print('DEBUG: undo follow request actors do not match')
return False
nickname_follower = \
get_nickname_from_actor(actor)
if not nickname_follower:
print('WARN: undo follow request unable to find nickname in ' +
actor)
return False
domain_follower, port_follower = \
get_domain_from_actor(actor)
if not domain_follower:
print('WARN: undo follow request unable to find domain in ' +
actor)
return False
domain_follower_full = get_full_domain(domain_follower, port_follower)
following_actor = None
if isinstance(message_json['object']['object'], str):
following_actor = message_json['object']['object']
elif isinstance(message_json['object']['object'], dict):
if message_json['object']['object'].get('id'):
if isinstance(message_json['object']['object']['id'], str):
following_actor = message_json['object']['object']['id']
if not following_actor:
print('WARN: undo follow without following actor')
return False
nickname_following = \
get_nickname_from_actor(following_actor)
if not nickname_following:
print('WARN: undo follow request unable to find nickname in ' +
following_actor)
return False
domain_following, port_following = \
get_domain_from_actor(following_actor)
if not domain_following:
print('WARN: undo follow request unable to find domain in ' +
following_actor)
return False
if onion_domain:
if domain_following.endswith(onion_domain):
domain_following = domain
if i2p_domain:
if domain_following.endswith(i2p_domain):
domain_following = domain
domain_following_full = get_full_domain(domain_following, port_following)
group_account = has_group_type(base_dir, actor, None)
if unfollower_of_account(base_dir,
nickname_following, domain_following_full,
nickname_follower, domain_follower_full,
debug, group_account):
print(nickname_following + '@' + domain_following_full + ': '
'Follower ' + nickname_follower + '@' + domain_follower_full +
' was removed')
return True
if debug:
print('DEBUG: Follower ' +
nickname_follower + '@' + domain_follower_full +
' was not removed')
return False
def receive_undo(base_dir: str, message_json: {}, debug: bool,
domain: str, onion_domain: str, i2p_domain: str) -> bool:
"""Receives an undo request within the POST section of HTTPServer
"""
if not message_json['type'].startswith('Undo'):
return False
if debug:
print('DEBUG: Undo activity received')
if not has_actor(message_json, debug):
return False
actor_url = get_actor_from_post(message_json)
if not has_users_path(actor_url):
if debug:
print('DEBUG: "users" or "profile" missing from actor')
return False
if not has_object_string_type(message_json, debug):
return False
if message_json['object']['type'] == 'Follow' or \
message_json['object']['type'] == 'Join':
_receive_undo_follow(base_dir, message_json,
debug, domain, onion_domain, i2p_domain)
return True
return False
def receive_undo_like(recent_posts_cache: {},
session, handle: str, base_dir: str,
http_prefix: str, domain: str, port: int,
cached_webfingers: {},
person_cache: {}, message_json: {},
debug: bool,
signing_priv_key_pem: str,
max_recent_posts: int, translate: {},
allow_deletion: bool,
yt_replace_domain: str,
twitter_replacement_domain: str,
peertube_instances: [],
allow_local_network_access: bool,
theme_name: str, system_language: str,
max_like_count: int, cw_lists: {},
lists_enabled: str,
bold_reading: bool, dogwhistles: {},
min_images_for_accounts: [],
buy_sites: {},
auto_cw_cache: {}) -> bool:
"""Receives an undo like activity within the POST section of HTTPServer
"""
if message_json['type'] != 'Undo':
return False
if not has_actor(message_json, debug):
return False
if not has_object_string_type(message_json, debug):
return False
if message_json['object']['type'] != 'Like':
return False
if not has_object_string_object(message_json, debug):
return False
actor_url = get_actor_from_post(message_json)
if not has_users_path(actor_url):
if debug:
print('DEBUG: "users" or "profile" missing from actor in ' +
message_json['type'] + ' like')
return False
if '/statuses/' not in message_json['object']['object']:
if debug:
print('DEBUG: "statuses" missing from like object in ' +
message_json['type'])
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
print('DEBUG: unknown recipient of undo like - ' + handle)
# if this post in the outbox of the person?
handle_name = handle.split('@')[0]
handle_dom = handle.split('@')[1]
post_filename = \
locate_post(base_dir, handle_name, handle_dom,
message_json['object']['object'])
if not post_filename:
if debug:
print('DEBUG: unliked post not found in inbox or outbox')
print(message_json['object']['object'])
return True
if debug:
print('DEBUG: liked post found in inbox. Now undoing.')
like_actor = get_actor_from_post(message_json)
undo_likes_collection_entry(recent_posts_cache, base_dir, post_filename,
like_actor, domain, debug, None)
# regenerate the html
liked_post_json = load_json(post_filename)
if liked_post_json:
if liked_post_json.get('type'):
if liked_post_json['type'] == 'Announce' and \
liked_post_json.get('object'):
if isinstance(liked_post_json['object'], str):
announce_like_url = liked_post_json['object']
announce_liked_filename = \
locate_post(base_dir, handle_name,
domain, announce_like_url)
if announce_liked_filename:
post_filename = announce_liked_filename
undo_likes_collection_entry(recent_posts_cache,
base_dir,
post_filename,
like_actor, domain, debug,
None)
if liked_post_json:
if debug:
cached_post_filename = \
get_cached_post_filename(base_dir, handle_name, domain,
liked_post_json)
print('Unliked post json: ' + str(liked_post_json))
print('Unliked post nickname: ' + handle_name + ' ' + domain)
print('Unliked post cache: ' + str(cached_post_filename))
page_number = 1
show_published_date_only = False
show_individual_post_icons = True
manually_approve_followers = \
follower_approval_active(base_dir, handle_name, domain)
not_dm = not is_dm(liked_post_json)
timezone = get_account_timezone(base_dir, handle_name, domain)
mitm = False
if os.path.isfile(post_filename.replace('.json', '') + '.mitm'):
mitm = True
minimize_all_images = False
if handle_name in min_images_for_accounts:
minimize_all_images = True
individual_post_as_html(signing_priv_key_pem, False,
recent_posts_cache, max_recent_posts,
translate, page_number, base_dir,
session, cached_webfingers, person_cache,
handle_name, domain, port, liked_post_json,
None, True, allow_deletion,
http_prefix, __version__,
'inbox',
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
allow_local_network_access,
theme_name, system_language,
max_like_count, not_dm,
show_individual_post_icons,
manually_approve_followers,
False, True, False, cw_lists,
lists_enabled, timezone, mitm,
bold_reading, dogwhistles,
minimize_all_images, None,
buy_sites, auto_cw_cache)
return True
def receive_undo_reaction(recent_posts_cache: {},
session, handle: str, base_dir: str,
http_prefix: str, domain: str, port: int,
cached_webfingers: {},
person_cache: {}, message_json: {},
debug: bool,
signing_priv_key_pem: str,
max_recent_posts: int, translate: {},
allow_deletion: bool,
yt_replace_domain: str,
twitter_replacement_domain: str,
peertube_instances: [],
allow_local_network_access: bool,
theme_name: str, system_language: str,
max_like_count: int, cw_lists: {},
lists_enabled: str,
bold_reading: bool, dogwhistles: {},
min_images_for_accounts: [],
buy_sites: {},
auto_cw_cache: {}) -> bool:
"""Receives an undo emoji reaction within the POST section of HTTPServer
"""
if message_json['type'] != 'Undo':
return False
if not has_actor(message_json, debug):
return False
if not has_object_string_type(message_json, debug):
return False
if message_json['object']['type'] != 'EmojiReact':
return False
if not has_object_string_object(message_json, debug):
return False
if 'content' not in message_json['object']:
if debug:
print('DEBUG: ' + message_json['type'] + ' has no "content"')
return False
if not isinstance(message_json['object']['content'], str):
if debug:
print('DEBUG: ' + message_json['type'] + ' content is not string')
return False
actor_url = get_actor_from_post(message_json)
if not has_users_path(actor_url):
if debug:
print('DEBUG: "users" or "profile" missing from actor in ' +
message_json['type'] + ' reaction')
return False
if '/statuses/' not in message_json['object']['object']:
if debug:
print('DEBUG: "statuses" missing from reaction object in ' +
message_json['type'])
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
print('DEBUG: unknown recipient of undo reaction - ' + handle)
# if this post in the outbox of the person?
handle_name = handle.split('@')[0]
handle_dom = handle.split('@')[1]
post_filename = \
locate_post(base_dir, handle_name, handle_dom,
message_json['object']['object'])
if not post_filename:
if debug:
print('DEBUG: unreaction post not found in inbox or outbox')
print(message_json['object']['object'])
return True
if debug:
print('DEBUG: reaction post found in inbox. Now undoing.')
reaction_actor = actor_url
emoji_content = remove_html(message_json['object']['content'])
if not emoji_content:
if debug:
print('DEBUG: unreaction has no content')
return True
undo_reaction_collection_entry(recent_posts_cache, base_dir, post_filename,
reaction_actor, domain,
debug, None, emoji_content)
# regenerate the html
reaction_post_json = load_json(post_filename)
if reaction_post_json:
if reaction_post_json.get('type'):
if reaction_post_json['type'] == 'Announce' and \
reaction_post_json.get('object'):
if isinstance(reaction_post_json['object'], str):
announce_reaction_url = reaction_post_json['object']
announce_reaction_filename = \
locate_post(base_dir, handle_name,
domain, announce_reaction_url)
if announce_reaction_filename:
post_filename = announce_reaction_filename
undo_reaction_collection_entry(recent_posts_cache,
base_dir,
post_filename,
reaction_actor,
domain,
debug, None,
emoji_content)
if reaction_post_json:
if debug:
cached_post_filename = \
get_cached_post_filename(base_dir, handle_name, domain,
reaction_post_json)
print('Unreaction post json: ' + str(reaction_post_json))
print('Unreaction post nickname: ' +
handle_name + ' ' + domain)
print('Unreaction post cache: ' + str(cached_post_filename))
page_number = 1
show_published_date_only = False
show_individual_post_icons = True
manually_approve_followers = \
follower_approval_active(base_dir, handle_name, domain)
not_dm = not is_dm(reaction_post_json)
timezone = get_account_timezone(base_dir, handle_name, domain)
mitm = False
if os.path.isfile(post_filename.replace('.json', '') + '.mitm'):
mitm = True
minimize_all_images = False
if handle_name in min_images_for_accounts:
minimize_all_images = True
individual_post_as_html(signing_priv_key_pem, False,
recent_posts_cache, max_recent_posts,
translate, page_number, base_dir,
session, cached_webfingers, person_cache,
handle_name, domain, port,
reaction_post_json,
None, True, allow_deletion,
http_prefix, __version__,
'inbox',
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
allow_local_network_access,
theme_name, system_language,
max_like_count, not_dm,
show_individual_post_icons,
manually_approve_followers,
False, True, False, cw_lists,
lists_enabled, timezone, mitm,
bold_reading, dogwhistles,
minimize_all_images, None,
buy_sites, auto_cw_cache)
return True
def receive_undo_bookmark(recent_posts_cache: {},
session, handle: str, base_dir: str,
http_prefix: str, domain: str, port: int,
cached_webfingers: {},
person_cache: {}, message_json: {},
debug: bool, signing_priv_key_pem: str,
max_recent_posts: int, translate: {},
allow_deletion: bool,
yt_replace_domain: str,
twitter_replacement_domain: str,
peertube_instances: [],
allow_local_network_access: bool,
theme_name: str, system_language: str,
max_like_count: int, cw_lists: {},
lists_enabled: str, bold_reading: bool,
dogwhistles: {},
min_images_for_accounts: [],
buy_sites: {},
auto_cw_cache: {}) -> bool:
"""Receives an undo bookmark activity within the POST section of HTTPServer
"""
if not message_json.get('type'):
return False
if message_json['type'] != 'Remove':
return False
if not has_actor(message_json, debug):
return False
if not message_json.get('target'):
if debug:
print('DEBUG: no target in inbox undo bookmark Remove')
return False
if not has_object_string_type(message_json, debug):
return False
if not isinstance(message_json['target'], str):
if debug:
print('DEBUG: inbox Remove bookmark target is not string')
return False
domain_full = get_full_domain(domain, port)
nickname = handle.split('@')[0]
actor_url = get_actor_from_post(message_json)
if not actor_url.endswith(domain_full + '/users/' + nickname):
if debug:
print('DEBUG: inbox undo bookmark Remove unexpected actor')
return False
if not message_json['target'].endswith(actor_url +
'/tlbookmarks'):
if debug:
print('DEBUG: inbox undo bookmark Remove target invalid ' +
message_json['target'])
return False
if message_json['object']['type'] != 'Document':
if debug:
print('DEBUG: inbox undo bookmark Remove type is not Document')
return False
if not message_json['object'].get('url'):
if debug:
print('DEBUG: inbox undo bookmark Remove missing url')
return False
url_str = get_url_from_post(message_json['object']['url'])
if '/statuses/' not in url_str:
if debug:
print('DEBUG: inbox undo bookmark Remove missing statuses un url')
return False
if debug:
print('DEBUG: c2s inbox Remove bookmark ' +
'request arrived in outbox')
message_url2 = remove_html(url_str)
message_url = remove_id_ending(message_url2)
domain = remove_domain_port(domain)
post_filename = locate_post(base_dir, nickname, domain, message_url)
if not post_filename:
if debug:
print('DEBUG: c2s inbox like post not found in inbox or outbox')
print(message_url)
return True
undo_bookmarks_collection_entry(recent_posts_cache, base_dir,
post_filename,
actor_url, domain, debug)
# regenerate the html
bookmarked_post_json = load_json(post_filename)
if bookmarked_post_json:
if debug:
cached_post_filename = \
get_cached_post_filename(base_dir, nickname, domain,
bookmarked_post_json)
print('Unbookmarked post json: ' + str(bookmarked_post_json))
print('Unbookmarked post nickname: ' + nickname + ' ' + domain)
print('Unbookmarked post cache: ' + str(cached_post_filename))
page_number = 1
show_published_date_only = False
show_individual_post_icons = True
manually_approve_followers = \
follower_approval_active(base_dir, nickname, domain)
not_dm = not is_dm(bookmarked_post_json)
timezone = get_account_timezone(base_dir, nickname, domain)
mitm = False
if os.path.isfile(post_filename.replace('.json', '') + '.mitm'):
mitm = True
minimize_all_images = False
if nickname in min_images_for_accounts:
minimize_all_images = True
individual_post_as_html(signing_priv_key_pem, False,
recent_posts_cache, max_recent_posts,
translate, page_number, base_dir,
session, cached_webfingers, person_cache,
nickname, domain, port, bookmarked_post_json,
None, True, allow_deletion,
http_prefix, __version__,
'inbox',
yt_replace_domain,
twitter_replacement_domain,
show_published_date_only,
peertube_instances,
allow_local_network_access,
theme_name, system_language,
max_like_count, not_dm,
show_individual_post_icons,
manually_approve_followers,
False, True, False, cw_lists, lists_enabled,
timezone, mitm, bold_reading,
dogwhistles, minimize_all_images, None,
buy_sites, auto_cw_cache)
return True
def receive_undo_announce(recent_posts_cache: {},
handle: str, base_dir: str, domain: str,
message_json: {}, debug: bool) -> bool:
"""Receives an undo announce activity within the POST section of HTTPServer
"""
if message_json['type'] != 'Undo':
return False
if not has_actor(message_json, debug):
return False
if not has_object_dict(message_json):
return False
if not has_object_string_object(message_json, debug):
return False
if message_json['object']['type'] != 'Announce':
return False
actor_url = get_actor_from_post(message_json)
if not has_users_path(actor_url):
if debug:
print('DEBUG: "users" or "profile" missing from actor in ' +
message_json['type'] + ' announce')
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
print('DEBUG: unknown recipient of undo announce - ' + handle)
# if this post in the outbox of the person?
handle_name = handle.split('@')[0]
handle_dom = handle.split('@')[1]
post_filename = locate_post(base_dir, handle_name, handle_dom,
message_json['object']['object'])
if not post_filename:
if debug:
print('DEBUG: undo announce post not found in inbox or outbox')
print(message_json['object']['object'])
return True
if debug:
print('DEBUG: announced/repeated post to be undone found in inbox')
post_json_object = load_json(post_filename)
if post_json_object:
if not post_json_object.get('type'):
if post_json_object['type'] != 'Announce':
if debug:
print("DEBUG: Attempt to undo something " +
"which isn't an announcement")
return False
undo_announce_collection_entry(recent_posts_cache, base_dir, post_filename,
actor_url, domain, debug)
if os.path.isfile(post_filename):
try:
os.remove(post_filename)
except OSError:
print('EX: _receive_undo_announce unable to delete ' +
str(post_filename))
return True

View File

@ -47,7 +47,6 @@ from media import replace_you_tube
from media import replace_twitter
from media import get_media_path
from media import create_media_dirs
from inbox import inbox_update_index
from announce import outbox_announce
from announce import outbox_undo_announce
from follow import outbox_undo_follow
@ -68,6 +67,7 @@ from webapp_hashtagswarm import store_hash_tags
from speaker import update_speaker
from reading import store_book_events
from reading import has_edition_tag
from inbox_receive import inbox_update_index
def _localonly_not_local(message_json: {}, domain_full: str) -> bool:

251
posts.py
View File

@ -109,6 +109,8 @@ from content import add_html_tags
from content import replace_emoji_from_tags
from content import remove_text_formatting
from content import add_auto_cw
from content import contains_invalid_local_links
from content import valid_url_lengths
from auth import create_basic_auth_header
from blocking import is_blocked_hashtag
from blocking import is_blocked
@ -116,6 +118,7 @@ from blocking import is_blocked_domain
from filters import is_filtered
from filters import is_question_filtered
from git import convert_post_to_patch
from git import is_git_patch
from linked_data_sig import generate_json_signature
from petnames import resolve_petnames
from video import convert_video_to_note
@ -126,6 +129,7 @@ from keys import get_person_key
from markdown import markdown_to_html
from followerSync import update_followers_sync_cache
from question import is_question
from question import dangerous_question
from pyjsonld import JsonLdError
@ -7040,3 +7044,250 @@ def json_post_allows_comments(post_json_object: {}) -> bool:
return not post_json_object['object']['rejectReplies']
return True
def _estimate_number_of_mentions(content: str) -> int:
"""Returns a rough estimate of the number of mentions
"""
return content.count('>@<')
def _estimate_number_of_emoji(content: str) -> int:
"""Returns a rough estimate of the number of emoji
"""
return content.count(' :')
def _estimate_number_of_hashtags(content: str) -> int:
"""Returns a rough estimate of the number of hashtags
"""
return content.count('>#<')
def post_allow_comments(post_filename: str) -> bool:
"""Returns true if the given post allows comments/replies
"""
post_json_object = load_json(post_filename)
if not post_json_object:
return False
return json_post_allows_comments(post_json_object)
def valid_post_content(base_dir: str, nickname: str, domain: str,
message_json: {}, max_mentions: int, max_emoji: int,
allow_local_network_access: bool, debug: bool,
system_language: str,
http_prefix: str, domain_full: str,
person_cache: {},
max_hashtags: int,
onion_domain: str, i2p_domain: str) -> bool:
"""Is the content of a received post valid?
Check for bad html
Check for hellthreads
Check that the language is understood
Check if it's a git patch
Check number of tags and mentions is reasonable
"""
if not has_object_dict(message_json):
return True
if 'content' not in message_json['object']:
return True
if not message_json['object'].get('published'):
if message_json['object'].get('id'):
print('REJECT inbox post does not have a published date. ' +
str(message_json['object']['id']))
return False
published = message_json['object']['published']
if 'T' not in published:
if message_json['object'].get('id'):
print('REJECT inbox post does not use expected time format. ' +
published + ' ' + str(message_json['object']['id']))
return False
if 'Z' not in published:
if message_json['object'].get('id'):
print('REJECT inbox post does not use Zulu time format. ' +
published + ' ' + str(message_json['object']['id']))
return False
if '.' in published:
# converts 2022-03-30T17:37:58.734Z into 2022-03-30T17:37:58Z
published = published.split('.')[0] + 'Z'
message_json['object']['published'] = published
if not valid_post_date(published, 90, debug):
if message_json['object'].get('id'):
print('REJECT: invalid post published date ' +
str(published) + ' ' +
str(message_json['object']['id']))
return False
# if the post has been edited then check its edit date
if message_json['object'].get('updated'):
published_update = message_json['object']['updated']
if 'T' not in published_update:
if message_json['object'].get('id'):
print('REJECT: invalid post update date format ' +
str(published_update) + ' ' +
str(message_json['object']['id']))
return False
if 'Z' not in published_update:
if message_json['object'].get('id'):
print('REJECT: post update date not in Zulu time ' +
str(published_update) + ' ' +
str(message_json['object']['id']))
return False
if '.' in published_update:
# converts 2022-03-30T17:37:58.734Z into 2022-03-30T17:37:58Z
published_update = published_update.split('.')[0] + 'Z'
message_json['object']['updated'] = published_update
if not valid_post_date(published_update, 90, debug):
if message_json['object'].get('id'):
print('REJECT: invalid post update date ' +
str(published_update) + ' ' +
str(message_json['object']['id']))
return False
summary = None
if message_json['object'].get('summary'):
summary = message_json['object']['summary']
if not isinstance(summary, str):
if message_json['object'].get('id'):
print('REJECT: content warning is not a string ' +
str(summary) + ' ' + str(message_json['object']['id']))
return False
if summary != valid_content_warning(summary):
if message_json['object'].get('id'):
print('REJECT: invalid content warning ' + summary + ' ' +
str(message_json['object']['id']))
return False
if dangerous_markup(summary, allow_local_network_access, []):
if message_json['object'].get('id'):
print('REJECT ARBITRARY HTML 1: ' +
message_json['object']['id'])
print('REJECT ARBITRARY HTML: bad string in summary - ' +
summary)
return False
# check for patches before dangeousMarkup, which excludes code
if is_git_patch(base_dir, nickname, domain,
message_json['object']['type'],
summary,
message_json['object']['content']):
return True
if is_question(message_json):
if is_question_filtered(base_dir, nickname, domain,
system_language, message_json):
print('REJECT: incoming question options filter')
return False
if dangerous_question(message_json, allow_local_network_access):
print('REJECT: incoming question markup filter')
return False
content_str = get_base_content_from_post(message_json, system_language)
if dangerous_markup(content_str, allow_local_network_access, ['pre']):
if message_json['object'].get('id'):
print('REJECT ARBITRARY HTML 2: ' +
str(message_json['object']['id']))
if debug:
print('REJECT ARBITRARY HTML: bad string in post - ' +
content_str)
return False
if contains_invalid_local_links(domain_full,
onion_domain, i2p_domain,
content_str):
if message_json['object'].get('id'):
print('REJECT: post contains invalid local links ' +
str(message_json['object']['id']) + ' ' +
str(content_str))
return False
# check (rough) number of mentions
mentions_est = _estimate_number_of_mentions(content_str)
if mentions_est > max_mentions:
if message_json['object'].get('id'):
print('REJECT HELLTHREAD: ' + str(message_json['object']['id']))
if debug:
print('REJECT HELLTHREAD: Too many mentions in post - ' +
content_str)
return False
if _estimate_number_of_emoji(content_str) > max_emoji:
if message_json['object'].get('id'):
print('REJECT EMOJI OVERLOAD: ' +
str(message_json['object']['id']))
if debug:
print('REJECT EMOJI OVERLOAD: Too many emoji in post - ' +
content_str)
return False
if _estimate_number_of_hashtags(content_str) > max_hashtags:
if message_json['object'].get('id'):
print('REJECT HASHTAG OVERLOAD: ' +
str(message_json['object']['id']))
if debug:
print('REJECT HASHTAG OVERLOAD: Too many hashtags in post - ' +
content_str)
return False
# check number of tags
if message_json['object'].get('tag'):
if not isinstance(message_json['object']['tag'], list):
message_json['object']['tag'] = []
else:
if len(message_json['object']['tag']) > int(max_mentions * 2):
if message_json['object'].get('id'):
print('REJECT: ' + message_json['object']['id'])
print('REJECT: Too many tags in post - ' +
str(message_json['object']['tag']))
return False
# check that the post is in a language suitable for this account
if not understood_post_language(base_dir, nickname,
message_json, system_language,
http_prefix, domain_full,
person_cache):
if message_json['object'].get('id'):
print('REJECT: content not understood ' +
str(message_json['object']['id']))
return False
# check for urls which are too long
if not valid_url_lengths(content_str, 2048):
print('REJECT: url within content too long')
return False
# check for filtered content
media_descriptions = get_media_descriptions_from_post(message_json)
content_all = content_str
if summary:
content_all = summary + ' ' + content_str + ' ' + media_descriptions
if is_filtered(base_dir, nickname, domain, content_all,
system_language):
if message_json['object'].get('id'):
print('REJECT: content filtered ' +
str(message_json['object']['id']))
return False
reply_id = get_reply_to(message_json['object'])
if reply_id:
if isinstance(reply_id, str):
# this is a reply
original_post_id = reply_id
post_post_filename = locate_post(base_dir, nickname, domain,
original_post_id)
if post_post_filename:
if not post_allow_comments(post_post_filename):
print('REJECT: reply to post which does not ' +
'allow comments: ' + original_post_id)
return False
if contains_private_key(message_json['object']['content']):
if message_json['object'].get('id'):
print('REJECT: someone posted their private key ' +
str(message_json['object']['id']) + ' ' +
message_json['object']['content'])
return False
if invalid_ciphertext(message_json['object']['content']):
if message_json['object'].get('id'):
print('REJECT: malformed ciphertext in content ' +
str(message_json['object']['id']) + ' ' +
message_json['object']['content'])
return False
if debug:
print('ACCEPT: post content is valid')
return True