epicyon/flags.py

626 lines
20 KiB
Python

__filename__ = "flags.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"
import os
import re
from utils import acct_dir
from utils import date_utcnow
from utils import date_epoch
from utils import data_dir
from utils import get_config_param
from utils import get_image_extensions
from utils import evil_incarnate
from utils import get_local_network_addresses
from utils import get_attributed_to
from utils import is_dm
from utils import has_object_dict
from utils import locate_post
from utils import load_json
from utils import has_object_string_type
from utils import date_from_string_format
from utils import get_reply_to
from utils import text_in_file
from utils import get_group_paths
from utils import get_quote_toot_url
def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
"""Is the given account a featured writer, appearing in the features
timeline on news instances?
"""
features_blocked_filename = \
acct_dir(base_dir, nickname, domain) + '/.nofeatures'
return not os.path.isfile(features_blocked_filename)
def is_dormant(base_dir: str, nickname: str, domain: str, actor: str,
dormant_months: int) -> bool:
"""Is the given followed actor dormant, from the standpoint
of the given account
"""
last_seen_filename = acct_dir(base_dir, nickname, domain) + \
'/lastseen/' + actor.replace('/', '#') + '.txt'
if not os.path.isfile(last_seen_filename):
return False
days_since_epoch_str = None
try:
with open(last_seen_filename, 'r',
encoding='utf-8') as fp_last_seen:
days_since_epoch_str = fp_last_seen.read()
except OSError:
print('EX: failed to read last seen ' + last_seen_filename)
return False
if days_since_epoch_str:
days_since_epoch = int(days_since_epoch_str)
curr_time = date_utcnow()
curr_days_since_epoch = (curr_time - date_epoch()).days
time_diff_months = \
int((curr_days_since_epoch - days_since_epoch) / 30)
if time_diff_months >= dormant_months:
return True
return False
def is_editor(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an editor
"""
editors_file = data_dir(base_dir) + '/editors.txt'
if not os.path.isfile(editors_file):
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
return False
lines = []
try:
with open(editors_file, 'r', encoding='utf-8') as fp_editors:
lines = fp_editors.readlines()
except OSError:
print('EX: is_editor unable to read ' + editors_file)
if len(lines) == 0:
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for editor in lines:
editor = editor.strip('\n').strip('\r')
if editor == nickname:
return True
return False
def is_artist(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an artist
"""
artists_file = data_dir(base_dir) + '/artists.txt'
if not os.path.isfile(artists_file):
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
return False
lines = []
try:
with open(artists_file, 'r', encoding='utf-8') as fp_artists:
lines = fp_artists.readlines()
except OSError:
print('EX: is_artist unable to read ' + artists_file)
if len(lines) == 0:
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for artist in lines:
artist = artist.strip('\n').strip('\r')
if artist == nickname:
return True
return False
def is_image_file(filename: str) -> bool:
"""Is the given filename an image?
"""
for ext in get_image_extensions():
if filename.endswith('.' + ext):
return True
return False
def is_system_account(nickname: str) -> bool:
"""Returns true if the given nickname is a system account
"""
if nickname in ('news', 'inbox'):
return True
return False
def is_memorial_account(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is a memorial account
"""
memorial_file = data_dir(base_dir) + '/memorial'
if not os.path.isfile(memorial_file):
return False
memorial_list = []
try:
with open(memorial_file, 'r', encoding='utf-8') as fp_memorial:
memorial_list = fp_memorial.read().split('\n')
except OSError:
print('EX: unable to read ' + memorial_file)
if nickname in memorial_list:
return True
return False
def is_suspended(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is suspended
"""
admin_nickname = get_config_param(base_dir, 'admin')
if not admin_nickname:
return False
if nickname == admin_nickname:
return False
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
lines = []
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_susp:
lines = fp_susp.readlines()
except OSError:
print('EX: is_suspended unable to read ' + suspended_filename)
for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname:
return True
return False
def is_evil(domain: str) -> bool:
""" https://www.youtube.com/watch?v=5qw1hcevmdU
"""
if not isinstance(domain, str):
print('WARN: Malformed domain ' + str(domain))
return True
# if a domain contains any of these strings then it is
# declaring itself to be hostile
evil_emporium = (
'nazi', 'extremis', 'extreemis', 'gendercritic',
'kiwifarm', 'illegal', 'raplst', 'rapist',
'rapl.st', 'rapi.st', 'antivax', 'plandemic', 'terror'
)
for hostile_str in evil_emporium:
if hostile_str in domain:
return True
evil_domains = evil_incarnate()
for concentrated_evil in evil_domains:
if domain.endswith(concentrated_evil):
return True
return False
def is_local_network_address(ip_address: str) -> bool:
"""Is the given ip address local?
"""
local_ips = get_local_network_addresses()
for ip_addr in local_ips:
if ip_address.startswith(ip_addr):
return True
return False
def is_reminder(post_json_object: {}) -> bool:
"""Returns true if the given post is a reminder
"""
if not is_dm(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if not post_json_object['object'].get('attributedTo'):
return False
if not post_json_object['object'].get('tag'):
return False
if not isinstance(post_json_object['object']['to'], list):
return False
if len(post_json_object['object']['to']) != 1:
return False
if post_json_object['object']['to'][0] != \
get_attributed_to(post_json_object['object']['attributedTo']):
return False
for tag in post_json_object['object']['tag']:
if tag['type'] == 'Event':
return True
return False
def is_public_post_from_url(base_dir: str, nickname: str, domain: str,
post_url: str) -> bool:
"""Returns whether the given url is a public post
"""
post_filename = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
return False
post_json_object = load_json(post_filename)
if not post_json_object:
return False
return is_public_post(post_json_object)
def is_public_post(post_json_object: {}) -> bool:
"""Returns true if the given post is public
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('#Public') or \
recipient == 'as:Public' or \
recipient == 'Public':
return True
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('#Public'):
return True
return False
def is_followers_post(post_json_object: {}) -> bool:
"""Returns true if the given post is to followers
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('/followers'):
return True
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('/followers'):
return True
return False
def is_unlisted_post(post_json_object: {}) -> bool:
"""Returns true if the given post is unlisted
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if not post_json_object['object'].get('cc'):
return False
has_followers = False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('/followers'):
has_followers = True
break
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('/followers'):
has_followers = True
if not has_followers:
return False
if isinstance(post_json_object['object']['cc'], list):
for recipient in post_json_object['object']['cc']:
if recipient.endswith('#Public') or \
recipient == 'as:Public' or \
recipient == 'Public':
return True
elif isinstance(post_json_object['object']['cc'], str):
if post_json_object['object']['cc'].endswith('#Public'):
return True
return False
def is_blog_post(post_json_object: {}) -> bool:
"""Is the given post a blog post?
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not has_object_string_type(post_json_object, False):
return False
if 'content' not in post_json_object['object']:
return False
if post_json_object['object']['type'] != 'Article':
return False
return True
def is_news_post(post_json_object: {}) -> bool:
"""Is the given post a blog post?
"""
return post_json_object.get('news')
def is_recent_post(post_json_object: {}, max_days: int) -> bool:
""" Is the given post recent?
"""
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('published'):
return False
if not isinstance(post_json_object['object']['published'], str):
return False
curr_time = date_utcnow()
days_since_epoch = (curr_time - date_epoch()).days
recently = days_since_epoch - max_days
published_date_str = post_json_object['object']['published']
if '.' in published_date_str:
published_date_str = published_date_str.split('.')[0] + 'Z'
published_date = \
date_from_string_format(published_date_str,
["%Y-%m-%dT%H:%M:%S%z"])
if not published_date:
print('EX: is_recent_post unrecognized published date ' +
str(published_date_str))
return False
published_days_since_epoch = \
(published_date - date_epoch()).days
if published_days_since_epoch < recently:
return False
return True
def is_chat_message(post_json_object: {}) -> bool:
"""Returns true if the given post is a chat message
Note that is_dm should be checked before calling this
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if post_json_object['object']['type'] != 'ChatMessage':
return False
return True
def is_reply(post_json_object: {}, actor: str) -> bool:
"""Returns true if the given post is a reply to the given actor
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if post_json_object['object'].get('moderationStatus'):
return False
if post_json_object['object']['type'] not in ('Note', 'Event', 'Page',
'EncryptedMessage',
'ChatMessage', 'Article'):
return False
reply_id = get_reply_to(post_json_object['object'])
if reply_id:
if isinstance(reply_id, str):
if reply_id.startswith(actor):
return True
if not post_json_object['object'].get('tag'):
return False
if not isinstance(post_json_object['object']['tag'], list):
return False
for tag in post_json_object['object']['tag']:
if not tag.get('type'):
continue
if tag['type'] == 'Mention':
if not tag.get('href'):
continue
if actor in tag['href']:
return True
return False
def is_pgp_encrypted(content: str) -> bool:
"""Returns true if the given content is PGP encrypted
"""
if '--BEGIN PGP MESSAGE--' in content:
if '--END PGP MESSAGE--' in content:
return True
return False
def invalid_ciphertext(content: str) -> bool:
"""Returns true if the given content contains an invalid key
"""
if '----BEGIN ' in content or '----END ' in content:
if not contains_pgp_public_key(content) and \
not is_pgp_encrypted(content):
return True
return False
def contains_pgp_public_key(content: str) -> bool:
"""Returns true if the given content contains a PGP public key
"""
if '--BEGIN PGP PUBLIC KEY BLOCK--' in content:
if '--END PGP PUBLIC KEY BLOCK--' in content:
return True
return False
def contains_private_key(content: str) -> bool:
"""Returns true if the given content contains a PGP private key
"""
if '--BEGIN PGP PRIVATE KEY BLOCK--' in content:
if '--END PGP PRIVATE KEY BLOCK--' in content:
return True
if '--BEGIN RSA PRIVATE KEY--' in content:
if '--END RSA PRIVATE KEY--' in content:
return True
return False
def is_float(value) -> bool:
"""Is the given value a float?
"""
try:
float(value)
return True
except ValueError:
return False
def is_group_actor(base_dir: str, actor: str, person_cache: {},
debug: bool = False) -> bool:
"""Is the given actor a group?
"""
person_cache_actor = None
if person_cache:
if person_cache.get(actor):
if person_cache[actor].get('actor'):
person_cache_actor = person_cache[actor]['actor']
if person_cache_actor:
if person_cache_actor.get('type'):
if person_cache_actor['type'] == 'Group':
if debug:
print('Cached actor ' + actor + ' has Group type')
return True
return False
if debug:
print('Actor ' + actor + ' not in cache')
cached_actor_filename = \
base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if not os.path.isfile(cached_actor_filename):
if debug:
print('Cached actor file not found ' + cached_actor_filename)
return False
if text_in_file('"type": "Group"', cached_actor_filename):
if debug:
print('Group type found in ' + cached_actor_filename)
return True
return False
def is_group_account(base_dir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account is a group
"""
account_filename = acct_dir(base_dir, nickname, domain) + '.json'
if not os.path.isfile(account_filename):
return False
if text_in_file('"type": "Group"', account_filename):
return True
return False
def has_group_type(base_dir: str, actor: str, person_cache: {},
debug: bool = False) -> bool:
"""Does the given actor url have a group type?
"""
# does the actor path clearly indicate that this is a group?
# eg. https://lemmy/c/groupname
group_paths = get_group_paths()
for grp_path in group_paths:
if grp_path in actor:
if debug:
print('grpPath ' + grp_path + ' in ' + actor)
return True
# is there a cached actor which can be examined for Group type?
return is_group_actor(base_dir, actor, person_cache, debug)
def is_quote_toot(post_json_object: str, content: str) -> bool:
"""Returns true if the given post is a quote toot / quote tweet
"""
if get_quote_toot_url(post_json_object):
return True
# Twitter-style indicator
if content:
if 'QT: ' in content:
return True
return False
def is_right_to_left_text(text: str) -> bool:
"""Is the given text right to left?
Persian \u0600-\u06FF
Arabic \u0627-\u064a
Hebrew/Yiddish \u0590-\u05FF\uFB2A-\uFB4E
"""
unicode_str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \
'[\u0590-\u05FF\uFB2A-\uFB4E]'
pattern = re.compile(unicode_str)
return len(re.findall(pattern, text)) > (len(text)/2)
def is_valid_date(date_str: str) -> bool:
"""is the given date valid?
"""
if not isinstance(date_str, str):
return False
if '-' not in date_str:
return False
date_sections = date_str.split('-')
if len(date_sections) != 3:
return False
date_sect_ctr = 0
for section_str in date_sections:
if not section_str.isdigit():
return False
if date_sect_ctr == 0:
date_year = int(section_str)
if date_year < 1920 or date_year > 3000:
return False
elif date_sect_ctr == 1:
date_month = int(section_str)
if date_month < 1 or date_month > 12:
return False
elif date_sect_ctr == 2:
date_day = int(section_str)
if date_day < 1 or date_day > 31:
return False
date_sect_ctr += 1
return True
def is_premium_account(base_dir: str, nickname: str, domain: str) -> bool:
""" Is the given account a premium one?
"""
premium_filename = acct_dir(base_dir, nickname, domain) + '/.premium'
return os.path.isfile(premium_filename)
def url_permitted(url: str, federation_list: []) -> bool:
"""is the given url permitted?
"""
if is_evil(url):
return False
if not federation_list:
return True
for domain in federation_list:
if domain in url:
return True
return False