epicyon/blocking.py

357 lines
13 KiB
Python

__filename__ = "blocking.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from utils import removeIdEnding
from utils import isEvil
from utils import locatePost
from utils import evilIncarnate
from utils import getDomainFromActor
from utils import getNicknameFromActor
def addGlobalBlock(baseDir: str,
blockNickname: str, blockDomain: str) -> bool:
"""Global block which applies to all accounts
"""
blockingFilename = baseDir + '/accounts/blocking.txt'
if not blockNickname.startswith('#'):
# is the handle already blocked?
blockHandle = blockNickname + '@' + blockDomain
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
return False
# block an account handle or domain
blockFile = open(blockingFilename, "a+")
if blockFile:
blockFile.write(blockHandle + '\n')
blockFile.close()
else:
blockHashtag = blockNickname
# is the hashtag already blocked?
if os.path.isfile(blockingFilename):
if blockHashtag + '\n' in open(blockingFilename).read():
return False
# block a hashtag
blockFile = open(blockingFilename, "a+")
if blockFile:
blockFile.write(blockHashtag + '\n')
blockFile.close()
return True
def addBlock(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Block the given account
"""
if ':' in domain:
domain = domain.split(':')[0]
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
blockHandle = blockNickname + '@' + blockDomain
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
return False
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHandle + '\n')
blockFile.close()
return True
def removeGlobalBlock(baseDir: str,
unblockNickname: str,
unblockDomain: str) -> bool:
"""Unblock the given global block
"""
unblockingFilename = baseDir + '/accounts/blocking.txt'
if not unblockNickname.startswith('#'):
unblockHandle = unblockNickname + '@' + unblockDomain
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename + '.new', 'w+') as fpnew:
for line in fp:
handle = line.replace('\n', '').replace('\r', '')
if unblockHandle not in line:
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
else:
unblockHashtag = unblockNickname
if os.path.isfile(unblockingFilename):
if unblockHashtag + '\n' in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename + '.new', 'w+') as fpnew:
for line in fp:
blockLine = \
line.replace('\n', '').replace('\r', '')
if unblockHashtag not in line:
fpnew.write(blockLine + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
return False
def removeBlock(baseDir: str, nickname: str, domain: str,
unblockNickname: str, unblockDomain: str) -> bool:
"""Unblock the given account
"""
if ':' in domain:
domain = domain.split(':')[0]
unblockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
unblockHandle = unblockNickname + '@' + unblockDomain
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename + '.new', 'w+') as fpnew:
for line in fp:
handle = line.replace('\n', '').replace('\r', '')
if unblockHandle not in line:
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
return False
def isBlockedHashtag(baseDir: str, hashtag: str) -> bool:
"""Is the given hashtag blocked?
"""
# avoid very long hashtags
if len(hashtag) > 32:
return True
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
hashtag = hashtag.strip('\n').strip('\r')
if hashtag + '\n' in open(globalBlockingFilename).read():
return True
return False
def getDomainBlocklist(baseDir: str) -> str:
"""Returns all globally blocked domains as a string
This can be used for fast matching to mitigate flooding
"""
blockedStr = ''
evilDomains = evilIncarnate()
for evil in evilDomains:
blockedStr += evil + '\n'
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if not os.path.isfile(globalBlockingFilename):
return blockedStr
with open(globalBlockingFilename, 'r') as fpBlocked:
blockedStr += fpBlocked.read()
return blockedStr
def isBlockedDomain(baseDir: str, domain: str) -> bool:
"""Is the given domain blocked?
"""
if '.' not in domain:
return False
if isEvil(domain):
return True
# by checking a shorter version we can thwart adversaries
# who constantly change their subdomain
sections = domain.split('.')
noOfSections = len(sections)
shortDomain = None
if noOfSections > 2:
shortDomain = domain[noOfSections-2] + '.' + domain[noOfSections-1]
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
with open(globalBlockingFilename, 'r') as fpBlocked:
blockedStr = fpBlocked.read()
if '*@' + domain in blockedStr:
return True
if shortDomain:
if '*@' + shortDomain in blockedStr:
return True
return False
def isBlocked(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Is the given nickname blocked?
"""
if isEvil(blockDomain):
return True
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
if '*@' + blockDomain in open(globalBlockingFilename).read():
return True
if blockNickname:
blockHandle = blockNickname + '@' + blockDomain
if blockHandle in open(globalBlockingFilename).read():
return True
allowFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/allowedinstances.txt'
if os.path.isfile(allowFilename):
if blockDomain not in open(allowFilename).read():
return True
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
if os.path.isfile(blockingFilename):
if '*@' + blockDomain in open(blockingFilename).read():
return True
if blockNickname:
blockHandle = blockNickname + '@' + blockDomain
if blockHandle in open(blockingFilename).read():
return True
return False
def outboxBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When a block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: block - no type')
return
if not messageJson['type'] == 'Block':
if debug:
print('DEBUG: not a block')
return
if not messageJson.get('object'):
if debug:
print('DEBUG: no object in block')
return
if not isinstance(messageJson['object'], str):
if debug:
print('DEBUG: block object is not string')
return
if debug:
print('DEBUG: c2s block request arrived in outbox')
messageId = removeIdEnding(messageJson['object'])
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s block object is not a status')
return
if '/users/' not in messageId and \
'/accounts/' not in messageId and \
'/channel/' not in messageId and \
'/profile/' not in messageId:
if debug:
print('DEBUG: c2s block object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s block post not found in inbox or outbox')
print(messageId)
return
nicknameBlocked = getNicknameFromActor(messageJson['object'])
if not nicknameBlocked:
print('WARN: unable to find nickname in ' + messageJson['object'])
return
domainBlocked, portBlocked = getDomainFromActor(messageJson['object'])
domainBlockedFull = domainBlocked
if portBlocked:
if portBlocked != 80 and portBlocked != 443:
if ':' not in domainBlocked:
domainBlockedFull = domainBlocked + ':' + str(portBlocked)
addBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post blocked via c2s - ' + postFilename)
def outboxUndoBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When an undo block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: undo block - no type')
return
if not messageJson['type'] == 'Undo':
if debug:
print('DEBUG: not an undo block')
return
if not messageJson.get('object'):
if debug:
print('DEBUG: no object in undo block')
return
if not isinstance(messageJson['object'], dict):
if debug:
print('DEBUG: undo block object is not string')
return
if not messageJson['object'].get('type'):
if debug:
print('DEBUG: undo block - no type')
return
if not messageJson['object']['type'] == 'Block':
if debug:
print('DEBUG: not an undo block')
return
if not messageJson['object'].get('object'):
if debug:
print('DEBUG: no object in undo block')
return
if not isinstance(messageJson['object']['object'], str):
if debug:
print('DEBUG: undo block object is not string')
return
if debug:
print('DEBUG: c2s undo block request arrived in outbox')
messageId = removeIdEnding(messageJson['object']['object'])
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s undo block object is not a status')
return
if '/users/' not in messageId and \
'/accounts/' not in messageId and \
'/channel/' not in messageId and \
'/profile/' not in messageId:
if debug:
print('DEBUG: c2s undo block object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s undo block post not found in inbox or outbox')
print(messageId)
return
nicknameBlocked = getNicknameFromActor(messageJson['object']['object'])
if not nicknameBlocked:
print('WARN: unable to find nickname in ' +
messageJson['object']['object'])
return
domainObject = messageJson['object']['object']
domainBlocked, portBlocked = getDomainFromActor(domainObject)
domainBlockedFull = domainBlocked
if portBlocked:
if portBlocked != 80 and portBlocked != 443:
if ':' not in domainBlocked:
domainBlockedFull = domainBlocked + ':' + str(portBlocked)
removeBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post undo blocked via c2s - ' + postFilename)