flake8 format

main
Bob Mottram 2020-04-05 10:47:42 +01:00
parent 0a5a9d439b
commit ba493f9e20
2 changed files with 116 additions and 90 deletions

View File

@ -1,22 +1,22 @@
__filename__="capabilities.py" __filename__ = "capabilities.py"
__author__="Bob Mottram" __author__ = "Bob Mottram"
__license__="AGPL3+" __license__ = "AGPL3+"
__version__="1.1.0" __version__ = "1.1.0"
__maintainer__="Bob Mottram" __maintainer__ = "Bob Mottram"
__email__="bob@freedombone.net" __email__ = "bob@freedombone.net"
__status__="Production" __status__ = "Production"
import os import os
import datetime
import time
import json
from auth import createPassword from auth import createPassword
from utils import getNicknameFromActor from utils import getNicknameFromActor
from utils import getDomainFromActor from utils import getDomainFromActor
from utils import loadJson from utils import loadJson
from utils import saveJson from utils import saveJson
def getOcapFilename(baseDir :str,nickname: str,domain: str,actor :str,subdir: str) -> str:
def getOcapFilename(baseDir: str,
nickname: str, domain: str,
actor: str, subdir: str) -> str:
"""Returns the filename for a particular capability accepted or granted """Returns the filename for a particular capability accepted or granted
Also creates directories as needed Also creates directories as needed
""" """
@ -24,152 +24,174 @@ def getOcapFilename(baseDir :str,nickname: str,domain: str,actor :str,subdir: st
return None return None
if ':' in domain: if ':' in domain:
domain=domain.split(':')[0] domain = domain.split(':')[0]
if not os.path.isdir(baseDir+'/accounts'): if not os.path.isdir(baseDir + '/accounts'):
os.mkdir(baseDir+'/accounts') os.mkdir(baseDir + '/accounts')
ocDir=baseDir+'/accounts/'+nickname+'@'+domain ocDir = baseDir + '/accounts/' + nickname + '@' + domain
if not os.path.isdir(ocDir): if not os.path.isdir(ocDir):
os.mkdir(ocDir) os.mkdir(ocDir)
ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap' ocDir = baseDir + '/accounts/' + nickname + '@' + domain + '/ocap'
if not os.path.isdir(ocDir): if not os.path.isdir(ocDir):
os.mkdir(ocDir) os.mkdir(ocDir)
ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir ocDir = baseDir + '/accounts/' + \
nickname + '@' + domain + '/ocap/' + subdir
if not os.path.isdir(ocDir): if not os.path.isdir(ocDir):
os.mkdir(ocDir) os.mkdir(ocDir)
return baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir+'/'+actor.replace('/','#')+'.json' return baseDir + '/accounts/' + \
nickname + '@' + domain + '/ocap/' + \
subdir + '/' + actor.replace('/', '#') + '.json'
def CapablePost(postJson: {}, capabilityList: [], debug :bool) -> bool:
def CapablePost(postJson: {}, capabilityList: [], debug: bool) -> bool:
"""Determines whether a post arriving in the inbox """Determines whether a post arriving in the inbox
should be accepted accoring to the list of capabilities should be accepted accoring to the list of capabilities
""" """
if postJson.get('type'): if postJson.get('type'):
# No announces/repeats # No announces/repeats
if postJson['type']=='Announce': if postJson['type'] == 'Announce':
if 'inbox:noannounce' in capabilityList: if 'inbox:noannounce' in capabilityList:
if debug: if debug:
print('DEBUG: inbox post rejected because inbox:noannounce') print('DEBUG: ' +
'inbox post rejected because inbox:noannounce')
return False return False
# No likes # No likes
if postJson['type']=='Like': if postJson['type'] == 'Like':
if 'inbox:nolike' in capabilityList: if 'inbox:nolike' in capabilityList:
if debug: if debug:
print('DEBUG: inbox post rejected because inbox:nolike') print('DEBUG: ' +
'inbox post rejected because inbox:nolike')
return False return False
if postJson['type']=='Create': if postJson['type'] == 'Create':
if postJson.get('object'): if postJson.get('object'):
# Does this have a reply? # Does this have a reply?
if postJson['object'].get('inReplyTo'): if postJson['object'].get('inReplyTo'):
if postJson['object']['inReplyTo']: if postJson['object']['inReplyTo']:
if 'inbox:noreply' in capabilityList: if 'inbox:noreply' in capabilityList:
if debug: if debug:
print('DEBUG: inbox post rejected because inbox:noreply') print('DEBUG: ' +
'inbox post rejected because ' +
'inbox:noreply')
return False return False
# are content warnings enforced? # are content warnings enforced?
if postJson['object'].get('sensitive'): if postJson['object'].get('sensitive'):
if not postJson['object']['sensitive']: if not postJson['object']['sensitive']:
if 'inbox:cw' in capabilityList: if 'inbox:cw' in capabilityList:
if debug: if debug:
print('DEBUG: inbox post rejected because inbox:cw') print('DEBUG: ' +
'inbox post rejected because inbox:cw')
return False return False
# content warning must have non-zero summary # content warning must have non-zero summary
if postJson['object'].get('summary'): if postJson['object'].get('summary'):
if len(postJson['object']['summary'])<2: if len(postJson['object']['summary']) < 2:
if 'inbox:cw' in capabilityList: if 'inbox:cw' in capabilityList:
if debug: if debug:
print('DEBUG: inbox post rejected because inbox:cw, summary missing') print('DEBUG: ' +
'inbox post rejected because ' +
'inbox:cw, summary missing')
return False return False
if 'inbox:write' in capabilityList: if 'inbox:write' in capabilityList:
return True return True
return True return True
def capabilitiesRequest(baseDir: str,httpPrefix: str,domain: str, \
requestedActor: str, \ def capabilitiesRequest(baseDir: str, httpPrefix: str, domain: str,
requestedCaps=["inbox:write","objects:read"]) -> {}: requestedActor: str, requestedDomain: str,
requestedCaps=["inbox:write", "objects:read"]) -> {}:
# This is sent to the capabilities endpoint /caps/new # This is sent to the capabilities endpoint /caps/new
# which could be instance wide or for a particular person # which could be instance wide or for a particular person
# This could also be added to a follow activity # This could also be added to a follow activity
ocapId=createPassword(32) ocapId = createPassword(32)
ocapRequest={ ocapRequest = {
"@context": "https://www.w3.org/ns/activitystreams", "@context": "https://www.w3.org/ns/activitystreams",
"id": httpPrefix+"://"+requestedDomain+"/caps/request/"+ocapId, "id": httpPrefix + "://" + requestedDomain + "/caps/request/" + ocapId,
"type": "Request", "type": "Request",
"capability": requestedCaps, "capability": requestedCaps,
"actor": requestedActor "actor": requestedActor
} }
return ocapRequest return ocapRequest
def capabilitiesAccept(baseDir: str,httpPrefix: str, \
nickname: str,domain: str, port: int, \ def capabilitiesAccept(baseDir: str, httpPrefix: str,
acceptedActor: str, saveToFile: bool, \ nickname: str, domain: str, port: int,
acceptedCaps=["inbox:write","objects:read"]) -> {}: acceptedActor: str, saveToFile: bool,
acceptedCaps=["inbox:write", "objects:read"]) -> {}:
# This gets returned to capabilities requester # This gets returned to capabilities requester
# This could also be added to a follow Accept activity # This could also be added to a follow Accept activity
# reject excessively long actors # reject excessively long actors
if len(acceptedActor)>256: if len(acceptedActor) > 256:
return None return None
fullDomain=domain fullDomain = domain
if port: if port:
if port!=80 and port !=443: if port != 80 and port != 443:
if ':' not in domain: if ':' not in domain:
fullDomain=domain+':'+str(port) fullDomain = domain + ':' + str(port)
# make directories to store capabilities # make directories to store capabilities
ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,acceptedActor,'accept') ocapFilename = \
getOcapFilename(baseDir, nickname, fullDomain, acceptedActor, 'accept')
if not ocapFilename: if not ocapFilename:
return None return None
ocapAccept=None ocapAccept = None
# if the capability already exists then load it from file # if the capability already exists then load it from file
if os.path.isfile(ocapFilename): if os.path.isfile(ocapFilename):
ocapAccept=loadJson(ocapFilename) ocapAccept = loadJson(ocapFilename)
# otherwise create a new capability # otherwise create a new capability
if not ocapAccept: if not ocapAccept:
acceptedActorNickname=getNicknameFromActor(acceptedActor) acceptedActorNickname = getNicknameFromActor(acceptedActor)
if not acceptedActorNickname: if not acceptedActorNickname:
print('WARN: unable to find nickname in '+acceptedActor) print('WARN: unable to find nickname in ' + acceptedActor)
return None return None
acceptedActorDomain,acceptedActorPort=getDomainFromActor(acceptedActor) acceptedActorDomain, acceptedActorPort = \
getDomainFromActor(acceptedActor)
if acceptedActorPort: if acceptedActorPort:
ocapId=acceptedActorNickname+'@'+acceptedActorDomain+':'+str(acceptedActorPort)+'#'+createPassword(32) ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
':' + str(acceptedActorPort) + '#'+createPassword(32)
else: else:
ocapId=acceptedActorNickname+'@'+acceptedActorDomain+'#'+createPassword(32) ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
ocapAccept={ '#' + createPassword(32)
ocapAccept = {
"@context": "https://www.w3.org/ns/activitystreams", "@context": "https://www.w3.org/ns/activitystreams",
"id": httpPrefix+"://"+fullDomain+"/caps/"+ocapId, "id": httpPrefix + "://" + fullDomain + "/caps/" + ocapId,
"type": "Capability", "type": "Capability",
"capability": acceptedCaps, "capability": acceptedCaps,
"scope": acceptedActor, "scope": acceptedActor,
"actor": httpPrefix+"://"+fullDomain "actor": httpPrefix + "://" + fullDomain
} }
if nickname: if nickname:
ocapAccept['actor']=httpPrefix+"://"+fullDomain+'/users/'+nickname ocapAccept['actor'] = \
httpPrefix + "://" + fullDomain + '/users/' + nickname
if saveToFile: if saveToFile:
saveJson(ocapAccept,ocapFilename) saveJson(ocapAccept, ocapFilename)
return ocapAccept return ocapAccept
def capabilitiesGrantedSave(baseDir :str,nickname :str,domain :str,ocap: {}) -> bool:
def capabilitiesGrantedSave(baseDir: str,
nickname: str, domain: str, ocap: {}) -> bool:
"""A capabilities accept is received, so stor it for """A capabilities accept is received, so stor it for
reference when sending to the actor reference when sending to the actor
""" """
if not ocap.get('actor'): if not ocap.get('actor'):
return False return False
ocapFilename=getOcapFilename(baseDir,nickname,domain,ocap['actor'],'granted') ocapFilename = \
getOcapFilename(baseDir, nickname, domain, ocap['actor'], 'granted')
if not ocapFilename: if not ocapFilename:
return False return False
saveJson(ocap,ocapFilename) saveJson(ocap, ocapFilename)
return True return True
def capabilitiesUpdate(baseDir: str,httpPrefix: str, \
nickname: str,domain: str, port: int, \ def capabilitiesUpdate(baseDir: str, httpPrefix: str,
updateActor: str, \ nickname: str, domain: str, port: int,
updateActor: str,
updateCaps: []) -> {}: updateCaps: []) -> {}:
"""Used to sends an update for a change of object capabilities """Used to sends an update for a change of object capabilities
Note that the capability id gets changed with a new random token Note that the capability id gets changed with a new random token
@ -177,17 +199,18 @@ def capabilitiesUpdate(baseDir: str,httpPrefix: str, \
""" """
# reject excessively long actors # reject excessively long actors
if len(updateActor)>256: if len(updateActor) > 256:
return None return None
fullDomain=domain fullDomain = domain
if port: if port:
if port!=80 and port !=443: if port != 80 and port != 443:
if ':' not in domain: if ':' not in domain:
fullDomain=domain+':'+str(port) fullDomain = domain + ':' + str(port)
# Get the filename of the capability # Get the filename of the capability
ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,updateActor,'accept') ocapFilename = \
getOcapFilename(baseDir, nickname, fullDomain, updateActor, 'accept')
if not ocapFilename: if not ocapFilename:
return None return None
@ -196,48 +219,51 @@ def capabilitiesUpdate(baseDir: str,httpPrefix: str, \
return None return None
# create an update activity # create an update activity
ocapUpdate={ ocapUpdate = {
"@context": "https://www.w3.org/ns/activitystreams", "@context": "https://www.w3.org/ns/activitystreams",
'type': 'Update', 'type': 'Update',
'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname, 'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname,
'to': [updateActor], 'to': [updateActor],
'cc': [], 'cc': [],
'object': {} 'object': {}
} }
# read the existing capability # read the existing capability
ocapJson=loadJson(ocapFilename) ocapJson = loadJson(ocapFilename)
# set the new capabilities list. eg. ["inbox:write","objects:read"] # set the new capabilities list. eg. ["inbox:write","objects:read"]
ocapJson['capability']=updateCaps ocapJson['capability'] = updateCaps
# change the id, so that the old capabilities can't continue to be used # change the id, so that the old capabilities can't continue to be used
updateActorNickname=getNicknameFromActor(updateActor) updateActorNickname = getNicknameFromActor(updateActor)
if not updateActorNickname: if not updateActorNickname:
print('WARN: unable to find nickname in '+updateActor) print('WARN: unable to find nickname in ' + updateActor)
return None return None
updateActorDomain,updateActorPort=getDomainFromActor(updateActor) updateActorDomain, updateActorPort = getDomainFromActor(updateActor)
if updateActorPort: if updateActorPort:
ocapId=updateActorNickname+'@'+updateActorDomain+':'+str(updateActorPort)+'#'+createPassword(32) ocapId = updateActorNickname + '@' + updateActorDomain + \
':' + str(updateActorPort) + '#' + createPassword(32)
else: else:
ocapId=updateActorNickname+'@'+updateActorDomain+'#'+createPassword(32) ocapId = updateActorNickname + '@' + updateActorDomain + \
ocapJson['id']=httpPrefix+"://"+fullDomain+"/caps/"+ocapId '#' + createPassword(32)
ocapUpdate['object']=ocapJson ocapJson['id'] = httpPrefix + "://" + fullDomain + "/caps/" + ocapId
ocapUpdate['object'] = ocapJson
# save it again # save it again
saveJson(ocapJson,ocapFilename) saveJson(ocapJson, ocapFilename)
return ocapUpdate return ocapUpdate
def capabilitiesReceiveUpdate(baseDir :str, \
nickname :str,domain :str,port :int, \ def capabilitiesReceiveUpdate(baseDir: str,
actor :str, \ nickname: str, domain: str, port: int,
newCapabilitiesId :str, \ actor: str,
capabilityList :[], debug :bool) -> bool: newCapabilitiesId: str,
capabilityList: [], debug: bool) -> bool:
"""An update for a capability or the given actor has arrived """An update for a capability or the given actor has arrived
""" """
ocapFilename= \ ocapFilename = \
getOcapFilename(baseDir,nickname,domain,actor,'granted') getOcapFilename(baseDir, nickname, domain, actor, 'granted')
if not ocapFilename: if not ocapFilename:
return False return False
@ -247,11 +273,11 @@ def capabilitiesReceiveUpdate(baseDir :str, \
print(ocapFilename) print(ocapFilename)
return False return False
ocapJson=loadJson(ocapFilename) ocapJson = loadJson(ocapFilename)
if ocapJson: if ocapJson:
ocapJson['id']=newCapabilitiesId ocapJson['id'] = newCapabilitiesId
ocapJson['capability']=capabilityList ocapJson['capability'] = capabilityList
return saveJson(ocapJson,ocapFilename) return saveJson(ocapJson, ocapFilename)
return False return False

View File

@ -10,7 +10,7 @@ echo "Starting static analysis"
for sourceFile in *.py for sourceFile in *.py
do do
if [[ "$sourceFile" == 'tests.py' || "$sourceFile" == 'blurhash.py' || "$sourceFile" == 'capabilities.py' ]]; then if [[ "$sourceFile" == 'tests.py' || "$sourceFile" == 'blurhash.py' ]]; then
continue continue
fi fi
result=$($cmd "$sourceFile") result=$($cmd "$sourceFile")