__filename__ = "capabilities.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.1.0" __maintainer__ = "Bob Mottram" __email__ = "bob@freedombone.net" __status__ = "Production" import os from auth import createPassword from utils import getNicknameFromActor from utils import getDomainFromActor from utils import loadJson from utils import saveJson def getOcapFilename(baseDir: str, nickname: str, domain: str, actor: str, subdir: str) -> str: """Returns the filename for a particular capability accepted or granted Also creates directories as needed """ if not actor: return None if ':' in domain: domain = domain.split(':')[0] if not os.path.isdir(baseDir + '/accounts'): os.mkdir(baseDir + '/accounts') ocDir = baseDir + '/accounts/' + nickname + '@' + domain if not os.path.isdir(ocDir): os.mkdir(ocDir) ocDir = baseDir + '/accounts/' + nickname + '@' + domain + '/ocap' if not os.path.isdir(ocDir): os.mkdir(ocDir) ocDir = baseDir + '/accounts/' + \ nickname + '@' + domain + '/ocap/' + subdir if not os.path.isdir(ocDir): os.mkdir(ocDir) return baseDir + '/accounts/' + \ nickname + '@' + domain + '/ocap/' + \ subdir + '/' + actor.replace('/', '#') + '.json' def CapablePost(postJson: {}, capabilityList: [], debug: bool) -> bool: """Determines whether a post arriving in the inbox should be accepted accoring to the list of capabilities """ if postJson.get('type'): # No announces/repeats if postJson['type'] == 'Announce': if 'inbox:noannounce' in capabilityList: if debug: print('DEBUG: ' + 'inbox post rejected because inbox:noannounce') return False # No likes if postJson['type'] == 'Like': if 'inbox:nolike' in capabilityList: if debug: print('DEBUG: ' + 'inbox post rejected because inbox:nolike') return False if postJson['type'] == 'Create': if postJson.get('object'): # Does this have a reply? if postJson['object'].get('inReplyTo'): if postJson['object']['inReplyTo']: if 'inbox:noreply' in capabilityList: if debug: print('DEBUG: ' + 'inbox post rejected because ' + 'inbox:noreply') return False # are content warnings enforced? if postJson['object'].get('sensitive'): if not postJson['object']['sensitive']: if 'inbox:cw' in capabilityList: if debug: print('DEBUG: ' + 'inbox post rejected because inbox:cw') return False # content warning must have non-zero summary if postJson['object'].get('summary'): if len(postJson['object']['summary']) < 2: if 'inbox:cw' in capabilityList: if debug: print('DEBUG: ' + 'inbox post rejected because ' + 'inbox:cw, summary missing') return False if 'inbox:write' in capabilityList: return True return True def capabilitiesRequest(baseDir: str, httpPrefix: str, domain: str, requestedActor: str, requestedDomain: str, requestedCaps=["inbox:write", "objects:read"]) -> {}: # This is sent to the capabilities endpoint /caps/new # which could be instance wide or for a particular person # This could also be added to a follow activity ocapId = createPassword(32) ocapRequest = { "@context": "https://www.w3.org/ns/activitystreams", "id": httpPrefix + "://" + requestedDomain + "/caps/request/" + ocapId, "type": "Request", "capability": requestedCaps, "actor": requestedActor } return ocapRequest def capabilitiesAccept(baseDir: str, httpPrefix: str, nickname: str, domain: str, port: int, acceptedActor: str, saveToFile: bool, acceptedCaps=["inbox:write", "objects:read"]) -> {}: # This gets returned to capabilities requester # This could also be added to a follow Accept activity # reject excessively long actors if len(acceptedActor) > 256: return None fullDomain = domain if port: if port != 80 and port != 443: if ':' not in domain: fullDomain = domain + ':' + str(port) # make directories to store capabilities ocapFilename = \ getOcapFilename(baseDir, nickname, fullDomain, acceptedActor, 'accept') if not ocapFilename: return None ocapAccept = None # if the capability already exists then load it from file if os.path.isfile(ocapFilename): ocapAccept = loadJson(ocapFilename) # otherwise create a new capability if not ocapAccept: acceptedActorNickname = getNicknameFromActor(acceptedActor) if not acceptedActorNickname: print('WARN: unable to find nickname in ' + acceptedActor) return None acceptedActorDomain, acceptedActorPort = \ getDomainFromActor(acceptedActor) if acceptedActorPort: ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \ ':' + str(acceptedActorPort) + '#'+createPassword(32) else: ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \ '#' + createPassword(32) ocapAccept = { "@context": "https://www.w3.org/ns/activitystreams", "id": httpPrefix + "://" + fullDomain + "/caps/" + ocapId, "type": "Capability", "capability": acceptedCaps, "scope": acceptedActor, "actor": httpPrefix + "://" + fullDomain } if nickname: ocapAccept['actor'] = \ httpPrefix + "://" + fullDomain + '/users/' + nickname if saveToFile: saveJson(ocapAccept, ocapFilename) return ocapAccept def capabilitiesGrantedSave(baseDir: str, nickname: str, domain: str, ocap: {}) -> bool: """A capabilities accept is received, so stor it for reference when sending to the actor """ if not ocap.get('actor'): return False ocapFilename = \ getOcapFilename(baseDir, nickname, domain, ocap['actor'], 'granted') if not ocapFilename: return False saveJson(ocap, ocapFilename) return True def capabilitiesUpdate(baseDir: str, httpPrefix: str, nickname: str, domain: str, port: int, updateActor: str, updateCaps: []) -> {}: """Used to sends an update for a change of object capabilities Note that the capability id gets changed with a new random token so that the old capabilities can't continue to be used """ # reject excessively long actors if len(updateActor) > 256: return None fullDomain = domain if port: if port != 80 and port != 443: if ':' not in domain: fullDomain = domain + ':' + str(port) # Get the filename of the capability ocapFilename = \ getOcapFilename(baseDir, nickname, fullDomain, updateActor, 'accept') if not ocapFilename: return None # The capability should already exist for it to be updated if not os.path.isfile(ocapFilename): return None # create an update activity ocapUpdate = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Update', 'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname, 'to': [updateActor], 'cc': [], 'object': {} } # read the existing capability ocapJson = loadJson(ocapFilename) # set the new capabilities list. eg. ["inbox:write","objects:read"] ocapJson['capability'] = updateCaps # change the id, so that the old capabilities can't continue to be used updateActorNickname = getNicknameFromActor(updateActor) if not updateActorNickname: print('WARN: unable to find nickname in ' + updateActor) return None updateActorDomain, updateActorPort = getDomainFromActor(updateActor) if updateActorPort: ocapId = updateActorNickname + '@' + updateActorDomain + \ ':' + str(updateActorPort) + '#' + createPassword(32) else: ocapId = updateActorNickname + '@' + updateActorDomain + \ '#' + createPassword(32) ocapJson['id'] = httpPrefix + "://" + fullDomain + "/caps/" + ocapId ocapUpdate['object'] = ocapJson # save it again saveJson(ocapJson, ocapFilename) return ocapUpdate def capabilitiesReceiveUpdate(baseDir: str, nickname: str, domain: str, port: int, actor: str, newCapabilitiesId: str, capabilityList: [], debug: bool) -> bool: """An update for a capability or the given actor has arrived """ ocapFilename = \ getOcapFilename(baseDir, nickname, domain, actor, 'granted') if not ocapFilename: return False if not os.path.isfile(ocapFilename): if debug: print('DEBUG: capabilities file not found during update') print(ocapFilename) return False ocapJson = loadJson(ocapFilename) if ocapJson: ocapJson['id'] = newCapabilitiesId ocapJson['capability'] = capabilityList return saveJson(ocapJson, ocapFilename) return False