__filename__ = "flags.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"

import os
import re
from utils import acct_dir
from utils import date_utcnow
from utils import date_epoch
from utils import data_dir
from utils import get_config_param
from utils import get_image_extensions
from utils import evil_incarnate
from utils import get_local_network_addresses
from utils import get_attributed_to
from utils import is_dm
from utils import has_object_dict
from utils import locate_post
from utils import load_json
from utils import has_object_string_type
from utils import date_from_string_format
from utils import get_reply_to
from utils import text_in_file
from utils import get_group_paths
from utils import get_quote_toot_url


def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
    """Is the given account a featured writer, appearing in the features
    timeline on news instances?
    """
    features_blocked_filename = \
        acct_dir(base_dir, nickname, domain) + '/.nofeatures'
    return not os.path.isfile(features_blocked_filename)


def is_dormant(base_dir: str, nickname: str, domain: str, actor: str,
               dormant_months: int) -> bool:
    """Is the given followed actor dormant, from the standpoint
    of the given account
    """
    last_seen_filename = acct_dir(base_dir, nickname, domain) + \
        '/lastseen/' + actor.replace('/', '#') + '.txt'

    if not os.path.isfile(last_seen_filename):
        return False

    days_since_epoch_str = None
    try:
        with open(last_seen_filename, 'r',
                  encoding='utf-8') as fp_last_seen:
            days_since_epoch_str = fp_last_seen.read()
    except OSError:
        print('EX: failed to read last seen ' + last_seen_filename)
        return False

    if days_since_epoch_str:
        days_since_epoch = int(days_since_epoch_str)
        curr_time = date_utcnow()
        curr_days_since_epoch = (curr_time - date_epoch()).days
        time_diff_months = \
            int((curr_days_since_epoch - days_since_epoch) / 30)
        if time_diff_months >= dormant_months:
            return True
    return False


def is_editor(base_dir: str, nickname: str) -> bool:
    """Returns true if the given nickname is an editor
    """
    editors_file = data_dir(base_dir) + '/editors.txt'

    if not os.path.isfile(editors_file):
        admin_name = get_config_param(base_dir, 'admin')
        if admin_name:
            if admin_name == nickname:
                return True
        return False

    lines = []
    try:
        with open(editors_file, 'r', encoding='utf-8') as fp_editors:
            lines = fp_editors.readlines()
    except OSError:
        print('EX: is_editor unable to read ' + editors_file)

    if len(lines) == 0:
        admin_name = get_config_param(base_dir, 'admin')
        if admin_name:
            if admin_name == nickname:
                return True
    for editor in lines:
        editor = editor.strip('\n').strip('\r')
        if editor == nickname:
            return True
    return False


def is_artist(base_dir: str, nickname: str) -> bool:
    """Returns true if the given nickname is an artist
    """
    artists_file = data_dir(base_dir) + '/artists.txt'

    if not os.path.isfile(artists_file):
        admin_name = get_config_param(base_dir, 'admin')
        if admin_name:
            if admin_name == nickname:
                return True
        return False

    lines = []
    try:
        with open(artists_file, 'r', encoding='utf-8') as fp_artists:
            lines = fp_artists.readlines()
    except OSError:
        print('EX: is_artist unable to read ' + artists_file)

    if len(lines) == 0:
        admin_name = get_config_param(base_dir, 'admin')
        if admin_name:
            if admin_name == nickname:
                return True
    for artist in lines:
        artist = artist.strip('\n').strip('\r')
        if artist == nickname:
            return True
    return False


def is_image_file(filename: str) -> bool:
    """Is the given filename an image?
    """
    for ext in get_image_extensions():
        if filename.endswith('.' + ext):
            return True
    return False


def is_system_account(nickname: str) -> bool:
    """Returns true if the given nickname is a system account
    """
    if nickname in ('news', 'inbox'):
        return True
    return False


def is_memorial_account(base_dir: str, nickname: str) -> bool:
    """Returns true if the given nickname is a memorial account
    """
    memorial_file = data_dir(base_dir) + '/memorial'
    if not os.path.isfile(memorial_file):
        return False
    memorial_list = []
    try:
        with open(memorial_file, 'r', encoding='utf-8') as fp_memorial:
            memorial_list = fp_memorial.read().split('\n')
    except OSError:
        print('EX: unable to read ' + memorial_file)
    if nickname in memorial_list:
        return True
    return False


