Check for inverted text

main
Bob Mottram 2022-09-25 18:26:11 +01:00
parent 6fb410276c
commit 947715bfae
13 changed files with 154 additions and 59 deletions

View File

@ -5001,7 +5001,8 @@ class PubServer(BaseHTTPRequestHandler):
def _set_hashtag_category(self, calling_domain: str, cookie: str,
path: str, base_dir: str,
domain: str, debug: bool) -> None:
domain: str, debug: bool,
system_language: str) -> None:
"""On the screen after selecting a hashtag from the swarm, this sets
the category for that tag
"""
@ -5089,7 +5090,8 @@ class PubServer(BaseHTTPRequestHandler):
if fields.get('hashtagCategory'):
category_str = fields['hashtagCategory'].lower()
if not is_blocked_hashtag(base_dir, category_str) and \
not is_filtered(base_dir, nickname, domain, category_str):
not is_filtered(base_dir, nickname, domain, category_str,
system_language):
set_hashtag_category(base_dir, hashtag,
category_str, False)
else:
@ -5813,7 +5815,8 @@ class PubServer(BaseHTTPRequestHandler):
if not skill_name:
skill_ctr += 1
continue
if is_filtered(base_dir, nickname, domain, skill_name):
if is_filtered(base_dir, nickname, domain, skill_name,
system_language):
skill_ctr += 1
continue
skill_value = \
@ -5876,7 +5879,8 @@ class PubServer(BaseHTTPRequestHandler):
remove_html(fields['displayNickname'])
if not is_filtered(base_dir,
nickname, domain,
display_name):
display_name,
system_language):
actor_json['name'] = display_name
else:
actor_json['name'] = nickname
@ -6439,7 +6443,8 @@ class PubServer(BaseHTTPRequestHandler):
if fields['bio'] != actor_json['summary']:
bio_str = remove_html(fields['bio'])
if not is_filtered(base_dir,
nickname, domain, bio_str):
nickname, domain, bio_str,
system_language):
actor_tags = {}
actor_json['summary'] = \
add_html_tags(base_dir,
@ -14905,7 +14910,8 @@ class PubServer(BaseHTTPRequestHandler):
access_keys,
default_reply_interval_hrs,
self.server.cw_lists,
self.server.lists_enabled)
self.server.lists_enabled,
self.server.system_language)
if msg:
msg = msg.encode('utf-8')
msglen = len(msg)
@ -20043,7 +20049,8 @@ class PubServer(BaseHTTPRequestHandler):
self.path,
self.server.base_dir,
self.server.domain,
self.server.debug)
self.server.debug,
self.server.system_language)
self.server.postreq_busy = False
return

View File

@ -1242,9 +1242,12 @@ def _command_options() -> None:
if argb.rss:
timeout_sec = 20
session = create_session(None)
if not argb.language:
argb.language = 'en'
test_rss = get_rss(base_dir, domain, session, argb.rss,
False, False, 1000, 1000, 1000, 1000, debug,
preferred_podcast_formats, timeout_sec)
preferred_podcast_formats, timeout_sec,
argb.language)
pprint(test_rss)
sys.exit()

View File

