epicyon/capabilities.py

284 lines
10 KiB
Python

__filename__ = "capabilities.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from auth import createPassword
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import loadJson
from utils import saveJson
def getOcapFilename(baseDir: str,
nickname: str, domain: str,
actor: str, subdir: str) -> str:
"""Returns the filename for a particular capability accepted or granted
Also creates directories as needed
"""
if not actor:
return None
if ':' in domain:
domain = domain.split(':')[0]
if not os.path.isdir(baseDir + '/accounts'):
os.mkdir(baseDir + '/accounts')
ocDir = baseDir + '/accounts/' + nickname + '@' + domain
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
ocDir = baseDir + '/accounts/' + nickname + '@' + domain + '/ocap'
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
ocDir = baseDir + '/accounts/' + \
nickname + '@' + domain + '/ocap/' + subdir
if not os.path.isdir(ocDir):
os.mkdir(ocDir)
return baseDir + '/accounts/' + \
nickname + '@' + domain + '/ocap/' + \
subdir + '/' + actor.replace('/', '#') + '.json'
def CapablePost(postJson: {}, capabilityList: [], debug: bool) -> bool:
"""Determines whether a post arriving in the inbox
should be accepted accoring to the list of capabilities
"""
if postJson.get('type'):
# No announces/repeats
if postJson['type'] == 'Announce':
if 'inbox:noannounce' in capabilityList:
if debug:
print('DEBUG: ' +
'inbox post rejected because inbox:noannounce')
return False
# No likes
if postJson['type'] == 'Like':
if 'inbox:nolike' in capabilityList:
if debug:
print('DEBUG: ' +
'inbox post rejected because inbox:nolike')
return False
if postJson['type'] == 'Create':
if postJson.get('object'):
# Does this have a reply?
if postJson['object'].get('inReplyTo'):
if postJson['object']['inReplyTo']:
if 'inbox:noreply' in capabilityList:
if debug:
print('DEBUG: ' +
'inbox post rejected because ' +
'inbox:noreply')
return False
# are content warnings enforced?
if postJson['object'].get('sensitive'):
if not postJson['object']['sensitive']:
if 'inbox:cw' in capabilityList:
if debug:
print('DEBUG: ' +
'inbox post rejected because inbox:cw')
return False
# content warning must have non-zero summary
if postJson['object'].get('summary'):
if len(postJson['object']['summary']) < 2:
if 'inbox:cw' in capabilityList:
if debug:
print('DEBUG: ' +
'inbox post rejected because ' +
'inbox:cw, summary missing')
return False
if 'inbox:write' in capabilityList:
return True
return True
def capabilitiesRequest(baseDir: str, httpPrefix: str, domain: str,
requestedActor: str, requestedDomain: str,
requestedCaps=["inbox:write", "objects:read"]) -> {}:
# This is sent to the capabilities endpoint /caps/new
# which could be instance wide or for a particular person
# This could also be added to a follow activity
ocapId = createPassword(32)
ocapRequest = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": httpPrefix + "://" + requestedDomain + "/caps/request/" + ocapId,
"type": "Request",
"capability": requestedCaps,
"actor": requestedActor
}
return ocapRequest
def capabilitiesAccept(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
acceptedActor: str, saveToFile: bool,
acceptedCaps=["inbox:write", "objects:read"]) -> {}:
# This gets returned to capabilities requester
# This could also be added to a follow Accept activity
# reject excessively long actors
if len(acceptedActor) > 256:
return None
fullDomain = domain
if port:
if port != 80 and port != 443:
if ':' not in domain:
fullDomain = domain + ':' + str(port)
# make directories to store capabilities
ocapFilename = \
getOcapFilename(baseDir, nickname, fullDomain, acceptedActor, 'accept')
if not ocapFilename:
return None
ocapAccept = None
# if the capability already exists then load it from file
if os.path.isfile(ocapFilename):
ocapAccept = loadJson(ocapFilename)
# otherwise create a new capability
if not ocapAccept:
acceptedActorNickname = getNicknameFromActor(acceptedActor)
if not acceptedActorNickname:
print('WARN: unable to find nickname in ' + acceptedActor)
return None
acceptedActorDomain, acceptedActorPort = \
getDomainFromActor(acceptedActor)
if acceptedActorPort:
ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
':' + str(acceptedActorPort) + '#'+createPassword(32)
else:
ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
'#' + createPassword(32)
ocapAccept = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": httpPrefix + "://" + fullDomain + "/caps/" + ocapId,
"type": "Capability",
"capability": acceptedCaps,
"scope": acceptedActor,
"actor": httpPrefix + "://" + fullDomain
}
if nickname:
ocapAccept['actor'] = \
httpPrefix + "://" + fullDomain + '/users/' + nickname
if saveToFile:
saveJson(ocapAccept, ocapFilename)
return ocapAccept
def capabilitiesGrantedSave(baseDir: str,
nickname: str, domain: str, ocap: {}) -> bool:
"""A capabilities accept is received, so stor it for
reference when sending to the actor
"""
if not ocap.get('actor'):
return False
ocapFilename = \
getOcapFilename(baseDir, nickname, domain, ocap['actor'], 'granted')
if not ocapFilename:
return False
saveJson(ocap, ocapFilename)
return True
def capabilitiesUpdate(baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
updateActor: str,
updateCaps: []) -> {}:
"""Used to sends an update for a change of object capabilities
Note that the capability id gets changed with a new random token
so that the old capabilities can't continue to be used
"""
# reject excessively long actors
if len(updateActor) > 256:
return None
fullDomain = domain
if port:
if port != 80 and port != 443:
if ':' not in domain:
fullDomain = domain + ':' + str(port)
# Get the filename of the capability
ocapFilename = \
getOcapFilename(baseDir, nickname, fullDomain, updateActor, 'accept')
if not ocapFilename:
return None
# The capability should already exist for it to be updated
if not os.path.isfile(ocapFilename):
return None
# create an update activity
ocapUpdate = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Update',
'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname,
'to': [updateActor],
'cc': [],
'object': {}
}
# read the existing capability
ocapJson = loadJson(ocapFilename)
# set the new capabilities list. eg. ["inbox:write","objects:read"]
ocapJson['capability'] = updateCaps
# change the id, so that the old capabilities can't continue to be used
updateActorNickname = getNicknameFromActor(updateActor)
if not updateActorNickname:
print('WARN: unable to find nickname in ' + updateActor)
return None
updateActorDomain, updateActorPort = getDomainFromActor(updateActor)
if updateActorPort:
ocapId = updateActorNickname + '@' + updateActorDomain + \
':' + str(updateActorPort) + '#' + createPassword(32)
else:
ocapId = updateActorNickname + '@' + updateActorDomain + \
'#' + createPassword(32)
ocapJson['id'] = httpPrefix + "://" + fullDomain + "/caps/" + ocapId
ocapUpdate['object'] = ocapJson
# save it again
saveJson(ocapJson, ocapFilename)
return ocapUpdate
def capabilitiesReceiveUpdate(baseDir: str,
nickname: str, domain: str, port: int,
actor: str,
newCapabilitiesId: str,
capabilityList: [], debug: bool) -> bool:
"""An update for a capability or the given actor has arrived
"""
ocapFilename = \
getOcapFilename(baseDir, nickname, domain, actor, 'granted')
if not ocapFilename:
return False
if not os.path.isfile(ocapFilename):
if debug:
print('DEBUG: capabilities file not found during update')
print(ocapFilename)
return False
ocapJson = loadJson(ocapFilename)
if ocapJson:
ocapJson['id'] = newCapabilitiesId
ocapJson['capability'] = capabilityList
return saveJson(ocapJson, ocapFilename)
return False