def is_suspended(base_dir: str, nickname: str) -> bool:
    """Returns true if the given nickname is suspended
    """
    admin_nickname = get_config_param(base_dir, 'admin')
    if not admin_nickname:
        return False
    if nickname == admin_nickname:
        return False

    suspended_filename = data_dir(base_dir) + '/suspended.txt'
    if os.path.isfile(suspended_filename):
        lines = []
        try:
            with open(suspended_filename, 'r', encoding='utf-8') as fp_susp:
                lines = fp_susp.readlines()
        except OSError:
            print('EX: is_suspended unable to read ' + suspended_filename)

        for suspended in lines:
            if suspended.strip('\n').strip('\r') == nickname:
                return True
    return False


def is_evil(domain: str) -> bool:
    """ https://www.youtube.com/watch?v=5qw1hcevmdU
    """
    if not isinstance(domain, str):
        print('WARN: Malformed domain ' + str(domain))
        return True
    # if a domain contains any of these strings then it is
    # declaring itself to be hostile
    evil_emporium = (
        'nazi', 'extremis', 'extreemis', 'gendercritic',
        'kiwifarm', 'illegal', 'raplst', 'rapist',
        'rapl.st', 'rapi.st', 'antivax', 'plandemic', 'terror'
    )
    for hostile_str in evil_emporium:
        if hostile_str in domain:
            return True
    evil_domains = evil_incarnate()
    for concentrated_evil in evil_domains:
        if domain.endswith(concentrated_evil):
            return True
    return False


def is_local_network_address(ip_address: str) -> bool:
    """Is the given ip address local?
    """
    local_ips = get_local_network_addresses()
    for ip_addr in local_ips:
        if ip_address.startswith(ip_addr):
            return True
    return False


def is_reminder(post_json_object: {}) -> bool:
    """Returns true if the given post is a reminder
    """
    if not is_dm(post_json_object):
        return False
    if not post_json_object['object'].get('to'):
        return False
    if not post_json_object['object'].get('attributedTo'):
        return False
    if not post_json_object['object'].get('tag'):
        return False
    if not isinstance(post_json_object['object']['to'], list):
        return False
    if len(post_json_object['object']['to']) != 1:
        return False
    if post_json_object['object']['to'][0] != \
       get_attributed_to(post_json_object['object']['attributedTo']):
        return False
    for tag in post_json_object['object']['tag']:
        if tag['type'] == 'Event':
            return True
    return False


def is_public_post_from_url(base_dir: str, nickname: str, domain: str,
                            post_url: str) -> bool:
    """Returns whether the given url is a public post
    """
    post_filename = locate_post(base_dir, nickname, domain, post_url)
    if not post_filename:
        return False
    post_json_object = load_json(post_filename)
    if not post_json_object:
        return False
    return is_public_post(post_json_object)


def is_public_post(post_json_object: {}) -> bool:
    """Returns true if the given post is public
    """
    if not post_json_object.get('type'):
        return False
    if post_json_object['type'] != 'Create':
        return False
    if not has_object_dict(post_json_object):
        return False
    if not post_json_object['object'].get('to'):
        return False
    if isinstance(post_json_object['object']['to'], list):
        for recipient in post_json_object['object']['to']:
            if recipient.endswith('#Public') or \
               recipient == 'as:Public' or \
               recipient == 'Public':
                return True
    elif isinstance(post_json_object['object']['to'], str):
        if post_json_object['object']['to'].endswith('#Public'):
            return True
    return False


def is_followers_post(post_json_object: {}) -> bool:
    """Returns true if the given post is to followers
    """
    if not post_json_object.get('type'):
        return False
    if post_json_object['type'] != 'Create':
        return False
    if not has_object_dict(post_json_object):
        return False
    if not post_json_object['object'].get('to'):
        return False
    if isinstance(post_json_object['object']['to'], list):
        for recipient in post_json_object['object']['to']:
            if recipient.endswith('/followers'):
                return True
    elif isinstance(post_json_object['object']['to'], str):
        if post_json_object['object']['to'].endswith('/followers'):
            return True
    return False


