Define function first

merge-requests/30/head
Bob Mottram 2023-03-06 11:26:56 +00:00
parent 171808f7ee
commit 1acf4987ac
1 changed files with 161 additions and 161 deletions

322
inbox.py
View File

@ -1282,6 +1282,167 @@ def _receive_update_to_question(recent_posts_cache: {}, message_json: {},
return True
def _valid_post_content(base_dir: str, nickname: str, domain: str,
message_json: {}, max_mentions: int, max_emoji: int,
allow_local_network_access: bool, debug: bool,
system_language: str,
http_prefix: str, domain_full: str,
person_cache: {},
max_hashtags: int) -> bool:
"""Is the content of a received post valid?
Check for bad html
Check for hellthreads
Check that the language is understood
Check if it's a git patch
Check number of tags and mentions is reasonable
"""
if not has_object_dict(message_json):
return True
if 'content' not in message_json['object']:
return True
if not message_json['object'].get('published'):
return False
published = message_json['object']['published']
if 'T' not in published:
return False
if 'Z' not in published:
print('REJECT inbox post does not use Zulu time format. ' +
published)
return False
if '.' in published:
# converts 2022-03-30T17:37:58.734Z into 2022-03-30T17:37:58Z
published = published.split('.')[0] + 'Z'
message_json['object']['published'] = published
if not valid_post_date(published, 90, debug):
return False
# if the post has been edited then check its edit date
if message_json['object'].get('updated'):
published_update = message_json['object']['updated']
if 'T' not in published_update:
return False
if 'Z' not in published_update:
return False
if '.' in published_update:
# converts 2022-03-30T17:37:58.734Z into 2022-03-30T17:37:58Z
published_update = published_update.split('.')[0] + 'Z'
message_json['object']['updated'] = published_update
if not valid_post_date(published_update, 90, debug):
return False
summary = None
if message_json['object'].get('summary'):
summary = message_json['object']['summary']
if not isinstance(summary, str):
print('WARN: content warning is not a string')
return False
if summary != valid_content_warning(summary):
print('WARN: invalid content warning ' + summary)
return False
if dangerous_markup(summary, allow_local_network_access):
if message_json['object'].get('id'):
print('REJECT ARBITRARY HTML: ' + message_json['object']['id'])
print('REJECT ARBITRARY HTML: bad string in summary - ' +
summary)
return False
# check for patches before dangeousMarkup, which excludes code
if is_git_patch(base_dir, nickname, domain,
message_json['object']['type'],
summary,
message_json['object']['content']):
return True
if is_question(message_json):
if is_question_filtered(base_dir, nickname, domain,
system_language, message_json):
print('REJECT: incoming question options filter')
return False
if dangerous_question(message_json, allow_local_network_access):
print('REJECT: incoming question markup filter')
return False
content_str = get_base_content_from_post(message_json, system_language)
if dangerous_markup(content_str, allow_local_network_access):
if message_json['object'].get('id'):
print('REJECT ARBITRARY HTML: ' + message_json['object']['id'])
print('REJECT ARBITRARY HTML: bad string in post - ' +
content_str)
return False
# check (rough) number of mentions
mentions_est = _estimate_number_of_mentions(content_str)
if mentions_est > max_mentions:
if message_json['object'].get('id'):
print('REJECT HELLTHREAD: ' + message_json['object']['id'])
print('REJECT HELLTHREAD: Too many mentions in post - ' +
content_str)
return False
if _estimate_number_of_emoji(content_str) > max_emoji:
if message_json['object'].get('id'):
print('REJECT EMOJI OVERLOAD: ' + message_json['object']['id'])
print('REJECT EMOJI OVERLOAD: Too many emoji in post - ' +
content_str)
return False
if _estimate_number_of_hashtags(content_str) > max_hashtags:
if message_json['object'].get('id'):
print('REJECT HASHTAG OVERLOAD: ' + message_json['object']['id'])
print('REJECT HASHTAG OVERLOAD: Too many hashtags in post - ' +
content_str)
return False
# check number of tags
if message_json['object'].get('tag'):
if not isinstance(message_json['object']['tag'], list):
message_json['object']['tag'] = []
else:
if len(message_json['object']['tag']) > int(max_mentions * 2):
if message_json['object'].get('id'):
print('REJECT: ' + message_json['object']['id'])
print('REJECT: Too many tags in post - ' +
message_json['object']['tag'])
return False
# check that the post is in a language suitable for this account
if not understood_post_language(base_dir, nickname,
message_json, system_language,
http_prefix, domain_full,
person_cache):
return False
# check for urls which are too long
if not valid_url_lengths(content_str, 2048):
print('REJECT: url within content too long')
return False
# check for filtered content
media_descriptions = get_media_descriptions_from_post(message_json)
content_all = content_str
if summary:
content_all = summary + ' ' + content_str + ' ' + media_descriptions
if is_filtered(base_dir, nickname, domain, content_all,
system_language):
print('REJECT: content filtered')
return False
if message_json['object'].get('inReplyTo'):
if isinstance(message_json['object']['inReplyTo'], str):
original_post_id = message_json['object']['inReplyTo']
post_post_filename = locate_post(base_dir, nickname, domain,
original_post_id)
if post_post_filename:
if not _post_allow_comments(post_post_filename):
print('REJECT: reply to post which does not ' +
'allow comments: ' + original_post_id)
return False
if invalid_ciphertext(message_json['object']['content']):
print('REJECT: malformed ciphertext in content ' +
message_json['object']['id'] + ' ' +
message_json['object']['content'])
return False
if debug:
print('ACCEPT: post content is valid')
return True
def receive_edit_to_post(recent_posts_cache: {}, message_json: {},
base_dir: str,
nickname: str, domain: str,
@ -3066,167 +3227,6 @@ def _estimate_number_of_hashtags(content: str) -> int:
return content.count('>#<')
def _valid_post_content(base_dir: str, nickname: str, domain: str,
message_json: {}, max_mentions: int, max_emoji: int,
allow_local_network_access: bool, debug: bool,
system_language: str,
http_prefix: str, domain_full: str,
person_cache: {},
max_hashtags: int) -> bool:
"""Is the content of a received post valid?
Check for bad html
Check for hellthreads
Check that the language is understood
Check if it's a git patch
Check number of tags and mentions is reasonable
"""
if not has_object_dict(message_json):
return True
if 'content' not in message_json['object']:
return True
if not message_json['object'].get('published'):
return False
published = message_json['object']['published']
if 'T' not in published:
return False
if 'Z' not in published:
print('REJECT inbox post does not use Zulu time format. ' +
published)
return False
if '.' in published:
# converts 2022-03-30T17:37:58.734Z into 2022-03-30T17:37:58Z
published = published.split('.')[0] + 'Z'
message_json['object']['published'] = published
if not valid_post_date(published, 90, debug):
return False
# if the post has been edited then check its edit date
if message_json['object'].get('updated'):
published_update = message_json['object']['updated']
if 'T' not in published_update:
return False
if 'Z' not in published_update:
return False
if '.' in published_update:
# converts 2022-03-30T17:37:58.734Z into 2022-03-30T17:37:58Z
published_update = published_update.split('.')[0] + 'Z'
message_json['object']['updated'] = published_update
if not valid_post_date(published_update, 90, debug):
return False
summary = None
if message_json['object'].get('summary'):
summary = message_json['object']['summary']
if not isinstance(summary, str):
print('WARN: content warning is not a string')
return False
if summary != valid_content_warning(summary):
print('WARN: invalid content warning ' + summary)
return False
if dangerous_markup(summary, allow_local_network_access):
if message_json['object'].get('id'):
print('REJECT ARBITRARY HTML: ' + message_json['object']['id'])
print('REJECT ARBITRARY HTML: bad string in summary - ' +
summary)
return False
# check for patches before dangeousMarkup, which excludes code
if is_git_patch(base_dir, nickname, domain,
message_json['object']['type'],
summary,
message_json['object']['content']):
return True
if is_question(message_json):
if is_question_filtered(base_dir, nickname, domain,
system_language, message_json):
print('REJECT: incoming question options filter')
return False
if dangerous_question(message_json, allow_local_network_access):
print('REJECT: incoming question markup filter')
return False
content_str = get_base_content_from_post(message_json, system_language)
if dangerous_markup(content_str, allow_local_network_access):
if message_json['object'].get('id'):
print('REJECT ARBITRARY HTML: ' + message_json['object']['id'])
print('REJECT ARBITRARY HTML: bad string in post - ' +
content_str)
return False
# check (rough) number of mentions
mentions_est = _estimate_number_of_mentions(content_str)
if mentions_est > max_mentions:
if message_json['object'].get('id'):
print('REJECT HELLTHREAD: ' + message_json['object']['id'])
print('REJECT HELLTHREAD: Too many mentions in post - ' +
content_str)
return False
if _estimate_number_of_emoji(content_str) > max_emoji:
if message_json['object'].get('id'):
print('REJECT EMOJI OVERLOAD: ' + message_json['object']['id'])
print('REJECT EMOJI OVERLOAD: Too many emoji in post - ' +
content_str)
return False
if _estimate_number_of_hashtags(content_str) > max_hashtags:
if message_json['object'].get('id'):
print('REJECT HASHTAG OVERLOAD: ' + message_json['object']['id'])
print('REJECT HASHTAG OVERLOAD: Too many hashtags in post - ' +
content_str)
return False
# check number of tags
if message_json['object'].get('tag'):
if not isinstance(message_json['object']['tag'], list):
message_json['object']['tag'] = []
else:
if len(message_json['object']['tag']) > int(max_mentions * 2):
if message_json['object'].get('id'):
print('REJECT: ' + message_json['object']['id'])
print('REJECT: Too many tags in post - ' +
message_json['object']['tag'])
return False
# check that the post is in a language suitable for this account
if not understood_post_language(base_dir, nickname,
message_json, system_language,
http_prefix, domain_full,
person_cache):
return False
# check for urls which are too long
if not valid_url_lengths(content_str, 2048):
print('REJECT: url within content too long')
return False
# check for filtered content
media_descriptions = get_media_descriptions_from_post(message_json)
content_all = content_str
if summary:
content_all = summary + ' ' + content_str + ' ' + media_descriptions
if is_filtered(base_dir, nickname, domain, content_all,
system_language):
print('REJECT: content filtered')
return False
if message_json['object'].get('inReplyTo'):
if isinstance(message_json['object']['inReplyTo'], str):
original_post_id = message_json['object']['inReplyTo']
post_post_filename = locate_post(base_dir, nickname, domain,
original_post_id)
if post_post_filename:
if not _post_allow_comments(post_post_filename):
print('REJECT: reply to post which does not ' +
'allow comments: ' + original_post_id)
return False
if invalid_ciphertext(message_json['object']['content']):
print('REJECT: malformed ciphertext in content ' +
message_json['object']['id'] + ' ' +
message_json['object']['content'])
return False
if debug:
print('ACCEPT: post content is valid')
return True
def _obtain_avatar_for_reply_post(session, base_dir: str, http_prefix: str,
domain: str, onion_domain: str,
i2p_domain: str,