@ -12,6 +12,7 @@ from utils import acct_dir
from utils import text_in_file
from utils import remove_eol
from utils import standardize_text
from utils import remove_inverted_text
def add_filter(base_dir: str, nickname: str, domain: str, words: str) -> bool:
@ -115,13 +116,16 @@ def _is_twitter_post(content: str) -> bool:
return False
def _is_filtered_base(filename: str, content: str) -> bool:
def _is_filtered_base(filename: str, content: str,
system_language: str) -> bool:
"""Uses the given file containing filtered words to check
the given content
"""
if not os.path.isfile(filename):
return False
content = remove_inverted_text(content, system_language)
# convert any fancy characters to ordinary ones
content = standardize_text(content)
@ -147,20 +151,23 @@ def _is_filtered_base(filename: str, content: str) -> bool:
return False
def is_filtered_globally(base_dir: str, content: str) -> bool:
def is_filtered_globally(base_dir: str, content: str,
system_language: str) -> bool:
"""Is the given content globally filtered?
"""
global_filters_filename = base_dir + '/accounts/filters.txt'
if _is_filtered_base(global_filters_filename, content):
if _is_filtered_base(global_filters_filename, content,
system_language):
return True
return False
def is_filtered_bio(base_dir: str,
nickname: str, domain: str, bio: str) -> bool:
nickname: str, domain: str, bio: str,
system_language: str) -> bool:
"""Should the given actor bio be filtered out?
"""
if is_filtered_globally(base_dir, bio):
if is_filtered_globally(base_dir, bio, system_language):
return True
if not nickname or not domain:
@ -168,17 +175,17 @@ def is_filtered_bio(base_dir: str,
account_filters_filename = \
acct_dir(base_dir, nickname, domain) + '/filters_bio.txt'
return _is_filtered_base(account_filters_filename, bio)
return _is_filtered_base(account_filters_filename, bio, system_language)
def is_filtered(base_dir: str, nickname: str, domain: str,
content: str) -> bool:
content: str, system_language: str) -> bool:
"""Should the given content be filtered out?
This is a simple type of filter which just matches words, not a regex
You can add individual words or use word1+word2 to indicate that two
words must be present although not necessarily adjacent
"""
if is_filtered_globally(base_dir, content):
if is_filtered_globally(base_dir, content, system_language):
return True
if not nickname or not domain:
@ -192,4 +199,5 @@ def is_filtered(base_dir: str, nickname: str, domain: str,
account_filters_filename = \
acct_dir(base_dir, nickname, domain) + '/filters.txt'
return _is_filtered_base(account_filters_filename, content)
return _is_filtered_base(account_filters_filename, content,
system_language)

View File

@ -854,7 +854,8 @@ def _dav_store_event(base_dir: str, nickname: str, domain: str,
return False
# check that the description is valid
if is_filtered(base_dir, nickname, domain, description):
if is_filtered(base_dir, nickname, domain, description,
system_language):
return False
# convert to the expected time format

View File

@ -685,7 +685,8 @@ def save_post_to_inbox_queue(base_dir: str, http_prefix: str,
get_media_descriptions_from_post(post_json_object)
content_all = \
summary_str + ' ' + content_str + ' ' + media_descriptions
if is_filtered(base_dir, nickname, domain, content_all):
if is_filtered(base_dir, nickname, domain, content_all,
system_language):
if debug:
print('WARN: post was filtered out due to content')
return None
@ -2924,7 +2925,8 @@ def _valid_post_content(base_dir: str, nickname: str, domain: str,
content_all = content_str
if summary:
content_all = summary + ' ' + content_str + ' ' + media_descriptions
if is_filtered(base_dir, nickname, domain, content_all):
if is_filtered(base_dir, nickname, domain, content_all,
system_language):
print('REJECT: content filtered')
return False
if message_json['object'].get('inReplyTo'):
@ -4222,7 +4224,8 @@ def _inbox_after_initial(server, inbox_start_time,
# is the sending actor valid?
if not valid_sending_actor(session, base_dir, nickname, domain,
person_cache, post_json_object,
signing_priv_key_pem, debug, unit_test):
signing_priv_key_pem, debug, unit_test,
system_language):
if debug:
print('Inbox sending actor is not valid ' +
str(post_json_object))
@ -4856,7 +4859,7 @@ def _receive_follow_request(session, session_onion, session_i2p,
max_followers: int,
this_domain: str, onion_domain: str,
i2p_domain: str, signing_priv_key_pem: str,
unit_test: bool) -> bool:
unit_test: bool, system_language: str) -> bool:
"""Receives a follow request within the POST section of HTTPServer
"""
if not message_json['type'].startswith('Follow'):
@ -4972,7 +4975,8 @@ def _receive_follow_request(session, session_onion, session_i2p,
if not valid_sending_actor(curr_session, base_dir,
nickname_to_follow, domain_to_follow,
person_cache, message_json,
signing_priv_key_pem, debug, unit_test):
signing_priv_key_pem, debug, unit_test,
system_language):
print('REJECT spam follow request ' + approve_handle)
return False
@ -5536,7 +5540,8 @@ def run_inbox_queue(server,
debug, project_version,
max_followers, domain,
onion_domain, i2p_domain,
signing_priv_key_pem, unit_test):
signing_priv_key_pem, unit_test,
system_language):
if os.path.isfile(queue_filename):
try:
os.remove(queue_filename)

View File

@ -210,7 +210,8 @@ def _add_newswire_dict_entry(base_dir: str, domain: str,
mirrored: bool,
tags: [],
max_tags: int, session, debug: bool,
podcast_properties: {}) -> None:
podcast_properties: {},
system_language: str) -> None:
"""Update the newswire dictionary
"""
# remove any markup
@ -220,7 +221,7 @@ def _add_newswire_dict_entry(base_dir: str, domain: str,
all_text = title + ' ' + description
# check that none of the text is filtered against
if is_filtered(base_dir, None, None, all_text):
if is_filtered(base_dir, None, None, all_text, system_language):
return
title = limit_word_lengths(title, 13)
@ -723,7 +724,8 @@ def _xml2str_to_dict(base_dir: str, domain: str, xml_str: str,
max_feed_item_size_kb: int,
max_categories_feed_item_size_kb: int,
session, debug: bool,
preferred_podcast_formats: []) -> {}:
preferred_podcast_formats: [],
system_language: str) -> {}:
"""Converts an xml RSS 2.0 string to a dictionary
"""
if '<item>' not in xml_str:
@ -812,7 +814,7 @@ def _xml2str_to_dict(base_dir: str, domain: str, xml_str: str,
votes_status, post_filename,
description, moderated,
mirrored, [], 32, session, debug,
podcast_properties)
podcast_properties, system_language)
post_ctr += 1
if post_ctr >= max_posts_per_source:
break
@ -827,7 +829,8 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
max_feed_item_size_kb: int,
max_categories_feed_item_size_kb: int,
session, debug: bool,
preferred_podcast_formats: []) -> {}:
preferred_podcast_formats: [],
system_language: str) -> {}:
"""Converts an xml RSS 1.0 string to a dictionary
https://validator.w3.org/feed/docs/rss1.html
"""
@ -918,7 +921,7 @@ def _xml1str_to_dict(base_dir: str, domain: str, xml_str: str,
votes_status, post_filename,
description, moderated,
mirrored, [], 32, session, debug,
podcast_properties)
podcast_properties, system_language)
post_ctr += 1
if post_ctr >= max_posts_per_source:
break
@ -932,7 +935,8 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
max_posts_per_source: int,
max_feed_item_size_kb: int,
session, debug: bool,
preferred_podcast_formats: []) -> {}:
preferred_podcast_formats: [],
system_language: str) -> {}:
"""Converts an atom feed string to a dictionary
"""
if '<entry>' not in xml_str:
@ -1012,7 +1016,7 @@ def _atom_feed_to_dict(base_dir: str, domain: str, xml_str: str,
votes_status, post_filename,
description, moderated,
mirrored, [], 32, session, debug,
podcast_properties)
podcast_properties, system_language)
post_ctr += 1
if post_ctr >= max_posts_per_source:
break
@ -1025,7 +1029,8 @@ def _json_feed_v1to_dict(base_dir: str, domain: str, xml_str: str,
moderated: bool, mirrored: bool,
max_posts_per_source: int,
max_feed_item_size_kb: int,
session, debug: bool) -> {}:
session, debug: bool,
system_language: str) -> {}:
"""Converts a json feed string to a dictionary
See https://jsonfeed.org/version/1.1
"""
@ -1125,7 +1130,7 @@ def _json_feed_v1to_dict(base_dir: str, domain: str, xml_str: str,
votes_status, post_filename,
description, moderated,
mirrored, [], 32, session, debug,
None)
None, system_language)
post_ctr += 1
if post_ctr >= max_posts_per_source:
break
@ -1139,7 +1144,8 @@ def _atom_feed_yt_to_dict(base_dir: str, domain: str, xml_str: str,
moderated: bool, mirrored: bool,
max_posts_per_source: int,
max_feed_item_size_kb: int,
session, debug: bool) -> {}:
session, debug: bool,
system_language: str) -> {}:
"""Converts an atom-style YouTube feed string to a dictionary
"""
if '<entry>' not in xml_str:
@ -1214,7 +1220,7 @@ def _atom_feed_yt_to_dict(base_dir: str, domain: str, xml_str: str,
votes_status, post_filename,
description, moderated, mirrored,
[], 32, session, debug,
podcast_properties)
podcast_properties, system_language)
post_ctr += 1
if post_ctr >= max_posts_per_source:
break
@ -1229,7 +1235,8 @@ def _xml_str_to_dict(base_dir: str, domain: str, xml_str: str,
max_feed_item_size_kb: int,
max_categories_feed_item_size_kb: int,
session, debug: bool,
preferred_podcast_formats: []) -> {}:
preferred_podcast_formats: [],
system_language: str) -> {}:
"""Converts an xml string to a dictionary
"""
if '<yt:videoId>' in xml_str and '<yt:channelId>' in xml_str:
@ -1238,31 +1245,35 @@ def _xml_str_to_dict(base_dir: str, domain: str, xml_str: str,
xml_str, moderated, mirrored,
max_posts_per_source,
max_feed_item_size_kb,
session, debug)
session, debug,
system_language)
if 'rss version="2.0"' in xml_str:
return _xml2str_to_dict(base_dir, domain,
xml_str, moderated, mirrored,
max_posts_per_source, max_feed_item_size_kb,
max_categories_feed_item_size_kb,
session, debug,
preferred_podcast_formats)
preferred_podcast_formats,
system_language)
if '<?xml version="1.0"' in xml_str:
return _xml1str_to_dict(base_dir, domain,
xml_str, moderated, mirrored,
max_posts_per_source, max_feed_item_size_kb,
max_categories_feed_item_size_kb,
session, debug, preferred_podcast_formats)
session, debug, preferred_podcast_formats,
system_language)
if 'xmlns="http://www.w3.org/2005/Atom"' in xml_str:
return _atom_feed_to_dict(base_dir, domain,
xml_str, moderated, mirrored,
max_posts_per_source, max_feed_item_size_kb,
session, debug, preferred_podcast_formats)
session, debug, preferred_podcast_formats,
system_language)
if 'https://jsonfeed.org/version/1' in xml_str:
return _json_feed_v1to_dict(base_dir, domain,
xml_str, moderated, mirrored,
max_posts_per_source,
max_feed_item_size_kb,
session, debug)
session, debug, system_language)
return {}
@ -1284,7 +1295,7 @@ def get_rss(base_dir: str, domain: str, session, url: str,
max_feed_item_size_kb: int,
max_categories_feed_item_size_kb: int, debug: bool,
preferred_podcast_formats: [],
timeout_sec: int) -> {}:
timeout_sec: int, system_language: str) -> {}:
"""Returns an RSS url as a dict
"""
if not isinstance(url, str):
@ -1321,7 +1332,8 @@ def get_rss(base_dir: str, domain: str, session, url: str,
max_feed_item_size_kb,
max_categories_feed_item_size_kb,
session, debug,
preferred_podcast_formats)
preferred_podcast_formats,
system_language)
print('WARN: feed is too large, ' +
'or contains invalid characters: ' + url)
else:
@ -1498,7 +1510,7 @@ def _add_account_blogs_to_newswire(base_dir: str, nickname: str, domain: str,
description, moderated, False,
tags_from_post,
max_tags, session, debug,
None)
None, system_language)
ctr += 1
if ctr >= max_blogs_per_account:
@ -1609,7 +1621,7 @@ def get_dict_from_newswire(session, base_dir: str, domain: str,
max_feed_item_size_kb,
max_categories_feed_item_size_kb, debug,
preferred_podcast_formats,
timeout_sec)
timeout_sec, system_language)
if items_list:
for date_str, item in items_list.items():
result[date_str] = item

