flake8 style

main
Bob Mottram 2020-04-01 20:06:27 +00:00
parent 2c6169fa64
commit 8dd64c187f
1 changed files with 200 additions and 170 deletions

View File

@ -1,201 +1,219 @@
__filename__="blocking.py"
__author__="Bob Mottram"
__license__="AGPL3+"
__version__="1.1.0"
__maintainer__="Bob Mottram"
__email__="bob@freedombone.net"
__status__="Production"
__filename__ = "blocking.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from utils import isEvil
from utils import locatePost
from utils import evilIncarnate
from utils import getDomainFromActor
from utils import getNicknameFromActor
from session import postJson
from auth import createBasicAuthHeader
from posts import getPersonBox
from webfinger import webfingerHandle
def addGlobalBlock(baseDir: str, \
blockNickname: str,blockDomain: str) -> bool:
def addGlobalBlock(baseDir: str,
blockNickname: str, blockDomain: str) -> bool:
"""Global block which applies to all accounts
"""
blockingFilename=baseDir+'/accounts/blocking.txt'
blockingFilename = baseDir + '/accounts/blocking.txt'
if not blockNickname.startswith('#'):
blockHandle=blockNickname+'@'+blockDomain
blockHandle = blockNickname + '@' + blockDomain
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
return False
blockFile=open(blockingFilename, "a+")
blockFile.write(blockHandle+'\n')
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHandle + '\n')
blockFile.close()
else:
blockHashtag=blockNickname
blockHashtag = blockNickname
if os.path.isfile(blockingFilename):
if blockHashtag+'\n' in open(blockingFilename).read():
if blockHashtag + '\n' in open(blockingFilename).read():
return False
blockFile=open(blockingFilename, "a+")
blockFile.write(blockHashtag+'\n')
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHashtag + '\n')
blockFile.close()
return True
def addBlock(baseDir: str,nickname: str,domain: str, \
blockNickname: str,blockDomain: str) -> bool:
def addBlock(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Block the given account
"""
if ':' in domain:
domain=domain.split(':')[0]
blockingFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt'
blockHandle=blockNickname+'@'+blockDomain
domain = domain.split(':')[0]
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
blockHandle = blockNickname + '@' + blockDomain
if os.path.isfile(blockingFilename):
if blockHandle in open(blockingFilename).read():
return False
blockFile=open(blockingFilename, "a+")
blockFile.write(blockHandle+'\n')
blockFile = open(blockingFilename, "a+")
blockFile.write(blockHandle + '\n')
blockFile.close()
return True
def removeGlobalBlock(baseDir: str, \
unblockNickname: str, \
def removeGlobalBlock(baseDir: str,
unblockNickname: str,
unblockDomain: str) -> bool:
"""Unblock the given global block
"""
unblockingFilename=baseDir+'/accounts/blocking.txt'
unblockingFilename = baseDir + '/accounts/blocking.txt'
if not unblockNickname.startswith('#'):
unblockHandle=unblockNickname+'@'+unblockDomain
unblockHandle = unblockNickname + '@' + unblockDomain
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename+'.new', 'w') as fpnew:
with open(unblockingFilename + '.new', 'w') as fpnew:
for line in fp:
handle=line.replace('\n','')
handle = line.replace('\n', '')
if unblockHandle not in line:
fpnew.write(handle+'\n')
if os.path.isfile(unblockingFilename+'.new'):
os.rename(unblockingFilename+'.new',unblockingFilename)
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
else:
unblockHashtag=unblockNickname
unblockHashtag = unblockNickname
if os.path.isfile(unblockingFilename):
if unblockHashtag+'\n' in open(unblockingFilename).read():
if unblockHashtag + '\n' in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename+'.new', 'w') as fpnew:
with open(unblockingFilename + '.new', 'w') as fpnew:
for line in fp:
blockLine=line.replace('\n','')
blockLine = line.replace('\n', '')
if unblockHashtag not in line:
fpnew.write(blockLine+'\n')
if os.path.isfile(unblockingFilename+'.new'):
os.rename(unblockingFilename+'.new',unblockingFilename)
fpnew.write(blockLine + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
return False
def removeBlock(baseDir: str,nickname: str,domain: str, \
unblockNickname: str,unblockDomain: str) -> bool:
def removeBlock(baseDir: str, nickname: str, domain: str,
unblockNickname: str, unblockDomain: str) -> bool:
"""Unblock the given account
"""
if ':' in domain:
domain=domain.split(':')[0]
unblockingFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt'
unblockHandle=unblockNickname+'@'+unblockDomain
domain = domain.split(':')[0]
unblockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
unblockHandle = unblockNickname + '@' + unblockDomain
if os.path.isfile(unblockingFilename):
if unblockHandle in open(unblockingFilename).read():
with open(unblockingFilename, 'r') as fp:
with open(unblockingFilename+'.new', 'w') as fpnew:
with open(unblockingFilename + '.new', 'w') as fpnew:
for line in fp:
handle=line.replace('\n','')
handle = line.replace('\n', '')
if unblockHandle not in line:
fpnew.write(handle+'\n')
if os.path.isfile(unblockingFilename+'.new'):
os.rename(unblockingFilename+'.new',unblockingFilename)
fpnew.write(handle + '\n')
if os.path.isfile(unblockingFilename + '.new'):
os.rename(unblockingFilename + '.new', unblockingFilename)
return True
return False
def isBlockedHashtag(baseDir: str,hashtag: str) -> bool:
def isBlockedHashtag(baseDir: str, hashtag: str) -> bool:
"""Is the given hashtag blocked?
"""
globalBlockingFilename=baseDir+'/accounts/blocking.txt'
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
hashtag=hashtag.strip('\n')
if hashtag+'\n' in open(globalBlockingFilename).read():
hashtag = hashtag.strip('\n')
if hashtag + '\n' in open(globalBlockingFilename).read():
return True
return False
def getDomainBlocklist(baseDir: str) -> str:
"""Returns all globally blocked domains as a string
This can be used for fast matching to mitigate flooding
"""
blockedStr=''
blockedStr = ''
evilDomains=evilIncarnate()
evilDomains = evilIncarnate()
for evil in evilDomains:
blockedStr+=evil+'\n'
blockedStr += evil + '\n'
globalBlockingFilename=baseDir+'/accounts/blocking.txt'
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if not os.path.isfile(globalBlockingFilename):
return blockedStr
with open(globalBlockingFilename, 'r') as file:
blockedStr += file.read()
return blockedStr
def isBlockedDomain(baseDir: str,domain: str) -> bool:
def isBlockedDomain(baseDir: str, domain: str) -> bool:
"""Is the given domain blocked?
"""
if isEvil(domain):
return True
globalBlockingFilename=baseDir+'/accounts/blocking.txt'
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
if '*@'+domain in open(globalBlockingFilename).read():
if '*@' + domain in open(globalBlockingFilename).read():
return True
return False
def isBlocked(baseDir: str,nickname: str,domain: str, \
blockNickname: str,blockDomain: str) -> bool:
def isBlocked(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Is the given nickname blocked?
"""
if isEvil(blockDomain):
return True
globalBlockingFilename=baseDir+'/accounts/blocking.txt'
globalBlockingFilename = baseDir + '/accounts/blocking.txt'
if os.path.isfile(globalBlockingFilename):
if '*@'+blockDomain in open(globalBlockingFilename).read():
if '*@' + blockDomain in open(globalBlockingFilename).read():
return True
if blockNickname:
blockHandle=blockNickname+'@'+blockDomain
blockHandle = blockNickname + '@' + blockDomain
if blockHandle in open(globalBlockingFilename).read():
return True
allowFilename= \
baseDir+'/accounts/'+nickname+'@'+domain+'/allowedinstances.txt'
allowFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/allowedinstances.txt'
if os.path.isfile(allowFilename):
if blockDomain not in open(allowFilename).read():
return True
blockingFilename= \
baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt'
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
if os.path.isfile(blockingFilename):
if '*@'+blockDomain in open(blockingFilename).read():
if '*@' + blockDomain in open(blockingFilename).read():
return True
if blockNickname:
blockHandle=blockNickname+'@'+blockDomain
blockHandle = blockNickname + '@' + blockDomain
if blockHandle in open(blockingFilename).read():
return True
return False
def sendBlockViaServer(baseDir: str,session, \
fromNickname: str,password: str, \
fromDomain: str,fromPort: int, \
httpPrefix: str,blockedUrl: str, \
cachedWebfingers: {},personCache: {}, \
debug: bool,projectVersion: str) -> {}:
def sendBlockViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,
httpPrefix: str, blockedUrl: str,
cachedWebfingers: {}, personCache: {},
debug: bool, projectVersion: str) -> {}:
"""Creates a block via c2s
"""
if not session:
print('WARN: No session for sendBlockViaServer')
return 6
fromDomainFull=fromDomain
fromDomainFull = fromDomain
if fromPort:
if fromPort!=80 and fromPort!=443:
if fromPort != 80 and fromPort != 443:
if ':' not in fromDomain:
fromDomainFull=fromDomain+':'+str(fromPort)
fromDomainFull = fromDomain + ':' + str(fromPort)
toUrl= 'https://www.w3.org/ns/activitystreams#Public'
ccUrl= \
httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname+'/followers'
toUrl = 'https://www.w3.org/ns/activitystreams#Public'
ccUrl = httpPrefix + '://' + fromDomainFull + '/users/' + \
fromNickname + '/followers'
blockActor=httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname
newBlockJson={
blockActor = httpPrefix + '://' + fromDomainFull + '/users/' + fromNickname
newBlockJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Block',
'actor': blockActor,
@ -204,73 +222,79 @@ def sendBlockViaServer(baseDir: str,session, \
'cc': [ccUrl]
}
handle=httpPrefix+'://'+fromDomainFull+'/@'+fromNickname
handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname
# lookup the inbox for the To handle
wfRequest= \
webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \
fromDomain,projectVersion)
wfRequest = webfingerHandle(session, handle, httpPrefix,
cachedWebfingers,
fromDomain, projectVersion)
if not wfRequest:
if debug:
print('DEBUG: announce webfinger failed for '+handle)
print('DEBUG: announce webfinger failed for ' + handle)
return 1
postToBox='outbox'
postToBox = 'outbox'
# get the actor inbox for the To handle
inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName= \
getPersonBox(baseDir,session,wfRequest,personCache, \
projectVersion,httpPrefix,fromNickname, \
fromDomain,postToBox)
(inboxUrl, pubKeyId, pubKey,
fromPersonId, sharedInbox,
capabilityAcquisition, avatarUrl,
displayName) = getPersonBox(baseDir, session, wfRequest,
personCache,
projectVersion, httpPrefix, fromNickname,
fromDomain, postToBox)
if not inboxUrl:
if debug:
print('DEBUG: No '+postToBox+' was found for '+handle)
print('DEBUG: No ' + postToBox + ' was found for ' + handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: No actor was found for '+handle)
print('DEBUG: No actor was found for ' + handle)
return 4
authHeader=createBasicAuthHeader(fromNickname,password)
authHeader = createBasicAuthHeader(fromNickname, password)
headers={
'host': fromDomain, \
'Content-type': 'application/json', \
headers = {
'host': fromDomain,
'Content-type': 'application/json',
'Authorization': authHeader
}
postResult= \
postJson(session,newBlockJson,[],inboxUrl,headers,"inbox:write")
postResult = postJson(session, newBlockJson, [], inboxUrl,
headers, "inbox:write")
if not postResult:
print('WARN: Unable to post block')
if debug:
print('DEBUG: c2s POST block success')
return newBlockJson
def sendUndoBlockViaServer(baseDir: str,session, \
fromNickname: str,password: str, \
fromDomain: str,fromPort: int, \
httpPrefix: str,blockedUrl: str, \
cachedWebfingers: {},personCache: {}, \
debug: bool,projectVersion: str) -> {}:
def sendUndoBlockViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,
httpPrefix: str, blockedUrl: str,
cachedWebfingers: {}, personCache: {},
debug: bool, projectVersion: str) -> {}:
"""Creates a block via c2s
"""
if not session:
print('WARN: No session for sendBlockViaServer')
return 6
fromDomainFull=fromDomain
fromDomainFull = fromDomain
if fromPort:
if fromPort!=80 and fromPort!=443:
if fromPort != 80 and fromPort != 443:
if ':' not in fromDomain:
fromDomainFull=fromDomain+':'+str(fromPort)
fromDomainFull = fromDomain + ':' + str(fromPort)
toUrl= 'https://www.w3.org/ns/activitystreams#Public'
ccUrl= \
httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname+'/followers'
toUrl = 'https://www.w3.org/ns/activitystreams#Public'
ccUrl = httpPrefix + '://' + fromDomainFull + '/users/' + \
fromNickname + '/followers'
blockActor=httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname
newBlockJson={
blockActor = httpPrefix + '://' + fromDomainFull + '/users/' + fromNickname
newBlockJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Undo',
'actor': blockActor,
@ -283,59 +307,64 @@ def sendUndoBlockViaServer(baseDir: str,session, \
}
}
handle=httpPrefix+'://'+fromDomainFull+'/@'+fromNickname
handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname
# lookup the inbox for the To handle
wfRequest= \
webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \
fromDomain,projectVersion)
wfRequest = webfingerHandle(session, handle, httpPrefix,
cachedWebfingers,
fromDomain, projectVersion)
if not wfRequest:
if debug:
print('DEBUG: announce webfinger failed for '+handle)
print('DEBUG: announce webfinger failed for ' + handle)
return 1
postToBox='outbox'
postToBox = 'outbox'
# get the actor inbox for the To handle
inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName= \
getPersonBox(baseDir,session,wfRequest,personCache, \
projectVersion,httpPrefix,fromNickname, \
fromDomain,postToBox)
(inboxUrl, pubKeyId, pubKey,
fromPersonId, sharedInbox,
capabilityAcquisition, avatarUrl,
displayName) = getPersonBox(baseDir, session, wfRequest, personCache,
projectVersion, httpPrefix, fromNickname,
fromDomain, postToBox)
if not inboxUrl:
if debug:
print('DEBUG: No '+postToBox+' was found for '+handle)
print('DEBUG: No ' + postToBox + ' was found for ' + handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: No actor was found for '+handle)
print('DEBUG: No actor was found for ' + handle)
return 4
authHeader=createBasicAuthHeader(fromNickname,password)
authHeader = createBasicAuthHeader(fromNickname, password)
headers={
'host': fromDomain, \
'Content-type': 'application/json', \
headers = {
'host': fromDomain,
'Content-type': 'application/json',
'Authorization': authHeader
}
postResult= \
postJson(session,newBlockJson,[],inboxUrl,headers,"inbox:write")
postResult = postJson(session, newBlockJson, [], inboxUrl,
headers, "inbox:write")
if not postResult:
print('WARN: Unable to post block')
if debug:
print('DEBUG: c2s POST block success')
return newBlockJson
def outboxBlock(baseDir: str,httpPrefix: str, \
nickname: str,domain: str,port: int, \
messageJson: {},debug: bool) -> None:
def outboxBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When a block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: block - no type')
return
if not messageJson['type']=='Block':
if not messageJson['type'] == 'Block':
if debug:
print('DEBUG: not a block')
return
@ -350,7 +379,7 @@ def outboxBlock(baseDir: str,httpPrefix: str, \
if debug:
print('DEBUG: c2s block request arrived in outbox')
messageId=messageJson['object'].replace('/activity','')
messageId = messageJson['object'].replace('/activity', '')
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s block object is not a status')
@ -362,40 +391,41 @@ def outboxBlock(baseDir: str,httpPrefix: str, \
print('DEBUG: c2s block object has no nickname')
return
if ':' in domain:
domain=domain.split(':')[0]
postFilename=locatePost(baseDir,nickname,domain,messageId)
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s block post not found in inbox or outbox')
print(messageId)
return
nicknameBlocked=getNicknameFromActor(messageJson['object'])
nicknameBlocked = getNicknameFromActor(messageJson['object'])
if not nicknameBlocked:
print('WARN: unable to find nickname in '+messageJson['object'])
print('WARN: unable to find nickname in ' + messageJson['object'])
return
domainBlocked,portBlocked=getDomainFromActor(messageJson['object'])
domainBlockedFull=domainBlocked
domainBlocked, portBlocked = getDomainFromActor(messageJson['object'])
domainBlockedFull = domainBlocked
if portBlocked:
if portBlocked!=80 and portBlocked!=443:
if portBlocked != 80 and portBlocked != 443:
if ':' not in domainBlocked:
domainBlockedFull=domainBlocked+':'+str(portBlocked)
domainBlockedFull = domainBlocked + ':' + str(portBlocked)
addBlock(baseDir,nickname,domain, \
nicknameBlocked,domainBlockedFull)
addBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post blocked via c2s - '+postFilename)
print('DEBUG: post blocked via c2s - ' + postFilename)
def outboxUndoBlock(baseDir: str,httpPrefix: str, \
nickname: str,domain: str,port: int, \
messageJson: {},debug: bool) -> None:
def outboxUndoBlock(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When an undo block request is received by the outbox from c2s
"""
if not messageJson.get('type'):
if debug:
print('DEBUG: undo block - no type')
return
if not messageJson['type']=='Undo':
if not messageJson['type'] == 'Undo':
if debug:
print('DEBUG: not an undo block')
return
@ -412,7 +442,7 @@ def outboxUndoBlock(baseDir: str,httpPrefix: str, \
if debug:
print('DEBUG: undo block - no type')
return
if not messageJson['object']['type']=='Block':
if not messageJson['object']['type'] == 'Block':
if debug:
print('DEBUG: not an undo block')
return
@ -427,7 +457,7 @@ def outboxUndoBlock(baseDir: str,httpPrefix: str, \
if debug:
print('DEBUG: c2s undo block request arrived in outbox')
messageId=messageJson['object']['object'].replace('/activity','')
messageId = messageJson['object']['object'].replace('/activity', '')
if '/statuses/' not in messageId:
if debug:
print('DEBUG: c2s undo block object is not a status')
@ -439,27 +469,27 @@ def outboxUndoBlock(baseDir: str,httpPrefix: str, \
print('DEBUG: c2s undo block object has no nickname')
return
if ':' in domain:
domain=domain.split(':')[0]
postFilename=locatePost(baseDir,nickname,domain,messageId)
domain = domain.split(':')[0]
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s undo block post not found in inbox or outbox')
print(messageId)
return
nicknameBlocked=getNicknameFromActor(messageJson['object']['object'])
nicknameBlocked = getNicknameFromActor(messageJson['object']['object'])
if not nicknameBlocked:
print('WARN: unable to find nickname in '+ \
print('WARN: unable to find nickname in ' +
messageJson['object']['object'])
return
domainBlocked,portBlocked= \
getDomainFromActor(messageJson['object']['object'])
domainBlockedFull=domainBlocked
domainObject = messageJson['object']['object']
domainBlocked, portBlocked = getDomainFromActor(domainObject)
domainBlockedFull = domainBlocked
if portBlocked:
if portBlocked!=80 and portBlocked!=443:
if portBlocked != 80 and portBlocked != 443:
if ':' not in domainBlocked:
domainBlockedFull=domainBlocked+':'+str(portBlocked)
domainBlockedFull = domainBlocked + ':' + str(portBlocked)
removeBlock(baseDir,nickname,domain, \
nicknameBlocked,domainBlockedFull)
removeBlock(baseDir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post undo blocked via c2s - '+postFilename)
print('DEBUG: post undo blocked via c2s - ' + postFilename)