Move flag functions to their own module

main
Bob Mottram 2024-09-13 14:58:14 +01:00
parent 8fa9189518
commit a876c9871d
56 changed files with 755 additions and 724 deletions

View File

@ -10,18 +10,18 @@ __status__ = "Production"
__module_group__ = "ActivityPub"
import os
from flags import has_group_type
from flags import url_permitted
from utils import get_user_paths
from utils import text_in_file
from utils import has_object_string_object
from utils import has_users_path
from utils import get_full_domain
from utils import url_permitted
from utils import get_domain_from_actor
from utils import get_nickname_from_actor
from utils import domain_permitted
from utils import follow_person
from utils import acct_dir
from utils import has_group_type
from utils import local_actor_url
from utils import has_actor
from utils import has_object_string_type

View File

@ -10,10 +10,11 @@ __status__ = "Production"
__module_group__ = "ActivityPub"
import os
from flags import has_group_type
from flags import url_permitted
from utils import text_in_file
from utils import get_user_paths
from utils import has_object_string_object
from utils import has_group_type
from utils import has_object_dict
from utils import remove_domain_port
from utils import remove_id_ending
@ -21,7 +22,6 @@ from utils import has_users_path
from utils import get_full_domain
from utils import get_status_number
from utils import create_outbox_dir
from utils import url_permitted
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import locate_post

View File

@ -12,9 +12,9 @@ import hashlib
import binascii
import os
import secrets
from flags import is_system_account
from flags import is_memorial_account
from utils import data_dir
from utils import is_system_account
from utils import is_memorial_account
from utils import has_users_path
from utils import text_in_file
from utils import remove_eol

View File

@ -12,6 +12,7 @@ import json
import time
from session import get_json_valid
from session import create_session
from flags import is_evil
from utils import get_user_paths
from utils import contains_statuses
from utils import data_dir
@ -33,7 +34,6 @@ from utils import set_config_param
from utils import has_users_path
from utils import get_full_domain
from utils import remove_id_ending
from utils import is_evil
from utils import locate_post
from utils import evil_incarnate
from utils import get_domain_from_actor

View File

@ -11,13 +11,13 @@ import os
from pprint import pprint
from webfinger import webfinger_handle
from auth import create_basic_auth_header
from flags import url_permitted
from utils import get_url_from_post
from utils import remove_domain_port
from utils import has_users_path
from utils import get_full_domain
from utils import remove_id_ending
from utils import remove_post_from_cache
from utils import url_permitted
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import locate_post

View File

@ -12,7 +12,7 @@ from session import download_image
from session import url_exists
from session import get_json
from session import get_json_valid
from utils import url_permitted
from flags import url_permitted
from utils import remove_html
from utils import get_url_from_post
from utils import data_dir

View File

@ -15,6 +15,10 @@ import email.parser
import urllib.parse
from shutil import copyfile
from dateutil.parser import parse
from flags import is_pgp_encrypted
from flags import contains_pgp_public_key
from flags import is_float
from flags import is_right_to_left_text
from utils import replace_strings
from utils import data_dir
from utils import remove_link_tracking
@ -22,7 +26,6 @@ from utils import string_contains
from utils import string_ends_with
from utils import is_account_dir
from utils import get_url_from_post
from utils import is_right_to_left_text
from utils import language_right_to_left
from utils import binary_is_image
from utils import get_content_from_post
@ -39,10 +42,7 @@ from utils import save_json
from utils import file_last_modified
from utils import get_link_prefixes
from utils import dangerous_markup
from utils import is_pgp_encrypted
from utils import contains_pgp_public_key
from utils import acct_dir
from utils import is_float
from utils import get_currencies
from utils import remove_html
from utils import remove_eol

View File

@ -87,6 +87,9 @@ from httpcodes import http_304
from httpcodes import http_400
from httpcodes import http_503
from httpcodes import write2
from flags import is_image_file
from flags import is_artist
from flags import is_blog_post
from utils import date_utcnow
from utils import replace_strings
from utils import contains_invalid_chars
@ -97,9 +100,6 @@ from utils import local_network_host
from utils import permitted_dir
from utils import has_users_path
from utils import media_file_mime_type
from utils import is_image_file
from utils import is_artist
from utils import is_blog_post
from utils import replace_users_with_at
from utils import remove_id_ending
from utils import local_actor_url

View File

@ -24,7 +24,7 @@ from utils import media_file_mime_type
from utils import get_image_mime_type
from utils import get_image_extensions
from utils import acct_dir
from utils import is_image_file
from flags import is_image_file
from daemon_utils import etag_exists
from fitnessFunctions import fitness_performance
from person import save_person_qrcode

View File

@ -10,9 +10,10 @@ __module_group__ = "Core GET"
import os
import json
from webapp_conversation import html_conversation_view
from utils import is_premium_account
from flags import is_public_post_from_url
from flags import is_public_post
from flags import is_premium_account
from utils import get_instance_url
from utils import is_public_post_from_url
from utils import local_actor_url
from utils import locate_post
from utils import get_config_param
@ -23,7 +24,6 @@ from utils import acct_dir
from utils import get_json_content_from_accept
from utils import convert_domains
from utils import has_object_dict
from utils import is_public_post
from utils import load_json
from session import establish_session
from languages import get_understood_languages

View File

@ -10,8 +10,8 @@ __module_group__ = "Core GET"
import json
from securemode import secure_mode
from posts import is_moderator
from utils import is_artist
from utils import is_editor
from flags import is_artist
from flags import is_editor
from utils import convert_domains
from utils import get_json_content_from_accept
from httpheaders import set_headers

View File

@ -10,9 +10,9 @@ __module_group__ = "Core"
import os
import datetime
from hashlib import md5
from flags import is_image_file
from utils import media_file_mime_type
from utils import data_dir
from utils import is_image_file
from utils import string_contains
from utils import decoded_host
from utils import check_bad_path

View File

@ -10,12 +10,12 @@ __module_group__ = "Core POST"
import errno
import urllib.parse
from socket import error as SocketError
from flags import has_group_type
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import get_full_domain
from utils import local_actor_url
from utils import get_status_number
from utils import has_group_type
from follow import unfollow_account
from follow import send_follow_request
from follow import remove_follower

View File

@ -15,7 +15,7 @@ from httpcodes import http_404
from httpheaders import redirect_headers
from utils import get_instance_url
from utils import get_nickname_from_actor
from utils import is_editor
from flags import is_editor
from content import extract_text_fields_in_post
from blocking import is_blocked_hashtag
from filters import is_filtered

View File

@ -10,11 +10,11 @@ __module_group__ = "Core POST"
import os
import errno
from socket import error as SocketError
from flags import is_editor
from utils import data_dir
from utils import dangerous_markup
from utils import get_instance_url
from utils import get_nickname_from_actor
from utils import is_editor
from utils import get_config_param
from httpheaders import redirect_headers
from content import extract_text_fields_in_post

View File

@ -25,13 +25,13 @@ from httpheaders import redirect_headers
from httpheaders import clear_login_details
from webapp_login import html_get_login_credentials
from webapp_suspended import html_suspended
from flags import is_suspended
from flags import is_local_network_address
from utils import data_dir
from utils import acct_dir
from utils import is_suspended
from utils import is_local_network_address
from utils import get_instance_url
from utils import valid_password
from utils import is_system_account
from flags import is_system_account
from person import person_upgrade_actor
from person import activate_account2
from person import register_account

View File

@ -10,6 +10,7 @@ __module_group__ = "Core POST"
import os
import errno
from socket import error as SocketError
from flags import is_editor
from utils import data_dir
from utils import clear_from_post_caches
from utils import remove_id_ending
@ -18,7 +19,6 @@ from utils import first_paragraph_from_string
from utils import date_from_string_format
from utils import load_json
from utils import locate_post
from utils import is_editor
from utils import acct_dir
from utils import get_instance_url
from utils import get_nickname_from_actor

