epicyon/blocking.py

722 lines
27 KiB
Python
Raw Normal View History

2020-04-01 20:06:27 +00:00
__filename__ = "blocking.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
2021-01-26 10:07:42 +00:00
__version__ = "1.2.0"
2020-04-01 20:06:27 +00:00
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
2021-06-15 15:08:12 +00:00
__module_group__ = "ActivityPub"
2019-07-14 19:27:13 +00:00
import os
2021-03-20 21:20:41 +00:00
import json
2021-02-15 22:26:25 +00:00
from datetime import datetime
from utils import isAccountDir
2021-03-20 21:20:41 +00:00
from utils import getCachedPostFilename
from utils import loadJson
from utils import saveJson
2021-02-15 22:26:25 +00:00
from utils import fileLastModified
2021-02-15 22:06:53 +00:00
from utils import setConfigParam
2020-12-23 10:57:44 +00:00
from utils import hasUsersPath
2020-12-16 10:30:54 +00:00
from utils import getFullDomain
2020-08-23 11:13:35 +00:00
from utils import removeIdEnding
2019-09-09 15:53:23 +00:00
from utils import isEvil
2020-04-01 20:06:27 +00:00
from utils import locatePost
2020-03-28 10:33:04 +00:00
from utils import evilIncarnate
2020-04-01 20:06:27 +00:00
from utils import getDomainFromActor
from utils import getNicknameFromActor
2019-07-14 19:27:13 +00:00
2020-04-01 20:06:27 +00:00
def addGlobalBlock(baseDir: str,
blockNickname: str, blockDomain: str) -> bool:
"""Global block which applies to all accounts
"""
2020-04-01 20:06:27 +00:00
blockingFilename = baseDir + '/accounts/blocking.txt'
2020-03-22 21:16:02 +00:00
if not blockNickname.startswith('#'):
2020-09-05 09:41:09 +00:00
# is the handle already blocked?
2020-09-05 09:42:52 +00:00
blockHandle = blockNickname + '@' + blockDomain
2019-08-14 10:32:15 +00:00
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
return False
2020-09-05 09:41:09 +00:00
# block an account handle or domain
2020-04-01 20:06:27 +00:00
blockFile = open(blockingFilename, "a+")
2020-10-19 19:26:58 +00:00
if blockFile:
blockFile.write(blockHandle + '\n')
blockFile.close()
2019-08-14 10:32:15 +00:00
else:
2020-04-01 20:06:27 +00:00
blockHashtag = blockNickname
2020-09-05 09:41:09 +00:00
# is the hashtag already blocked?
2019-08-14 10:32:15 +00:00
if os.path.isfile(blockingFilename):
2020-04-01 20:06:27 +00:00
if blockHashtag + '\n' in open(blockingFilename).read():
2019-08-14 10:32:15 +00:00
return False
2020-09-05 09:41:09 +00:00
# block a hashtag
2020-04-01 20:06:27 +00:00
blockFile = open(blockingFilename, "a+")
2020-10-19 19:26:58 +00:00
if blockFile:
blockFile.write(blockHashtag + '\n')
blockFile.close()
return True
2020-04-01 20:06:27 +00:00
def addBlock(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
2019-07-14 19:27:13 +00:00
"""Block the given account
"""
2019-07-17 21:40:56 +00:00
if ':' in domain:
2020-04-01 20:06:27 +00:00
domain = domain.split(':')[0]
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
blockHandle = blockNickname + '@' + blockDomain
2019-07-14 19:27:13 +00:00
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
2019-07-14 19:57:05 +00:00
return False
2020-04-01 20:06:27 +00:00
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHandle + '\n')
2019-07-14 19:27:13 +00:00
blockFile.close()
2019-07-14 19:57:05 +00:00
return True
2019-07-14 19:27:13 +00:00
2020-04-01 20:06:27 +00:00
def removeGlobalBlock(baseDir: str,
unblockNickname: str,
unblockDomain: str) -> bool:
"""Unblock the given global block
"""
2020-04-01 20:06:27 +00:00
unblockingFilename = baseDir + '/accounts/blocking.txt'
2020-03-22 21:16:02 +00:00
if not unblockNickname.startswith('#'):
2020-04-01 20:06:27 +00:00
unblockHandle = unblockNickname + '@' + unblockDomain
2019-08-14 10:32:15 +00:00
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
2020-07-12 20:04:58 +00:00
with open(unblockingFilename + '.new', 'w+') as fpnew:
2019-08-14 10:32:15 +00:00
for line in fp:
2020-05-22 11:32:38 +00:00
handle = line.replace('\n', '').replace('\r', '')
2019-08-14 10:32:15 +00:00
if unblockHandle not in line:
2020-04-01 20:06:27 +00:00
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
2019-08-14 10:32:15 +00:00
return True
else:
2020-04-01 20:06:27 +00:00
unblockHashtag = unblockNickname
2019-08-14 10:32:15 +00:00
if os.path.isfile(unblockingFilename):
2020-04-01 20:06:27 +00:00
if unblockHashtag + '\n' in open(unblockingFilename).read():
2019-08-14 10:32:15 +00:00
with open(unblockingFilename, 'r') as fp:
2020-07-12 20:04:58 +00:00
with open(unblockingFilename + '.new', 'w+') as fpnew:
2019-08-14 10:32:15 +00:00
for line in fp:
2020-05-22 11:32:38 +00:00
blockLine = \
line.replace('\n', '').replace('\r', '')
2019-08-14 10:32:15 +00:00
if unblockHashtag not in line:
2020-04-01 20:06:27 +00:00
fpnew.write(blockLine + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
2019-08-14 10:32:15 +00:00
return True
return False
2020-04-01 20:06:27 +00:00
def removeBlock(baseDir: str, nickname: str, domain: str,
unblockNickname: str, unblockDomain: str) -> bool:
2019-07-14 19:27:13 +00:00
"""Unblock the given account
"""
2019-07-17 21:40:56 +00:00
if ':' in domain:
2020-04-01 20:06:27 +00:00
domain = domain.split(':')[0]
unblockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
unblockHandle = unblockNickname + '@' + unblockDomain
2019-07-14 19:27:13 +00:00
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
2020-07-12 20:04:58 +00:00
with open(unblockingFilename + '.new', 'w+') as fpnew:
2019-07-14 19:27:13 +00:00
for line in fp:
2020-05-22 11:32:38 +00:00
handle = line.replace('\n', '').replace('\r', '')
2019-07-14 19:27:13 +00:00
if unblockHandle not in line:
2020-04-01 20:06:27 +00:00
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
2019-07-14 19:57:05 +00:00
return True
return False
2019-08-14 10:32:15 +00:00
2020-04-01 20:06:27 +00:00
def isBlockedHashtag(baseDir: str, hashtag: str) -> bool:
2019-08-14 10:32:15 +00:00
"""Is the given hashtag blocked?
"""
2020-08-07 20:40:53 +00:00
# avoid very long hashtags
if len(hashtag) > 32:
return True
2020-04-01 20:06:27 +00:00
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
2019-08-14 10:32:15 +00:00
if os.path.isfile(globalBlockingFilename):
2020-05-22 11:32:38 +00:00
hashtag = hashtag.strip('\n').strip('\r')
if not hashtag.startswith('#'):
hashtag = '#' + hashtag
2020-04-01 20:06:27 +00:00
if hashtag + '\n' in open(globalBlockingFilename).read():
2019-08-14 10:32:15 +00:00
return True
return False
2020-04-01 20:06:27 +00:00
2020-03-28 10:33:04 +00:00
def getDomainBlocklist(baseDir: str) -> str:
"""Returns all globally blocked domains as a string
This can be used for fast matching to mitigate flooding
"""
2020-04-01 20:06:27 +00:00
blockedStr = ''
2020-03-28 10:33:04 +00:00
2020-04-01 20:06:27 +00:00
evilDomains = evilIncarnate()
2020-03-28 10:33:04 +00:00
for evil in evilDomains:
2020-04-01 20:06:27 +00:00
blockedStr += evil + '\n'
2020-03-28 10:33:04 +00:00
2020-04-01 20:06:27 +00:00
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
2020-03-28 10:33:04 +00:00
if not os.path.isfile(globalBlockingFilename):
return blockedStr
2020-10-29 10:36:38 +00:00
with open(globalBlockingFilename, 'r') as fpBlocked:
blockedStr += fpBlocked.read()
2020-03-28 10:33:04 +00:00
return blockedStr
2020-04-01 20:06:27 +00:00
def isBlockedDomain(baseDir: str, domain: str) -> bool:
"""Is the given domain blocked?
"""
2020-10-29 10:36:38 +00:00
if '.' not in domain:
return False
if isEvil(domain):
return True
2020-10-29 10:36:38 +00:00
# by checking a shorter version we can thwart adversaries
# who constantly change their subdomain
sections = domain.split('.')
noOfSections = len(sections)
shortDomain = None
if noOfSections > 2:
shortDomain = domain[noOfSections-2] + '.' + domain[noOfSections-1]
2021-02-15 21:14:05 +00:00
allowFilename = baseDir + '/accounts/allowedinstances.txt'
if not os.path.isfile(allowFilename):
# instance block list
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
with open(globalBlockingFilename, 'r') as fpBlocked:
blockedStr = fpBlocked.read()
if '*@' + domain in blockedStr:
2020-10-29 10:36:38 +00:00
return True
2021-02-15 21:14:05 +00:00
if shortDomain:
if '*@' + shortDomain in blockedStr:
return True
else:
# instance allow list
if not shortDomain:
if domain not in open(allowFilename).read():
return True
else:
if shortDomain not in open(allowFilename).read():
return True
return False
2020-04-01 20:06:27 +00:00
def isBlocked(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
2019-07-14 19:27:13 +00:00
"""Is the given nickname blocked?
"""
2019-09-09 15:53:23 +00:00
if isEvil(blockDomain):
return True
2020-04-01 20:06:27 +00:00
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
2020-04-01 20:06:27 +00:00
if '*@' + blockDomain in open(globalBlockingFilename).read():
return True
2020-02-05 17:41:24 +00:00
if blockNickname:
2020-04-01 20:06:27 +00:00
blockHandle = blockNickname + '@' + blockDomain
2020-02-05 17:39:41 +00:00
if blockHandle in open(globalBlockingFilename).read():
return True
2020-04-01 20:06:27 +00:00
allowFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/allowedinstances.txt'
if os.path.isfile(allowFilename):
if blockDomain not in open(allowFilename).read():
return True
2020-04-01 20:06:27 +00:00
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
2019-07-14 19:27:13 +00:00
if os.path.isfile(blockingFilename):
2020-04-01 20:06:27 +00:00
if '*@' + blockDomain in open(blockingFilename).read():
2019-08-02 11:46:42 +00:00
return True
2020-02-05 17:41:24 +00:00
if blockNickname:
2020-04-01 20:06:27 +00:00
blockHandle = blockNickname + '@' + blockDomain
2020-02-05 17:39:41 +00:00
if blockHandle in open(blockingFilename).read():
return True
2019-07-14 19:27:13 +00:00
return False
2020-04-01 20:06:27 +00:00
def outboxBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
2019-07-17 21:40:56 +00:00
""" When a block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: block - no type')
return
2020-04-01 20:06:27 +00:00
if not messageJson['type'] == 'Block':
2019-07-17 21:40:56 +00:00
if debug:
print('DEBUG: not a block')
return
if not messageJson.get('object'):
if debug:
print('DEBUG: no object in block')
return
if not isinstance(messageJson['object'], str):
if debug:
print('DEBUG: block object is not string')
return
if debug:
print('DEBUG: c2s block request arrived in outbox')
2020-08-23 11:13:35 +00:00
messageId = removeIdEnding(messageJson['object'])
2019-07-17 21:40:56 +00:00
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s block object is not a status')
return
2020-12-23 10:57:44 +00:00
if not hasUsersPath(messageId):
2019-07-17 21:40:56 +00:00
if debug:
print('DEBUG: c2s block object has no nickname')
return
if ':' in domain:
2020-04-01 20:06:27 +00:00
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
2019-07-17 21:40:56 +00:00
if not postFilename:
if debug:
print('DEBUG: c2s block post not found in inbox or outbox')
print(messageId)
2019-09-02 09:43:43 +00:00
return
2020-04-01 20:06:27 +00:00
nicknameBlocked = getNicknameFromActor(messageJson['object'])
2019-09-02 09:43:43 +00:00
if not nicknameBlocked:
2020-04-01 20:06:27 +00:00
print('WARN: unable to find nickname in ' + messageJson['object'])
2019-09-02 09:43:43 +00:00
return
2020-04-01 20:06:27 +00:00
domainBlocked, portBlocked = getDomainFromActor(messageJson['object'])
2020-12-16 10:30:54 +00:00
domainBlockedFull = getFullDomain(domainBlocked, portBlocked)
2019-07-17 21:40:56 +00:00
2020-04-01 20:06:27 +00:00
addBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
2020-03-22 20:59:01 +00:00
2019-07-17 21:40:56 +00:00
if debug:
2020-04-01 20:06:27 +00:00
print('DEBUG: post blocked via c2s - ' + postFilename)
2019-07-17 21:40:56 +00:00
2020-04-01 20:06:27 +00:00
def outboxUndoBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
2019-07-17 21:40:56 +00:00
""" When an undo block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: undo block - no type')
return
2020-04-01 20:06:27 +00:00
if not messageJson['type'] == 'Undo':
2019-07-17 21:40:56 +00:00
if debug:
print('DEBUG: not an undo block')
return
if not messageJson.get('object'):
if debug:
print('DEBUG: no object in undo block')
return
if not isinstance(messageJson['object'], dict):
if debug:
print('DEBUG: undo block object is not string')
return
if not messageJson['object'].get('type'):
if debug:
print('DEBUG: undo block - no type')
return
2020-04-01 20:06:27 +00:00
if not messageJson['object']['type'] == 'Block':
2019-07-17 21:40:56 +00:00
if debug:
print('DEBUG: not an undo block')
return
if not messageJson['object'].get('object'):
if debug:
print('DEBUG: no object in undo block')
return
if not isinstance(messageJson['object']['object'], str):
if debug:
print('DEBUG: undo block object is not string')
return
if debug:
print('DEBUG: c2s undo block request arrived in outbox')
2020-08-23 11:13:35 +00:00
messageId = removeIdEnding(messageJson['object']['object'])
2019-07-17 21:40:56 +00:00
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s undo block object is not a status')
return
2020-12-23 10:57:44 +00:00
if not hasUsersPath(messageId):
2019-07-17 21:40:56 +00:00
if debug:
print('DEBUG: c2s undo block object has no nickname')
return
if ':' in domain:
2020-04-01 20:06:27 +00:00
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
2019-07-17 21:40:56 +00:00
if not postFilename:
if debug:
print('DEBUG: c2s undo block post not found in inbox or outbox')
print(messageId)
2019-09-02 09:43:43 +00:00
return
2020-04-01 20:06:27 +00:00
nicknameBlocked = getNicknameFromActor(messageJson['object']['object'])
2019-09-02 09:43:43 +00:00
if not nicknameBlocked:
2020-04-01 20:06:27 +00:00
print('WARN: unable to find nickname in ' +
2020-03-30 19:09:45 +00:00
messageJson['object']['object'])
2019-09-02 09:43:43 +00:00
return
2020-04-01 20:06:27 +00:00
domainObject = messageJson['object']['object']
domainBlocked, portBlocked = getDomainFromActor(domainObject)
2020-12-16 10:30:54 +00:00
domainBlockedFull = getFullDomain(domainBlocked, portBlocked)
2019-07-17 21:40:56 +00:00
2020-04-01 20:06:27 +00:00
removeBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
2019-07-17 21:40:56 +00:00
if debug:
2020-04-01 20:06:27 +00:00
print('DEBUG: post undo blocked via c2s - ' + postFilename)
2021-02-15 22:06:53 +00:00
def mutePost(baseDir: str, nickname: str, domain: str, port: int,
httpPrefix: str, postId: str, recentPostsCache: {},
debug: bool) -> None:
2021-03-20 21:20:41 +00:00
""" Mutes the given post
"""
postFilename = locatePost(baseDir, nickname, domain, postId)
if not postFilename:
return
postJsonObject = loadJson(postFilename)
if not postJsonObject:
return
if postJsonObject.get('object'):
if isinstance(postJsonObject['object'], dict):
domainFull = getFullDomain(domain, port)
actor = httpPrefix + '://' + domainFull + '/users/' + nickname
# does this post have ignores on it from differenent actors?
if not postJsonObject['object'].get('ignores'):
if debug:
print('DEBUG: Adding initial mute to ' + postId)
ignoresJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'id': postId,
'type': 'Collection',
"totalItems": 1,
'items': [{
'type': 'Ignore',
'actor': actor
}]
}
postJsonObject['object']['ignores'] = ignoresJson
else:
if not postJsonObject['object']['ignores'].get('items'):
postJsonObject['object']['ignores']['items'] = []
itemsList = postJsonObject['object']['ignores']['items']
for ignoresItem in itemsList:
if ignoresItem.get('actor'):
if ignoresItem['actor'] == actor:
return
newIgnore = {
'type': 'Ignore',
'actor': actor
}
igIt = len(itemsList)
itemsList.append(newIgnore)
postJsonObject['object']['ignores']['totalItems'] = igIt
saveJson(postJsonObject, postFilename)
2021-03-20 21:20:41 +00:00
# remove cached post so that the muted version gets recreated
# without its content text and/or image
cachedPostFilename = \
getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
os.remove(cachedPostFilename)
muteFile = open(postFilename + '.muted', 'w+')
if muteFile:
muteFile.write('\n')
muteFile.close()
print('MUTE: ' + postFilename + '.muted file added')
# if the post is in the recent posts cache then mark it as muted
if recentPostsCache.get('index'):
postId = \
removeIdEnding(postJsonObject['id']).replace('/', '#')
if postId in recentPostsCache['index']:
print('MUTE: ' + postId + ' is in recent posts cache')
if recentPostsCache['json'].get(postId):
postJsonObject['muted'] = True
recentPostsCache['json'][postId] = json.dumps(postJsonObject)
if recentPostsCache.get('html'):
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
print('MUTE: ' + postId +
' marked as muted in recent posts memory cache')
def unmutePost(baseDir: str, nickname: str, domain: str, port: int,
httpPrefix: str, postId: str, recentPostsCache: {},
debug: bool) -> None:
2021-03-20 21:20:41 +00:00
""" Unmutes the given post
"""
postFilename = locatePost(baseDir, nickname, domain, postId)
if not postFilename:
return
postJsonObject = loadJson(postFilename)
if not postJsonObject:
return
muteFilename = postFilename + '.muted'
if os.path.isfile(muteFilename):
os.remove(muteFilename)
print('UNMUTE: ' + muteFilename + ' file removed')
if postJsonObject.get('object'):
if isinstance(postJsonObject['object'], dict):
if postJsonObject['object'].get('ignores'):
domainFull = getFullDomain(domain, port)
actor = httpPrefix + '://' + domainFull + '/users/' + nickname
totalItems = 0
if postJsonObject['object']['ignores'].get('totalItems'):
totalItems = \
postJsonObject['object']['ignores']['totalItems']
itemsList = postJsonObject['object']['ignores']['items']
for ignoresItem in itemsList:
if ignoresItem.get('actor'):
if ignoresItem['actor'] == actor:
if debug:
print('DEBUG: mute was removed for ' + actor)
itemsList.remove(ignoresItem)
break
if totalItems == 1:
if debug:
print('DEBUG: mute was removed from post')
del postJsonObject['object']['ignores']
else:
igItLen = len(postJsonObject['object']['ignores']['items'])
postJsonObject['object']['ignores']['totalItems'] = igItLen
saveJson(postJsonObject, postFilename)
2021-03-20 21:20:41 +00:00
# remove cached post so that the muted version gets recreated
# with its content text and/or image
cachedPostFilename = \
getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
os.remove(cachedPostFilename)
# if the post is in the recent posts cache then mark it as unmuted
if recentPostsCache.get('index'):
postId = \
removeIdEnding(postJsonObject['id']).replace('/', '#')
if postId in recentPostsCache['index']:
print('UNMUTE: ' + postId + ' is in recent posts cache')
if recentPostsCache['json'].get(postId):
postJsonObject['muted'] = False
recentPostsCache['json'][postId] = json.dumps(postJsonObject)
if recentPostsCache.get('html'):
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
print('UNMUTE: ' + postId +
' marked as unmuted in recent posts cache')
def outboxMute(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool,
recentPostsCache: {}) -> None:
"""When a mute is received by the outbox from c2s
"""
if not messageJson.get('type'):
return
if not messageJson.get('actor'):
return
domainFull = getFullDomain(domain, port)
if not messageJson['actor'].endswith(domainFull + '/users/' + nickname):
return
if not messageJson['type'] == 'Ignore':
return
if not messageJson.get('object'):
if debug:
print('DEBUG: no object in mute')
return
if not isinstance(messageJson['object'], str):
if debug:
print('DEBUG: mute object is not string')
return
if debug:
print('DEBUG: c2s mute request arrived in outbox')
messageId = removeIdEnding(messageJson['object'])
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s mute object is not a status')
return
if not hasUsersPath(messageId):
if debug:
print('DEBUG: c2s mute object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s mute post not found in inbox or outbox')
print(messageId)
return
nicknameMuted = getNicknameFromActor(messageJson['object'])
if not nicknameMuted:
print('WARN: unable to find nickname in ' + messageJson['object'])
return
mutePost(baseDir, nickname, domain, port,
httpPrefix, messageJson['object'], recentPostsCache,
debug)
2021-03-20 21:20:41 +00:00
if debug:
print('DEBUG: post muted via c2s - ' + postFilename)
def outboxUndoMute(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool,
recentPostsCache: {}) -> None:
"""When an undo mute is received by the outbox from c2s
"""
if not messageJson.get('type'):
return
if not messageJson.get('actor'):
return
domainFull = getFullDomain(domain, port)
if not messageJson['actor'].endswith(domainFull + '/users/' + nickname):
return
if not messageJson['type'] == 'Undo':
return
if not messageJson.get('object'):
return
if not isinstance(messageJson['object'], dict):
return
if not messageJson['object'].get('type'):
return
if messageJson['object']['type'] != 'Ignore':
return
if not isinstance(messageJson['object']['object'], str):
if debug:
print('DEBUG: undo mute object is not a string')
return
if debug:
print('DEBUG: c2s undo mute request arrived in outbox')
messageId = removeIdEnding(messageJson['object']['object'])
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s undo mute object is not a status')
return
if not hasUsersPath(messageId):
if debug:
print('DEBUG: c2s undo mute object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s undo mute post not found in inbox or outbox')
print(messageId)
return
nicknameMuted = getNicknameFromActor(messageJson['object']['object'])
if not nicknameMuted:
print('WARN: unable to find nickname in ' +
messageJson['object']['object'])
return
unmutePost(baseDir, nickname, domain, port,
httpPrefix, messageJson['object']['object'],
recentPostsCache, debug)
2021-03-20 21:20:41 +00:00
if debug:
print('DEBUG: post undo mute via c2s - ' + postFilename)
def brochModeIsActive(baseDir: str) -> bool:
"""Returns true if broch mode is active
"""
allowFilename = baseDir + '/accounts/allowedinstances.txt'
return os.path.isfile(allowFilename)
2021-02-15 22:06:53 +00:00
def setBrochMode(baseDir: str, domainFull: str, enabled: bool) -> None:
"""Broch mode can be used to lock down the instance during
a period of time when it is temporarily under attack.
For example, where an adversary is constantly spinning up new
instances.
It surveys the following lists of all accounts and uses that
to construct an instance level allow list. Anything arriving
which is then not from one of the allowed domains will be dropped
"""
allowFilename = baseDir + '/accounts/allowedinstances.txt'
if not enabled:
# remove instance allow list
if os.path.isfile(allowFilename):
os.remove(allowFilename)
2021-02-15 22:26:25 +00:00
print('Broch mode turned off')
2021-02-15 22:06:53 +00:00
else:
2021-02-16 09:50:50 +00:00
if os.path.isfile(allowFilename):
lastModified = fileLastModified(allowFilename)
print('Broch mode already activated ' + lastModified)
return
2021-02-15 22:06:53 +00:00
# generate instance allow list
allowedDomains = [domainFull]
followFiles = ('following.txt', 'followers.txt')
2021-02-15 22:06:53 +00:00
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if not isAccountDir(acct):
2021-02-15 22:06:53 +00:00
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
for followFileType in followFiles:
followingFilename = accountDir + '/' + followFileType
if not os.path.isfile(followingFilename):
continue
with open(followingFilename, "r") as f:
followList = f.readlines()
for handle in followList:
if '@' not in handle:
continue
handle = handle.replace('\n', '')
handleDomain = handle.split('@')[1]
if handleDomain not in allowedDomains:
allowedDomains.append(handleDomain)
2021-02-15 22:06:53 +00:00
break
# write the allow file
allowFile = open(allowFilename, "w+")
if allowFile:
allowFile.write(domainFull + '\n')
for d in allowedDomains:
allowFile.write(d + '\n')
allowFile.close()
2021-02-15 22:26:25 +00:00
print('Broch mode enabled')
2021-02-15 22:06:53 +00:00
setConfigParam(baseDir, "brochMode", enabled)
2021-02-15 22:26:25 +00:00
2021-02-15 23:01:07 +00:00
def brochModeLapses(baseDir: str, lapseDays=7) -> bool:
2021-02-15 22:26:25 +00:00
"""After broch mode is enabled it automatically
elapses after a period of time
"""
allowFilename = baseDir + '/accounts/allowedinstances.txt'
if not os.path.isfile(allowFilename):
2021-02-15 23:01:07 +00:00
return False
2021-02-15 22:26:25 +00:00
lastModified = fileLastModified(allowFilename)
modifiedDate = None
try:
modifiedDate = \
datetime.strptime(lastModified, "%Y-%m-%dT%H:%M:%SZ")
except BaseException:
2021-06-05 13:38:57 +00:00
return False
2021-02-15 22:26:25 +00:00
if not modifiedDate:
2021-06-05 13:38:57 +00:00
return False
2021-02-15 22:26:25 +00:00
currTime = datetime.datetime.utcnow()
daysSinceBroch = (currTime - modifiedDate).days
if daysSinceBroch >= lapseDays:
try:
os.remove(allowFilename)
2021-06-05 13:42:18 +00:00
setConfigParam(baseDir, "brochMode", False)
2021-02-15 22:26:25 +00:00
print('Broch mode has elapsed')
2021-06-05 13:38:57 +00:00
return True
2021-02-15 22:26:25 +00:00
except BaseException:
pass
2021-06-05 13:38:57 +00:00
return False