Allow wildcards within domain blocks

main
Bob Mottram 2025-05-03 21:16:20 +01:00
parent 7b2750f061
commit d2e03e5dce
2 changed files with 62 additions and 22 deletions

View File

@ -725,42 +725,58 @@ def is_blocked_domain(base_dir: str, domain: str,
short_domain = _get_short_domain(domain)
search_str = '*@' + domain
if not broch_mode_is_active(base_dir):
block_federated_list = blocked_cache_list = []
if block_federated:
if domain in block_federated:
return True
block_federated_list = block_federated
if blocked_cache:
for blocked_str in blocked_cache:
if blocked_str == search_str:
return True
if short_domain:
if blocked_str == '*@' + short_domain:
return True
else:
if not blocked_cache:
blocked_cache = []
# instance block list
global_blocking_filename = data_dir(base_dir) + '/blocking.txt'
if os.path.isfile(global_blocking_filename):
search_str += '\n'
search_str_short = None
if short_domain:
search_str_short = '*@' + short_domain + '\n'
try:
with open(global_blocking_filename, 'r',
encoding='utf-8') as fp_blocked:
blocked_str = fp_blocked.read()
if search_str in blocked_str:
return True
if short_domain:
if search_str_short in blocked_str:
return True
blocked_cache = fp_blocked.read().split('\n')
except OSError as ex:
print('EX: is_blocked_domain unable to read ' +
global_blocking_filename + ' ' + str(ex))
if blocked_cache:
blocked_cache_list = blocked_cache
else:
blocked_cache_list = blocked_cache
search_str = '*@' + domain
for curr_block_list in (blocked_cache_list, block_federated_list):
for blocked_str in curr_block_list:
blocked_str = blocked_str.strip()
if not blocked_str:
continue
if blocked_str.startswith('#'):
# skip over commented out lines
continue
if '*@' not in blocked_str:
continue
blocked_dom = blocked_str.split('*@')[1]
if len(blocked_dom) < 3:
continue
if '?' not in blocked_dom and '*' not in blocked_dom:
if blocked_str == search_str:
return True
if short_domain:
if blocked_str == '*@' + short_domain:
return True
else:
# allow wildcards within domain blocks
if fnmatch.fnmatchcase(domain, blocked_dom):
return True
if short_domain:
if fnmatch.fnmatchcase(short_domain, blocked_dom):
return True
else:
# use temporary allowlist
allow_filename = data_dir(base_dir) + '/allowedinstances.txt'
# instance allow list
if not short_domain:
if not text_in_file(domain, allow_filename):
return True
@ -800,6 +816,8 @@ def is_blocked_nickname(base_dir: str, nickname: str,
if not blocked_str.endswith('@*'):
continue
blocked_nick = blocked_str.split('@*')[0]
if len(blocked_nick) < 3:
continue
if '?' not in blocked_nick and '*' not in blocked_nick:
if blocked_str == search_str:
return True

View File

@ -230,6 +230,7 @@ from conversation import conversation_tag_to_convthread_id
from conversation import convthread_id_to_conversation_tag
from webapp_utils import add_emoji_to_display_name
from blocking import is_blocked_nickname
from blocking import is_blocked_domain
TEST_SERVER_GROUP_RUNNING = False
@ -9230,6 +9231,26 @@ def _test_blocking_nick(base_dir: str) -> None:
assert is_blocked_nickname(base_dir, 'chud674', blocked_cache)
def _test_blocking_domain(base_dir: str) -> None:
print('blocking domain')
block_federated = ['*@hate*.domain']
blocked_cache = ['*@weasel.social', '*@chud*.city']
assert not is_blocked_domain(base_dir, 'fedi.instance', blocked_cache,
block_federated)
assert is_blocked_domain(base_dir, 'weasel.social', blocked_cache,
block_federated)
assert is_blocked_domain(base_dir, 'chud.city', blocked_cache,
block_federated)
assert is_blocked_domain(base_dir, 'chud78.city', blocked_cache,
block_federated)
assert not is_blocked_domain(base_dir, '78chud.city', blocked_cache,
block_federated)
assert is_blocked_domain(base_dir, 'hate623.domain', blocked_cache,
block_federated)
assert is_blocked_domain(base_dir, 'hate.domain', blocked_cache,
block_federated)
def run_all_tests():
base_dir = os.getcwd()
data_dir_testing(base_dir)
@ -9248,6 +9269,7 @@ def run_all_tests():
_test_checkbox_names()
_test_thread_functions()
_test_functions()
_test_blocking_domain(base_dir)
_test_blocking_nick(base_dir)
_test_conversation_to_convthread()
_test_bridgy()