epicyon/blocking.py

330 lines
12 KiB
Python

__filename__ = "blocking.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from utils import isEvil
from utils import locatePost
from utils import evilIncarnate
from utils import getDomainFromActor
from utils import getNicknameFromActor
from session import postJson
from auth import createBasicAuthHeader
from webfinger import webfingerHandle
def addGlobalBlock(baseDir: str,
blockNickname: str, blockDomain: str) -> bool:
"""Global block which applies to all accounts
"""
blockingFilename = baseDir + '/accounts/blocking.txt'
if not blockNickname.startswith('#'):
blockHandle = blockNickname + '@' + blockDomain
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
return False
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHandle + '\n')
blockFile.close()
else:
blockHashtag = blockNickname
if os.path.isfile(blockingFilename):
if blockHashtag + '\n' in open(blockingFilename).read():
return False
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHashtag + '\n')
blockFile.close()
return True
def addBlock(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Block the given account
"""
if ':' in domain:
domain = domain.split(':')[0]
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
blockHandle = blockNickname + '@' + blockDomain
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
return False
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHandle + '\n')
blockFile.close()
return True
def removeGlobalBlock(baseDir: str,
unblockNickname: str,
unblockDomain: str) -> bool:
"""Unblock the given global block
"""
unblockingFilename = baseDir + '/accounts/blocking.txt'
if not unblockNickname.startswith('#'):
unblockHandle = unblockNickname + '@' + unblockDomain
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename + '.new', 'w') as fpnew:
for line in fp:
handle = line.replace('\n', '')
if unblockHandle not in line:
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
else:
unblockHashtag = unblockNickname
if os.path.isfile(unblockingFilename):
if unblockHashtag + '\n' in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename + '.new', 'w') as fpnew:
for line in fp:
blockLine = line.replace('\n', '')
if unblockHashtag not in line:
fpnew.write(blockLine + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
return False
def removeBlock(baseDir: str, nickname: str, domain: str,
unblockNickname: str, unblockDomain: str) -> bool:
"""Unblock the given account
"""
if ':' in domain:
domain = domain.split(':')[0]
unblockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
unblockHandle = unblockNickname + '@' + unblockDomain
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename + '.new', 'w') as fpnew:
for line in fp:
handle = line.replace('\n', '')
if unblockHandle not in line:
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
return False
def isBlockedHashtag(baseDir: str, hashtag: str) -> bool:
"""Is the given hashtag blocked?
"""
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
hashtag = hashtag.strip('\n')
if hashtag + '\n' in open(globalBlockingFilename).read():
return True
return False
def getDomainBlocklist(baseDir: str) -> str:
"""Returns all globally blocked domains as a string
This can be used for fast matching to mitigate flooding
"""
blockedStr = ''
evilDomains = evilIncarnate()
for evil in evilDomains:
blockedStr += evil + '\n'
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if not os.path.isfile(globalBlockingFilename):
return blockedStr
with open(globalBlockingFilename, 'r') as file:
blockedStr += file.read()
return blockedStr
def isBlockedDomain(baseDir: str, domain: str) -> bool:
"""Is the given domain blocked?
"""
if isEvil(domain):
return True
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
if '*@' + domain in open(globalBlockingFilename).read():
return True
return False
def isBlocked(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Is the given nickname blocked?
"""
if isEvil(blockDomain):
return True
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
if '*@' + blockDomain in open(globalBlockingFilename).read():
return True
if blockNickname:
blockHandle = blockNickname + '@' + blockDomain
if blockHandle in open(globalBlockingFilename).read():
return True
allowFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/allowedinstances.txt'
if os.path.isfile(allowFilename):
if blockDomain not in open(allowFilename).read():
return True
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
if os.path.isfile(blockingFilename):
if '*@' + blockDomain in open(blockingFilename).read():
return True
if blockNickname:
blockHandle = blockNickname + '@' + blockDomain
if blockHandle in open(blockingFilename).read():
return True
return False
def outboxBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When a block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: block - no type')
return
if not messageJson['type'] == 'Block':
if debug:
print('DEBUG: not a block')
return
if not messageJson.get('object'):
if debug:
print('DEBUG: no object in block')
return
if not isinstance(messageJson['object'], str):
if debug:
print('DEBUG: block object is not string')
return
if debug:
print('DEBUG: c2s block request arrived in outbox')
messageId = messageJson['object'].replace('/activity', '')
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s block object is not a status')
return
if '/users/' not in messageId and \
'/channel/' not in messageId and \
'/profile/' not in messageId:
if debug:
print('DEBUG: c2s block object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s block post not found in inbox or outbox')
print(messageId)
return
nicknameBlocked = getNicknameFromActor(messageJson['object'])
if not nicknameBlocked:
print('WARN: unable to find nickname in ' + messageJson['object'])
return
domainBlocked, portBlocked = getDomainFromActor(messageJson['object'])
domainBlockedFull = domainBlocked
if portBlocked:
if portBlocked != 80 and portBlocked != 443:
if ':' not in domainBlocked:
domainBlockedFull = domainBlocked + ':' + str(portBlocked)
addBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post blocked via c2s - ' + postFilename)
def outboxUndoBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When an undo block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: undo block - no type')
return
if not messageJson['type'] == 'Undo':
if debug:
print('DEBUG: not an undo block')
return
if not messageJson.get('object'):
if debug:
print('DEBUG: no object in undo block')
return
if not isinstance(messageJson['object'], dict):
if debug:
print('DEBUG: undo block object is not string')
return
if not messageJson['object'].get('type'):
if debug:
print('DEBUG: undo block - no type')
return
if not messageJson['object']['type'] == 'Block':
if debug:
print('DEBUG: not an undo block')
return
if not messageJson['object'].get('object'):
if debug:
print('DEBUG: no object in undo block')
return
if not isinstance(messageJson['object']['object'], str):
if debug:
print('DEBUG: undo block object is not string')
return
if debug:
print('DEBUG: c2s undo block request arrived in outbox')
messageId = messageJson['object']['object'].replace('/activity', '')
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s undo block object is not a status')
return
if '/users/' not in messageId and \
'/channel/' not in messageId and \
'/profile/' not in messageId:
if debug:
print('DEBUG: c2s undo block object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s undo block post not found in inbox or outbox')
print(messageId)
return
nicknameBlocked = getNicknameFromActor(messageJson['object']['object'])
if not nicknameBlocked:
print('WARN: unable to find nickname in ' +
messageJson['object']['object'])
return
domainObject = messageJson['object']['object']
domainBlocked, portBlocked = getDomainFromActor(domainObject)
domainBlockedFull = domainBlocked
if portBlocked:
if portBlocked != 80 and portBlocked != 443:
if ':' not in domainBlocked:
domainBlockedFull = domainBlocked + ':' + str(portBlocked)
removeBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post undo blocked via c2s - ' + postFilename)