diff --git a/capabilities.py b/capabilities.py index 75b45456..f8ef2bb6 100644 --- a/capabilities.py +++ b/capabilities.py @@ -1,22 +1,22 @@ -__filename__="capabilities.py" -__author__="Bob Mottram" -__license__="AGPL3+" -__version__="1.1.0" -__maintainer__="Bob Mottram" -__email__="bob@freedombone.net" -__status__="Production" +__filename__ = "capabilities.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.1.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@freedombone.net" +__status__ = "Production" import os -import datetime -import time -import json from auth import createPassword from utils import getNicknameFromActor from utils import getDomainFromActor from utils import loadJson from utils import saveJson -def getOcapFilename(baseDir :str,nickname: str,domain: str,actor :str,subdir: str) -> str: + +def getOcapFilename(baseDir: str, + nickname: str, domain: str, + actor: str, subdir: str) -> str: """Returns the filename for a particular capability accepted or granted Also creates directories as needed """ @@ -24,152 +24,174 @@ def getOcapFilename(baseDir :str,nickname: str,domain: str,actor :str,subdir: st return None if ':' in domain: - domain=domain.split(':')[0] + domain = domain.split(':')[0] - if not os.path.isdir(baseDir+'/accounts'): - os.mkdir(baseDir+'/accounts') + if not os.path.isdir(baseDir + '/accounts'): + os.mkdir(baseDir + '/accounts') - ocDir=baseDir+'/accounts/'+nickname+'@'+domain + ocDir = baseDir + '/accounts/' + nickname + '@' + domain if not os.path.isdir(ocDir): os.mkdir(ocDir) - ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap' + ocDir = baseDir + '/accounts/' + nickname + '@' + domain + '/ocap' if not os.path.isdir(ocDir): os.mkdir(ocDir) - ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir + ocDir = baseDir + '/accounts/' + \ + nickname + '@' + domain + '/ocap/' + subdir if not os.path.isdir(ocDir): os.mkdir(ocDir) - return baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir+'/'+actor.replace('/','#')+'.json' + return baseDir + '/accounts/' + \ + nickname + '@' + domain + '/ocap/' + \ + subdir + '/' + actor.replace('/', '#') + '.json' -def CapablePost(postJson: {}, capabilityList: [], debug :bool) -> bool: + +def CapablePost(postJson: {}, capabilityList: [], debug: bool) -> bool: """Determines whether a post arriving in the inbox should be accepted accoring to the list of capabilities """ if postJson.get('type'): # No announces/repeats - if postJson['type']=='Announce': + if postJson['type'] == 'Announce': if 'inbox:noannounce' in capabilityList: if debug: - print('DEBUG: inbox post rejected because inbox:noannounce') + print('DEBUG: ' + + 'inbox post rejected because inbox:noannounce') return False # No likes - if postJson['type']=='Like': + if postJson['type'] == 'Like': if 'inbox:nolike' in capabilityList: if debug: - print('DEBUG: inbox post rejected because inbox:nolike') + print('DEBUG: ' + + 'inbox post rejected because inbox:nolike') return False - if postJson['type']=='Create': + if postJson['type'] == 'Create': if postJson.get('object'): # Does this have a reply? if postJson['object'].get('inReplyTo'): if postJson['object']['inReplyTo']: if 'inbox:noreply' in capabilityList: if debug: - print('DEBUG: inbox post rejected because inbox:noreply') + print('DEBUG: ' + + 'inbox post rejected because ' + + 'inbox:noreply') return False # are content warnings enforced? if postJson['object'].get('sensitive'): if not postJson['object']['sensitive']: if 'inbox:cw' in capabilityList: if debug: - print('DEBUG: inbox post rejected because inbox:cw') + print('DEBUG: ' + + 'inbox post rejected because inbox:cw') return False # content warning must have non-zero summary if postJson['object'].get('summary'): - if len(postJson['object']['summary'])<2: + if len(postJson['object']['summary']) < 2: if 'inbox:cw' in capabilityList: if debug: - print('DEBUG: inbox post rejected because inbox:cw, summary missing') + print('DEBUG: ' + + 'inbox post rejected because ' + + 'inbox:cw, summary missing') return False if 'inbox:write' in capabilityList: return True return True -def capabilitiesRequest(baseDir: str,httpPrefix: str,domain: str, \ - requestedActor: str, \ - requestedCaps=["inbox:write","objects:read"]) -> {}: + +def capabilitiesRequest(baseDir: str, httpPrefix: str, domain: str, + requestedActor: str, requestedDomain: str, + requestedCaps=["inbox:write", "objects:read"]) -> {}: # This is sent to the capabilities endpoint /caps/new # which could be instance wide or for a particular person # This could also be added to a follow activity - ocapId=createPassword(32) - ocapRequest={ + ocapId = createPassword(32) + ocapRequest = { "@context": "https://www.w3.org/ns/activitystreams", - "id": httpPrefix+"://"+requestedDomain+"/caps/request/"+ocapId, + "id": httpPrefix + "://" + requestedDomain + "/caps/request/" + ocapId, "type": "Request", "capability": requestedCaps, "actor": requestedActor } return ocapRequest -def capabilitiesAccept(baseDir: str,httpPrefix: str, \ - nickname: str,domain: str, port: int, \ - acceptedActor: str, saveToFile: bool, \ - acceptedCaps=["inbox:write","objects:read"]) -> {}: + +def capabilitiesAccept(baseDir: str, httpPrefix: str, + nickname: str, domain: str, port: int, + acceptedActor: str, saveToFile: bool, + acceptedCaps=["inbox:write", "objects:read"]) -> {}: # This gets returned to capabilities requester # This could also be added to a follow Accept activity # reject excessively long actors - if len(acceptedActor)>256: + if len(acceptedActor) > 256: return None - fullDomain=domain + fullDomain = domain if port: - if port!=80 and port !=443: + if port != 80 and port != 443: if ':' not in domain: - fullDomain=domain+':'+str(port) + fullDomain = domain + ':' + str(port) # make directories to store capabilities - ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,acceptedActor,'accept') + ocapFilename = \ + getOcapFilename(baseDir, nickname, fullDomain, acceptedActor, 'accept') if not ocapFilename: return None - ocapAccept=None + ocapAccept = None # if the capability already exists then load it from file if os.path.isfile(ocapFilename): - ocapAccept=loadJson(ocapFilename) + ocapAccept = loadJson(ocapFilename) # otherwise create a new capability if not ocapAccept: - acceptedActorNickname=getNicknameFromActor(acceptedActor) + acceptedActorNickname = getNicknameFromActor(acceptedActor) if not acceptedActorNickname: - print('WARN: unable to find nickname in '+acceptedActor) + print('WARN: unable to find nickname in ' + acceptedActor) return None - acceptedActorDomain,acceptedActorPort=getDomainFromActor(acceptedActor) + acceptedActorDomain, acceptedActorPort = \ + getDomainFromActor(acceptedActor) if acceptedActorPort: - ocapId=acceptedActorNickname+'@'+acceptedActorDomain+':'+str(acceptedActorPort)+'#'+createPassword(32) + ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \ + ':' + str(acceptedActorPort) + '#'+createPassword(32) else: - ocapId=acceptedActorNickname+'@'+acceptedActorDomain+'#'+createPassword(32) - ocapAccept={ + ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \ + '#' + createPassword(32) + ocapAccept = { "@context": "https://www.w3.org/ns/activitystreams", - "id": httpPrefix+"://"+fullDomain+"/caps/"+ocapId, + "id": httpPrefix + "://" + fullDomain + "/caps/" + ocapId, "type": "Capability", "capability": acceptedCaps, "scope": acceptedActor, - "actor": httpPrefix+"://"+fullDomain + "actor": httpPrefix + "://" + fullDomain } if nickname: - ocapAccept['actor']=httpPrefix+"://"+fullDomain+'/users/'+nickname + ocapAccept['actor'] = \ + httpPrefix + "://" + fullDomain + '/users/' + nickname if saveToFile: - saveJson(ocapAccept,ocapFilename) + saveJson(ocapAccept, ocapFilename) return ocapAccept -def capabilitiesGrantedSave(baseDir :str,nickname :str,domain :str,ocap: {}) -> bool: + +def capabilitiesGrantedSave(baseDir: str, + nickname: str, domain: str, ocap: {}) -> bool: """A capabilities accept is received, so stor it for reference when sending to the actor """ if not ocap.get('actor'): return False - ocapFilename=getOcapFilename(baseDir,nickname,domain,ocap['actor'],'granted') + ocapFilename = \ + getOcapFilename(baseDir, nickname, domain, ocap['actor'], 'granted') if not ocapFilename: return False - saveJson(ocap,ocapFilename) + saveJson(ocap, ocapFilename) return True -def capabilitiesUpdate(baseDir: str,httpPrefix: str, \ - nickname: str,domain: str, port: int, \ - updateActor: str, \ + +def capabilitiesUpdate(baseDir: str, httpPrefix: str, + nickname: str, domain: str, port: int, + updateActor: str, updateCaps: []) -> {}: """Used to sends an update for a change of object capabilities Note that the capability id gets changed with a new random token @@ -177,17 +199,18 @@ def capabilitiesUpdate(baseDir: str,httpPrefix: str, \ """ # reject excessively long actors - if len(updateActor)>256: + if len(updateActor) > 256: return None - fullDomain=domain + fullDomain = domain if port: - if port!=80 and port !=443: + if port != 80 and port != 443: if ':' not in domain: - fullDomain=domain+':'+str(port) + fullDomain = domain + ':' + str(port) # Get the filename of the capability - ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,updateActor,'accept') + ocapFilename = \ + getOcapFilename(baseDir, nickname, fullDomain, updateActor, 'accept') if not ocapFilename: return None @@ -196,48 +219,51 @@ def capabilitiesUpdate(baseDir: str,httpPrefix: str, \ return None # create an update activity - ocapUpdate={ + ocapUpdate = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Update', - 'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname, + 'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname, 'to': [updateActor], 'cc': [], 'object': {} } # read the existing capability - ocapJson=loadJson(ocapFilename) + ocapJson = loadJson(ocapFilename) # set the new capabilities list. eg. ["inbox:write","objects:read"] - ocapJson['capability']=updateCaps + ocapJson['capability'] = updateCaps # change the id, so that the old capabilities can't continue to be used - updateActorNickname=getNicknameFromActor(updateActor) + updateActorNickname = getNicknameFromActor(updateActor) if not updateActorNickname: - print('WARN: unable to find nickname in '+updateActor) + print('WARN: unable to find nickname in ' + updateActor) return None - updateActorDomain,updateActorPort=getDomainFromActor(updateActor) + updateActorDomain, updateActorPort = getDomainFromActor(updateActor) if updateActorPort: - ocapId=updateActorNickname+'@'+updateActorDomain+':'+str(updateActorPort)+'#'+createPassword(32) + ocapId = updateActorNickname + '@' + updateActorDomain + \ + ':' + str(updateActorPort) + '#' + createPassword(32) else: - ocapId=updateActorNickname+'@'+updateActorDomain+'#'+createPassword(32) - ocapJson['id']=httpPrefix+"://"+fullDomain+"/caps/"+ocapId - ocapUpdate['object']=ocapJson + ocapId = updateActorNickname + '@' + updateActorDomain + \ + '#' + createPassword(32) + ocapJson['id'] = httpPrefix + "://" + fullDomain + "/caps/" + ocapId + ocapUpdate['object'] = ocapJson # save it again - saveJson(ocapJson,ocapFilename) + saveJson(ocapJson, ocapFilename) return ocapUpdate -def capabilitiesReceiveUpdate(baseDir :str, \ - nickname :str,domain :str,port :int, \ - actor :str, \ - newCapabilitiesId :str, \ - capabilityList :[], debug :bool) -> bool: + +def capabilitiesReceiveUpdate(baseDir: str, + nickname: str, domain: str, port: int, + actor: str, + newCapabilitiesId: str, + capabilityList: [], debug: bool) -> bool: """An update for a capability or the given actor has arrived """ - ocapFilename= \ - getOcapFilename(baseDir,nickname,domain,actor,'granted') + ocapFilename = \ + getOcapFilename(baseDir, nickname, domain, actor, 'granted') if not ocapFilename: return False @@ -247,11 +273,11 @@ def capabilitiesReceiveUpdate(baseDir :str, \ print(ocapFilename) return False - ocapJson=loadJson(ocapFilename) + ocapJson = loadJson(ocapFilename) if ocapJson: - ocapJson['id']=newCapabilitiesId - ocapJson['capability']=capabilityList + ocapJson['id'] = newCapabilitiesId + ocapJson['capability'] = capabilityList - return saveJson(ocapJson,ocapFilename) + return saveJson(ocapJson, ocapFilename) return False diff --git a/static_analysis b/static_analysis index 71dbe039..01ce1123 100755 --- a/static_analysis +++ b/static_analysis @@ -10,7 +10,7 @@ echo "Starting static analysis" for sourceFile in *.py do - if [[ "$sourceFile" == 'tests.py' || "$sourceFile" == 'blurhash.py' || "$sourceFile" == 'capabilities.py' ]]; then + if [[ "$sourceFile" == 'tests.py' || "$sourceFile" == 'blurhash.py' ]]; then continue fi result=$($cmd "$sourceFile")