epicyon/capabilities.py

259 lines
9.3 KiB
Python

__filename__ = "capabilities.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.0.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
import datetime
import time
import json
import commentjson
from auth import createPassword
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import loadJson
from utils import saveJson
def getOcapFilename(baseDir :str,nickname: str,domain: str,actor :str,subdir: str) -> str:
"""Returns the filename for a particular capability accepted or granted
Also creates directories as needed
"""
if not actor:
return None
if ':' in domain:
domain=domain.split(':')[0]
if not os.path.isdir(baseDir+'/accounts'):
os.mkdir(baseDir+'/accounts')
ocDir=baseDir+'/accounts/'+nickname+'@'+domain
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap'
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
return baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir+'/'+actor.replace('/','#')+'.json'
def CapablePost(postJson: {}, capabilityList: [], debug :bool) -> bool:
"""Determines whether a post arriving in the inbox
should be accepted accoring to the list of capabilities
"""
if postJson.get('type'):
# No announces/repeats
if postJson['type']=='Announce':
if 'inbox:noannounce' in capabilityList:
if debug:
print('DEBUG: inbox post rejected because inbox:noannounce')
return False
# No likes
if postJson['type']=='Like':
if 'inbox:nolike' in capabilityList:
if debug:
print('DEBUG: inbox post rejected because inbox:nolike')
return False
if postJson['type']=='Create':
if postJson.get('object'):
# Does this have a reply?
if postJson['object'].get('inReplyTo'):
if postJson['object']['inReplyTo']:
if 'inbox:noreply' in capabilityList:
if debug:
print('DEBUG: inbox post rejected because inbox:noreply')
return False
# are content warnings enforced?
if postJson['object'].get('sensitive'):
if not postJson['object']['sensitive']:
if 'inbox:cw' in capabilityList:
if debug:
print('DEBUG: inbox post rejected because inbox:cw')
return False
# content warning must have non-zero summary
if postJson['object'].get('summary'):
if len(postJson['object']['summary'])<2:
if 'inbox:cw' in capabilityList:
if debug:
print('DEBUG: inbox post rejected because inbox:cw, summary missing')
return False
if 'inbox:write' in capabilityList:
return True
return True
def capabilitiesRequest(baseDir: str,httpPrefix: str,domain: str, \
requestedActor: str, \
requestedCaps=["inbox:write","objects:read"]) -> {}:
# This is sent to the capabilities endpoint /caps/new
# which could be instance wide or for a particular person
# This could also be added to a follow activity
ocapId=createPassword(32)
ocapRequest = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": httpPrefix+"://"+requestedDomain+"/caps/request/"+ocapId,
"type": "Request",
"capability": requestedCaps,
"actor": requestedActor
}
return ocapRequest
def capabilitiesAccept(baseDir: str,httpPrefix: str, \
nickname: str,domain: str, port: int, \
acceptedActor: str, saveToFile: bool, \
acceptedCaps=["inbox:write","objects:read"]) -> {}:
# This gets returned to capabilities requester
# This could also be added to a follow Accept activity
# reject excessively long actors
if len(acceptedActor)>256:
return None
fullDomain=domain
if port:
if port!=80 and port !=443:
if ':' not in domain:
fullDomain=domain+':'+str(port)
# make directories to store capabilities
ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,acceptedActor,'accept')
if not ocapFilename:
return None
ocapAccept=None
# if the capability already exists then load it from file
if os.path.isfile(ocapFilename):
ocapAccept=loadJson(ocapFilename)
# otherwise create a new capability
if not ocapAccept:
acceptedActorNickname=getNicknameFromActor(acceptedActor)
if not acceptedActorNickname:
print('WARN: unable to find nickname in '+acceptedActor)
return None
acceptedActorDomain,acceptedActorPort=getDomainFromActor(acceptedActor)
if acceptedActorPort:
ocapId=acceptedActorNickname+'@'+acceptedActorDomain+':'+str(acceptedActorPort)+'#'+createPassword(32)
else:
ocapId=acceptedActorNickname+'@'+acceptedActorDomain+'#'+createPassword(32)
ocapAccept = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": httpPrefix+"://"+fullDomain+"/caps/"+ocapId,
"type": "Capability",
"capability": acceptedCaps,
"scope": acceptedActor,
"actor": httpPrefix+"://"+fullDomain
}
if nickname:
ocapAccept['actor']=httpPrefix+"://"+fullDomain+'/users/'+nickname
if saveToFile:
saveJson(ocapAccept,ocapFilename)
return ocapAccept
def capabilitiesGrantedSave(baseDir :str,nickname :str,domain :str,ocap: {}) -> bool:
"""A capabilities accept is received, so stor it for
reference when sending to the actor
"""
if not ocap.get('actor'):
return False
ocapFilename=getOcapFilename(baseDir,nickname,domain,ocap['actor'],'granted')
if not ocapFilename:
return False
saveJson(ocap,ocapFilename)
return True
def capabilitiesUpdate(baseDir: str,httpPrefix: str, \
nickname: str,domain: str, port: int, \
updateActor: str, \
updateCaps: []) -> {}:
"""Used to sends an update for a change of object capabilities
Note that the capability id gets changed with a new random token
so that the old capabilities can't continue to be used
"""
# reject excessively long actors
if len(updateActor)>256:
return None
fullDomain=domain
if port:
if port!=80 and port !=443:
if ':' not in domain:
fullDomain=domain+':'+str(port)
# Get the filename of the capability
ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,updateActor,'accept')
if not ocapFilename:
return None
# The capability should already exist for it to be updated
if not os.path.isfile(ocapFilename):
return None
# create an update activity
ocapUpdate = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Update',
'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
'to': [updateActor],
'cc': [],
'object': {}
}
# read the existing capability
ocapJson=loadJson(ocapFilename)
# set the new capabilities list. eg. ["inbox:write","objects:read"]
ocapJson['capability']=updateCaps
# change the id, so that the old capabilities can't continue to be used
updateActorNickname=getNicknameFromActor(updateActor)
if not updateActorNickname:
print('WARN: unable to find nickname in '+updateActor)
return None
updateActorDomain,updateActorPort=getDomainFromActor(updateActor)
if updateActorPort:
ocapId=updateActorNickname+'@'+updateActorDomain+':'+str(updateActorPort)+'#'+createPassword(32)
else:
ocapId=updateActorNickname+'@'+updateActorDomain+'#'+createPassword(32)
ocapJson['id']=httpPrefix+"://"+fullDomain+"/caps/"+ocapId
ocapUpdate['object']=ocapJson
# save it again
saveJson(ocapJson,ocapFilename)
return ocapUpdate
def capabilitiesReceiveUpdate(baseDir :str, \
nickname :str,domain :str,port :int, \
actor :str, \
newCapabilitiesId :str, \
capabilityList :[], debug :bool) -> bool:
"""An update for a capability or the given actor has arrived
"""
ocapFilename= \
getOcapFilename(baseDir,nickname,domain,actor,'granted')
if not ocapFilename:
return False
if not os.path.isfile(ocapFilename):
if debug:
print('DEBUG: capabilities file not found during update')
print(ocapFilename)
return False
ocapJson=loadJson(ocapFilename)
if ocapJson:
ocapJson['id']=newCapabilitiesId
ocapJson['capability']=capabilityList
return saveJson(ocapJson,ocapFilename)
return False