mirror of https://gitlab.com/bashrc2/epicyon
2030 lines
75 KiB
Python
2030 lines
75 KiB
Python
__filename__ = "blocking.py"
|
|
__author__ = "Bob Mottram"
|
|
__license__ = "AGPL3+"
|
|
__version__ = "1.5.0"
|
|
__maintainer__ = "Bob Mottram"
|
|
__email__ = "bob@libreserver.org"
|
|
__status__ = "Production"
|
|
__module_group__ = "Core"
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
from session import get_json_valid
|
|
from session import create_session
|
|
from utils import date_from_string_format
|
|
from utils import date_utcnow
|
|
from utils import remove_eol
|
|
from utils import has_object_string
|
|
from utils import has_object_string_object
|
|
from utils import has_object_string_type
|
|
from utils import remove_domain_port
|
|
from utils import has_object_dict
|
|
from utils import is_account_dir
|
|
from utils import get_cached_post_filename
|
|
from utils import load_json
|
|
from utils import save_json
|
|
from utils import file_last_modified
|
|
from utils import set_config_param
|
|
from utils import has_users_path
|
|
from utils import get_full_domain
|
|
from utils import remove_id_ending
|
|
from utils import is_evil
|
|
from utils import locate_post
|
|
from utils import evil_incarnate
|
|
from utils import get_domain_from_actor
|
|
from utils import get_nickname_from_actor
|
|
from utils import acct_dir
|
|
from utils import local_actor_url
|
|
from utils import has_actor
|
|
from utils import text_in_file
|
|
from utils import get_actor_from_post
|
|
from conversation import mute_conversation
|
|
from conversation import unmute_conversation
|
|
from auth import create_basic_auth_header
|
|
from session import get_json
|
|
|
|
|
|
def get_global_block_reason(search_text: str,
|
|
blocking_reasons_filename: str) -> str:
|
|
"""Returns the reason why a domain was globally blocked
|
|
"""
|
|
if not text_in_file(search_text, blocking_reasons_filename):
|
|
return ''
|
|
|
|
reasons_str = ''
|
|
try:
|
|
with open(blocking_reasons_filename, 'r',
|
|
encoding='utf-8') as fp_reas:
|
|
reasons_str = fp_reas.read()
|
|
except OSError:
|
|
print('WARN: Failed to raed blocking reasons ' +
|
|
blocking_reasons_filename)
|
|
if not reasons_str:
|
|
return ''
|
|
|
|
reasons_lines = reasons_str.split('\n')
|
|
for line in reasons_lines:
|
|
if line.startswith(search_text):
|
|
if ' ' in line:
|
|
return line.split(' ', 1)[1]
|
|
return ''
|
|
|
|
|
|
def get_account_blocks(base_dir: str,
|
|
nickname: str, domain: str) -> str:
|
|
"""Returne the text for the textarea for "blocked accounts"
|
|
when editing profile
|
|
"""
|
|
account_directory = acct_dir(base_dir, nickname, domain)
|
|
blocking_filename = \
|
|
account_directory + '/blocking.txt'
|
|
blocking_reasons_filename = \
|
|
account_directory + '/blocking_reasons.txt'
|
|
|
|
if not os.path.isfile(blocking_filename):
|
|
return ''
|
|
|
|
blocked_accounts_textarea = ''
|
|
blocking_file_text = ''
|
|
try:
|
|
with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
|
|
blocking_file_text = fp_block.read()
|
|
except OSError:
|
|
print('EX: Failed to read ' + blocking_filename)
|
|
return ''
|
|
|
|
blocklist = blocking_file_text.split('\n')
|
|
for handle in blocklist:
|
|
handle = handle.strip()
|
|
if not handle:
|
|
continue
|
|
reason = \
|
|
get_global_block_reason(handle,
|
|
blocking_reasons_filename)
|
|
if reason:
|
|
blocked_accounts_textarea += \
|
|
handle + ' - ' + reason + '\n'
|
|
continue
|
|
blocked_accounts_textarea += handle + '\n'
|
|
|
|
return blocked_accounts_textarea
|
|
|
|
|
|
def blocked_timeline_json(actor: str, page_number: int, items_per_page: int,
|
|
base_dir: str,
|
|
nickname: str, domain: str) -> {}:
|
|
"""Returns blocked collection for an account
|
|
https://codeberg.org/fediverse/fep/src/branch/main/fep/c648/fep-c648.md
|
|
"""
|
|
blocked_accounts_textarea = \
|
|
get_account_blocks(base_dir, nickname, domain)
|
|
blocked_list = []
|
|
if blocked_accounts_textarea:
|
|
blocked_list = blocked_accounts_textarea.split('\n')
|
|
start_index = (page_number - 1) * items_per_page
|
|
if start_index >= len(blocked_list):
|
|
start_index = 0
|
|
last_page_number = (len(blocked_list) / items_per_page) + 1
|
|
|
|
result_json = {
|
|
"@context": [
|
|
"https://www.w3.org/ns/activitystreams",
|
|
"https://purl.archive.org/socialweb/blocked"
|
|
],
|
|
"id": actor + '?page=' + str(page_number),
|
|
"first": actor + '?page=1',
|
|
"last": actor + '?page=' + str(last_page_number),
|
|
"type": "OrderedCollection",
|
|
"name": nickname + "'s Blocked Collection",
|
|
"orderedItems": []
|
|
}
|
|
|
|
index = start_index
|
|
for _ in range(items_per_page):
|
|
if index >= len(blocked_list):
|
|
break
|
|
block_handle = blocked_list[index]
|
|
block_reason = ''
|
|
if ' - ' in block_handle:
|
|
block_reason = block_handle.split(' - ')[1]
|
|
block_handle = block_handle.split(' - ')[0]
|
|
block_type = "Person"
|
|
if block_handle.startswith('*@'):
|
|
block_type = "Application"
|
|
block_handle = block_handle.split('*@', 1)[1]
|
|
block_json = {
|
|
"type": "Block",
|
|
"id": actor + '/' + str(index),
|
|
"object": {
|
|
"type": block_type,
|
|
"id": block_handle
|
|
}
|
|
}
|
|
if block_reason:
|
|
block_json["object"]["name"] = block_reason
|
|
result_json["orderedItems"].append(block_json)
|
|
index += 1
|
|
return result_json
|
|
|
|
|
|
def add_account_blocks(base_dir: str,
|
|
nickname: str, domain: str,
|
|
blocked_accounts_textarea: str) -> bool:
|
|
"""Update the blockfile for an account after editing their
|
|
profile and changing "blocked accounts"
|
|
"""
|
|
if blocked_accounts_textarea is None:
|
|
return False
|
|
blocklist = blocked_accounts_textarea.split('\n')
|
|
blocking_file_text = ''
|
|
blocking_reasons_file_text = ''
|
|
for line in blocklist:
|
|
line = line.strip()
|
|
reason = None
|
|
if ' - ' in line:
|
|
block_id = line.split(' - ', 1)[0]
|
|
reason = line.split(' - ', 1)[1]
|
|
blocking_reasons_file_text += block_id + ' ' + reason + '\n'
|
|
elif ' ' in line:
|
|
block_id = line.split(' ', 1)[0]
|
|
reason = line.split(' ', 1)[1]
|
|
blocking_reasons_file_text += block_id + ' ' + reason + '\n'
|
|
else:
|
|
block_id = line
|
|
blocking_file_text += block_id + '\n'
|
|
|
|
account_directory = acct_dir(base_dir, nickname, domain)
|
|
blocking_filename = \
|
|
account_directory + '/blocking.txt'
|
|
blocking_reasons_filename = \
|
|
account_directory + '/blocking_reasons.txt'
|
|
|
|
if not blocking_file_text:
|
|
if os.path.isfile(blocking_filename):
|
|
try:
|
|
os.remove(blocking_filename)
|
|
except OSError:
|
|
print('EX: _profile_edit unable to delete blocking ' +
|
|
blocking_filename)
|
|
if os.path.isfile(blocking_reasons_filename):
|
|
try:
|
|
os.remove(blocking_reasons_filename)
|
|
except OSError:
|
|
print('EX: _profile_edit unable to delete blocking reasons' +
|
|
blocking_reasons_filename)
|
|
return True
|
|
|
|
try:
|
|
with open(blocking_filename, 'w+', encoding='utf-8') as fp_block:
|
|
fp_block.write(blocking_file_text)
|
|
except OSError:
|
|
print('EX: Failed to write ' + blocking_filename)
|
|
|
|
try:
|
|
with open(blocking_reasons_filename, 'w+',
|
|
encoding='utf-8') as fp_block:
|
|
fp_block.write(blocking_reasons_file_text)
|
|
except OSError:
|
|
print('EX: Failed to write ' + blocking_reasons_filename)
|
|
return True
|
|
|
|
|
|
def _add_global_block_reason(base_dir: str,
|
|
block_nickname: str, block_domain: str,
|
|
reason: str) -> bool:
|
|
"""Store a global block reason
|
|
"""
|
|
if not reason:
|
|
return False
|
|
|
|
blocking_reasons_filename = \
|
|
base_dir + '/accounts/blocking_reasons.txt'
|
|
|
|
if not block_nickname.startswith('#'):
|
|
# is the handle already blocked?
|
|
block_id = block_nickname + '@' + block_domain
|
|
else:
|
|
block_id = block_nickname
|
|
|
|
reason = reason.replace('\n', '').strip()
|
|
reason_line = block_id + ' ' + reason + '\n'
|
|
|
|
if os.path.isfile(blocking_reasons_filename):
|
|
if not text_in_file(block_id,
|
|
blocking_reasons_filename):
|
|
try:
|
|
with open(blocking_reasons_filename, 'a+',
|
|
encoding='utf-8') as reas_file:
|
|
reas_file.write(reason_line)
|
|
except OSError:
|
|
print('EX: unable to add blocking reason ' +
|
|
block_id)
|
|
else:
|
|
reasons_str = ''
|
|
try:
|
|
with open(blocking_reasons_filename, 'r',
|
|
encoding='utf-8') as reas_file:
|
|
reasons_str = reas_file.read()
|
|
except OSError:
|
|
print('EX: unable to read blocking reasons')
|
|
reasons_lines = reasons_str.split('\n')
|
|
new_reasons_str = ''
|
|
for line in reasons_lines:
|
|
if not line.startswith(block_id + ' '):
|
|
new_reasons_str += line + '\n'
|
|
continue
|
|
new_reasons_str += reason_line
|
|
try:
|
|
with open(blocking_reasons_filename, 'w+',
|
|
encoding='utf-8') as reas_file:
|
|
reas_file.write(new_reasons_str)
|
|
except OSError:
|
|
print('EX: unable to save blocking reasons' +
|
|
blocking_reasons_filename)
|
|
else:
|
|
try:
|
|
with open(blocking_reasons_filename, 'w+',
|
|
encoding='utf-8') as reas_file:
|
|
reas_file.write(reason_line)
|
|
except OSError:
|
|
print('EX: unable to save blocking reason ' +
|
|
block_id + ' ' + blocking_reasons_filename)
|
|
|
|
|
|
def add_global_block(base_dir: str,
|
|
block_nickname: str, block_domain: str,
|
|
reason: str) -> bool:
|
|
"""Global block which applies to all accounts
|
|
"""
|
|
_add_global_block_reason(base_dir,
|
|
block_nickname, block_domain,
|
|
reason)
|
|
|
|
blocking_filename = base_dir + '/accounts/blocking.txt'
|
|
if not block_nickname.startswith('#'):
|
|
# is the handle already blocked?
|
|
block_handle = block_nickname + '@' + block_domain
|
|
if os.path.isfile(blocking_filename):
|
|
if text_in_file(block_handle, blocking_filename):
|
|
return False
|
|
# block an account handle or domain
|
|
try:
|
|
with open(blocking_filename, 'a+', encoding='utf-8') as block_file:
|
|
block_file.write(block_handle + '\n')
|
|
except OSError:
|
|
print('EX: unable to save blocked handle ' + block_handle)
|
|
return False
|
|
else:
|
|
block_hashtag = block_nickname
|
|
# is the hashtag already blocked?
|
|
if os.path.isfile(blocking_filename):
|
|
if text_in_file(block_hashtag + '\n', blocking_filename):
|
|
return False
|
|
# block a hashtag
|
|
try:
|
|
with open(blocking_filename, 'a+', encoding='utf-8') as block_file:
|
|
block_file.write(block_hashtag + '\n')
|
|
except OSError:
|
|
print('EX: unable to save blocked hashtag ' + block_hashtag)
|
|
return False
|
|
return True
|
|
|
|
|
|
def _add_block_reason(base_dir: str,
|
|
nickname: str, domain: str,
|
|
block_nickname: str, block_domain: str,
|
|
reason: str) -> bool:
|
|
"""Store an account level block reason
|
|
"""
|
|
if not reason:
|
|
return False
|
|
|
|
domain = remove_domain_port(domain)
|
|
blocking_reasons_filename = \
|
|
acct_dir(base_dir, nickname, domain) + '/blocking_reasons.txt'
|
|
|
|
if not block_nickname.startswith('#'):
|
|
# is the handle already blocked?
|
|
block_id = block_nickname + '@' + block_domain
|
|
else:
|
|
block_id = block_nickname
|
|
|
|
reason = reason.replace('\n', '').strip()
|
|
reason_line = block_id + ' ' + reason + '\n'
|
|
|
|
if os.path.isfile(blocking_reasons_filename):
|
|
if not text_in_file(block_id,
|
|
blocking_reasons_filename):
|
|
try:
|
|
with open(blocking_reasons_filename, 'a+',
|
|
encoding='utf-8') as reas_file:
|
|
reas_file.write(reason_line)
|
|
except OSError:
|
|
print('EX: unable to add blocking reason 2 ' +
|
|
block_id)
|
|
else:
|
|
reasons_str = ''
|
|
try:
|
|
with open(blocking_reasons_filename, 'r',
|
|
encoding='utf-8') as reas_file:
|
|
reasons_str = reas_file.read()
|
|
except OSError:
|
|
print('EX: unable to read blocking reasons 2')
|
|
reasons_lines = reasons_str.split('\n')
|
|
new_reasons_str = ''
|
|
for line in reasons_lines:
|
|
if not line.startswith(block_id + ' '):
|
|
new_reasons_str += line + '\n'
|
|
continue
|
|
new_reasons_str += reason_line
|
|
try:
|
|
with open(blocking_reasons_filename, 'w+',
|
|
encoding='utf-8') as reas_file:
|
|
reas_file.write(new_reasons_str)
|
|
except OSError:
|
|
print('EX: unable to save blocking reasons 2' +
|
|
blocking_reasons_filename)
|
|
else:
|
|
try:
|
|
with open(blocking_reasons_filename, 'w+',
|
|
encoding='utf-8') as reas_file:
|
|
reas_file.write(reason_line)
|
|
except OSError:
|
|
print('EX: unable to save blocking reason 2 ' +
|
|
block_id + ' ' + blocking_reasons_filename)
|
|
|
|
|
|
def add_block(base_dir: str, nickname: str, domain: str,
|
|
block_nickname: str, block_domain: str,
|
|
reason: str) -> bool:
|
|
"""Block the given account
|
|
"""
|
|
if block_domain.startswith(domain) and nickname == block_nickname:
|
|
# don't block self
|
|
return False
|
|
|
|
domain = remove_domain_port(domain)
|
|
blocking_filename = acct_dir(base_dir, nickname, domain) + '/blocking.txt'
|
|
block_handle = block_nickname + '@' + block_domain
|
|
if os.path.isfile(blocking_filename):
|
|
if text_in_file(block_handle + '\n', blocking_filename):
|
|
return False
|
|
|
|
# if we are following then unfollow
|
|
following_filename = \
|
|
acct_dir(base_dir, nickname, domain) + '/following.txt'
|
|
if os.path.isfile(following_filename):
|
|
if text_in_file(block_handle + '\n', following_filename):
|
|
following_str = ''
|
|
try:
|
|
with open(following_filename, 'r',
|
|
encoding='utf-8') as foll_file:
|
|
following_str = foll_file.read()
|
|
except OSError:
|
|
print('EX: Unable to read following ' + following_filename)
|
|
return False
|
|
|
|
if following_str:
|
|
following_str = following_str.replace(block_handle + '\n', '')
|
|
|
|
try:
|
|
with open(following_filename, 'w+',
|
|
encoding='utf-8') as foll_file:
|
|
foll_file.write(following_str)
|
|
except OSError:
|
|
print('EX: Unable to write following ' + following_str)
|
|
return False
|
|
|
|
# if they are a follower then remove them
|
|
followers_filename = \
|
|
acct_dir(base_dir, nickname, domain) + '/followers.txt'
|
|
if os.path.isfile(followers_filename):
|
|
if text_in_file(block_handle + '\n', followers_filename):
|
|
followers_str = ''
|
|
try:
|
|
with open(followers_filename, 'r',
|
|
encoding='utf-8') as foll_file:
|
|
followers_str = foll_file.read()
|
|
except OSError:
|
|
print('EX: Unable to read followers ' + followers_filename)
|
|
return False
|
|
|
|
if followers_str:
|
|
followers_str = followers_str.replace(block_handle + '\n', '')
|
|
|
|
try:
|
|
with open(followers_filename, 'w+',
|
|
encoding='utf-8') as foll_file:
|
|
foll_file.write(followers_str)
|
|
except OSError:
|
|
print('EX: Unable to write followers ' + followers_str)
|
|
return False
|
|
|
|
try:
|
|
with open(blocking_filename, 'a+', encoding='utf-8') as block_file:
|
|
block_file.write(block_handle + '\n')
|
|
except OSError:
|
|
print('EX: unable to append block handle ' + block_handle)
|
|
return False
|
|
|
|
if reason:
|
|
_add_block_reason(base_dir, nickname, domain,
|
|
block_nickname, block_domain, reason)
|
|
|
|
return True
|
|
|
|
|
|
def _remove_global_block_reason(base_dir: str,
|
|
unblock_nickname: str,
|
|
unblock_domain: str) -> bool:
|
|
"""Remove a globla block reason
|
|
"""
|
|
unblocking_filename = base_dir + '/accounts/blocking_reasons.txt'
|
|
if not os.path.isfile(unblocking_filename):
|
|
return False
|
|
|
|
if not unblock_nickname.startswith('#'):
|
|
unblock_id = unblock_nickname + '@' + unblock_domain
|
|
else:
|
|
unblock_id = unblock_nickname
|
|
|
|
if not text_in_file(unblock_id + ' ', unblocking_filename):
|
|
return False
|
|
|
|
reasons_str = ''
|
|
try:
|
|
with open(unblocking_filename, 'r',
|
|
encoding='utf-8') as reas_file:
|
|
reasons_str = reas_file.read()
|
|
except OSError:
|
|
print('EX: unable to read blocking reasons 2')
|
|
reasons_lines = reasons_str.split('\n')
|
|
new_reasons_str = ''
|
|
for line in reasons_lines:
|
|
if line.startswith(unblock_id + ' '):
|
|
continue
|
|
new_reasons_str += line + '\n'
|
|
try:
|
|
with open(unblocking_filename, 'w+',
|
|
encoding='utf-8') as reas_file:
|
|
reas_file.write(new_reasons_str)
|
|
except OSError:
|
|
print('EX: unable to save blocking reasons 2' +
|
|
unblocking_filename)
|
|
|
|
|
|
def remove_global_block(base_dir: str,
|
|
unblock_nickname: str,
|
|
unblock_domain: str) -> bool:
|
|
"""Unblock the given global block
|
|
"""
|
|
_remove_global_block_reason(base_dir,
|
|
unblock_nickname,
|
|
unblock_domain)
|
|
|
|
unblocking_filename = base_dir + '/accounts/blocking.txt'
|
|
if not unblock_nickname.startswith('#'):
|
|
unblock_handle = unblock_nickname + '@' + unblock_domain
|
|
if os.path.isfile(unblocking_filename):
|
|
if text_in_file(unblock_handle, unblocking_filename):
|
|
try:
|
|
with open(unblocking_filename, 'r',
|
|
encoding='utf-8') as fp_unblock:
|
|
with open(unblocking_filename + '.new', 'w+',
|
|
encoding='utf-8') as fpnew:
|
|
for line in fp_unblock:
|
|
handle = remove_eol(line)
|
|
if unblock_handle not in line:
|
|
fpnew.write(handle + '\n')
|
|
except OSError as ex:
|
|
print('EX: failed to remove global block ' +
|
|
unblocking_filename + ' ' + str(ex))
|
|
return False
|
|
|
|
if os.path.isfile(unblocking_filename + '.new'):
|
|
try:
|
|
os.rename(unblocking_filename + '.new',
|
|
unblocking_filename)
|
|
except OSError:
|
|
print('EX: unable to rename ' + unblocking_filename)
|
|
return False
|
|
return True
|
|
else:
|
|
unblock_hashtag = unblock_nickname
|
|
if os.path.isfile(unblocking_filename):
|
|
if text_in_file(unblock_hashtag + '\n', unblocking_filename):
|
|
try:
|
|
with open(unblocking_filename, 'r',
|
|
encoding='utf-8') as fp_unblock:
|
|
with open(unblocking_filename + '.new', 'w+',
|
|
encoding='utf-8') as fpnew:
|
|
for line in fp_unblock:
|
|
block_line = remove_eol(line)
|
|
if unblock_hashtag not in line:
|
|
fpnew.write(block_line + '\n')
|
|
except OSError as ex:
|
|
print('EX: failed to remove global hashtag block ' +
|
|
unblocking_filename + ' ' + str(ex))
|
|
return False
|
|
|
|
if os.path.isfile(unblocking_filename + '.new'):
|
|
try:
|
|
os.rename(unblocking_filename + '.new',
|
|
unblocking_filename)
|
|
except OSError:
|
|
print('EX: unable to rename 2 ' + unblocking_filename)
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
|
|
def remove_block(base_dir: str, nickname: str, domain: str,
|
|
unblock_nickname: str, unblock_domain: str) -> bool:
|
|
"""Unblock the given account
|
|
"""
|
|
domain = remove_domain_port(domain)
|
|
unblocking_filename = \
|
|
acct_dir(base_dir, nickname, domain) + '/blocking.txt'
|
|
unblock_handle = unblock_nickname + '@' + unblock_domain
|
|
if os.path.isfile(unblocking_filename):
|
|
if text_in_file(unblock_handle, unblocking_filename):
|
|
try:
|
|
with open(unblocking_filename, 'r',
|
|
encoding='utf-8') as fp_unblock:
|
|
with open(unblocking_filename + '.new', 'w+',
|
|
encoding='utf-8') as fpnew:
|
|
for line in fp_unblock:
|
|
handle = remove_eol(line)
|
|
if unblock_handle not in line:
|
|
fpnew.write(handle + '\n')
|
|
except OSError as ex:
|
|
print('EX: failed to remove block ' +
|
|
unblocking_filename + ' ' + str(ex))
|
|
return False
|
|
|
|
if os.path.isfile(unblocking_filename + '.new'):
|
|
try:
|
|
os.rename(unblocking_filename + '.new',
|
|
unblocking_filename)
|
|
except OSError:
|
|
print('EX: unable to rename 3 ' + unblocking_filename)
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_blocked_hashtag(base_dir: str, hashtag: str) -> bool:
|
|
"""Is the given hashtag blocked?
|
|
"""
|
|
# avoid very long hashtags
|
|
if len(hashtag) > 32:
|
|
return True
|
|
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
|
if os.path.isfile(global_blocking_filename):
|
|
hashtag = hashtag.strip('\n').strip('\r')
|
|
if not hashtag.startswith('#'):
|
|
hashtag = '#' + hashtag
|
|
if text_in_file(hashtag + '\n', global_blocking_filename):
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_domain_blocklist(base_dir: str) -> str:
|
|
"""Returns all globally blocked domains as a string
|
|
This can be used for fast matching to mitigate flooding
|
|
"""
|
|
blocked_str = ''
|
|
|
|
evil_domains = evil_incarnate()
|
|
for evil in evil_domains:
|
|
blocked_str += evil + '\n'
|
|
|
|
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
|
if not os.path.isfile(global_blocking_filename):
|
|
return blocked_str
|
|
try:
|
|
with open(global_blocking_filename, 'r',
|
|
encoding='utf-8') as fp_blocked:
|
|
blocked_str += fp_blocked.read()
|
|
except OSError:
|
|
print('EX: unable to read ' + global_blocking_filename)
|
|
return blocked_str
|
|
|
|
|
|
def update_blocked_cache(base_dir: str,
|
|
blocked_cache: [],
|
|
blocked_cache_last_updated: int,
|
|
blocked_cache_update_secs: int) -> int:
|
|
"""Updates the cache of globally blocked domains held in memory
|
|
"""
|
|
curr_time = int(time.time())
|
|
if blocked_cache_last_updated > curr_time:
|
|
print('WARN: Cache updated in the future')
|
|
blocked_cache_last_updated = 0
|
|
seconds_since_last_update = curr_time - blocked_cache_last_updated
|
|
if seconds_since_last_update < blocked_cache_update_secs:
|
|
return blocked_cache_last_updated
|
|
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
|
if not os.path.isfile(global_blocking_filename):
|
|
return blocked_cache_last_updated
|
|
try:
|
|
with open(global_blocking_filename, 'r',
|
|
encoding='utf-8') as fp_blocked:
|
|
blocked_lines = fp_blocked.readlines()
|
|
# remove newlines
|
|
for index, _ in enumerate(blocked_lines):
|
|
blocked_lines[index] = remove_eol(blocked_lines[index])
|
|
# update the cache
|
|
blocked_cache.clear()
|
|
blocked_cache += blocked_lines
|
|
except OSError as ex:
|
|
print('EX: unable to read ' + global_blocking_filename + ' ' + str(ex))
|
|
return curr_time
|
|
|
|
|
|
def _get_short_domain(domain: str) -> str:
|
|
""" by checking a shorter version we can thwart adversaries
|
|
who constantly change their subdomain
|
|
e.g. subdomain123.mydomain.com becomes mydomain.com
|
|
"""
|
|
sections = domain.split('.')
|
|
no_of_sections = len(sections)
|
|
if no_of_sections > 2:
|
|
return sections[no_of_sections-2] + '.' + sections[-1]
|
|
return None
|
|
|
|
|
|
def is_blocked_domain(base_dir: str, domain: str,
|
|
blocked_cache: [] = None,
|
|
block_federated: [] = None) -> bool:
|
|
"""Is the given domain blocked?
|
|
"""
|
|
if '.' not in domain:
|
|
return False
|
|
|
|
if is_evil(domain):
|
|
return True
|
|
|
|
short_domain = _get_short_domain(domain)
|
|
|
|
search_str = '*@' + domain
|
|
if not broch_mode_is_active(base_dir):
|
|
if block_federated:
|
|
if domain in block_federated:
|
|
return True
|
|
|
|
if blocked_cache:
|
|
for blocked_str in blocked_cache:
|
|
if blocked_str == search_str:
|
|
return True
|
|
if short_domain:
|
|
if blocked_str == '*@' + short_domain:
|
|
return True
|
|
else:
|
|
# instance block list
|
|
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
|
if os.path.isfile(global_blocking_filename):
|
|
search_str += '\n'
|
|
search_str_short = None
|
|
if short_domain:
|
|
search_str_short = '*@' + short_domain + '\n'
|
|
try:
|
|
with open(global_blocking_filename, 'r',
|
|
encoding='utf-8') as fp_blocked:
|
|
blocked_str = fp_blocked.read()
|
|
if search_str in blocked_str:
|
|
return True
|
|
if short_domain:
|
|
if search_str_short in blocked_str:
|
|
return True
|
|
except OSError as ex:
|
|
print('EX: unable to read ' + global_blocking_filename +
|
|
' ' + str(ex))
|
|
else:
|
|
allow_filename = base_dir + '/accounts/allowedinstances.txt'
|
|
# instance allow list
|
|
if not short_domain:
|
|
if not text_in_file(domain, allow_filename):
|
|
return True
|
|
else:
|
|
if not text_in_file(short_domain, allow_filename):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def is_blocked_nickname(base_dir: str, nickname: str,
|
|
blocked_cache: [] = None) -> bool:
|
|
"""Is the given nickname blocked?
|
|
"""
|
|
search_str = nickname + '@*'
|
|
if blocked_cache:
|
|
for blocked_str in blocked_cache:
|
|
if blocked_str == search_str:
|
|
return True
|
|
else:
|
|
# instance-wide block list
|
|
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
|
if os.path.isfile(global_blocking_filename):
|
|
search_str += '\n'
|
|
try:
|
|
with open(global_blocking_filename, 'r',
|
|
encoding='utf-8') as fp_blocked:
|
|
blocked_str = fp_blocked.read()
|
|
if search_str in blocked_str:
|
|
return True
|
|
except OSError as ex:
|
|
print('EX: unable to read ' + global_blocking_filename +
|
|
' ' + str(ex))
|
|
|
|
return False
|
|
|
|
|
|
def is_blocked(base_dir: str, nickname: str, domain: str,
|
|
block_nickname: str, block_domain: str,
|
|
blocked_cache: [] = None,
|
|
block_federated: [] = None) -> bool:
|
|
"""Is the given account blocked?
|
|
"""
|
|
if is_evil(block_domain):
|
|
return True
|
|
|
|
block_handle = None
|
|
if block_nickname and block_domain:
|
|
block_handle = block_nickname + '@' + block_domain
|
|
|
|
if not broch_mode_is_active(base_dir):
|
|
# instance level block list
|
|
if block_federated:
|
|
for blocked_str in block_federated:
|
|
if '@' in blocked_str:
|
|
if block_handle:
|
|
if blocked_str == block_handle:
|
|
return True
|
|
elif blocked_str == block_domain:
|
|
return True
|
|
|
|
if blocked_cache:
|
|
for blocked_str in blocked_cache:
|
|
if block_nickname:
|
|
if block_nickname + '@*' in blocked_str:
|
|
return True
|
|
if block_domain:
|
|
if '*@' + block_domain in blocked_str:
|
|
return True
|
|
if block_handle:
|
|
if blocked_str == block_handle:
|
|
return True
|
|
else:
|
|
global_blocks_filename = base_dir + '/accounts/blocking.txt'
|
|
if os.path.isfile(global_blocks_filename):
|
|
if block_nickname:
|
|
if text_in_file(block_nickname + '@*\n',
|
|
global_blocks_filename):
|
|
return True
|
|
if text_in_file('*@' + block_domain, global_blocks_filename):
|
|
return True
|
|
if block_handle:
|
|
block_str = block_handle + '\n'
|
|
if text_in_file(block_str, global_blocks_filename):
|
|
return True
|
|
if not block_federated:
|
|
federated_blocks_filename = \
|
|
base_dir + '/accounts/block_api.txt'
|
|
if os.path.isfile(federated_blocks_filename):
|
|
block_federated = []
|
|
try:
|
|
with open(federated_blocks_filename, 'r',
|
|
encoding='utf-8') as fp_fed:
|
|
block_federated = fp_fed.read().split('\n')
|
|
except OSError:
|
|
print('EX: is_blocked unable to load ' +
|
|
federated_blocks_filename)
|
|
if block_domain in block_federated:
|
|
return True
|
|
if block_handle:
|
|
if block_handle in block_federated:
|
|
return True
|
|
else:
|
|
# instance allow list
|
|
allow_filename = base_dir + '/accounts/allowedinstances.txt'
|
|
short_domain = _get_short_domain(block_domain)
|
|
if not short_domain and block_domain:
|
|
if not text_in_file(block_domain + '\n', allow_filename):
|
|
return True
|
|
else:
|
|
if not text_in_file(short_domain + '\n', allow_filename):
|
|
return True
|
|
|
|
# account level allow list
|
|
account_dir = acct_dir(base_dir, nickname, domain)
|
|
allow_filename = account_dir + '/allowedinstances.txt'
|
|
if block_domain and os.path.isfile(allow_filename):
|
|
if not text_in_file(block_domain + '\n', allow_filename):
|
|
return True
|
|
|
|
# account level block list
|
|
blocking_filename = account_dir + '/blocking.txt'
|
|
if os.path.isfile(blocking_filename):
|
|
if block_nickname:
|
|
if text_in_file(block_nickname + '@*\n', blocking_filename):
|
|
return True
|
|
if block_domain:
|
|
if text_in_file('*@' + block_domain + '\n', blocking_filename):
|
|
return True
|
|
if block_handle:
|
|
if text_in_file(block_handle + '\n', blocking_filename):
|
|
return True
|
|
return False
|
|
|
|
|
|
def allowed_announce(base_dir: str, nickname: str, domain: str,
|
|
block_nickname: str, block_domain: str,
|
|
announce_blocked_cache: [] = None) -> bool:
|
|
"""Is the given nickname allowed to send announces?
|
|
"""
|
|
block_handle = None
|
|
if block_nickname and block_domain:
|
|
block_handle = block_nickname + '@' + block_domain
|
|
|
|
# cached announce blocks
|
|
if announce_blocked_cache:
|
|
for blocked_str in announce_blocked_cache:
|
|
if block_nickname:
|
|
if block_nickname + '@*' in blocked_str:
|
|
return False
|
|
if block_domain:
|
|
if '*@' + block_domain in blocked_str:
|
|
return False
|
|
if block_handle:
|
|
if blocked_str == block_handle:
|
|
return False
|
|
|
|
# non-cached instance level announce blocks
|
|
global_announce_blocks_filename = \
|
|
base_dir + '/accounts/noannounce.txt'
|
|
if os.path.isfile(global_announce_blocks_filename):
|
|
if block_nickname:
|
|
if text_in_file(block_nickname + '@*',
|
|
global_announce_blocks_filename, False):
|
|
return False
|
|
if block_domain:
|
|
if text_in_file('*@' + block_domain,
|
|
global_announce_blocks_filename, False):
|
|
return False
|
|
if block_handle:
|
|
block_str = block_handle + '\n'
|
|
if text_in_file(block_str,
|
|
global_announce_blocks_filename, False):
|
|
return False
|
|
|
|
# non-cached account level announce blocks
|
|
account_dir = acct_dir(base_dir, nickname, domain)
|
|
blocking_filename = account_dir + '/noannounce.txt'
|
|
if os.path.isfile(blocking_filename):
|
|
if block_nickname:
|
|
if text_in_file(block_nickname + '@*\n',
|
|
blocking_filename, False):
|
|
return False
|
|
if block_domain:
|
|
if text_in_file('*@' + block_domain + '\n',
|
|
blocking_filename, False):
|
|
return False
|
|
if block_handle:
|
|
if text_in_file(block_handle + '\n', blocking_filename, False):
|
|
return False
|
|
return True
|
|
|
|
|
|
def allowed_announce_add(base_dir: str, nickname: str, domain: str,
|
|
following_nickname: str,
|
|
following_domain: str) -> None:
|
|
"""Allow announces for a handle
|
|
"""
|
|
account_dir = acct_dir(base_dir, nickname, domain)
|
|
blocking_filename = account_dir + '/noannounce.txt'
|
|
|
|
# if the noannounce.txt file doesn't yet exist
|
|
if not os.path.isfile(blocking_filename):
|
|
return
|
|
|
|
handle = following_nickname + '@' + following_domain
|
|
if text_in_file(handle + '\n', blocking_filename, False):
|
|
file_text = ''
|
|
try:
|
|
with open(blocking_filename, 'r',
|
|
encoding='utf-8') as fp_noannounce:
|
|
file_text = fp_noannounce.read()
|
|
except OSError:
|
|
print('EX: unable to read noannounce: ' +
|
|
blocking_filename + ' ' + handle)
|
|
|
|
new_file_text = ''
|
|
file_text_list = file_text.split('\n')
|
|
handle_lower = handle.lower()
|
|
for allowed in file_text_list:
|
|
if allowed.lower() != handle_lower:
|
|
new_file_text += allowed + '\n'
|
|
file_text = new_file_text
|
|
|
|
try:
|
|
with open(blocking_filename, 'w+',
|
|
encoding='utf-8') as fp_noannounce:
|
|
fp_noannounce.write(file_text)
|
|
except OSError:
|
|
print('EX: unable to write noannounce: ' +
|
|
blocking_filename + ' ' + handle)
|
|
|
|
|
|
def allowed_announce_remove(base_dir: str, nickname: str, domain: str,
|
|
following_nickname: str,
|
|
following_domain: str) -> None:
|
|
"""Don't allow announces from a handle
|
|
"""
|
|
account_dir = acct_dir(base_dir, nickname, domain)
|
|
blocking_filename = account_dir + '/noannounce.txt'
|
|
handle = following_nickname + '@' + following_domain
|
|
|
|
# if the noannounce.txt file doesn't yet exist
|
|
if not os.path.isfile(blocking_filename):
|
|
file_text = handle + '\n'
|
|
try:
|
|
with open(blocking_filename, 'w+',
|
|
encoding='utf-8') as fp_noannounce:
|
|
fp_noannounce.write(file_text)
|
|
except OSError:
|
|
print('EX: unable to write initial noannounce: ' +
|
|
blocking_filename + ' ' + handle)
|
|
return
|
|
|
|
file_text = ''
|
|
if not text_in_file(handle + '\n', blocking_filename, False):
|
|
try:
|
|
with open(blocking_filename, 'r',
|
|
encoding='utf-8') as fp_noannounce:
|
|
file_text = fp_noannounce.read()
|
|
except OSError:
|
|
print('EX: unable to read noannounce: ' +
|
|
blocking_filename + ' ' + handle)
|
|
file_text += handle + '\n'
|
|
try:
|
|
with open(blocking_filename, 'w+',
|
|
encoding='utf-8') as fp_noannounce:
|
|
fp_noannounce.write(file_text)
|
|
except OSError:
|
|
print('EX: unable to write noannounce: ' +
|
|
blocking_filename + ' ' + handle)
|
|
|
|
|
|
def outbox_block(base_dir: str, nickname: str, domain: str,
|
|
message_json: {}, debug: bool) -> bool:
|
|
""" When a block request is received by the outbox from c2s
|
|
"""
|
|
if not message_json.get('type'):
|
|
if debug:
|
|
print('DEBUG: block - no type')
|
|
return False
|
|
if not message_json['type'] == 'Block':
|
|
if debug:
|
|
print('DEBUG: not a block')
|
|
return False
|
|
if not has_object_string(message_json, debug):
|
|
return False
|
|
if debug:
|
|
print('DEBUG: c2s block request arrived in outbox')
|
|
|
|
message_id = remove_id_ending(message_json['object'])
|
|
if '/statuses/' not in message_id:
|
|
if debug:
|
|
print('DEBUG: c2s block object is not a status')
|
|
return False
|
|
if not has_users_path(message_id):
|
|
if debug:
|
|
print('DEBUG: c2s block object has no nickname')
|
|
return False
|
|
domain = remove_domain_port(domain)
|
|
post_filename = locate_post(base_dir, nickname, domain, message_id)
|
|
if not post_filename:
|
|
if debug:
|
|
print('DEBUG: c2s block post not found in inbox or outbox')
|
|
print(message_id)
|
|
return False
|
|
nickname_blocked = get_nickname_from_actor(message_json['object'])
|
|
if not nickname_blocked:
|
|
print('WARN: unable to find nickname in ' + message_json['object'])
|
|
return False
|
|
domain_blocked, port_blocked = \
|
|
get_domain_from_actor(message_json['object'])
|
|
if not domain_blocked:
|
|
print('WARN: unable to find domain in ' + message_json['object'])
|
|
return False
|
|
domain_blocked_full = get_full_domain(domain_blocked, port_blocked)
|
|
|
|
add_block(base_dir, nickname, domain,
|
|
nickname_blocked, domain_blocked_full, '')
|
|
|
|
if debug:
|
|
print('DEBUG: post blocked via c2s - ' + post_filename)
|
|
return True
|
|
|
|
|
|
def outbox_undo_block(base_dir: str, nickname: str, domain: str,
|
|
message_json: {}, debug: bool) -> None:
|
|
""" When an undo block request is received by the outbox from c2s
|
|
"""
|
|
if not message_json.get('type'):
|
|
if debug:
|
|
print('DEBUG: undo block - no type')
|
|
return
|
|
if not message_json['type'] == 'Undo':
|
|
if debug:
|
|
print('DEBUG: not an undo block')
|
|
return
|
|
|
|
if not has_object_string_type(message_json, debug):
|
|
return
|
|
if not message_json['object']['type'] == 'Block':
|
|
if debug:
|
|
print('DEBUG: not an undo block')
|
|
return
|
|
if not has_object_string_object(message_json, debug):
|
|
return
|
|
if debug:
|
|
print('DEBUG: c2s undo block request arrived in outbox')
|
|
|
|
message_id = remove_id_ending(message_json['object']['object'])
|
|
if '/statuses/' not in message_id:
|
|
if debug:
|
|
print('DEBUG: c2s undo block object is not a status')
|
|
return
|
|
if not has_users_path(message_id):
|
|
if debug:
|
|
print('DEBUG: c2s undo block object has no nickname')
|
|
return
|
|
domain = remove_domain_port(domain)
|
|
post_filename = locate_post(base_dir, nickname, domain, message_id)
|
|
if not post_filename:
|
|
if debug:
|
|
print('DEBUG: c2s undo block post not found in inbox or outbox')
|
|
print(message_id)
|
|
return
|
|
nickname_blocked = \
|
|
get_nickname_from_actor(message_json['object']['object'])
|
|
if not nickname_blocked:
|
|
print('WARN: unable to find nickname in ' +
|
|
message_json['object']['object'])
|
|
return
|
|
domain_object = message_json['object']['object']
|
|
domain_blocked, port_blocked = get_domain_from_actor(domain_object)
|
|
if not domain_blocked:
|
|
print('WARN: unable to find domain in ' +
|
|
message_json['object']['object'])
|
|
return
|
|
domain_blocked_full = get_full_domain(domain_blocked, port_blocked)
|
|
|
|
remove_block(base_dir, nickname, domain,
|
|
nickname_blocked, domain_blocked_full)
|
|
if debug:
|
|
print('DEBUG: post undo blocked via c2s - ' + post_filename)
|
|
|
|
|
|
def mute_post(base_dir: str, nickname: str, domain: str, port: int,
|
|
http_prefix: str, post_id: str, recent_posts_cache: {},
|
|
debug: bool) -> None:
|
|
""" Mutes the given post
|
|
"""
|
|
print('mute_post: post_id ' + post_id)
|
|
post_filename = locate_post(base_dir, nickname, domain, post_id)
|
|
if not post_filename:
|
|
print('mute_post: file not found ' + post_id)
|
|
return
|
|
post_json_object = load_json(post_filename)
|
|
if not post_json_object:
|
|
print('mute_post: object not loaded ' + post_id)
|
|
return
|
|
print('mute_post: ' + str(post_json_object))
|
|
|
|
post_json_obj = post_json_object
|
|
also_update_post_id = None
|
|
if has_object_dict(post_json_object):
|
|
post_json_obj = post_json_object['object']
|
|
else:
|
|
if has_object_string(post_json_object, debug):
|
|
also_update_post_id = remove_id_ending(post_json_object['object'])
|
|
|
|
domain_full = get_full_domain(domain, port)
|
|
actor = local_actor_url(http_prefix, nickname, domain_full)
|
|
|
|
if post_json_obj.get('conversation'):
|
|
mute_conversation(base_dir, nickname, domain,
|
|
post_json_obj['conversation'])
|
|
elif post_json_obj.get('context'):
|
|
mute_conversation(base_dir, nickname, domain,
|
|
post_json_obj['context'])
|
|
|
|
# does this post have ignores on it from differenent actors?
|
|
if not post_json_obj.get('ignores'):
|
|
if debug:
|
|
print('DEBUG: Adding initial mute to ' + post_id)
|
|
ignores_json = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
'id': post_id,
|
|
'type': 'Collection',
|
|
"totalItems": 1,
|
|
'items': [{
|
|
'type': 'Ignore',
|
|
'actor': actor
|
|
}]
|
|
}
|
|
post_json_obj['ignores'] = ignores_json
|
|
else:
|
|
if not post_json_obj['ignores'].get('items'):
|
|
post_json_obj['ignores']['items'] = []
|
|
items_list = post_json_obj['ignores']['items']
|
|
for ignores_item in items_list:
|
|
if ignores_item.get('actor'):
|
|
if ignores_item['actor'] == actor:
|
|
return
|
|
new_ignore = {
|
|
'type': 'Ignore',
|
|
'actor': actor
|
|
}
|
|
ig_it = len(items_list)
|
|
items_list.append(new_ignore)
|
|
post_json_obj['ignores']['totalItems'] = ig_it
|
|
post_json_obj['muted'] = True
|
|
if save_json(post_json_object, post_filename):
|
|
print('mute_post: saved ' + post_filename)
|
|
|
|
# remove cached post so that the muted version gets recreated
|
|
# without its content text and/or image
|
|
cached_post_filename = \
|
|
get_cached_post_filename(base_dir, nickname, domain, post_json_object)
|
|
if cached_post_filename:
|
|
if os.path.isfile(cached_post_filename):
|
|
try:
|
|
os.remove(cached_post_filename)
|
|
print('MUTE: cached post removed ' + cached_post_filename)
|
|
except OSError:
|
|
print('EX: MUTE cached post not removed ' +
|
|
cached_post_filename)
|
|
else:
|
|
print('MUTE: cached post not found ' + cached_post_filename)
|
|
|
|
try:
|
|
with open(post_filename + '.muted', 'w+',
|
|
encoding='utf-8') as mute_file:
|
|
mute_file.write('\n')
|
|
except OSError:
|
|
print('EX: Failed to save mute file ' + post_filename + '.muted')
|
|
return
|
|
print('MUTE: ' + post_filename + '.muted file added')
|
|
|
|
# if the post is in the recent posts cache then mark it as muted
|
|
if recent_posts_cache.get('index'):
|
|
post_id = \
|
|
remove_id_ending(post_json_object['id']).replace('/', '#')
|
|
if post_id in recent_posts_cache['index']:
|
|
print('MUTE: ' + post_id + ' is in recent posts cache')
|
|
if recent_posts_cache.get('json'):
|
|
recent_posts_cache['json'][post_id] = json.dumps(post_json_object)
|
|
print('MUTE: ' + post_id +
|
|
' marked as muted in recent posts memory cache')
|
|
if recent_posts_cache.get('html'):
|
|
if recent_posts_cache['html'].get(post_id):
|
|
del recent_posts_cache['html'][post_id]
|
|
print('MUTE: ' + post_id + ' removed cached html')
|
|
|
|
if also_update_post_id:
|
|
post_filename = locate_post(base_dir, nickname, domain,
|
|
also_update_post_id)
|
|
if os.path.isfile(post_filename):
|
|
post_json_obj = load_json(post_filename)
|
|
cached_post_filename = \
|
|
get_cached_post_filename(base_dir, nickname, domain,
|
|
post_json_obj)
|
|
if cached_post_filename:
|
|
if os.path.isfile(cached_post_filename):
|
|
try:
|
|
os.remove(cached_post_filename)
|
|
print('MUTE: cached referenced post removed ' +
|
|
cached_post_filename)
|
|
except OSError:
|
|
print('EX: ' +
|
|
'MUTE cached referenced post not removed ' +
|
|
cached_post_filename)
|
|
|
|
if recent_posts_cache.get('json'):
|
|
if recent_posts_cache['json'].get(also_update_post_id):
|
|
del recent_posts_cache['json'][also_update_post_id]
|
|
print('MUTE: ' + also_update_post_id +
|
|
' removed referenced json')
|
|
if recent_posts_cache.get('html'):
|
|
if recent_posts_cache['html'].get(also_update_post_id):
|
|
del recent_posts_cache['html'][also_update_post_id]
|
|
print('MUTE: ' + also_update_post_id +
|
|
' removed referenced html')
|
|
|
|
|
|
def unmute_post(base_dir: str, nickname: str, domain: str, port: int,
|
|
http_prefix: str, post_id: str, recent_posts_cache: {},
|
|
debug: bool) -> None:
|
|
""" Unmutes the given post
|
|
"""
|
|
post_filename = locate_post(base_dir, nickname, domain, post_id)
|
|
if not post_filename:
|
|
return
|
|
post_json_object = load_json(post_filename)
|
|
if not post_json_object:
|
|
return
|
|
|
|
mute_filename = post_filename + '.muted'
|
|
if os.path.isfile(mute_filename):
|
|
try:
|
|
os.remove(mute_filename)
|
|
except OSError:
|
|
if debug:
|
|
print('EX: unmute_post mute filename not deleted ' +
|
|
str(mute_filename))
|
|
print('UNMUTE: ' + mute_filename + ' file removed')
|
|
|
|
post_json_obj = post_json_object
|
|
also_update_post_id = None
|
|
if has_object_dict(post_json_object):
|
|
post_json_obj = post_json_object['object']
|
|
else:
|
|
if has_object_string(post_json_object, debug):
|
|
also_update_post_id = remove_id_ending(post_json_object['object'])
|
|
|
|
if post_json_obj.get('conversation'):
|
|
unmute_conversation(base_dir, nickname, domain,
|
|
post_json_obj['conversation'])
|
|
elif post_json_obj.get('context'):
|
|
unmute_conversation(base_dir, nickname, domain,
|
|
post_json_obj['context'])
|
|
|
|
if post_json_obj.get('ignores'):
|
|
domain_full = get_full_domain(domain, port)
|
|
actor = local_actor_url(http_prefix, nickname, domain_full)
|
|
total_items = 0
|
|
if post_json_obj['ignores'].get('totalItems'):
|
|
total_items = post_json_obj['ignores']['totalItems']
|
|
items_list = post_json_obj['ignores']['items']
|
|
for ignores_item in items_list:
|
|
if ignores_item.get('actor'):
|
|
if ignores_item['actor'] == actor:
|
|
if debug:
|
|
print('DEBUG: mute was removed for ' + actor)
|
|
items_list.remove(ignores_item)
|
|
break
|
|
if total_items == 1:
|
|
if debug:
|
|
print('DEBUG: mute was removed from post')
|
|
del post_json_obj['ignores']
|
|
else:
|
|
ig_it_len = len(post_json_obj['ignores']['items'])
|
|
post_json_obj['ignores']['totalItems'] = ig_it_len
|
|
post_json_obj['muted'] = False
|
|
save_json(post_json_object, post_filename)
|
|
|
|
# remove cached post so that the muted version gets recreated
|
|
# with its content text and/or image
|
|
cached_post_filename = \
|
|
get_cached_post_filename(base_dir, nickname, domain, post_json_object)
|
|
if cached_post_filename:
|
|
if os.path.isfile(cached_post_filename):
|
|
try:
|
|
os.remove(cached_post_filename)
|
|
except OSError:
|
|
if debug:
|
|
print('EX: unmute_post cached post not deleted ' +
|
|
str(cached_post_filename))
|
|
|
|
# if the post is in the recent posts cache then mark it as unmuted
|
|
if recent_posts_cache.get('index'):
|
|
post_id = \
|
|
remove_id_ending(post_json_object['id']).replace('/', '#')
|
|
if post_id in recent_posts_cache['index']:
|
|
print('UNMUTE: ' + post_id + ' is in recent posts cache')
|
|
if recent_posts_cache.get('json'):
|
|
recent_posts_cache['json'][post_id] = json.dumps(post_json_object)
|
|
print('UNMUTE: ' + post_id +
|
|
' marked as unmuted in recent posts cache')
|
|
if recent_posts_cache.get('html'):
|
|
if recent_posts_cache['html'].get(post_id):
|
|
del recent_posts_cache['html'][post_id]
|
|
print('UNMUTE: ' + post_id + ' removed cached html')
|
|
if also_update_post_id:
|
|
post_filename = locate_post(base_dir, nickname, domain,
|
|
also_update_post_id)
|
|
if os.path.isfile(post_filename):
|
|
post_json_obj = load_json(post_filename)
|
|
cached_post_filename = \
|
|
get_cached_post_filename(base_dir, nickname, domain,
|
|
post_json_obj)
|
|
if cached_post_filename:
|
|
if os.path.isfile(cached_post_filename):
|
|
try:
|
|
os.remove(cached_post_filename)
|
|
print('MUTE: cached referenced post removed ' +
|
|
cached_post_filename)
|
|
except OSError:
|
|
if debug:
|
|
print('EX: ' +
|
|
'unmute_post cached ref post not removed ' +
|
|
str(cached_post_filename))
|
|
|
|
if recent_posts_cache.get('json'):
|
|
if recent_posts_cache['json'].get(also_update_post_id):
|
|
del recent_posts_cache['json'][also_update_post_id]
|
|
print('UNMUTE: ' +
|
|
also_update_post_id + ' removed referenced json')
|
|
if recent_posts_cache.get('html'):
|
|
if recent_posts_cache['html'].get(also_update_post_id):
|
|
del recent_posts_cache['html'][also_update_post_id]
|
|
print('UNMUTE: ' +
|
|
also_update_post_id + ' removed referenced html')
|
|
|
|
|
|
def outbox_mute(base_dir: str, http_prefix: str,
|
|
nickname: str, domain: str, port: int,
|
|
message_json: {}, debug: bool,
|
|
recent_posts_cache: {}) -> None:
|
|
"""When a mute is received by the outbox from c2s
|
|
"""
|
|
if not message_json.get('type'):
|
|
return
|
|
if not has_actor(message_json, debug):
|
|
return
|
|
domain_full = get_full_domain(domain, port)
|
|
actor_url = get_actor_from_post(message_json)
|
|
if not actor_url.endswith(domain_full + '/users/' + nickname):
|
|
return
|
|
if not message_json['type'] == 'Ignore':
|
|
return
|
|
if not has_object_string(message_json, debug):
|
|
return
|
|
if debug:
|
|
print('DEBUG: c2s mute request arrived in outbox')
|
|
|
|
message_id = remove_id_ending(message_json['object'])
|
|
if '/statuses/' not in message_id:
|
|
if debug:
|
|
print('DEBUG: c2s mute object is not a status')
|
|
return
|
|
if not has_users_path(message_id):
|
|
if debug:
|
|
print('DEBUG: c2s mute object has no nickname')
|
|
return
|
|
domain = remove_domain_port(domain)
|
|
post_filename = locate_post(base_dir, nickname, domain, message_id)
|
|
if not post_filename:
|
|
if debug:
|
|
print('DEBUG: c2s mute post not found in inbox or outbox')
|
|
print(message_id)
|
|
return
|
|
nickname_muted = get_nickname_from_actor(message_json['object'])
|
|
if not nickname_muted:
|
|
print('WARN: unable to find nickname in ' + message_json['object'])
|
|
return
|
|
|
|
mute_post(base_dir, nickname, domain, port,
|
|
http_prefix, message_json['object'], recent_posts_cache,
|
|
debug)
|
|
|
|
if debug:
|
|
print('DEBUG: post muted via c2s - ' + post_filename)
|
|
|
|
|
|
def outbox_undo_mute(base_dir: str, http_prefix: str,
|
|
nickname: str, domain: str, port: int,
|
|
message_json: {}, debug: bool,
|
|
recent_posts_cache: {}) -> None:
|
|
"""When an undo mute is received by the outbox from c2s
|
|
"""
|
|
if not message_json.get('type'):
|
|
return
|
|
if not has_actor(message_json, debug):
|
|
return
|
|
domain_full = get_full_domain(domain, port)
|
|
actor_url = get_actor_from_post(message_json)
|
|
if not actor_url.endswith(domain_full + '/users/' + nickname):
|
|
return
|
|
if not message_json['type'] == 'Undo':
|
|
return
|
|
if not has_object_string_type(message_json, debug):
|
|
return
|
|
if message_json['object']['type'] != 'Ignore':
|
|
return
|
|
if not isinstance(message_json['object']['object'], str):
|
|
if debug:
|
|
print('DEBUG: undo mute object is not a string')
|
|
return
|
|
if debug:
|
|
print('DEBUG: c2s undo mute request arrived in outbox')
|
|
|
|
message_id = remove_id_ending(message_json['object']['object'])
|
|
if '/statuses/' not in message_id:
|
|
if debug:
|
|
print('DEBUG: c2s undo mute object is not a status')
|
|
return
|
|
if not has_users_path(message_id):
|
|
if debug:
|
|
print('DEBUG: c2s undo mute object has no nickname')
|
|
return
|
|
domain = remove_domain_port(domain)
|
|
post_filename = locate_post(base_dir, nickname, domain, message_id)
|
|
if not post_filename:
|
|
if debug:
|
|
print('DEBUG: c2s undo mute post not found in inbox or outbox')
|
|
print(message_id)
|
|
return
|
|
nickname_muted = get_nickname_from_actor(message_json['object']['object'])
|
|
if not nickname_muted:
|
|
print('WARN: unable to find nickname in ' +
|
|
message_json['object']['object'])
|
|
return
|
|
|
|
unmute_post(base_dir, nickname, domain, port,
|
|
http_prefix, message_json['object']['object'],
|
|
recent_posts_cache, debug)
|
|
|
|
if debug:
|
|
print('DEBUG: post undo mute via c2s - ' + post_filename)
|
|
|
|
|
|
def broch_mode_is_active(base_dir: str) -> bool:
|
|
"""Returns true if broch mode is active
|
|
"""
|
|
allow_filename = base_dir + '/accounts/allowedinstances.txt'
|
|
return os.path.isfile(allow_filename)
|
|
|
|
|
|
def set_broch_mode(base_dir: str, domain_full: str, enabled: bool) -> None:
|
|
"""Broch mode can be used to lock down the instance during
|
|
a period of time when it is temporarily under attack.
|
|
For example, where an adversary is constantly spinning up new
|
|
instances.
|
|
It surveys the following lists of all accounts and uses that
|
|
to construct an instance level allow list. Anything arriving
|
|
which is then not from one of the allowed domains will be dropped
|
|
"""
|
|
allow_filename = base_dir + '/accounts/allowedinstances.txt'
|
|
|
|
if not enabled:
|
|
# remove instance allow list
|
|
if os.path.isfile(allow_filename):
|
|
try:
|
|
os.remove(allow_filename)
|
|
except OSError:
|
|
print('EX: set_broch_mode allow file not deleted ' +
|
|
str(allow_filename))
|
|
print('Broch mode turned off')
|
|
else:
|
|
if os.path.isfile(allow_filename):
|
|
last_modified = file_last_modified(allow_filename)
|
|
print('Broch mode already activated ' + last_modified)
|
|
return
|
|
# generate instance allow list
|
|
allowed_domains = [domain_full]
|
|
follow_files = ('following.txt', 'followers.txt')
|
|
for _, dirs, _ in os.walk(base_dir + '/accounts'):
|
|
for acct in dirs:
|
|
if not is_account_dir(acct):
|
|
continue
|
|
account_dir = os.path.join(base_dir + '/accounts', acct)
|
|
for follow_file_type in follow_files:
|
|
following_filename = account_dir + '/' + follow_file_type
|
|
if not os.path.isfile(following_filename):
|
|
continue
|
|
try:
|
|
with open(following_filename, 'r',
|
|
encoding='utf-8') as foll_file:
|
|
follow_list = foll_file.readlines()
|
|
for handle in follow_list:
|
|
if '@' not in handle:
|
|
continue
|
|
handle = remove_eol(handle)
|
|
handle_domain = handle.split('@')[1]
|
|
if handle_domain not in allowed_domains:
|
|
allowed_domains.append(handle_domain)
|
|
except OSError as ex:
|
|
print('EX: failed to read ' + following_filename +
|
|
' ' + str(ex))
|
|
break
|
|
|
|
# write the allow file
|
|
try:
|
|
with open(allow_filename, 'w+',
|
|
encoding='utf-8') as allow_file:
|
|
allow_file.write(domain_full + '\n')
|
|
for allowed in allowed_domains:
|
|
allow_file.write(allowed + '\n')
|
|
print('Broch mode enabled')
|
|
except OSError as ex:
|
|
print('EX: Broch mode not enabled due to file write ' + str(ex))
|
|
return
|
|
|
|
set_config_param(base_dir, "brochMode", enabled)
|
|
|
|
|
|
def broch_modeLapses(base_dir: str, lapseDays: int) -> bool:
|
|
"""After broch mode is enabled it automatically
|
|
elapses after a period of time
|
|
"""
|
|
allow_filename = base_dir + '/accounts/allowedinstances.txt'
|
|
if not os.path.isfile(allow_filename):
|
|
return False
|
|
last_modified = file_last_modified(allow_filename)
|
|
modified_date = \
|
|
date_from_string_format(last_modified, ["%Y-%m-%dT%H:%M:%S%z"])
|
|
if not modified_date:
|
|
print('EX: broch_modeLapses date not parsed ' + str(last_modified))
|
|
return False
|
|
curr_time = date_utcnow()
|
|
days_since_broch = (curr_time - modified_date).days
|
|
if days_since_broch >= lapseDays:
|
|
removed = False
|
|
try:
|
|
os.remove(allow_filename)
|
|
removed = True
|
|
except OSError:
|
|
print('EX: broch_modeLapses allow file not deleted ' +
|
|
str(allow_filename))
|
|
if removed:
|
|
set_config_param(base_dir, "brochMode", False)
|
|
print('Broch mode has elapsed')
|
|
return True
|
|
return False
|
|
|
|
|
|
def import_blocking_file(base_dir: str, nickname: str, domain: str,
|
|
lines: []) -> bool:
|
|
"""Imports blocked domains for a given account
|
|
"""
|
|
if not lines:
|
|
return False
|
|
if len(lines) < 2:
|
|
return False
|
|
if not lines[0].startswith('#domain,#') or \
|
|
'comment' not in lines[0]:
|
|
return False
|
|
fieldnames = lines[0].split(',')
|
|
comment_field_index = 0
|
|
for field_str in fieldnames:
|
|
if 'comment' in field_str:
|
|
break
|
|
comment_field_index += 1
|
|
if comment_field_index >= len(fieldnames):
|
|
return False
|
|
|
|
account_directory = acct_dir(base_dir, nickname, domain)
|
|
blocking_filename = \
|
|
account_directory + '/blocking.txt'
|
|
blocking_reasons_filename = \
|
|
account_directory + '/blocking_reasons.txt'
|
|
|
|
existing_lines = []
|
|
if os.path.isfile(blocking_filename):
|
|
try:
|
|
with open(blocking_filename, 'r', encoding='utf-8') as fp_blocks:
|
|
existing_lines = fp_blocks.read().splitlines()
|
|
except OSError:
|
|
print('EX: ' +
|
|
'unable to import existing blocked instances from file ' +
|
|
blocking_filename)
|
|
existing_reasons = []
|
|
if os.path.isfile(blocking_reasons_filename):
|
|
try:
|
|
with open(blocking_reasons_filename,
|
|
'r', encoding='utf-8') as fp_blocks:
|
|
existing_reasons = fp_blocks.read().splitlines()
|
|
except OSError:
|
|
print('EX: ' +
|
|
'unable to import existing ' +
|
|
'blocked instance reasons from file ' +
|
|
blocking_reasons_filename)
|
|
|
|
append_blocks = []
|
|
append_reasons = []
|
|
for line_str in lines:
|
|
if line_str.startswith('#'):
|
|
continue
|
|
block_fields = line_str.split(',')
|
|
blocked_domain_name = block_fields[0].strip()
|
|
if ' ' in blocked_domain_name or \
|
|
'.' not in blocked_domain_name:
|
|
continue
|
|
if blocked_domain_name in existing_lines:
|
|
# already blocked
|
|
continue
|
|
append_blocks.append(blocked_domain_name)
|
|
blocked_comment = ''
|
|
if '"' in line_str:
|
|
quote_section = line_str.split('"')
|
|
if len(quote_section) > 1:
|
|
blocked_comment = quote_section[1]
|
|
append_reasons.append(blocked_domain_name + ' ' +
|
|
blocked_comment)
|
|
if not blocked_comment:
|
|
if len(block_fields) > comment_field_index:
|
|
blocked_comment = block_fields[comment_field_index].strip()
|
|
if blocked_comment:
|
|
if blocked_comment.startswith('"'):
|
|
blocked_comment = blocked_comment.replace('"', '')
|
|
if blocked_comment not in existing_reasons:
|
|
append_reasons.append(blocked_domain_name + ' ' +
|
|
blocked_comment)
|
|
if not append_blocks:
|
|
return True
|
|
|
|
try:
|
|
with open(blocking_filename, 'a+', encoding='utf-8') as fp_blocks:
|
|
for new_block in append_blocks:
|
|
fp_blocks.write(new_block + '\n')
|
|
except OSError:
|
|
print('EX: ' +
|
|
'unable to append imported blocks to ' +
|
|
blocking_filename)
|
|
|
|
try:
|
|
with open(blocking_reasons_filename, 'a+',
|
|
encoding='utf-8') as fp_blocks:
|
|
for new_reason in append_reasons:
|
|
fp_blocks.write(new_reason + '\n')
|
|
except OSError:
|
|
print('EX: ' +
|
|
'unable to append imported block reasons to ' +
|
|
blocking_reasons_filename)
|
|
|
|
return True
|
|
|
|
|
|
def export_blocking_file(base_dir: str, nickname: str, domain: str) -> str:
|
|
"""exports account level blocks in a csv format
|
|
"""
|
|
account_directory = acct_dir(base_dir, nickname, domain)
|
|
blocking_filename = \
|
|
account_directory + '/blocking.txt'
|
|
blocking_reasons_filename = \
|
|
account_directory + '/blocking_reasons.txt'
|
|
|
|
blocks_header = \
|
|
'#domain,#severity,#reject_media,#reject_reports,' + \
|
|
'#public_comment,#obfuscate\n'
|
|
|
|
if not os.path.isfile(blocking_filename):
|
|
return blocks_header
|
|
|
|
blocking_lines = []
|
|
if os.path.isfile(blocking_filename):
|
|
try:
|
|
with open(blocking_filename, 'r', encoding='utf-8') as fp_block:
|
|
blocking_lines = fp_block.read().splitlines()
|
|
except OSError:
|
|
print('EX: export_blocks failed to read ' + blocking_filename)
|
|
|
|
blocking_reasons = []
|
|
if os.path.isfile(blocking_reasons_filename):
|
|
try:
|
|
with open(blocking_reasons_filename, 'r',
|
|
encoding='utf-8') as fp_block:
|
|
blocking_reasons = fp_block.read().splitlines()
|
|
except OSError:
|
|
print('EX: export_blocks failed to read ' +
|
|
blocking_reasons_filename)
|
|
|
|
blocks_str = blocks_header
|
|
for blocked_domain in blocking_lines:
|
|
blocked_domain = blocked_domain.strip()
|
|
if blocked_domain.startswith('#'):
|
|
continue
|
|
reason_str = ''
|
|
for reason_line in blocking_reasons:
|
|
if reason_line.startswith(blocked_domain + ' '):
|
|
reason_str = reason_line.split(' ', 1)[1]
|
|
break
|
|
blocks_str += \
|
|
blocked_domain + ',suspend,false,false,"' + \
|
|
reason_str + '",false\n'
|
|
return blocks_str
|
|
|
|
|
|
def get_blocks_via_server(session, nickname: str, password: str,
|
|
domain: str, port: int,
|
|
http_prefix: str, page_number: int, debug: bool,
|
|
version: str,
|
|
signing_priv_key_pem: str) -> {}:
|
|
"""Returns the blocked collection for shared items via c2s
|
|
https://codeberg.org/fediverse/fep/src/branch/main/fep/c648/fep-c648.md
|
|
"""
|
|
if not session:
|
|
print('WARN: No session for get_blocks_via_server')
|
|
return 6
|
|
|
|
auth_header = create_basic_auth_header(nickname, password)
|
|
|
|
headers = {
|
|
'host': domain,
|
|
'Content-type': 'application/json',
|
|
'Authorization': auth_header,
|
|
'Accept': 'application/json'
|
|
}
|
|
domain_full = get_full_domain(domain, port)
|
|
url = local_actor_url(http_prefix, nickname, domain_full) + \
|
|
'/blocked?page=' + str(page_number)
|
|
if debug:
|
|
print('Blocked collection request to: ' + url)
|
|
blocked_json = get_json(signing_priv_key_pem, session, url, headers, None,
|
|
debug, version, http_prefix, None)
|
|
if not get_json_valid(blocked_json):
|
|
if debug:
|
|
print('DEBUG: GET blocked collection failed for c2s to ' + url)
|
|
# return 5
|
|
|
|
if debug:
|
|
print('DEBUG: c2s GET blocked collection success')
|
|
|
|
return blocked_json
|
|
|
|
|
|
def load_blocked_military(base_dir: str) -> {}:
|
|
"""Loads a list of nicknames for accounts which block military instances
|
|
"""
|
|
block_military_filename = base_dir + '/accounts/block_military.txt'
|
|
nicknames_list = []
|
|
if os.path.isfile(block_military_filename):
|
|
try:
|
|
with open(block_military_filename, 'r',
|
|
encoding='utf-8') as fp_mil:
|
|
nicknames_list = fp_mil.read()
|
|
except OSError:
|
|
print('EX: error while reading block military file')
|
|
if not nicknames_list:
|
|
return {}
|
|
nicknames_list = nicknames_list.split('\n')
|
|
nicknames_dict = {}
|
|
for nickname in nicknames_list:
|
|
nicknames_dict[nickname] = True
|
|
return nicknames_dict
|
|
|
|
|
|
def save_blocked_military(base_dir: str, block_military: {}) -> None:
|
|
"""Saves a list of nicknames for accounts which block military instances
|
|
"""
|
|
nicknames_str = ''
|
|
for nickname, _ in block_military.items():
|
|
nicknames_str += nickname + '\n'
|
|
|
|
block_military_filename = base_dir + '/accounts/block_military.txt'
|
|
try:
|
|
with open(block_military_filename, 'w+',
|
|
encoding='utf-8') as fp_mil:
|
|
fp_mil.write(nicknames_str)
|
|
except OSError:
|
|
print('EX: error while saving block military file')
|
|
|
|
|
|
def get_mil_domains_list() -> []:
|
|
"""returns a list of military domains
|
|
"""
|
|
return ('army', 'navy', 'airforce', 'mil',
|
|
'sncorp.com', 'sierranevadacorp.us', 'ncontext.com')
|
|
|
|
|
|
def contains_military_domain(message_str: str) -> bool:
|
|
"""Returns true if the given string contains a military domain
|
|
"""
|
|
mil_domains = get_mil_domains_list()
|
|
for domain_str in mil_domains:
|
|
if '.' not in domain_str:
|
|
tld = domain_str
|
|
if '.' + tld + '"' in message_str or \
|
|
'.' + tld + '/' in message_str:
|
|
return True
|
|
else:
|
|
if domain_str + '"' in message_str or \
|
|
domain_str + '/' in message_str:
|
|
return True
|
|
return False
|
|
|
|
|
|
def load_federated_blocks_endpoints(base_dir: str) -> []:
|
|
"""Loads endpoint urls for federated blocklists
|
|
"""
|
|
block_federated_endpoints = []
|
|
block_api_endpoints_filename = \
|
|
base_dir + '/accounts/block_api_endpoints.txt'
|
|
if os.path.isfile(block_api_endpoints_filename):
|
|
new_block_federated_endpoints = []
|
|
try:
|
|
with open(block_api_endpoints_filename, 'r',
|
|
encoding='utf-8') as fp_ep:
|
|
new_block_federated_endpoints = fp_ep.read().split('\n')
|
|
except OSError:
|
|
print('EX: unable to load block_api_endpoints.txt')
|
|
for endpoint in new_block_federated_endpoints:
|
|
if endpoint:
|
|
if '#' not in endpoint:
|
|
block_federated_endpoints.append(endpoint)
|
|
return block_federated_endpoints
|
|
|
|
|
|
def _update_federated_blocks(session, base_dir: str,
|
|
http_prefix: str,
|
|
domain: str,
|
|
debug: bool, version: str,
|
|
signing_priv_key_pem: str,
|
|
max_api_blocks: int) -> []:
|
|
"""Creates block_api.txt
|
|
"""
|
|
block_federated = []
|
|
debug = True
|
|
|
|
if not session:
|
|
print('WARN: federated blocklist ' +
|
|
'no session for update_federated_blocks')
|
|
return block_federated
|
|
|
|
headers = {
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
block_federated_endpoints = load_federated_blocks_endpoints(base_dir)
|
|
if debug:
|
|
print('DEBUG: federated blocklist endpoints: ' +
|
|
str(block_federated_endpoints))
|
|
|
|
new_block_api_str = ''
|
|
for endpoint in block_federated_endpoints:
|
|
if not endpoint:
|
|
continue
|
|
url = endpoint.strip()
|
|
|
|
if debug:
|
|
print('federated blocklist Block API endpoint: ' + url)
|
|
blocked_json = get_json(signing_priv_key_pem, session, url, headers,
|
|
None, debug, version, http_prefix, domain)
|
|
if not get_json_valid(blocked_json):
|
|
print('DEBUG: federated blocklist ' +
|
|
'GET blocked json failed ' + url)
|
|
continue
|
|
if debug:
|
|
print('DEBUG: federated blocklist: ' + str(blocked_json))
|
|
if isinstance(blocked_json, list):
|
|
# ensure that the size of the list does not become a form of denial
|
|
# of service
|
|
if len(blocked_json) < max_api_blocks:
|
|
for block_dict in blocked_json:
|
|
if not isinstance(block_dict, dict):
|
|
continue
|
|
if not block_dict.get('username'):
|
|
continue
|
|
if not isinstance(block_dict['username'], str):
|
|
continue
|
|
handle = block_dict['username']
|
|
if handle.startswith('@'):
|
|
handle = handle[1:]
|
|
if ' ' in handle or \
|
|
',' in handle or \
|
|
';' in handle or \
|
|
'.' not in handle or \
|
|
'<' in handle:
|
|
continue
|
|
new_block_api_str += handle + '\n'
|
|
if handle not in block_federated:
|
|
block_federated.append(handle)
|
|
|
|
block_api_filename = \
|
|
base_dir + '/accounts/block_api.txt'
|
|
if not new_block_api_str:
|
|
print('DEBUG: federated blocklist not loaded: ' + block_api_filename)
|
|
if os.path.isfile(block_api_filename):
|
|
try:
|
|
os.remove(block_api_filename)
|
|
except OSError:
|
|
print('EX: unable to remove block api: ' + block_api_filename)
|
|
else:
|
|
print('DEBUG: federated blocklist loaded: ' + str(block_federated))
|
|
try:
|
|
with open(block_api_filename, 'w+', encoding='utf-8') as fp_api:
|
|
fp_api.write(new_block_api_str)
|
|
except OSError:
|
|
print('EX: unable to write block_api.txt')
|
|
|
|
return block_federated
|
|
|
|
|
|
def save_block_federated_endpoints(base_dir: str,
|
|
block_federated_endpoints: []) -> []:
|
|
"""Saves a list of blocking API endpoints
|
|
"""
|
|
block_api_endpoints_filename = \
|
|
base_dir + '/accounts/block_api_endpoints.txt'
|
|
result = []
|
|
block_federated_endpoints_str = ''
|
|
for endpoint in block_federated_endpoints:
|
|
if not endpoint:
|
|
continue
|
|
if '.' not in endpoint or \
|
|
' ' in endpoint or \
|
|
'<' in endpoint or \
|
|
',' in endpoint or \
|
|
';' in endpoint:
|
|
continue
|
|
if endpoint.startswith('@'):
|
|
endpoint = endpoint[1:]
|
|
if not endpoint:
|
|
continue
|
|
block_federated_endpoints_str += endpoint.strip() + '\n'
|
|
result.append(endpoint)
|
|
if not block_federated_endpoints_str:
|
|
if os.path.isfile(block_api_endpoints_filename):
|
|
try:
|
|
os.remove(block_api_endpoints_filename)
|
|
except OSError:
|
|
print('EX: unable to delete block_api_endpoints.txt')
|
|
block_api_filename = \
|
|
base_dir + '/accounts/block_api.txt'
|
|
if os.path.isfile(block_api_filename):
|
|
try:
|
|
os.remove(block_api_filename)
|
|
except OSError:
|
|
print('EX: unable to delete block_api.txt')
|
|
else:
|
|
try:
|
|
with open(block_api_endpoints_filename, 'w+',
|
|
encoding='utf-8') as fp_api:
|
|
fp_api.write(block_federated_endpoints_str)
|
|
except OSError:
|
|
print('EX: unable to write block_api_endpoints.txt')
|
|
return result
|
|
|
|
|
|
def run_federated_blocks_daemon(base_dir: str, httpd, debug: bool) -> None:
|
|
"""Runs the daemon used to update federated blocks
|
|
"""
|
|
print('DEBUG: federated blocklist 0')
|
|
seconds_per_hour = 60 * 60
|
|
time.sleep(60)
|
|
|
|
session = None
|
|
while True:
|
|
print('DEBUG: federated blocklist 1')
|
|
if httpd.session:
|
|
session = httpd.session
|
|
else:
|
|
session = create_session(httpd.proxy_type)
|
|
|
|
if session:
|
|
print('DEBUG: federated blocklist 2')
|
|
httpd.block_federated = \
|
|
_update_federated_blocks(httpd.session, base_dir,
|
|
httpd.http_prefix,
|
|
httpd.domain,
|
|
debug, httpd.project_version,
|
|
httpd.signing_priv_key_pem,
|
|
httpd.max_api_blocks)
|
|
time.sleep(seconds_per_hour * 6)
|