View File

@ -14,11 +14,12 @@ from socket import error as SocketError
from blocking import save_blocked_military
from httpheaders import redirect_headers
from httpheaders import clear_login_details
from flags import is_artist
from flags import is_memorial_account
from flags import is_premium_account
from utils import data_dir
from utils import set_premium_account
from utils import is_premium_account
from utils import remove_avatar_from_cache
from utils import is_memorial_account
from utils import save_json
from utils import save_reverse_timeline
from utils import set_minimize_all_images
@ -35,7 +36,6 @@ from utils import remove_eol
from utils import remove_html
from utils import get_url_from_post
from utils import load_json
from utils import is_artist
from utils import acct_dir
from utils import get_config_param
from utils import get_instance_url

View File

@ -28,8 +28,9 @@ from media import process_meta_data
from media import convert_image_to_low_bandwidth
from media import attach_media
from city import get_spoofed_city
from flags import is_image_file
from flags import is_float
from utils import get_instance_url
from utils import is_float
from utils import save_json
from utils import remove_post_from_cache
from utils import load_json
@ -39,7 +40,6 @@ from utils import get_base_content_from_post
from utils import license_link_from_name
from utils import get_config_param
from utils import acct_dir
from utils import is_image_file
from posts import create_reading_post
from posts import create_question_post
from posts import create_report_post

View File

@ -38,7 +38,7 @@ from utils import get_domain_from_actor
from utils import get_actor_from_post
from utils import has_actor
from utils import resembles_url
from utils import is_system_account
from flags import is_system_account
from cache import check_for_changed_actor
from cache import get_person_from_cache
from website import get_website

View File

@ -16,6 +16,7 @@ import webbrowser
import urllib.parse
from pathlib import Path
from random import randint
from flags import is_pgp_encrypted
from utils import replace_strings
from utils import get_post_attachments
from utils import get_url_from_post
@ -33,7 +34,6 @@ from utils import is_dm
from utils import load_translations_from_file
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import is_pgp_encrypted
from utils import local_actor_url
from utils import get_reply_to
from utils import get_actor_from_post

625
flags.py 100644
View File

