epicyon/capabilities.py

119 lines
4.1 KiB
Python
Raw Normal View History

__filename__ = "capabilities.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "0.0.1"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
2019-07-06 09:07:24 +00:00
import os
2019-07-07 11:53:32 +00:00
import datetime
import time
import json
import commentjson
from auth import createPassword
2019-07-07 15:51:04 +00:00
def getOcapFilename(baseDir :str,nickname: str,domain: str,actor :str,subdir: str) -> str:
if ':' in domain:
domain=domain.split(':')[0]
2019-07-07 16:33:59 +00:00
if not os.path.isdir(baseDir+'/accounts'):
os.mkdir(baseDir+'/accounts')
2019-07-07 15:51:04 +00:00
2019-07-07 16:33:59 +00:00
ocDir=baseDir+'/accounts/'+nickname+'@'+domain
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap'
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
return baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir+'/'+actor.replace('/','#')+'.json'
2019-07-07 11:53:32 +00:00
def capabilitiesRequest(baseDir: str,httpPrefix: str,domain: str, \
requestedActor: str, \
requestedCaps=["inbox:write","objects:read"]) -> {}:
2019-07-06 09:07:24 +00:00
# This is sent to the capabilities endpoint /caps/new
# which could be instance wide or for a particular person
2019-07-07 11:53:32 +00:00
# This could also be added to a follow activity
ocapId=createPassword(32)
ocapRequest = {
"id": httpPrefix+"://"+requestedDomain+"/caps/request/"+ocapId,
"type": "Request",
2019-07-06 10:38:48 +00:00
"capability": requestedCaps,
2019-07-06 09:15:40 +00:00
"actor": requestedActor
2019-07-06 09:07:24 +00:00
}
2019-07-07 11:53:32 +00:00
return ocapRequest
2019-07-07 15:51:04 +00:00
2019-07-07 13:53:12 +00:00
def capabilitiesAccept(baseDir: str,httpPrefix: str, \
nickname: str,domain: str, port: int, \
2019-07-07 11:53:32 +00:00
acceptedActor: str, saveToFile: bool, \
acceptedCaps=["inbox:write","objects:read"]) -> {}:
2019-07-06 09:07:24 +00:00
# This gets returned to capabilities requester
2019-07-07 11:53:32 +00:00
# This could also be added to a follow Accept activity
# reject excessively long actors
if len(acceptedActor)>256:
return None
2019-07-07 13:53:12 +00:00
fullDomain=domain
if port!=80 and port !=443:
fullDomain=domain+':'+str(port)
2019-07-07 11:53:32 +00:00
# make directories to store capabilities
2019-07-07 15:51:04 +00:00
ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,acceptedActor,'accept')
2019-07-07 11:53:32 +00:00
ocapAccept=None
# if the capability already exists then load it from file
2019-07-07 15:51:04 +00:00
if os.path.isfile(ocapFilename):
with open(ocapFilename, 'r') as fp:
2019-07-07 11:53:32 +00:00
ocapAccept=commentjson.load(fp)
# otherwise create a new capability
if not ocapAccept:
ocapId=createPassword(32)
ocapAccept = {
2019-07-07 13:53:12 +00:00
"id": httpPrefix+"://"+fullDomain+"/caps/"+ocapId,
2019-07-07 11:53:32 +00:00
"type": "Capability",
"capability": acceptedCaps,
"scope": acceptedActor,
2019-07-07 13:53:12 +00:00
"actor": httpPrefix+"://"+fullDomain
2019-07-07 11:53:32 +00:00
}
if nickname:
2019-07-07 13:53:12 +00:00
ocapAccept['actor']=httpPrefix+"://"+fullDomain+'/users/'+nickname
2019-07-06 09:07:24 +00:00
2019-07-07 11:53:32 +00:00
if saveToFile:
2019-07-07 15:51:04 +00:00
with open(ocapFilename, 'w') as fp:
2019-07-07 11:53:32 +00:00
commentjson.dump(ocapAccept, fp, indent=4, sort_keys=False)
return ocapAccept
2019-07-07 15:51:04 +00:00
def capabilitiesGrantedSave(baseDir :str,nickname :str,domain :str,ocap: {}) -> bool:
2019-07-07 11:53:32 +00:00
"""A capabilities accept is received, so stor it for
reference when sending to the actor
"""
if not ocap.get('actor'):
return False
2019-07-07 16:06:38 +00:00
ocapFilename=getOcapFilename(baseDir,nickname,domain,ocap['actor'],'granted')
2019-07-07 15:51:04 +00:00
with open(ocapFilename, 'w') as fp:
2019-07-07 11:53:32 +00:00
commentjson.dump(ocap, fp, indent=4, sort_keys=False)
return True
def isCapable(actor: str,ocapGranted: {},capability: str) -> bool:
2019-07-06 09:07:24 +00:00
# is the given actor capable of using the current resource?
2019-07-07 11:53:32 +00:00
for id,ocap in ocapGranted.items():
if ocap['scope'] in actor:
if capability in ocap['capability']:
return True
return False
def isCapableId(id: str,ocapGranted: {},capability: str) -> bool:
# is the given id capable of using the current resource?
if ocapGranted.get(id):
if ocapGranted['id']['scope'] in actor:
if capability in ocapGranted['id']['capability']:
2019-07-06 10:33:57 +00:00
return True
2019-07-06 09:07:24 +00:00
return False