From 8dd64c187f737191e22a22259012a2a81afbb974 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Wed, 1 Apr 2020 20:06:27 +0000 Subject: [PATCH] flake8 style --- blocking.py | 370 ++++++++++++++++++++++++++++------------------------ 1 file changed, 200 insertions(+), 170 deletions(-) diff --git a/blocking.py b/blocking.py index 1b37a32e7..7d90d05ce 100644 --- a/blocking.py +++ b/blocking.py @@ -1,201 +1,219 @@ -__filename__="blocking.py" -__author__="Bob Mottram" -__license__="AGPL3+" -__version__="1.1.0" -__maintainer__="Bob Mottram" -__email__="bob@freedombone.net" -__status__="Production" +__filename__ = "blocking.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.1.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@freedombone.net" +__status__ = "Production" import os from utils import isEvil +from utils import locatePost from utils import evilIncarnate +from utils import getDomainFromActor +from utils import getNicknameFromActor +from session import postJson +from auth import createBasicAuthHeader +from posts import getPersonBox +from webfinger import webfingerHandle -def addGlobalBlock(baseDir: str, \ - blockNickname: str,blockDomain: str) -> bool: + +def addGlobalBlock(baseDir: str, + blockNickname: str, blockDomain: str) -> bool: """Global block which applies to all accounts """ - blockingFilename=baseDir+'/accounts/blocking.txt' + blockingFilename = baseDir + '/accounts/blocking.txt' if not blockNickname.startswith('#'): - blockHandle=blockNickname+'@'+blockDomain + blockHandle = blockNickname + '@' + blockDomain if os.path.isfile(blockingFilename): if blockHandle in open(blockingFilename).read(): return False - blockFile=open(blockingFilename, "a+") - blockFile.write(blockHandle+'\n') + blockFile = open(blockingFilename, "a+") + blockFile.write(blockHandle + '\n') blockFile.close() else: - blockHashtag=blockNickname + blockHashtag = blockNickname if os.path.isfile(blockingFilename): - if blockHashtag+'\n' in open(blockingFilename).read(): + if blockHashtag + '\n' in open(blockingFilename).read(): return False - blockFile=open(blockingFilename, "a+") - blockFile.write(blockHashtag+'\n') + blockFile = open(blockingFilename, "a+") + blockFile.write(blockHashtag + '\n') blockFile.close() return True -def addBlock(baseDir: str,nickname: str,domain: str, \ - blockNickname: str,blockDomain: str) -> bool: + +def addBlock(baseDir: str, nickname: str, domain: str, + blockNickname: str, blockDomain: str) -> bool: """Block the given account """ if ':' in domain: - domain=domain.split(':')[0] - blockingFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt' - blockHandle=blockNickname+'@'+blockDomain + domain = domain.split(':')[0] + blockingFilename = baseDir + '/accounts/' + \ + nickname + '@' + domain + '/blocking.txt' + blockHandle = blockNickname + '@' + blockDomain if os.path.isfile(blockingFilename): if blockHandle in open(blockingFilename).read(): return False - blockFile=open(blockingFilename, "a+") - blockFile.write(blockHandle+'\n') + blockFile = open(blockingFilename, "a+") + blockFile.write(blockHandle + '\n') blockFile.close() return True -def removeGlobalBlock(baseDir: str, \ - unblockNickname: str, \ + +def removeGlobalBlock(baseDir: str, + unblockNickname: str, unblockDomain: str) -> bool: """Unblock the given global block """ - unblockingFilename=baseDir+'/accounts/blocking.txt' + unblockingFilename = baseDir + '/accounts/blocking.txt' if not unblockNickname.startswith('#'): - unblockHandle=unblockNickname+'@'+unblockDomain + unblockHandle = unblockNickname + '@' + unblockDomain if os.path.isfile(unblockingFilename): if unblockHandle in open(unblockingFilename).read(): with open(unblockingFilename, 'r') as fp: - with open(unblockingFilename+'.new', 'w') as fpnew: + with open(unblockingFilename + '.new', 'w') as fpnew: for line in fp: - handle=line.replace('\n','') + handle = line.replace('\n', '') if unblockHandle not in line: - fpnew.write(handle+'\n') - if os.path.isfile(unblockingFilename+'.new'): - os.rename(unblockingFilename+'.new',unblockingFilename) + fpnew.write(handle + '\n') + if os.path.isfile(unblockingFilename + '.new'): + os.rename(unblockingFilename + '.new', unblockingFilename) return True else: - unblockHashtag=unblockNickname + unblockHashtag = unblockNickname if os.path.isfile(unblockingFilename): - if unblockHashtag+'\n' in open(unblockingFilename).read(): + if unblockHashtag + '\n' in open(unblockingFilename).read(): with open(unblockingFilename, 'r') as fp: - with open(unblockingFilename+'.new', 'w') as fpnew: + with open(unblockingFilename + '.new', 'w') as fpnew: for line in fp: - blockLine=line.replace('\n','') + blockLine = line.replace('\n', '') if unblockHashtag not in line: - fpnew.write(blockLine+'\n') - if os.path.isfile(unblockingFilename+'.new'): - os.rename(unblockingFilename+'.new',unblockingFilename) + fpnew.write(blockLine + '\n') + if os.path.isfile(unblockingFilename + '.new'): + os.rename(unblockingFilename + '.new', unblockingFilename) return True return False -def removeBlock(baseDir: str,nickname: str,domain: str, \ - unblockNickname: str,unblockDomain: str) -> bool: + +def removeBlock(baseDir: str, nickname: str, domain: str, + unblockNickname: str, unblockDomain: str) -> bool: """Unblock the given account """ if ':' in domain: - domain=domain.split(':')[0] - unblockingFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt' - unblockHandle=unblockNickname+'@'+unblockDomain + domain = domain.split(':')[0] + unblockingFilename = baseDir + '/accounts/' + \ + nickname + '@' + domain + '/blocking.txt' + unblockHandle = unblockNickname + '@' + unblockDomain if os.path.isfile(unblockingFilename): if unblockHandle in open(unblockingFilename).read(): with open(unblockingFilename, 'r') as fp: - with open(unblockingFilename+'.new', 'w') as fpnew: + with open(unblockingFilename + '.new', 'w') as fpnew: for line in fp: - handle=line.replace('\n','') + handle = line.replace('\n', '') if unblockHandle not in line: - fpnew.write(handle+'\n') - if os.path.isfile(unblockingFilename+'.new'): - os.rename(unblockingFilename+'.new',unblockingFilename) + fpnew.write(handle + '\n') + if os.path.isfile(unblockingFilename + '.new'): + os.rename(unblockingFilename + '.new', unblockingFilename) return True return False -def isBlockedHashtag(baseDir: str,hashtag: str) -> bool: + +def isBlockedHashtag(baseDir: str, hashtag: str) -> bool: """Is the given hashtag blocked? """ - globalBlockingFilename=baseDir+'/accounts/blocking.txt' + globalBlockingFilename = baseDir + '/accounts/blocking.txt' if os.path.isfile(globalBlockingFilename): - hashtag=hashtag.strip('\n') - if hashtag+'\n' in open(globalBlockingFilename).read(): + hashtag = hashtag.strip('\n') + if hashtag + '\n' in open(globalBlockingFilename).read(): return True return False + def getDomainBlocklist(baseDir: str) -> str: """Returns all globally blocked domains as a string This can be used for fast matching to mitigate flooding """ - blockedStr='' + blockedStr = '' - evilDomains=evilIncarnate() + evilDomains = evilIncarnate() for evil in evilDomains: - blockedStr+=evil+'\n' + blockedStr += evil + '\n' - globalBlockingFilename=baseDir+'/accounts/blocking.txt' + globalBlockingFilename = baseDir + '/accounts/blocking.txt' if not os.path.isfile(globalBlockingFilename): return blockedStr with open(globalBlockingFilename, 'r') as file: blockedStr += file.read() return blockedStr -def isBlockedDomain(baseDir: str,domain: str) -> bool: + +def isBlockedDomain(baseDir: str, domain: str) -> bool: """Is the given domain blocked? """ if isEvil(domain): return True - globalBlockingFilename=baseDir+'/accounts/blocking.txt' + globalBlockingFilename = baseDir + '/accounts/blocking.txt' if os.path.isfile(globalBlockingFilename): - if '*@'+domain in open(globalBlockingFilename).read(): + if '*@' + domain in open(globalBlockingFilename).read(): return True return False -def isBlocked(baseDir: str,nickname: str,domain: str, \ - blockNickname: str,blockDomain: str) -> bool: + +def isBlocked(baseDir: str, nickname: str, domain: str, + blockNickname: str, blockDomain: str) -> bool: """Is the given nickname blocked? """ if isEvil(blockDomain): return True - globalBlockingFilename=baseDir+'/accounts/blocking.txt' + globalBlockingFilename = baseDir + '/accounts/blocking.txt' if os.path.isfile(globalBlockingFilename): - if '*@'+blockDomain in open(globalBlockingFilename).read(): + if '*@' + blockDomain in open(globalBlockingFilename).read(): return True if blockNickname: - blockHandle=blockNickname+'@'+blockDomain + blockHandle = blockNickname + '@' + blockDomain if blockHandle in open(globalBlockingFilename).read(): return True - allowFilename= \ - baseDir+'/accounts/'+nickname+'@'+domain+'/allowedinstances.txt' + allowFilename = baseDir + '/accounts/' + \ + nickname + '@' + domain + '/allowedinstances.txt' if os.path.isfile(allowFilename): if blockDomain not in open(allowFilename).read(): return True - blockingFilename= \ - baseDir+'/accounts/'+nickname+'@'+domain+'/blocking.txt' + blockingFilename = baseDir + '/accounts/' + \ + nickname + '@' + domain + '/blocking.txt' if os.path.isfile(blockingFilename): - if '*@'+blockDomain in open(blockingFilename).read(): + if '*@' + blockDomain in open(blockingFilename).read(): return True if blockNickname: - blockHandle=blockNickname+'@'+blockDomain + blockHandle = blockNickname + '@' + blockDomain if blockHandle in open(blockingFilename).read(): return True return False -def sendBlockViaServer(baseDir: str,session, \ - fromNickname: str,password: str, \ - fromDomain: str,fromPort: int, \ - httpPrefix: str,blockedUrl: str, \ - cachedWebfingers: {},personCache: {}, \ - debug: bool,projectVersion: str) -> {}: + +def sendBlockViaServer(baseDir: str, session, + fromNickname: str, password: str, + fromDomain: str, fromPort: int, + httpPrefix: str, blockedUrl: str, + cachedWebfingers: {}, personCache: {}, + debug: bool, projectVersion: str) -> {}: """Creates a block via c2s """ if not session: print('WARN: No session for sendBlockViaServer') return 6 - fromDomainFull=fromDomain + fromDomainFull = fromDomain if fromPort: - if fromPort!=80 and fromPort!=443: + if fromPort != 80 and fromPort != 443: if ':' not in fromDomain: - fromDomainFull=fromDomain+':'+str(fromPort) + fromDomainFull = fromDomain + ':' + str(fromPort) - toUrl= 'https://www.w3.org/ns/activitystreams#Public' - ccUrl= \ - httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname+'/followers' + toUrl = 'https://www.w3.org/ns/activitystreams#Public' + ccUrl = httpPrefix + '://' + fromDomainFull + '/users/' + \ + fromNickname + '/followers' - blockActor=httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname - newBlockJson={ + blockActor = httpPrefix + '://' + fromDomainFull + '/users/' + fromNickname + newBlockJson = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Block', 'actor': blockActor, @@ -204,73 +222,79 @@ def sendBlockViaServer(baseDir: str,session, \ 'cc': [ccUrl] } - handle=httpPrefix+'://'+fromDomainFull+'/@'+fromNickname + handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname # lookup the inbox for the To handle - wfRequest= \ - webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \ - fromDomain,projectVersion) + wfRequest = webfingerHandle(session, handle, httpPrefix, + cachedWebfingers, + fromDomain, projectVersion) if not wfRequest: if debug: - print('DEBUG: announce webfinger failed for '+handle) + print('DEBUG: announce webfinger failed for ' + handle) return 1 - postToBox='outbox' + postToBox = 'outbox' # get the actor inbox for the To handle - inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName= \ - getPersonBox(baseDir,session,wfRequest,personCache, \ - projectVersion,httpPrefix,fromNickname, \ - fromDomain,postToBox) + (inboxUrl, pubKeyId, pubKey, + fromPersonId, sharedInbox, + capabilityAcquisition, avatarUrl, + displayName) = getPersonBox(baseDir, session, wfRequest, + personCache, + projectVersion, httpPrefix, fromNickname, + fromDomain, postToBox) if not inboxUrl: if debug: - print('DEBUG: No '+postToBox+' was found for '+handle) + print('DEBUG: No ' + postToBox + ' was found for ' + handle) return 3 if not fromPersonId: if debug: - print('DEBUG: No actor was found for '+handle) + print('DEBUG: No actor was found for ' + handle) return 4 - authHeader=createBasicAuthHeader(fromNickname,password) + authHeader = createBasicAuthHeader(fromNickname, password) - headers={ - 'host': fromDomain, \ - 'Content-type': 'application/json', \ + headers = { + 'host': fromDomain, + 'Content-type': 'application/json', 'Authorization': authHeader } - postResult= \ - postJson(session,newBlockJson,[],inboxUrl,headers,"inbox:write") + postResult = postJson(session, newBlockJson, [], inboxUrl, + headers, "inbox:write") + if not postResult: + print('WARN: Unable to post block') if debug: print('DEBUG: c2s POST block success') return newBlockJson -def sendUndoBlockViaServer(baseDir: str,session, \ - fromNickname: str,password: str, \ - fromDomain: str,fromPort: int, \ - httpPrefix: str,blockedUrl: str, \ - cachedWebfingers: {},personCache: {}, \ - debug: bool,projectVersion: str) -> {}: + +def sendUndoBlockViaServer(baseDir: str, session, + fromNickname: str, password: str, + fromDomain: str, fromPort: int, + httpPrefix: str, blockedUrl: str, + cachedWebfingers: {}, personCache: {}, + debug: bool, projectVersion: str) -> {}: """Creates a block via c2s """ if not session: print('WARN: No session for sendBlockViaServer') return 6 - fromDomainFull=fromDomain + fromDomainFull = fromDomain if fromPort: - if fromPort!=80 and fromPort!=443: + if fromPort != 80 and fromPort != 443: if ':' not in fromDomain: - fromDomainFull=fromDomain+':'+str(fromPort) + fromDomainFull = fromDomain + ':' + str(fromPort) - toUrl= 'https://www.w3.org/ns/activitystreams#Public' - ccUrl= \ - httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname+'/followers' + toUrl = 'https://www.w3.org/ns/activitystreams#Public' + ccUrl = httpPrefix + '://' + fromDomainFull + '/users/' + \ + fromNickname + '/followers' - blockActor=httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname - newBlockJson={ + blockActor = httpPrefix + '://' + fromDomainFull + '/users/' + fromNickname + newBlockJson = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Undo', 'actor': blockActor, @@ -283,59 +307,64 @@ def sendUndoBlockViaServer(baseDir: str,session, \ } } - handle=httpPrefix+'://'+fromDomainFull+'/@'+fromNickname + handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname # lookup the inbox for the To handle - wfRequest= \ - webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \ - fromDomain,projectVersion) + wfRequest = webfingerHandle(session, handle, httpPrefix, + cachedWebfingers, + fromDomain, projectVersion) if not wfRequest: if debug: - print('DEBUG: announce webfinger failed for '+handle) + print('DEBUG: announce webfinger failed for ' + handle) return 1 - postToBox='outbox' + postToBox = 'outbox' # get the actor inbox for the To handle - inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName= \ - getPersonBox(baseDir,session,wfRequest,personCache, \ - projectVersion,httpPrefix,fromNickname, \ - fromDomain,postToBox) + (inboxUrl, pubKeyId, pubKey, + fromPersonId, sharedInbox, + capabilityAcquisition, avatarUrl, + displayName) = getPersonBox(baseDir, session, wfRequest, personCache, + projectVersion, httpPrefix, fromNickname, + fromDomain, postToBox) if not inboxUrl: if debug: - print('DEBUG: No '+postToBox+' was found for '+handle) + print('DEBUG: No ' + postToBox + ' was found for ' + handle) return 3 if not fromPersonId: if debug: - print('DEBUG: No actor was found for '+handle) + print('DEBUG: No actor was found for ' + handle) return 4 - authHeader=createBasicAuthHeader(fromNickname,password) + authHeader = createBasicAuthHeader(fromNickname, password) - headers={ - 'host': fromDomain, \ - 'Content-type': 'application/json', \ + headers = { + 'host': fromDomain, + 'Content-type': 'application/json', 'Authorization': authHeader } - postResult= \ - postJson(session,newBlockJson,[],inboxUrl,headers,"inbox:write") + postResult = postJson(session, newBlockJson, [], inboxUrl, + headers, "inbox:write") + if not postResult: + print('WARN: Unable to post block') if debug: print('DEBUG: c2s POST block success') return newBlockJson -def outboxBlock(baseDir: str,httpPrefix: str, \ - nickname: str,domain: str,port: int, \ - messageJson: {},debug: bool) -> None: + +def outboxBlock(baseDir: str, httpPrefix: str, + nickname: str, domain: str, port: int, + messageJson: {}, debug: bool) -> None: """ When a block request is received by the outbox from c2s """ if not messageJson.get('type'): if debug: print('DEBUG: block - no type') return - if not messageJson['type']=='Block': + if not messageJson['type'] == 'Block': if debug: print('DEBUG: not a block') return @@ -350,7 +379,7 @@ def outboxBlock(baseDir: str,httpPrefix: str, \ if debug: print('DEBUG: c2s block request arrived in outbox') - messageId=messageJson['object'].replace('/activity','') + messageId = messageJson['object'].replace('/activity', '') if '/statuses/' not in messageId: if debug: print('DEBUG: c2s block object is not a status') @@ -362,40 +391,41 @@ def outboxBlock(baseDir: str,httpPrefix: str, \ print('DEBUG: c2s block object has no nickname') return if ':' in domain: - domain=domain.split(':')[0] - postFilename=locatePost(baseDir,nickname,domain,messageId) + domain = domain.split(':')[0] + postFilename = locatePost(baseDir, nickname, domain, messageId) if not postFilename: if debug: print('DEBUG: c2s block post not found in inbox or outbox') print(messageId) return - nicknameBlocked=getNicknameFromActor(messageJson['object']) + nicknameBlocked = getNicknameFromActor(messageJson['object']) if not nicknameBlocked: - print('WARN: unable to find nickname in '+messageJson['object']) + print('WARN: unable to find nickname in ' + messageJson['object']) return - domainBlocked,portBlocked=getDomainFromActor(messageJson['object']) - domainBlockedFull=domainBlocked + domainBlocked, portBlocked = getDomainFromActor(messageJson['object']) + domainBlockedFull = domainBlocked if portBlocked: - if portBlocked!=80 and portBlocked!=443: + if portBlocked != 80 and portBlocked != 443: if ':' not in domainBlocked: - domainBlockedFull=domainBlocked+':'+str(portBlocked) + domainBlockedFull = domainBlocked + ':' + str(portBlocked) - addBlock(baseDir,nickname,domain, \ - nicknameBlocked,domainBlockedFull) + addBlock(baseDir, nickname, domain, + nicknameBlocked, domainBlockedFull) if debug: - print('DEBUG: post blocked via c2s - '+postFilename) + print('DEBUG: post blocked via c2s - ' + postFilename) -def outboxUndoBlock(baseDir: str,httpPrefix: str, \ - nickname: str,domain: str,port: int, \ - messageJson: {},debug: bool) -> None: + +def outboxUndoBlock(baseDir: str, httpPrefix: str, + nickname: str, domain: str, port: int, + messageJson: {}, debug: bool) -> None: """ When an undo block request is received by the outbox from c2s """ if not messageJson.get('type'): if debug: print('DEBUG: undo block - no type') return - if not messageJson['type']=='Undo': + if not messageJson['type'] == 'Undo': if debug: print('DEBUG: not an undo block') return @@ -412,7 +442,7 @@ def outboxUndoBlock(baseDir: str,httpPrefix: str, \ if debug: print('DEBUG: undo block - no type') return - if not messageJson['object']['type']=='Block': + if not messageJson['object']['type'] == 'Block': if debug: print('DEBUG: not an undo block') return @@ -427,7 +457,7 @@ def outboxUndoBlock(baseDir: str,httpPrefix: str, \ if debug: print('DEBUG: c2s undo block request arrived in outbox') - messageId=messageJson['object']['object'].replace('/activity','') + messageId = messageJson['object']['object'].replace('/activity', '') if '/statuses/' not in messageId: if debug: print('DEBUG: c2s undo block object is not a status') @@ -439,27 +469,27 @@ def outboxUndoBlock(baseDir: str,httpPrefix: str, \ print('DEBUG: c2s undo block object has no nickname') return if ':' in domain: - domain=domain.split(':')[0] - postFilename=locatePost(baseDir,nickname,domain,messageId) + domain = domain.split(':')[0] + postFilename = locatePost(baseDir, nickname, domain, messageId) if not postFilename: if debug: print('DEBUG: c2s undo block post not found in inbox or outbox') print(messageId) return - nicknameBlocked=getNicknameFromActor(messageJson['object']['object']) + nicknameBlocked = getNicknameFromActor(messageJson['object']['object']) if not nicknameBlocked: - print('WARN: unable to find nickname in '+ \ + print('WARN: unable to find nickname in ' + messageJson['object']['object']) return - domainBlocked,portBlocked= \ - getDomainFromActor(messageJson['object']['object']) - domainBlockedFull=domainBlocked + domainObject = messageJson['object']['object'] + domainBlocked, portBlocked = getDomainFromActor(domainObject) + domainBlockedFull = domainBlocked if portBlocked: - if portBlocked!=80 and portBlocked!=443: + if portBlocked != 80 and portBlocked != 443: if ':' not in domainBlocked: - domainBlockedFull=domainBlocked+':'+str(portBlocked) + domainBlockedFull = domainBlocked + ':' + str(portBlocked) - removeBlock(baseDir,nickname,domain, \ - nicknameBlocked,domainBlockedFull) + removeBlock(baseDir, nickname, domain, + nicknameBlocked, domainBlockedFull) if debug: - print('DEBUG: post undo blocked via c2s - '+postFilename) + print('DEBUG: post undo blocked via c2s - ' + postFilename)