@ -0,0 +1,625 @@
__filename__ = "flags.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"
import os
import re
from utils import acct_dir
from utils import date_utcnow
from utils import date_epoch
from utils import data_dir
from utils import get_config_param
from utils import get_image_extensions
from utils import evil_incarnate
from utils import get_local_network_addresses
from utils import get_attributed_to
from utils import is_dm
from utils import has_object_dict
from utils import locate_post
from utils import load_json
from utils import has_object_string_type
from utils import date_from_string_format
from utils import get_reply_to
from utils import text_in_file
from utils import get_group_paths
from utils import get_quote_toot_url
def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
"""Is the given account a featured writer, appearing in the features
timeline on news instances?
"""
features_blocked_filename = \
acct_dir(base_dir, nickname, domain) + '/.nofeatures'
return not os.path.isfile(features_blocked_filename)
def is_dormant(base_dir: str, nickname: str, domain: str, actor: str,
dormant_months: int) -> bool:
"""Is the given followed actor dormant, from the standpoint
of the given account
"""
last_seen_filename = acct_dir(base_dir, nickname, domain) + \
'/lastseen/' + actor.replace('/', '#') + '.txt'
if not os.path.isfile(last_seen_filename):
return False
days_since_epoch_str = None
try:
with open(last_seen_filename, 'r',
encoding='utf-8') as fp_last_seen:
days_since_epoch_str = fp_last_seen.read()
except OSError:
print('EX: failed to read last seen ' + last_seen_filename)
return False
if days_since_epoch_str:
days_since_epoch = int(days_since_epoch_str)
curr_time = date_utcnow()
curr_days_since_epoch = (curr_time - date_epoch()).days
time_diff_months = \
int((curr_days_since_epoch - days_since_epoch) / 30)
if time_diff_months >= dormant_months:
return True
return False
def is_editor(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an editor
"""
editors_file = data_dir(base_dir) + '/editors.txt'
if not os.path.isfile(editors_file):
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
return False
lines = []
try:
with open(editors_file, 'r', encoding='utf-8') as fp_editors:
lines = fp_editors.readlines()
except OSError:
print('EX: is_editor unable to read ' + editors_file)
if len(lines) == 0:
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for editor in lines:
editor = editor.strip('\n').strip('\r')
if editor == nickname:
return True
return False
def is_artist(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an artist
"""
artists_file = data_dir(base_dir) + '/artists.txt'
if not os.path.isfile(artists_file):
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
return False
lines = []
try:
with open(artists_file, 'r', encoding='utf-8') as fp_artists:
lines = fp_artists.readlines()
except OSError:
print('EX: is_artist unable to read ' + artists_file)
if len(lines) == 0:
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for artist in lines:
artist = artist.strip('\n').strip('\r')
if artist == nickname:
return True
return False
def is_image_file(filename: str) -> bool:
"""Is the given filename an image?
"""
for ext in get_image_extensions():
if filename.endswith('.' + ext):
return True
return False
def is_system_account(nickname: str) -> bool:
"""Returns true if the given nickname is a system account
"""
if nickname in ('news', 'inbox'):
return True
return False
def is_memorial_account(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is a memorial account
"""
memorial_file = data_dir(base_dir) + '/memorial'
if not os.path.isfile(memorial_file):
return False
memorial_list = []
try:
with open(memorial_file, 'r', encoding='utf-8') as fp_memorial:
memorial_list = fp_memorial.read().split('\n')
except OSError:
print('EX: unable to read ' + memorial_file)
if nickname in memorial_list:
return True
return False
def is_suspended(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is suspended
"""
admin_nickname = get_config_param(base_dir, 'admin')
if not admin_nickname:
return False
if nickname == admin_nickname:
return False
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
lines = []
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_susp:
lines = fp_susp.readlines()
except OSError:
print('EX: is_suspended unable to read ' + suspended_filename)
for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname:
return True
return False
def is_evil(domain: str) -> bool:
""" https://www.youtube.com/watch?v=5qw1hcevmdU
"""
if not isinstance(domain, str):
print('WARN: Malformed domain ' + str(domain))
return True
# if a domain contains any of these strings then it is
# declaring itself to be hostile
evil_emporium = (
'nazi', 'extremis', 'extreemis', 'gendercritic',
'kiwifarm', 'illegal', 'raplst', 'rapist',
'rapl.st', 'rapi.st', 'antivax', 'plandemic', 'terror'
)
for hostile_str in evil_emporium:
if hostile_str in domain:
return True
evil_domains = evil_incarnate()
for concentrated_evil in evil_domains:
if domain.endswith(concentrated_evil):
return True
return False
def is_local_network_address(ip_address: str) -> bool:
"""Is the given ip address local?
"""
local_ips = get_local_network_addresses()
for ip_addr in local_ips:
if ip_address.startswith(ip_addr):
return True
return False
def is_reminder(post_json_object: {}) -> bool:
"""Returns true if the given post is a reminder
"""
if not is_dm(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if not post_json_object['object'].get('attributedTo'):
return False
if not post_json_object['object'].get('tag'):
return False
if not isinstance(post_json_object['object']['to'], list):
return False
if len(post_json_object['object']['to']) != 1:
return False
if post_json_object['object']['to'][0] != \
get_attributed_to(post_json_object['object']['attributedTo']):
return False
for tag in post_json_object['object']['tag']:
if tag['type'] == 'Event':
return True
return False
def is_public_post_from_url(base_dir: str, nickname: str, domain: str,
post_url: str) -> bool:
"""Returns whether the given url is a public post
"""
post_filename = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
return False
post_json_object = load_json(post_filename)
if not post_json_object:
return False
return is_public_post(post_json_object)
def is_public_post(post_json_object: {}) -> bool:
"""Returns true if the given post is public
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('#Public') or \
recipient == 'as:Public' or \
recipient == 'Public':
return True
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('#Public'):
return True
return False
def is_followers_post(post_json_object: {}) -> bool:
"""Returns true if the given post is to followers
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('/followers'):
return True
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('/followers'):
return True
return False
def is_unlisted_post(post_json_object: {}) -> bool:
"""Returns true if the given post is unlisted
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if not post_json_object['object'].get('cc'):
return False
has_followers = False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('/followers'):
has_followers = True
break
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('/followers'):
has_followers = True
if not has_followers:
return False
if isinstance(post_json_object['object']['cc'], list):
for recipient in post_json_object['object']['cc']:
if recipient.endswith('#Public') or \
recipient == 'as:Public' or \
recipient == 'Public':
return True
elif isinstance(post_json_object['object']['cc'], str):
if post_json_object['object']['cc'].endswith('#Public'):
return True
return False
def is_blog_post(post_json_object: {}) -> bool:
"""Is the given post a blog post?
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not has_object_string_type(post_json_object, False):
return False
if 'content' not in post_json_object['object']:
return False
if post_json_object['object']['type'] != 'Article':
return False
return True
def is_news_post(post_json_object: {}) -> bool:
"""Is the given post a blog post?
"""
return post_json_object.get('news')
def is_recent_post(post_json_object: {}, max_days: int) -> bool:
""" Is the given post recent?
"""
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('published'):
return False
if not isinstance(post_json_object['object']['published'], str):
return False
curr_time = date_utcnow()
days_since_epoch = (curr_time - date_epoch()).days
recently = days_since_epoch - max_days
published_date_str = post_json_object['object']['published']
if '.' in published_date_str:
published_date_str = published_date_str.split('.')[0] + 'Z'
published_date = \
date_from_string_format(published_date_str,
["%Y-%m-%dT%H:%M:%S%z"])
if not published_date:
print('EX: is_recent_post unrecognized published date ' +
str(published_date_str))
return False
published_days_since_epoch = \
(published_date - date_epoch()).days
if published_days_since_epoch < recently:
return False
return True
def is_chat_message(post_json_object: {}) -> bool:
"""Returns true if the given post is a chat message
Note that is_dm should be checked before calling this
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if post_json_object['object']['type'] != 'ChatMessage':
return False
return True
def is_reply(post_json_object: {}, actor: str) -> bool:
"""Returns true if the given post is a reply to the given actor
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if post_json_object['object'].get('moderationStatus'):
return False
if post_json_object['object']['type'] not in ('Note', 'Event', 'Page',
'EncryptedMessage',
'ChatMessage', 'Article'):
return False
reply_id = get_reply_to(post_json_object['object'])
if reply_id:
if isinstance(reply_id, str):
if reply_id.startswith(actor):
return True
if not post_json_object['object'].get('tag'):
return False
if not isinstance(post_json_object['object']['tag'], list):
return False
for tag in post_json_object['object']['tag']:
if not tag.get('type'):
continue
if tag['type'] == 'Mention':
if not tag.get('href'):
continue
if actor in tag['href']:
return True
return False
def is_pgp_encrypted(content: str) -> bool:
"""Returns true if the given content is PGP encrypted
"""
if '--BEGIN PGP MESSAGE--' in content:
if '--END PGP MESSAGE--' in content:
return True
return False
def invalid_ciphertext(content: str) -> bool:
"""Returns true if the given content contains an invalid key
"""
if '----BEGIN ' in content or '----END ' in content:
if not contains_pgp_public_key(content) and \
not is_pgp_encrypted(content):
return True
return False
def contains_pgp_public_key(content: str) -> bool:
"""Returns true if the given content contains a PGP public key
"""
if '--BEGIN PGP PUBLIC KEY BLOCK--' in content:
if '--END PGP PUBLIC KEY BLOCK--' in content:
return True
return False
def contains_private_key(content: str) -> bool:
"""Returns true if the given content contains a PGP private key
"""
if '--BEGIN PGP PRIVATE KEY BLOCK--' in content:
if '--END PGP PRIVATE KEY BLOCK--' in content:
return True
if '--BEGIN RSA PRIVATE KEY--' in content:
if '--END RSA PRIVATE KEY--' in content:
return True
return False
def is_float(value) -> bool:
"""Is the given value a float?
"""
try:
float(value)
return True
except ValueError:
return False
def is_group_actor(base_dir: str, actor: str, person_cache: {},
debug: bool = False) -> bool:
"""Is the given actor a group?
"""
person_cache_actor = None
if person_cache:
if person_cache.get(actor):
if person_cache[actor].get('actor'):
person_cache_actor = person_cache[actor]['actor']
if person_cache_actor:
if person_cache_actor.get('type'):
if person_cache_actor['type'] == 'Group':
if debug:
print('Cached actor ' + actor + ' has Group type')
return True
return False
if debug:
print('Actor ' + actor + ' not in cache')
cached_actor_filename = \
base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if not os.path.isfile(cached_actor_filename):
if debug:
print('Cached actor file not found ' + cached_actor_filename)
return False
if text_in_file('"type": "Group"', cached_actor_filename):
if debug:
print('Group type found in ' + cached_actor_filename)
return True
return False
def is_group_account(base_dir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account is a group
"""
account_filename = acct_dir(base_dir, nickname, domain) + '.json'
if not os.path.isfile(account_filename):
return False
if text_in_file('"type": "Group"', account_filename):
return True
return False
def has_group_type(base_dir: str, actor: str, person_cache: {},
debug: bool = False) -> bool:
"""Does the given actor url have a group type?
"""
# does the actor path clearly indicate that this is a group?
# eg. https://lemmy/c/groupname
group_paths = get_group_paths()
for grp_path in group_paths:
if grp_path in actor:
if debug:
print('grpPath ' + grp_path + ' in ' + actor)
return True
# is there a cached actor which can be examined for Group type?
return is_group_actor(base_dir, actor, person_cache, debug)
def is_quote_toot(post_json_object: str, content: str) -> bool:
"""Returns true if the given post is a quote toot / quote tweet
"""
if get_quote_toot_url(post_json_object):
return True
# Twitter-style indicator
if content:
if 'QT: ' in content:
return True
return False
def is_right_to_left_text(text: str) -> bool:
"""Is the given text right to left?
Persian \u0600-\u06FF
Arabic \u0627-\u064a
Hebrew/Yiddish \u0590-\u05FF\uFB2A-\uFB4E
"""
unicode_str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \
'[\u0590-\u05FF\uFB2A-\uFB4E]'
pattern = re.compile(unicode_str)
return len(re.findall(pattern, text)) > (len(text)/2)
def is_valid_date(date_str: str) -> bool:
"""is the given date valid?
"""
if not isinstance(date_str, str):
return False
if '-' not in date_str:
return False
date_sections = date_str.split('-')
if len(date_sections) != 3:
return False
date_sect_ctr = 0
for section_str in date_sections:
if not section_str.isdigit():
return False
if date_sect_ctr == 0:
date_year = int(section_str)
if date_year < 1920 or date_year > 3000:
return False
elif date_sect_ctr == 1:
date_month = int(section_str)
if date_month < 1 or date_month > 12:
return False
elif date_sect_ctr == 2:
date_day = int(section_str)
if date_day < 1 or date_day > 31:
return False
date_sect_ctr += 1
return True
def is_premium_account(base_dir: str, nickname: str, domain: str) -> bool:
""" Is the given account a premium one?
"""
premium_filename = acct_dir(base_dir, nickname, domain) + '/.premium'
return os.path.isfile(premium_filename)
def url_permitted(url: str, federation_list: []) -> bool:
"""is the given url permitted?
"""
if is_evil(url):
return False
if not federation_list:
return True
for domain in federation_list:
if domain in url:
return True
return False

View File

@ -9,6 +9,7 @@ __module_group__ = "ActivityPub"
import os
from pprint import pprint
from flags import has_group_type
from utils import get_user_paths
from utils import acct_handle_dir
from utils import has_object_string_object
@ -29,7 +30,6 @@ from utils import load_json
from utils import save_json
from utils import is_account_dir
from utils import acct_dir
from utils import has_group_type
from utils import local_actor_url
from utils import text_in_file
from utils import remove_eol

View File

@ -12,11 +12,12 @@ from uuid import UUID
from hashlib import md5
from datetime import datetime
from datetime import timedelta
from flags import is_reminder
from flags import is_public_post
from utils import replace_strings
from utils import date_from_numbers
from utils import date_from_string_format
from utils import acct_handle_dir
from utils import is_public_post
from utils import load_json
from utils import save_json
from utils import locate_post
@ -29,7 +30,6 @@ from utils import get_status_number
from utils import get_full_domain
from utils import text_in_file
from utils import remove_eol
from utils import is_reminder
from filters import is_filtered
from context import get_individual_post_context
from session import get_method

View File

@ -14,6 +14,14 @@ import time
import random
from shutil import copyfile
from linked_data_sig import verify_json_signature
from flags import is_system_account
from flags import is_blog_post
from flags import is_recent_post
from flags import is_reply
from flags import is_group_account
from flags import has_group_type
from flags import is_quote_toot
from flags import url_permitted
from utils import harmless_markup
from utils import quote_toots_allowed
from utils import lines_in_file
@ -21,15 +29,12 @@ from utils import date_epoch
from utils import date_utcnow
from utils import contains_statuses
from utils import get_actor_from_post_id
from utils import is_quote_toot
from utils import acct_handle_dir
from utils import text_in_file
from utils import get_media_descriptions_from_post
from utils import get_summary_from_post
from utils import get_account_timezone
from utils import domain_permitted
from utils import is_group_account
from utils import is_system_account
from utils import get_reply_interval_hours
from utils import can_reply_to
from utils import get_base_content_from_post
@ -38,13 +43,10 @@ from utils import remove_domain_port
from utils import get_port_from_domain
from utils import has_object_dict
from utils import dm_allowed_from_domain
from utils import is_recent_post
from utils import get_config_param
from utils import has_users_path
from utils import get_full_domain
from utils import remove_id_ending
from utils import is_blog_post
from utils import url_permitted
from utils import create_inbox_queue_dir
from utils import get_status_number
from utils import get_domain_from_actor
@ -53,14 +55,12 @@ from utils import locate_post
from utils import delete_post
from utils import load_json
from utils import save_json
from utils import has_group_type
from utils import local_actor_url
from utils import get_attributed_to
from utils import get_reply_to
from utils import get_actor_from_post
from utils import data_dir
from utils import is_dm
from utils import is_reply
from utils import has_actor
from httpsig import get_digest_algorithm_from_headers
from httpsig import verify_post_headers

View File

@ -9,7 +9,7 @@ __module_group__ = "Timeline"
import os
import time
from utils import is_recent_post
from flags import is_recent_post
from utils import get_actor_from_post_id
from utils import contains_invalid_actor_url_chars
from utils import get_attributed_to

View File

@ -8,6 +8,7 @@ __status__ = "Production"
__module_group__ = "Timeline"
import os
from flags import has_group_type
from utils import undo_announce_collection_entry
from utils import has_object_dict
from utils import remove_domain_port
@ -25,7 +26,6 @@ from utils import acct_handle_dir
from utils import has_object_string_object
from utils import has_object_string_type
from utils import has_actor
from utils import has_group_type
from utils import get_full_domain
from utils import get_actor_from_post
from utils import has_users_path

View File

@ -9,6 +9,8 @@ __module_group__ = "ActivityPub"
import os
from pprint import pprint
from flags import has_group_type
from flags import url_permitted
from utils import has_object_string
from utils import has_object_string_object
from utils import has_object_string_type
@ -17,12 +19,10 @@ from utils import has_object_dict
from utils import has_users_path
from utils import get_full_domain
from utils import remove_id_ending
from utils import url_permitted
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import locate_post
from utils import undo_likes_collection_entry
from utils import has_group_type
from utils import local_actor_url
from utils import load_json
from utils import save_json

View File

@ -9,8 +9,8 @@ __module_group__ = "Core"
import os
from flags import is_float
from utils import get_url_from_post
from utils import is_float
from utils import acct_dir
from utils import load_json
from utils import save_json

View File

@ -8,12 +8,12 @@ __status__ = "Production"
__module_group__ = "Core"
import os
from flags import has_group_type
from utils import data_dir
from utils import is_account_dir
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import acct_dir
from utils import has_group_type
from webfinger import webfinger_handle
from blocking import is_blocked
from posts import get_user_url

View File

@ -19,7 +19,9 @@ from datetime import timezone
from collections import OrderedDict
from utils import valid_post_date
from categories import set_hashtag_category
from utils import is_local_network_address
from flags import is_suspended
from flags import is_local_network_address
from flags import is_public_post
from utils import data_dir
from utils import string_contains
from utils import image_mime_types_dict
@ -36,11 +38,9 @@ from utils import get_fav_filename_from_url
from utils import get_base_content_from_post
from utils import has_object_dict
from utils import first_paragraph_from_string
from utils import is_public_post
from utils import locate_post
from utils import load_json
from utils import save_json
from utils import is_suspended
from utils import contains_invalid_chars
from utils import remove_html
from utils import is_account_dir

View File

@ -15,6 +15,8 @@ from posts import outbox_message_create_wrap
from posts import save_post_to_box
from posts import send_to_followers_thread
from posts import send_to_named_addresses_thread
from flags import is_featured_writer
from flags import is_quote_toot
from utils import data_dir
from utils import quote_toots_allowed
from utils import get_post_attachments
@ -30,13 +32,11 @@ from utils import get_full_domain
from utils import remove_id_ending
from utils import get_domain_from_actor
from utils import dangerous_markup
from utils import is_featured_writer
from utils import load_json
from utils import save_json
from utils import acct_dir
from utils import local_actor_url
from utils import has_actor
from utils import is_quote_toot
from utils import get_actor_from_post
from blocking import is_blocked_domain
from blocking import outbox_block

View File

@ -37,6 +37,7 @@ from roles import set_role
from roles import actor_roles_from_list
from roles import get_actor_roles_list
from media import process_meta_data
from flags import is_image_file
from utils import account_is_indexable
from utils import get_image_mime_type
from utils import get_instance_url
@ -66,7 +67,6 @@ from utils import refresh_newswire
from utils import get_protocol_prefixes
from utils import has_users_path
from utils import get_image_extensions
from utils import is_image_file
from utils import acct_dir
from utils import get_user_paths
from utils import get_group_paths

4
pgp.py
View File

@ -12,11 +12,11 @@ import base64
import subprocess
from pathlib import Path
from person import get_actor_json
from flags import is_pgp_encrypted
from flags import contains_pgp_public_key
from utils import get_occupation_skills
from utils import get_url_from_post
from utils import safe_system_string
from utils import contains_pgp_public_key
from utils import is_pgp_encrypted
from utils import get_full_domain
from utils import get_status_number
from utils import local_actor_url

View File

@ -34,13 +34,18 @@ from webfinger import webfinger_handle
from httpsig import create_signed_header
from siteactive import site_is_active
from languages import understood_post_language
from flags import is_evil
from flags import is_public_post
from flags import invalid_ciphertext
from flags import contains_private_key
from flags import has_group_type
from flags import is_premium_account
from flags import url_permitted
from utils import replace_strings
from utils import valid_content_warning
from utils import get_actor_from_post_id
from utils import string_contains
from utils import get_post_attachments
from utils import is_premium_account
from utils import contains_private_key
from utils import get_url_from_post
from utils import date_from_string_format
from utils import date_epoch
@ -57,11 +62,9 @@ from utils import valid_hash_tag
from utils import get_audio_extensions
from utils import get_summary_from_post
from utils import get_user_paths
from utils import invalid_ciphertext
from utils import has_object_string_type
from utils import remove_id_ending
from utils import replace_users_with_at
from utils import has_group_type
from utils import get_base_content_from_post
from utils import remove_domain_port
from utils import get_port_from_domain
@ -69,15 +72,12 @@ from utils import has_object_dict
from utils import reject_post_id
from utils import remove_invalid_chars
from utils import file_last_modified
from utils import is_public_post
from utils import has_users_path
from utils import valid_post_date
from utils import get_full_domain
from utils import get_followers_list
from utils import is_evil
from utils import get_status_number
from utils import create_person_dir
from utils import url_permitted
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import delete_post

View File

@ -11,6 +11,8 @@ import os
import re
import urllib.parse
from pprint import pprint
from flags import has_group_type
from flags import url_permitted
from utils import data_dir
from utils import has_object_string
from utils import has_object_string_object
@ -20,12 +22,10 @@ from utils import has_object_dict
from utils import has_users_path
from utils import get_full_domain
from utils import remove_id_ending
from utils import url_permitted
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import locate_post
from utils import undo_reaction_collection_entry
from utils import has_group_type
from utils import local_actor_url
from utils import load_json
from utils import save_json

View File

@ -8,9 +8,9 @@ __status__ = "Production"
__module_group__ = "Core"
import os
from flags import is_dormant
from utils import data_dir
from utils import get_user_paths
from utils import is_dormant
from utils import acct_dir
from utils import valid_nickname
from utils import get_full_domain

View File

@ -9,7 +9,7 @@ __module_group__ = "Security"
from httpsig import verify_post_headers
from session import establish_session
from utils import url_permitted
from flags import url_permitted
from httpsig import signed_get_key_id
from cache import get_person_pub_key

View File

@ -9,17 +9,17 @@ __module_group__ = "Session"
import os
import requests
import json
import errno
from socket import error as SocketError
from http.client import HTTPConnection
from flags import is_image_file
from flags import url_permitted
from utils import text_in_file
from utils import acct_dir
from utils import url_permitted
from utils import is_image_file
from utils import binary_is_image
from utils import image_mime_types_dict
from httpsig import create_signed_header
import json
from socket import error as SocketError
import errno
from http.client import HTTPConnection
def create_session(proxy_type: str):

View File

@ -23,6 +23,7 @@ from session import post_json
from session import post_image
from session import create_session
from session import get_json_valid
from flags import is_float
from utils import replace_strings
from utils import data_dir
from utils import resembles_url
@ -44,7 +45,6 @@ from utils import get_image_extensions
from utils import remove_domain_port
from utils import is_account_dir
from utils import acct_dir
from utils import is_float
from utils import get_category_types
from utils import get_shares_files_list
from utils import local_actor_url

View File

@ -11,12 +11,13 @@ import os
import html
import random
import urllib.parse
from flags import is_reply
from flags import is_pgp_encrypted
from utils import data_dir
from utils import get_post_attachments
from utils import get_cached_post_filename
from utils import remove_id_ending
from utils import is_dm
from utils import is_reply
from utils import camel_case_split
from utils import get_domain_from_actor
from utils import get_nickname_from_actor
@ -25,7 +26,6 @@ from utils import get_display_name
from utils import remove_html
from utils import load_json
from utils import save_json
from utils import is_pgp_encrypted
from utils import has_object_dict
from utils import acct_dir
from utils import local_actor_url

View File

@ -57,6 +57,10 @@ from follow import clear_followers
from follow import send_follow_request_via_server
from follow import send_unfollow_request_via_server
from siteactive import site_is_active
from flags import contains_pgp_public_key
from flags import is_group_actor
from flags import is_group_account
from flags import is_right_to_left_text
from utils import replace_strings
from utils import valid_content_warning
from utils import data_dir
@ -66,7 +70,6 @@ from utils import uninvert_text
from utils import get_url_from_post
from utils import date_from_string_format
from utils import date_utcnow
from utils import is_right_to_left_text
from utils import remove_markup_tag
from utils import remove_style_within_html
from utils import html_tag_has_closing
@ -80,12 +83,10 @@ from utils import convert_to_snake_case
from utils import get_sha_256
from utils import dangerous_svg
from utils import can_reply_to
from utils import is_group_account
from utils import get_actor_languages_list
from utils import get_category_types
from utils import get_supported_languages
from utils import set_config_param
from utils import is_group_actor
from utils import date_string_to_seconds
from utils import date_seconds_to_string
from utils import valid_password
@ -111,7 +112,6 @@ from utils import dangerous_markup
from utils import acct_dir
from pgp import extract_pgp_public_key
from pgp import pgp_public_key_upload
from utils import contains_pgp_public_key
from follow import add_follower_of_person
from follow import unfollow_account
from follow import unfollower_of_account

614
utils.py
View File

@ -48,6 +48,16 @@ INVALID_ACTOR_URL_CHARACTERS = (
)
def is_account_dir(dir_name: str) -> bool:
"""Is the given directory an account within /accounts ?
"""
if '@' not in dir_name:
return False
if 'inbox@' in dir_name or 'news@' in dir_name or 'Actor@' in dir_name:
return False
return True
def remove_zero_length_strings(text: str) -> str:
"""removes zero length strings from text
"""
@ -658,15 +668,6 @@ def acct_handle_dir(base_dir: str, handle: str) -> str:
return data_dir(base_dir) + '/' + handle
def is_featured_writer(base_dir: str, nickname: str, domain: str) -> bool:
"""Is the given account a featured writer, appearing in the features
timeline on news instances?
"""
features_blocked_filename = \
acct_dir(base_dir, nickname, domain) + '/.nofeatures'
return not os.path.isfile(features_blocked_filename)
def refresh_newswire(base_dir: str):
"""Causes the newswire to be updates after a change to user accounts
"""
@ -795,100 +796,6 @@ def get_full_domain(domain: str, port: int) -> str:
return domain + ':' + str(port)
def is_dormant(base_dir: str, nickname: str, domain: str, actor: str,
dormant_months: int) -> bool:
"""Is the given followed actor dormant, from the standpoint
of the given account
"""
last_seen_filename = acct_dir(base_dir, nickname, domain) + \
'/lastseen/' + actor.replace('/', '#') + '.txt'
if not os.path.isfile(last_seen_filename):
return False
days_since_epoch_str = None
try:
with open(last_seen_filename, 'r',
encoding='utf-8') as fp_last_seen:
days_since_epoch_str = fp_last_seen.read()
except OSError:
print('EX: failed to read last seen ' + last_seen_filename)
return False
if days_since_epoch_str:
days_since_epoch = int(days_since_epoch_str)
curr_time = date_utcnow()
curr_days_since_epoch = \
(curr_time - date_epoch()).days
time_diff_months = \
int((curr_days_since_epoch - days_since_epoch) / 30)
if time_diff_months >= dormant_months:
return True
return False
def is_editor(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an editor
"""
editors_file = data_dir(base_dir) + '/editors.txt'
if not os.path.isfile(editors_file):
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
return False
lines = []
try:
with open(editors_file, 'r', encoding='utf-8') as fp_editors:
lines = fp_editors.readlines()
except OSError:
print('EX: is_editor unable to read ' + editors_file)
if len(lines) == 0:
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for editor in lines:
editor = editor.strip('\n').strip('\r')
if editor == nickname:
return True
return False
def is_artist(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an artist
"""
artists_file = data_dir(base_dir) + '/artists.txt'
if not os.path.isfile(artists_file):
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
return False
lines = []
try:
with open(artists_file, 'r', encoding='utf-8') as fp_artists:
lines = fp_artists.readlines()
except OSError:
print('EX: is_artist unable to read ' + artists_file)
if len(lines) == 0:
admin_name = get_config_param(base_dir, 'admin')
if admin_name:
if admin_name == nickname:
return True
for artist in lines:
artist = artist.strip('\n').strip('\r')
if artist == nickname:
return True
return False
def get_video_extensions() -> []:
"""Returns a list of the possible video file extensions
"""
@ -976,15 +883,6 @@ def get_image_formats() -> str:
return image_formats
def is_image_file(filename: str) -> bool:
"""Is the given filename an image?
"""
for ext in get_image_extensions():
if filename.endswith('.' + ext):
return True
return False
def get_media_formats() -> str:
"""Returns a string of permissable media formats
used when selecting an attachment for a new post
@ -1070,14 +968,6 @@ def first_paragraph_from_string(content: str) -> str:
return remove_html(paragraph)
def is_system_account(nickname: str) -> bool:
"""Returns true if the given nickname is a system account
"""
if nickname in ('news', 'inbox'):
return True
return False
def get_memorials(base_dir: str) -> str:
"""Returns the nicknames for memorial accounts
"""
@ -1116,23 +1006,6 @@ def set_memorials(base_dir: str, domain: str, memorial_str) -> None:
print('EX: unable to write ' + memorial_file)
def is_memorial_account(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is a memorial account
"""
memorial_file = data_dir(base_dir) + '/memorial'
if not os.path.isfile(memorial_file):
return False
memorial_list = []
try:
with open(memorial_file, 'r', encoding='utf-8') as fp_memorial:
memorial_list = fp_memorial.read().split('\n')
except OSError:
print('EX: unable to read ' + memorial_file)
if nickname in memorial_list:
return True
return False
def _create_config(base_dir: str) -> None:
"""Creates a configuration file
"""
@ -1171,30 +1044,6 @@ def get_config_param(base_dir: str, variable_name: str):
return None
def is_suspended(base_dir: str, nickname: str) -> bool:
"""Returns true if the given nickname is suspended
"""
admin_nickname = get_config_param(base_dir, 'admin')
if not admin_nickname:
return False
if nickname == admin_nickname:
return False
suspended_filename = data_dir(base_dir) + '/suspended.txt'
if os.path.isfile(suspended_filename):
lines = []
try:
with open(suspended_filename, 'r', encoding='utf-8') as fp_susp:
lines = fp_susp.readlines()
except OSError:
print('EX: is_suspended unable to read ' + suspended_filename)
for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname:
return True
return False
def get_followers_list(base_dir: str,
nickname: str, domain: str,
follow_file='following.txt') -> []:
@ -1444,29 +1293,6 @@ def evil_incarnate() -> []:
'kiwifarms.cc')
def is_evil(domain: str) -> bool:
""" https://www.youtube.com/watch?v=5qw1hcevmdU
"""
if not isinstance(domain, str):
print('WARN: Malformed domain ' + str(domain))
return True
# if a domain contains any of these strings then it is
# declaring itself to be hostile
evil_emporium = (
'nazi', 'extremis', 'extreemis', 'gendercritic',
'kiwifarm', 'illegal', 'raplst', 'rapist',
'rapl.st', 'rapi.st', 'antivax', 'plandemic', 'terror'
)
for hostile_str in evil_emporium:
if hostile_str in domain:
return True
evil_domains = evil_incarnate()
for concentrated_evil in evil_domains:
if domain.endswith(concentrated_evil):
return True
return False
def contains_invalid_chars(json_str: str) -> bool:
"""Does the given json string contain invalid characters?
"""
@ -1533,33 +1359,12 @@ def domain_permitted(domain: str, federation_list: []) -> bool:
return False
def url_permitted(url: str, federation_list: []):
if is_evil(url):
return False
if not federation_list:
return True
for domain in federation_list:
if domain in url:
return True
return False
def get_local_network_addresses() -> []:
"""Returns patterns for local network address detection
"""
return ('localhost', '127.0.', '192.168', '10.0.')
def is_local_network_address(ip_address: str) -> bool:
"""Is the given ip address local?
"""
local_ips = get_local_network_addresses()
for ip_addr in local_ips:
if ip_address.startswith(ip_addr):
return True
return False
def _is_dangerous_string_tag(content: str, allow_local_network_access: bool,
separators: [], invalid_strings: []) -> bool:
"""Returns true if the given string is dangerous
@ -2744,30 +2549,6 @@ def is_dm(post_json_object: {}) -> bool:
return True
def is_reminder(post_json_object: {}) -> bool:
"""Returns true if the given post is a reminder
"""
if not is_dm(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if not post_json_object['object'].get('attributedTo'):
return False
if not post_json_object['object'].get('tag'):
return False
if not isinstance(post_json_object['object']['to'], list):
return False
if len(post_json_object['object']['to']) != 1:
return False
if post_json_object['object']['to'][0] != \
get_attributed_to(post_json_object['object']['attributedTo']):
return False
for tag in post_json_object['object']['tag']:
if tag['type'] == 'Event':
return True
return False
def _is_remote_dm(domain_full: str, post_json_object: {}) -> bool:
"""Is the given post a DM from a different domain?
"""
@ -3060,99 +2841,6 @@ def no_of_active_accounts_monthly(base_dir: str, months: int) -> bool:
return account_ctr
def is_public_post_from_url(base_dir: str, nickname: str, domain: str,
post_url: str) -> bool:
"""Returns whether the given url is a public post
"""
post_filename = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
return False
post_json_object = load_json(post_filename)
if not post_json_object:
return False
return is_public_post(post_json_object)
def is_public_post(post_json_object: {}) -> bool:
"""Returns true if the given post is public
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('#Public') or \
recipient == 'as:Public' or \
recipient == 'Public':
return True
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('#Public'):
return True
return False
def is_followers_post(post_json_object: {}) -> bool:
"""Returns true if the given post is to followers
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('/followers'):
return True
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('/followers'):
return True
return False
def is_unlisted_post(post_json_object: {}) -> bool:
"""Returns true if the given post is unlisted
"""
if not post_json_object.get('type'):
return False
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('to'):
return False
if not post_json_object['object'].get('cc'):
return False
has_followers = False
if isinstance(post_json_object['object']['to'], list):
for recipient in post_json_object['object']['to']:
if recipient.endswith('/followers'):
has_followers = True
break
elif isinstance(post_json_object['object']['to'], str):
if post_json_object['object']['to'].endswith('/followers'):
has_followers = True
if not has_followers:
return False
if isinstance(post_json_object['object']['cc'], list):
for recipient in post_json_object['object']['cc']:
if recipient.endswith('#Public') or \
recipient == 'as:Public' or \
recipient == 'Public':
return True
elif isinstance(post_json_object['object']['cc'], str):
if post_json_object['object']['cc'].endswith('#Public'):
return True
return False
def copytree(src: str, dst: str, symlinks: str, ignore: bool):
"""Copy a directory
"""
@ -3249,28 +2937,6 @@ def get_css(base_dir: str, css_filename: str) -> str:
return None
def is_blog_post(post_json_object: {}) -> bool:
"""Is the given post a blog post?
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if not has_object_string_type(post_json_object, False):
return False
if 'content' not in post_json_object['object']:
return False
if post_json_object['object']['type'] != 'Article':
return False
return True
def is_news_post(post_json_object: {}) -> bool:
"""Is the given post a blog post?
"""
return post_json_object.get('news')
def _search_virtual_box_posts(base_dir: str, nickname: str, domain: str,
search_str: str, max_results: int,
box_name: str) -> []:
@ -3726,38 +3392,6 @@ def time_days_ago(datestr: str) -> int:
return date_diff.days
def is_recent_post(post_json_object: {}, max_days: int) -> bool:
""" Is the given post recent?
"""
if not has_object_dict(post_json_object):
return False
if not post_json_object['object'].get('published'):
return False
if not isinstance(post_json_object['object']['published'], str):
return False
curr_time = date_utcnow()
days_since_epoch = (curr_time - date_epoch()).days
recently = days_since_epoch - max_days
published_date_str = post_json_object['object']['published']
if '.' in published_date_str:
published_date_str = published_date_str.split('.')[0] + 'Z'
published_date = \
date_from_string_format(published_date_str,
["%Y-%m-%dT%H:%M:%S%z"])
if not published_date:
print('EX: is_recent_post unrecognized published date ' +
str(published_date_str))
return False
published_days_since_epoch = \
(published_date - date_epoch()).days
if published_days_since_epoch < recently:
return False
return True
def camel_case_split(text: str) -> str:
""" Splits CamelCase into "Camel Case"
"""
@ -3830,92 +3464,6 @@ def reject_post_id(base_dir: str, nickname: str, domain: str,
post_filename + '.reject')
def is_chat_message(post_json_object: {}) -> bool:
"""Returns true if the given post is a chat message
Note that is_dm should be checked before calling this
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if post_json_object['object']['type'] != 'ChatMessage':
return False
return True
def is_reply(post_json_object: {}, actor: str) -> bool:
"""Returns true if the given post is a reply to the given actor
"""
if post_json_object['type'] != 'Create':
return False
if not has_object_dict(post_json_object):
return False
if post_json_object['object'].get('moderationStatus'):
return False
if post_json_object['object']['type'] not in ('Note', 'Event', 'Page',
'EncryptedMessage',
'ChatMessage', 'Article'):
return False
reply_id = get_reply_to(post_json_object['object'])
if reply_id:
if isinstance(reply_id, str):
if reply_id.startswith(actor):
return True
if not post_json_object['object'].get('tag'):
return False
if not isinstance(post_json_object['object']['tag'], list):
return False
for tag in post_json_object['object']['tag']:
if not tag.get('type'):
continue
if tag['type'] == 'Mention':
if not tag.get('href'):
continue
if actor in tag['href']:
return True
return False
def contains_pgp_public_key(content: str) -> bool:
"""Returns true if the given content contains a PGP public key
"""
if '--BEGIN PGP PUBLIC KEY BLOCK--' in content:
if '--END PGP PUBLIC KEY BLOCK--' in content:
return True
return False
def contains_private_key(content: str) -> bool:
"""Returns true if the given content contains a PGP private key
"""
if '--BEGIN PGP PRIVATE KEY BLOCK--' in content:
if '--END PGP PRIVATE KEY BLOCK--' in content:
return True
if '--BEGIN RSA PRIVATE KEY--' in content:
if '--END RSA PRIVATE KEY--' in content:
return True
return False
def is_pgp_encrypted(content: str) -> bool:
"""Returns true if the given content is PGP encrypted
"""
if '--BEGIN PGP MESSAGE--' in content:
if '--END PGP MESSAGE--' in content:
return True
return False
def invalid_ciphertext(content: str) -> bool:
"""Returns true if the given content contains an invalid key
"""
if '----BEGIN ' in content or '----END ' in content:
if not contains_pgp_public_key(content) and \
not is_pgp_encrypted(content):
return True
return False
def load_translations_from_file(base_dir: str, language: str) -> ({}, str):
"""Returns the translations dictionary
"""
@ -4047,16 +3595,6 @@ def set_occupation_skills_list(actor_json: {}, skills_list: []) -> bool:
return False
def is_account_dir(dir_name: str) -> bool:
"""Is the given directory an account within /accounts ?
"""
if '@' not in dir_name:
return False
if 'inbox@' in dir_name or 'news@' in dir_name or 'Actor@' in dir_name:
return False
return True
def permitted_dir(path: str) -> bool:
"""These are special paths which should not be accessible
directly via GET or POST
@ -4203,16 +3741,6 @@ def valid_password(password: str, debug: bool) -> bool:
return True
def is_float(value) -> bool:
"""Is the given value a float?
"""
try:
float(value)
return True
except ValueError:
return False
def date_string_to_seconds(date_str: str) -> int:
"""Converts a date string (eg "published") into seconds since epoch
"""
@ -4236,66 +3764,6 @@ def date_seconds_to_string(date_sec: int) -> str:
return this_date_tz.strftime("%Y-%m-%dT%H:%M:%SZ")
def has_group_type(base_dir: str, actor: str, person_cache: {},
debug: bool = False) -> bool:
"""Does the given actor url have a group type?
"""
# does the actor path clearly indicate that this is a group?
# eg. https://lemmy/c/groupname
group_paths = get_group_paths()
for grp_path in group_paths:
if grp_path in actor:
if debug:
print('grpPath ' + grp_path + ' in ' + actor)
return True
# is there a cached actor which can be examined for Group type?
return is_group_actor(base_dir, actor, person_cache, debug)
def is_group_actor(base_dir: str, actor: str, person_cache: {},
debug: bool = False) -> bool:
"""Is the given actor a group?
"""
person_cache_actor = None
if person_cache:
if person_cache.get(actor):
if person_cache[actor].get('actor'):
person_cache_actor = person_cache[actor]['actor']
if person_cache_actor:
if person_cache_actor.get('type'):
if person_cache_actor['type'] == 'Group':
if debug:
print('Cached actor ' + actor + ' has Group type')
return True
return False
if debug:
print('Actor ' + actor + ' not in cache')
cached_actor_filename = \
base_dir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if not os.path.isfile(cached_actor_filename):
if debug:
print('Cached actor file not found ' + cached_actor_filename)
return False
if text_in_file('"type": "Group"', cached_actor_filename):
if debug:
print('Group type found in ' + cached_actor_filename)
return True
return False
def is_group_account(base_dir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account is a group
"""
account_filename = acct_dir(base_dir, nickname, domain) + '.json'
if not os.path.isfile(account_filename):
return False
if text_in_file('"type": "Group"', account_filename):
return True
return False
def get_currencies() -> {}:
"""Returns a dictionary of currencies
"""
@ -5046,18 +4514,6 @@ def get_quote_toot_url(post_json_object: str) -> str:
return ''
def is_quote_toot(post_json_object: str, content: str) -> bool:
"""Returns true if the given post is a quote toot / quote tweet
"""
if get_quote_toot_url(post_json_object):
return True
# Twitter-style indicator
if content:
if 'QT: ' in content:
return True
return False
def quote_toots_allowed(base_dir: str, nickname: str, domain: str,
sender_nickname: str, sender_domain: str) -> bool:
""" Returns true if quote toots are allowed by the given account
@ -5251,19 +4707,6 @@ def language_right_to_left(language: str) -> bool:
return False
def is_right_to_left_text(text: str) -> bool:
"""Is the given text right to left?
Persian \u0600-\u06FF
Arabic \u0627-\u064a
Hebrew/Yiddish \u0590-\u05FF\uFB2A-\uFB4E
"""
unicode_str = '[\u0627-\u064a]|[\u0600-\u06FF]|' + \
'[\u0590-\u05FF\uFB2A-\uFB4E]'
pattern = re.compile(unicode_str)
return len(re.findall(pattern, text)) > (len(text)/2)
def binary_is_image(filename: str, media_binary) -> bool:
"""Returns true if the given file binary data contains an image
"""
@ -5395,36 +4838,6 @@ def get_reply_to(post_json_object: {}) -> str:
return ''
def is_valid_date(date_str: str) -> bool:
"""is the given date valid?
"""
if not isinstance(date_str, str):
return False
if '-' not in date_str:
return False
date_sections = date_str.split('-')
if len(date_sections) != 3:
return False
date_sect_ctr = 0
for section_str in date_sections:
if not section_str.isdigit():
return False
if date_sect_ctr == 0:
date_year = int(section_str)
if date_year < 1920 or date_year > 3000:
return False
elif date_sect_ctr == 1:
date_month = int(section_str)
if date_month < 1 or date_month > 12:
return False
elif date_sect_ctr == 2:
date_day = int(section_str)
if date_day < 1 or date_day > 31:
return False
date_sect_ctr += 1
return True
def resembles_url(text: str) -> bool:
"""Does the given text look like a url?
"""
@ -5565,13 +4978,6 @@ def check_bad_path(path: str):
return False
def is_premium_account(base_dir: str, nickname: str, domain: str) -> bool:
""" Is the given account a premium one?
"""
premium_filename = acct_dir(base_dir, nickname, domain) + '/.premium'
return os.path.isfile(premium_filename)
def set_premium_account(base_dir: str, nickname: str, domain: str,
flag_state: bool) -> bool:
""" Set or clear the premium account flag

View File

@ -8,12 +8,12 @@ __status__ = "Production"
__module_group__ = "Web Interface Columns"
import os
from flags import is_editor
from flags import is_artist
from utils import replace_strings
from utils import data_dir
from utils import get_config_param
from utils import get_nickname_from_actor
from utils import is_editor
from utils import is_artist
from utils import remove_domain_port
from utils import local_actor_url
from webapp_utils import shares_timeline_json

View File

@ -10,6 +10,7 @@ __module_group__ = "Web Interface Columns"
import os
from content import remove_long_words
from content import limit_repeated_words
from flags import is_editor
from utils import replace_strings
from utils import data_dir
from utils import get_image_extensions
@ -20,7 +21,6 @@ from utils import locate_post
from utils import load_json
from utils import votes_on_newswire_item
from utils import get_nickname_from_actor
from utils import is_editor
from utils import get_config_param
from utils import remove_domain_port
from utils import acct_dir

View File

@ -10,11 +10,11 @@ __module_group__ = "Timeline"
import os
from conversation import download_conversation_posts
from flags import is_public_post
from utils import remove_id_ending
from utils import get_config_param
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import is_public_post
from utils import get_attributed_to
from blocking import is_blocked
from webapp_utils import text_mode_browser

View File

@ -8,8 +8,9 @@ __status__ = "Production"
__module_group__ = "Web Interface"
import os
from flags import is_public_post_from_url
from flags import is_premium_account
from utils import data_dir
from utils import is_premium_account
from utils import dangerous_markup
from utils import remove_html
from utils import get_content_from_post
@ -17,7 +18,6 @@ from utils import has_object_dict
from utils import load_json
from utils import locate_post
from utils import get_new_post_endpoints
from utils import is_public_post_from_url
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import get_media_formats

View File

@ -8,7 +8,7 @@ __status__ = "Production"
__module_group__ = "Timeline"
import os
from utils import is_system_account
from flags import is_system_account
from utils import get_domain_from_actor
from utils import get_config_param
from utils import get_account_timezone

View File

@ -9,11 +9,11 @@ __module_group__ = "Web Interface"
import os
from datetime import datetime, timezone
from flags import is_public_post
from utils import valid_hash_tag
from utils import remove_id_ending
from utils import resembles_url
from utils import has_object_dict
from utils import is_public_post
from utils import local_actor_url
from utils import date_from_string_format
from utils import file_last_modified

View File

@ -8,13 +8,13 @@ __status__ = "Production"
__module_group__ = "Moderation"
import os
from flags import is_editor
from flags import is_artist
from utils import data_dir
from utils import get_url_from_post
from utils import remove_html
from utils import is_artist
from utils import is_account_dir
from utils import get_full_domain
from utils import is_editor
from utils import load_json
from utils import get_nickname_from_actor
from utils import get_domain_from_actor

View File

@ -12,15 +12,15 @@ from shutil import copyfile
from petnames import get_pet_name
from person import is_person_snoozed
from posts import is_moderator
from flags import is_featured_writer
from flags import is_dormant
from utils import data_dir
from utils import quote_toots_allowed
from utils import get_full_domain
from utils import get_config_param
from utils import is_dormant
from utils import remove_html
from utils import get_domain_from_actor
from utils import get_nickname_from_actor
from utils import is_featured_writer
from utils import acct_dir
from utils import text_in_file
from utils import remove_domain_port

View File

@ -24,6 +24,16 @@ from posts import post_is_muted
from posts import get_person_box
from posts import download_announce
from posts import populate_replies_json
from flags import is_editor
from flags import is_reminder
from flags import is_public_post
from flags import is_followers_post
from flags import is_unlisted_post
from flags import is_blog_post
from flags import is_news_post
from flags import is_recent_post
from flags import is_chat_message
from flags import is_pgp_encrypted
from utils import get_actor_from_post_id
from utils import contains_statuses
from utils import data_dir
@ -49,33 +59,23 @@ from utils import get_language_from_post
from utils import get_summary_from_post
from utils import has_object_dict
from utils import update_announce_collection
from utils import is_pgp_encrypted
from utils import is_dm
from utils import is_reminder
from utils import is_chat_message
from utils import reject_post_id
from utils import is_recent_post
from utils import get_config_param
from utils import get_full_domain
from utils import is_editor
from utils import locate_post
from utils import load_json
from utils import get_cached_post_directory
from utils import get_cached_post_filename
from utils import get_protocol_prefixes
from utils import is_news_post
from utils import is_blog_post
from utils import get_display_name
from utils import display_name_is_emoji
from utils import is_public_post
from utils import is_followers_post
from utils import update_recent_posts_cache
from utils import remove_id_ending
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import acct_dir
from utils import local_actor_url
from utils import is_unlisted_post
from utils import language_right_to_left
from utils import get_attributed_to
from utils import get_reply_to

View File

@ -10,9 +10,14 @@ __module_group__ = "Web Interface"
import os
from pprint import pprint
from webfinger import webfinger_handle
from flags import is_dormant
from flags import is_artist
from flags import is_system_account
from flags import is_group_account
from flags import is_valid_date
from flags import is_premium_account
from utils import replace_strings
from utils import data_dir
from utils import is_premium_account
from utils import time_days_ago
from utils import uninvert_text
from utils import get_attributed_to
@ -24,16 +29,12 @@ from utils import ap_proxy_type
from utils import remove_id_ending
from utils import standardize_text
from utils import get_display_name
from utils import is_group_account
from utils import has_object_dict
from utils import get_occupation_name
from utils import get_locked_account
from utils import get_full_domain
from utils import is_artist
from utils import is_dormant
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import is_system_account
from utils import remove_html
from utils import load_json
from utils import get_config_param
@ -44,7 +45,6 @@ from utils import local_actor_url
from utils import get_reply_interval_hours
from utils import get_account_timezone
from utils import remove_eol
from utils import is_valid_date
from utils import get_actor_from_post
from utils import resembles_url
from languages import get_actor_languages

View File

@ -10,6 +10,8 @@ __module_group__ = "Web Interface"
import os
from shutil import copyfile
import urllib.parse
from flags import is_editor
from flags import is_public_post
from utils import data_dir
from utils import get_post_attachments
from utils import get_url_from_post
@ -25,11 +27,9 @@ from utils import get_base_content_from_post
from utils import is_account_dir
from utils import get_config_param
from utils import get_full_domain
from utils import is_editor
from utils import load_json
from utils import get_nickname_from_actor
from utils import locate_post
from utils import is_public_post
from utils import first_paragraph_from_string
from utils import search_box_posts
from utils import get_alt_path

View File

@ -10,15 +10,15 @@ __module_group__ = "Timeline"
import os
import time
from shutil import copyfile
from flags import is_editor
from flags import is_artist
from flags import is_float
from utils import data_dir
from utils import is_artist
from utils import dangerous_markup
from utils import get_config_param
from utils import get_full_domain
from utils import is_editor
from utils import remove_id_ending
from utils import acct_dir
from utils import is_float
from utils import local_actor_url
from utils import remove_eol
from utils import get_actor_from_post

View File

@ -12,6 +12,7 @@ from shutil import copyfile
from collections import OrderedDict
from session import get_json
from session import get_json_valid
from flags import is_float
from utils import replace_strings
from utils import get_image_file
from utils import data_dir
@ -35,7 +36,6 @@ from utils import get_config_param
from utils import acct_dir
from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import is_float
from utils import get_audio_extensions
from utils import get_video_extensions
from utils import get_image_extensions