mirror of https://gitlab.com/bashrc2/epicyon
Block based on nickname
parent
0ae7f614d9
commit
cec7114709
38
blocking.py
38
blocking.py
|
@ -574,10 +574,11 @@ def is_blocked_domain(base_dir: str, domain: str,
|
||||||
|
|
||||||
short_domain = _get_short_domain(domain)
|
short_domain = _get_short_domain(domain)
|
||||||
|
|
||||||
|
search_str = '*@' + domain
|
||||||
if not broch_mode_is_active(base_dir):
|
if not broch_mode_is_active(base_dir):
|
||||||
if blocked_cache:
|
if blocked_cache:
|
||||||
for blocked_str in blocked_cache:
|
for blocked_str in blocked_cache:
|
||||||
if blocked_str == '*@' + domain:
|
if blocked_str == search_str:
|
||||||
return True
|
return True
|
||||||
if short_domain:
|
if short_domain:
|
||||||
if blocked_str == '*@' + short_domain:
|
if blocked_str == '*@' + short_domain:
|
||||||
|
@ -586,14 +587,18 @@ def is_blocked_domain(base_dir: str, domain: str,
|
||||||
# instance block list
|
# instance block list
|
||||||
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
||||||
if os.path.isfile(global_blocking_filename):
|
if os.path.isfile(global_blocking_filename):
|
||||||
|
search_str += '\n'
|
||||||
|
search_str_short = None
|
||||||
|
if short_domain:
|
||||||
|
search_str_short = '*@' + short_domain + '\n'
|
||||||
try:
|
try:
|
||||||
with open(global_blocking_filename, 'r',
|
with open(global_blocking_filename, 'r',
|
||||||
encoding='utf-8') as fp_blocked:
|
encoding='utf-8') as fp_blocked:
|
||||||
blocked_str = fp_blocked.read()
|
blocked_str = fp_blocked.read()
|
||||||
if '*@' + domain + '\n' in blocked_str:
|
if search_str in blocked_str:
|
||||||
return True
|
return True
|
||||||
if short_domain:
|
if short_domain:
|
||||||
if '*@' + short_domain + '\n' in blocked_str:
|
if search_str_short in blocked_str:
|
||||||
return True
|
return True
|
||||||
except OSError as ex:
|
except OSError as ex:
|
||||||
print('EX: unable to read ' + global_blocking_filename +
|
print('EX: unable to read ' + global_blocking_filename +
|
||||||
|
@ -611,6 +616,33 @@ def is_blocked_domain(base_dir: str, domain: str,
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def is_blocked_nickname(base_dir: str, nickname: str,
|
||||||
|
blocked_cache: [] = None) -> bool:
|
||||||
|
"""Is the given nickname blocked?
|
||||||
|
"""
|
||||||
|
search_str = nickname + '@*'
|
||||||
|
if blocked_cache:
|
||||||
|
for blocked_str in blocked_cache:
|
||||||
|
if blocked_str == search_str:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# instance-wide block list
|
||||||
|
global_blocking_filename = base_dir + '/accounts/blocking.txt'
|
||||||
|
if os.path.isfile(global_blocking_filename):
|
||||||
|
search_str += '\n'
|
||||||
|
try:
|
||||||
|
with open(global_blocking_filename, 'r',
|
||||||
|
encoding='utf-8') as fp_blocked:
|
||||||
|
blocked_str = fp_blocked.read()
|
||||||
|
if search_str in blocked_str:
|
||||||
|
return True
|
||||||
|
except OSError as ex:
|
||||||
|
print('EX: unable to read ' + global_blocking_filename +
|
||||||
|
' ' + str(ex))
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_blocked(base_dir: str, nickname: str, domain: str,
|
def is_blocked(base_dir: str, nickname: str, domain: str,
|
||||||
block_nickname: str, block_domain: str,
|
block_nickname: str, block_domain: str,
|
||||||
blocked_cache: [] = None) -> bool:
|
blocked_cache: [] = None) -> bool:
|
||||||
|
|
17
daemon.py
17
daemon.py
|
@ -164,6 +164,7 @@ from blocking import remove_block
|
||||||
from blocking import add_global_block
|
from blocking import add_global_block
|
||||||
from blocking import remove_global_block
|
from blocking import remove_global_block
|
||||||
from blocking import is_blocked_hashtag
|
from blocking import is_blocked_hashtag
|
||||||
|
from blocking import is_blocked_nickname
|
||||||
from blocking import is_blocked_domain
|
from blocking import is_blocked_domain
|
||||||
from blocking import get_domain_blocklist
|
from blocking import get_domain_blocklist
|
||||||
from blocking import allowed_announce_add
|
from blocking import allowed_announce_add
|
||||||
|
@ -2150,6 +2151,22 @@ class PubServer(BaseHTTPRequestHandler):
|
||||||
self.server.postreq_busy = False
|
self.server.postreq_busy = False
|
||||||
return 3
|
return 3
|
||||||
|
|
||||||
|
message_nickname, _ = \
|
||||||
|
get_nickname_from_actor(message_json['actor'])
|
||||||
|
if not message_nickname:
|
||||||
|
print('INBOX: POST from unknown nickname ' + message_json['actor'])
|
||||||
|
self._400()
|
||||||
|
self.server.postreq_busy = False
|
||||||
|
return 3
|
||||||
|
if debug:
|
||||||
|
print('INBOX: checking for blocked nickname ' + message_nickname)
|
||||||
|
if is_blocked_nickname(self.server.base_dir, message_nickname,
|
||||||
|
self.server.blocked_cache):
|
||||||
|
print('INBOX: POST from blocked nickname ' + message_nickname)
|
||||||
|
self._400()
|
||||||
|
self.server.postreq_busy = False
|
||||||
|
return 3
|
||||||
|
|
||||||
# if the inbox queue is full then return a busy code
|
# if the inbox queue is full then return a busy code
|
||||||
if debug:
|
if debug:
|
||||||
print('INBOX: checking for full queue')
|
print('INBOX: checking for full queue')
|
||||||
|
|
Loading…
Reference in New Issue