def is_unlisted_post(post_json_object: {}) -> bool:
    """Returns true if the given post is unlisted
    """
    if not post_json_object.get('type'):
        return False
    if post_json_object['type'] != 'Create':
        return False
    if not has_object_dict(post_json_object):
        return False
    if not post_json_object['object'].get('to'):
        return False
    if not post_json_object['object'].get('cc'):
        return False
    has_followers = False
    if isinstance(post_json_object['object']['to'], list):
        for recipient in post_json_object['object']['to']:
            if recipient.endswith('/followers'):
                has_followers = True
                break
    elif isinstance(post_json_object['object']['to'], str):
        if post_json_object['object']['to'].endswith('/followers'):
            has_followers = True
    if not has_followers:
        return False
    if isinstance(post_json_object['object']['cc'], list):
        for recipient in post_json_object['object']['cc']:
            if recipient.endswith('#Public') or \
               recipient == 'as:Public' or \
               recipient == 'Public':
                return True
    elif isinstance(post_json_object['object']['cc'], str):
        if post_json_object['object']['cc'].endswith('#Public'):
            return True
    return False


def is_blog_post(post_json_object: {}) -> bool:
    """Is the given post a blog post?
    """
    if post_json_object['type'] != 'Create':
        return False
    if not has_object_dict(post_json_object):
        return False
    if not has_object_string_type(post_json_object, False):
        return False
    if 'content' not in post_json_object['object']:
        return False
    if post_json_object['object']['type'] != 'Article':
        return False
    return True


def is_news_post(post_json_object: {}) -> bool:
    """Is the given post a blog post?
    """
    return post_json_object.get('news')


def is_recent_post(post_json_object: {}, max_days: int) -> bool:
    """ Is the given post recent?
    """
    if not has_object_dict(post_json_object):
        return False
    if not post_json_object['object'].get('published'):
        return False
    if not isinstance(post_json_object['object']['published'], str):
        return False
    curr_time = date_utcnow()
    days_since_epoch = (curr_time - date_epoch()).days
    recently = days_since_epoch - max_days

    published_date_str = post_json_object['object']['published']
    if '.' in published_date_str:
        published_date_str = published_date_str.split('.')[0] + 'Z'

    published_date = \
        date_from_string_format(published_date_str,
                                ["%Y-%m-%dT%H:%M:%S%z"])
    if not published_date:
        print('EX: is_recent_post unrecognized published date ' +
              str(published_date_str))
        return False

    published_days_since_epoch = \
        (published_date - date_epoch()).days
    if published_days_since_epoch < recently:
        return False
    return True


def is_chat_message(post_json_object: {}) -> bool:
    """Returns true if the given post is a chat message
    Note that is_dm should be checked before calling this
    """
    if post_json_object['type'] != 'Create':
        return False
    if not has_object_dict(post_json_object):
        return False
    if post_json_object['object']['type'] != 'ChatMessage':
        return False
    return True


def is_reply(post_json_object: {}, actor: str) -> bool:
    """Returns true if the given post is a reply to the given actor
    """
    if post_json_object['type'] != 'Create':
        return False
    if not has_object_dict(post_json_object):
        return False
    if post_json_object['object'].get('moderationStatus'):
        return False
    if post_json_object['object']['type'] not in ('Note', 'Event', 'Page',
                                                  'EncryptedMessage',
                                                  'ChatMessage', 'Article'):
        return False
    reply_id = get_reply_to(post_json_object['object'])
    if reply_id:
        if isinstance(reply_id, str):
            if reply_id.startswith(actor):
                return True
    if not post_json_object['object'].get('tag'):
        return False
    if not isinstance(post_json_object['object']['tag'], list):
        return False
    for tag in post_json_object['object']['tag']:
        if not tag.get('type'):
            continue
        if tag['type'] == 'Mention':
            if not tag.get('href'):
                continue
            if actor in tag['href']:
                return True
    return False


def is_pgp_encrypted(content: str) -> bool:
    """Returns true if the given content is PGP encrypted
    """
    if '--BEGIN PGP MESSAGE--' in content:
        if '--END PGP MESSAGE--' in content:
            return True
    return False


def invalid_ciphertext(content: str) -> bool:
    """Returns true if the given content contains an invalid key
    """
    if '----BEGIN ' in content or '----END ' in content:
        if not contains_pgp_public_key(content) and \
           not is_pgp_encrypted(content):
            return True
    return False


def contains_pgp_public_key(content: str) -> bool:
    """Returns true if the given content contains a PGP public key
    """
    if '--BEGIN PGP PUBLIC KEY BLOCK--' in content:
        if '--END PGP PUBLIC KEY BLOCK--' in content:
            return True
    return False