View File

@ -1749,7 +1749,8 @@ def valid_sending_actor(session, base_dir: str,
person_cache: {},
post_json_object: {},
signing_priv_key_pem: str,
debug: bool, unit_test: bool) -> bool:
debug: bool, unit_test: bool,
system_language: str) -> bool:
"""When a post arrives in the inbox this is used to check that
the sending actor is valid
"""
@ -1827,7 +1828,8 @@ def valid_sending_actor(session, base_dir: str,
if contains_invalid_chars(bio_str):
print('REJECT: post actor bio contains invalid characters')
return False
if is_filtered_bio(base_dir, nickname, domain, bio_str):
if is_filtered_bio(base_dir, nickname, domain, bio_str,
system_language):
print('REJECT: post actor bio contains filtered text')
return False
else:

View File

@ -432,7 +432,7 @@ def get_person_box(signing_priv_key_pem: str, origin_domain: str,
display_name = '*ADVERSARY*'
elif is_filtered(base_dir,
nickname, domain,
display_name):
display_name, 'en'):
display_name = '*FILTERED*'
# have they moved?
if person_json.get('movedTo'):
@ -5189,7 +5189,8 @@ def download_announce(session, base_dir: str, http_prefix: str,
if summary_str:
content_all = \
summary_str + ' ' + content_str + ' ' + media_descriptions
if is_filtered(base_dir, nickname, domain, content_all):
if is_filtered(base_dir, nickname, domain, content_all,
system_language):
print('WARN: announced post has been filtered ' +
str(announced_json))
_reject_announce(announce_filename,

View File

@ -315,7 +315,8 @@ def add_share(base_dir: str,
"""
if is_filtered_globally(base_dir,
display_name + ' ' + summary + ' ' +
item_type + ' ' + item_category):
item_type + ' ' + item_category,
system_language):
print('Shared item was filtered due to content')
return
shares_filename = \
@ -1826,7 +1827,7 @@ def _dfc_to_shares_format(catalog_json: {},
all_text = \
item['DFC:description'] + ' ' + item_type + ' ' + item_category
if is_filtered_globally(base_dir, all_text):
if is_filtered_globally(base_dir, all_text, system_language):
continue
dfc_id = None

View File

@ -54,6 +54,7 @@ from follow import clear_followers
from follow import send_follow_request_via_server
from follow import send_unfollow_request_via_server
from siteactive import site_is_active
from utils import remove_inverted_text
from utils import standardize_text
from utils import remove_eol
from utils import text_in_file
@ -7550,6 +7551,18 @@ def _test_hashtag_maps():
assert len(map_links) == 2
def _test_uninvert():
print('test_uninvert')
text = 'ʇsǝʇ ɐ sı sıɥʇ'
expected = "this is a test"
result = remove_inverted_text(text, 'en')
if result != expected:
print('text: ' + text)
print('expected: ' + expected)
print('result: ' + result)
assert result == expected
def run_all_tests():
base_dir = os.getcwd()
print('Running tests...')
@ -7567,6 +7580,7 @@ def run_all_tests():
_test_checkbox_names()
_test_thread_functions()
_test_functions()
_test_uninvert()
_test_hashtag_maps()
_test_combine_lines()
_test_text_standardize()

View File

@ -3872,3 +3872,36 @@ def get_json_content_from_accept(accept: str) -> str:
if 'application/ld+json' in accept:
protocol_str = 'application/ld+json'
return protocol_str
def remove_inverted_text(text: str, system_language: str) -> str:
"""Removes any inverted text from the given string
"""
if system_language != 'en':
return text
inverted_lower = [*"zʎxʍʌnʇsɹbdouɯʃʞɾıɥƃɟǝpɔqɐ"]
inverted_upper = [*"Z⅄XMᴧ∩⊥SᴚΌԀOᴎW⅂⋊ſIH⅁ℲƎ◖Ↄ𐐒∀"]
replaced_chars = 0
index = 0
z_value = ord('z')
for test_ch in inverted_lower:
if test_ch in text:
text = text.replace(test_ch, chr(z_value - index))
replaced_chars += 1
index += 1
index = 0
z_value = ord('Z')
for test_ch in inverted_upper:
if test_ch in text:
text = text.replace(test_ch, chr(z_value - index))
replaced_chars += 1
index += 1
if replaced_chars > 1:
text = text[::-1]
return text

View File

@ -73,9 +73,11 @@ def convert_video_to_note(base_dir: str, nickname: str, domain: str,
return None
# check that the content is valid
if is_filtered(base_dir, nickname, domain, post_json_object['name']):
if is_filtered(base_dir, nickname, domain, post_json_object['name'],
system_language):
return None
if is_filtered(base_dir, nickname, domain, post_json_object['content']):
if is_filtered(base_dir, nickname, domain, post_json_object['content'],
system_language):
return None
# get the content
@ -84,7 +86,8 @@ def convert_video_to_note(base_dir: str, nickname: str, domain: str,
if isinstance(post_json_object['license'], dict):
if post_json_object['license'].get('name'):
if is_filtered(base_dir, nickname, domain,
post_json_object['license']['name']):
post_json_object['license']['name'],
system_language):
return None
content += '<p>' + post_json_object['license']['name'] + '</p>'
post_content = post_json_object['content']

View File

@ -1615,12 +1615,14 @@ def _html_edit_profile_skills(base_dir: str, nickname: str, domain: str,
translate: {}) -> str:
"""skills section of Edit Profile screen
"""
system_language = 'en'
skills = get_skills(base_dir, nickname, domain)
skills_str = ''
skill_ctr = 1
if skills:
for skill_desc, skill_value in skills.items():
if is_filtered(base_dir, nickname, domain, skill_desc):
if is_filtered(base_dir, nickname, domain, skill_desc,
system_language):
continue
skills_str += \
'<p><input type="text" placeholder="' + translate['Skill'] + \
@ -2296,7 +2298,8 @@ def html_edit_profile(server, translate: {},
crawlers_allowed: [],
access_keys: {},
default_reply_interval_hrs: int,
cw_lists: {}, lists_enabled: str) -> str:
cw_lists: {}, lists_enabled: str,
system_language: str) -> str:
"""Shows the edit profile screen
"""
path = path.replace('/inbox', '').replace('/outbox', '')
@ -2348,12 +2351,14 @@ def html_edit_profile(server, translate: {},
pgp_pub_key = get_pgp_pub_key(actor_json)
pgp_fingerprint = get_pgp_fingerprint(actor_json)
if actor_json.get('name'):
if not is_filtered(base_dir, nickname, domain, actor_json['name']):
if not is_filtered(base_dir, nickname, domain, actor_json['name'],
system_language):
display_nickname = actor_json['name']
if actor_json.get('summary'):
bio_str = \
actor_json['summary'].replace('<p>', '').replace('</p>', '')
if is_filtered(base_dir, nickname, domain, bio_str):
if is_filtered(base_dir, nickname, domain, bio_str,
system_language):
bio_str = ''
bio_str = remove_html(bio_str)
if actor_json.get('manuallyApprovesFollowers'):