def contains_private_key(content: str) -> bool:
    """Returns true if the given content contains a PGP private key
    """
    if '--BEGIN PGP PRIVATE KEY BLOCK--' in content:
        if '--END PGP PRIVATE KEY BLOCK--' in content:
            return True
    if '--BEGIN RSA PRIVATE KEY--' in content:
        if '--END RSA PRIVATE KEY--' in content:
            return True
    return False


def is_float(value) -> bool:
    """Is the given value a float?
    """
    try:
        float(value)
        return True
    except ValueError:
        return False


def is_group_actor(base_dir: str, actor: str, person_cache: {},
                   debug: bool = False) -> bool:
    """Is the given actor a group?
    """
    person_cache_actor = None
    if person_cache:
        if person_cache.get(actor):
            if person_cache[actor].get('actor'):
                person_cache_actor = person_cache[actor]['actor']

    if person_cache_actor:
        if person_cache_actor.get('type'):
            if person_cache_actor['type'] == 'Group':
                if debug:
                    print('Cached actor ' + actor + ' has Group type')
                return True
        return False

    if debug:
        print('Actor ' + actor + ' not in cache')
    cached_actor_filename = \
        base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
    if not os.path.isfile(cached_actor_filename):
        if debug:
            print('Cached actor file not found ' + cached_actor_filename)
        return False
    if text_in_file('"type": "Group"', cached_actor_filename):
        if debug:
            print('Group type found in ' + cached_actor_filename)
        return True
    return False


def is_group_account(base_dir: str, nickname: str, domain: str) -> bool:
    """Returns true if the given account is a group
    """
    account_filename = acct_dir(base_dir, nickname, domain) + '.json'
    if not os.path.isfile(account_filename):
        return False
    if text_in_file('"type": "Group"', account_filename):
        return True
    return False


def has_group_type(base_dir: str, actor: str, person_cache: {},
                   debug: bool = False) -> bool:
    """Does the given actor url have a group type?
    """
    # does the actor path clearly indicate that this is a group?
    # eg. https://lemmy/c/groupname
    group_paths = get_group_paths()
    for grp_path in group_paths:
        if grp_path in actor:
            if debug:
                print('grpPath ' + grp_path + ' in ' + actor)
            return True
    # is there a cached actor which can be examined for Group type?
    return is_group_actor(base_dir, actor, person_cache, debug)


def is_quote_toot(post_json_object: str, content: str) -> bool:
    """Returns true if the given post is a quote toot / quote tweet
    """
    if get_quote_toot_url(post_json_object):
        return True
    # Twitter-style indicator
    if content:
        if 'QT: ' in content:
            return True
    return False


def is_right_to_left_text(text: str) -> bool:
    """Is the given text right to left?
    Persian \u0600-\u06FF
    Arabic \u0627-\u064a
    Hebrew/Yiddish \u0590-\u05FF\uFB2A-\uFB4E
    """
    unicode_str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \
        '[\u0590-\u05FF\uFB2A-\uFB4E]'
    pattern = re.compile(unicode_str)

    return len(re.findall(pattern, text)) > (len(text)/2)


def is_valid_date(date_str: str) -> bool:
    """is the given date valid?
    """
    if not isinstance(date_str, str):
        return False
    if '-' not in date_str:
        return False
    date_sections = date_str.split('-')
    if len(date_sections) != 3:
        return False
    date_sect_ctr = 0
    for section_str in date_sections:
        if not section_str.isdigit():
            return False
        if date_sect_ctr == 0:
            date_year = int(section_str)
            if date_year < 1920 or date_year > 3000:
                return False
        elif date_sect_ctr == 1:
            date_month = int(section_str)
            if date_month < 1 or date_month > 12:
                return False
        elif date_sect_ctr == 2:
            date_day = int(section_str)
            if date_day < 1 or date_day > 31:
                return False
        date_sect_ctr += 1
    return True


def is_premium_account(base_dir: str, nickname: str, domain: str) -> bool:
    """ Is the given account a premium one?
    """
    premium_filename = acct_dir(base_dir, nickname, domain) + '/.premium'
    return os.path.isfile(premium_filename)


def url_permitted(url: str, federation_list: []) -> bool:
    """is the given url permitted?
    """
    if is_evil(url):
        return False
    if not federation_list:
        return True
    for domain in federation_list:
        if domain in url:
            return True
    return False