mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			284 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			284 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "capabilities.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.1.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@freedombone.net"
 | |
| __status__ = "Production"
 | |
| 
 | |
| import os
 | |
| from auth import createPassword
 | |
| from utils import getNicknameFromActor
 | |
| from utils import getDomainFromActor
 | |
| from utils import loadJson
 | |
| from utils import saveJson
 | |
| 
 | |
| 
 | |
| def getOcapFilename(baseDir: str,
 | |
|                     nickname: str, domain: str,
 | |
|                     actor: str, subdir: str) -> str:
 | |
|     """Returns the filename for a particular capability accepted or granted
 | |
|     Also creates directories as needed
 | |
|     """
 | |
|     if not actor:
 | |
|         return None
 | |
| 
 | |
|     if ':' in domain:
 | |
|         domain = domain.split(':')[0]
 | |
| 
 | |
|     if not os.path.isdir(baseDir + '/accounts'):
 | |
|         os.mkdir(baseDir + '/accounts')
 | |
| 
 | |
|     ocDir = baseDir + '/accounts/' + nickname + '@' + domain
 | |
|     if not os.path.isdir(ocDir):
 | |
|         os.mkdir(ocDir)
 | |
| 
 | |
|     ocDir = baseDir + '/accounts/' + nickname + '@' + domain + '/ocap'
 | |
|     if not os.path.isdir(ocDir):
 | |
|         os.mkdir(ocDir)
 | |
| 
 | |
|     ocDir = baseDir + '/accounts/' + \
 | |
|         nickname + '@' + domain + '/ocap/' + subdir
 | |
|     if not os.path.isdir(ocDir):
 | |
|         os.mkdir(ocDir)
 | |
| 
 | |
|     return baseDir + '/accounts/' + \
 | |
|         nickname + '@' + domain + '/ocap/' + \
 | |
|         subdir + '/' + actor.replace('/', '#') + '.json'
 | |
| 
 | |
| 
 | |
| def CapablePost(postJson: {}, capabilityList: [], debug: bool) -> bool:
 | |
|     """Determines whether a post arriving in the inbox
 | |
|     should be accepted accoring to the list of capabilities
 | |
|     """
 | |
|     if postJson.get('type'):
 | |
|         # No announces/repeats
 | |
|         if postJson['type'] == 'Announce':
 | |
|             if 'inbox:noannounce' in capabilityList:
 | |
|                 if debug:
 | |
|                     print('DEBUG: ' +
 | |
|                           'inbox post rejected because inbox:noannounce')
 | |
|                 return False
 | |
|         # No likes
 | |
|         if postJson['type'] == 'Like':
 | |
|             if 'inbox:nolike' in capabilityList:
 | |
|                 if debug:
 | |
|                     print('DEBUG: ' +
 | |
|                           'inbox post rejected because inbox:nolike')
 | |
|                 return False
 | |
|         if postJson['type'] == 'Create':
 | |
|             if postJson.get('object'):
 | |
|                 # Does this have a reply?
 | |
|                 if postJson['object'].get('inReplyTo'):
 | |
|                     if postJson['object']['inReplyTo']:
 | |
|                         if 'inbox:noreply' in capabilityList:
 | |
|                             if debug:
 | |
|                                 print('DEBUG: ' +
 | |
|                                       'inbox post rejected because ' +
 | |
|                                       'inbox:noreply')
 | |
|                             return False
 | |
|                 # are content warnings enforced?
 | |
|                 if postJson['object'].get('sensitive'):
 | |
|                     if not postJson['object']['sensitive']:
 | |
|                         if 'inbox:cw' in capabilityList:
 | |
|                             if debug:
 | |
|                                 print('DEBUG: ' +
 | |
|                                       'inbox post rejected because inbox:cw')
 | |
|                             return False
 | |
|                 # content warning must have non-zero summary
 | |
|                 if postJson['object'].get('summary'):
 | |
|                     if len(postJson['object']['summary']) < 2:
 | |
|                         if 'inbox:cw' in capabilityList:
 | |
|                             if debug:
 | |
|                                 print('DEBUG: ' +
 | |
|                                       'inbox post rejected because ' +
 | |
|                                       'inbox:cw, summary missing')
 | |
|                             return False
 | |
|     if 'inbox:write' in capabilityList:
 | |
|         return True
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def capabilitiesRequest(baseDir: str, httpPrefix: str, domain: str,
 | |
|                         requestedActor: str, requestedDomain: str,
 | |
|                         requestedCaps=["inbox:write", "objects:read"]) -> {}:
 | |
|     # This is sent to the capabilities endpoint /caps/new
 | |
|     # which could be instance wide or for a particular person
 | |
|     # This could also be added to a follow activity
 | |
|     ocapId = createPassword(32)
 | |
|     ocapRequest = {
 | |
|         "@context": "https://www.w3.org/ns/activitystreams",
 | |
|         "id": httpPrefix + "://" + requestedDomain + "/caps/request/" + ocapId,
 | |
|         "type": "Request",
 | |
|         "capability": requestedCaps,
 | |
|         "actor": requestedActor
 | |
|     }
 | |
|     return ocapRequest
 | |
| 
 | |
| 
 | |
| def capabilitiesAccept(baseDir: str, httpPrefix: str,
 | |
|                        nickname: str, domain: str, port: int,
 | |
|                        acceptedActor: str, saveToFile: bool,
 | |
|                        acceptedCaps=["inbox:write", "objects:read"]) -> {}:
 | |
|     # This gets returned to capabilities requester
 | |
|     # This could also be added to a follow Accept activity
 | |
| 
 | |
|     # reject excessively long actors
 | |
|     if len(acceptedActor) > 256:
 | |
|         return None
 | |
| 
 | |
|     fullDomain = domain
 | |
|     if port:
 | |
|         if port != 80 and port != 443:
 | |
|             if ':' not in domain:
 | |
|                 fullDomain = domain + ':' + str(port)
 | |
| 
 | |
|     # make directories to store capabilities
 | |
|     ocapFilename = \
 | |
|         getOcapFilename(baseDir, nickname, fullDomain, acceptedActor, 'accept')
 | |
|     if not ocapFilename:
 | |
|         return None
 | |
|     ocapAccept = None
 | |
| 
 | |
|     # if the capability already exists then load it from file
 | |
|     if os.path.isfile(ocapFilename):
 | |
|         ocapAccept = loadJson(ocapFilename)
 | |
|     # otherwise create a new capability
 | |
|     if not ocapAccept:
 | |
|         acceptedActorNickname = getNicknameFromActor(acceptedActor)
 | |
|         if not acceptedActorNickname:
 | |
|             print('WARN: unable to find nickname in ' + acceptedActor)
 | |
|             return None
 | |
|         acceptedActorDomain, acceptedActorPort = \
 | |
|             getDomainFromActor(acceptedActor)
 | |
|         if acceptedActorPort:
 | |
|             ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
 | |
|                 ':' + str(acceptedActorPort) + '#'+createPassword(32)
 | |
|         else:
 | |
|             ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
 | |
|                 '#' + createPassword(32)
 | |
|         ocapAccept = {
 | |
|             "@context": "https://www.w3.org/ns/activitystreams",
 | |
|             "id": httpPrefix + "://" + fullDomain + "/caps/" + ocapId,
 | |
|             "type": "Capability",
 | |
|             "capability": acceptedCaps,
 | |
|             "scope": acceptedActor,
 | |
|             "actor": httpPrefix + "://" + fullDomain
 | |
|         }
 | |
|         if nickname:
 | |
|             ocapAccept['actor'] = \
 | |
|                 httpPrefix + "://" + fullDomain + '/users/' + nickname
 | |
| 
 | |
|     if saveToFile:
 | |
|         saveJson(ocapAccept, ocapFilename)
 | |
|     return ocapAccept
 | |
| 
 | |
| 
 | |
| def capabilitiesGrantedSave(baseDir: str,
 | |
|                             nickname: str, domain: str, ocap: {}) -> bool:
 | |
|     """A capabilities accept is received, so stor it for
 | |
|     reference when sending to the actor
 | |
|     """
 | |
|     if not ocap.get('actor'):
 | |
|         return False
 | |
|     ocapFilename = \
 | |
|         getOcapFilename(baseDir, nickname, domain, ocap['actor'], 'granted')
 | |
|     if not ocapFilename:
 | |
|         return False
 | |
|     saveJson(ocap, ocapFilename)
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def capabilitiesUpdate(baseDir: str, httpPrefix: str,
 | |
|                        nickname: str, domain: str, port: int,
 | |
|                        updateActor: str,
 | |
|                        updateCaps: []) -> {}:
 | |
|     """Used to sends an update for a change of object capabilities
 | |
|     Note that the capability id gets changed with a new random token
 | |
|     so that the old capabilities can't continue to be used
 | |
|     """
 | |
| 
 | |
|     # reject excessively long actors
 | |
|     if len(updateActor) > 256:
 | |
|         return None
 | |
| 
 | |
|     fullDomain = domain
 | |
|     if port:
 | |
|         if port != 80 and port != 443:
 | |
|             if ':' not in domain:
 | |
|                 fullDomain = domain + ':' + str(port)
 | |
| 
 | |
|     # Get the filename of the capability
 | |
|     ocapFilename = \
 | |
|         getOcapFilename(baseDir, nickname, fullDomain, updateActor, 'accept')
 | |
|     if not ocapFilename:
 | |
|         return None
 | |
| 
 | |
|     # The capability should already exist for it to be updated
 | |
|     if not os.path.isfile(ocapFilename):
 | |
|         return None
 | |
| 
 | |
|     # create an update activity
 | |
|     ocapUpdate = {
 | |
|         "@context": "https://www.w3.org/ns/activitystreams",
 | |
|         'type': 'Update',
 | |
|         'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname,
 | |
|         'to': [updateActor],
 | |
|         'cc': [],
 | |
|         'object': {}
 | |
|     }
 | |
| 
 | |
|     # read the existing capability
 | |
|     ocapJson = loadJson(ocapFilename)
 | |
| 
 | |
|     # set the new capabilities list. eg. ["inbox:write","objects:read"]
 | |
|     ocapJson['capability'] = updateCaps
 | |
| 
 | |
|     # change the id, so that the old capabilities can't continue to be used
 | |
|     updateActorNickname = getNicknameFromActor(updateActor)
 | |
|     if not updateActorNickname:
 | |
|         print('WARN: unable to find nickname in ' + updateActor)
 | |
|         return None
 | |
|     updateActorDomain, updateActorPort = getDomainFromActor(updateActor)
 | |
|     if updateActorPort:
 | |
|         ocapId = updateActorNickname + '@' + updateActorDomain + \
 | |
|             ':' + str(updateActorPort) + '#' + createPassword(32)
 | |
|     else:
 | |
|         ocapId = updateActorNickname + '@' + updateActorDomain + \
 | |
|             '#' + createPassword(32)
 | |
|     ocapJson['id'] = httpPrefix + "://" + fullDomain + "/caps/" + ocapId
 | |
|     ocapUpdate['object'] = ocapJson
 | |
| 
 | |
|     # save it again
 | |
|     saveJson(ocapJson, ocapFilename)
 | |
| 
 | |
|     return ocapUpdate
 | |
| 
 | |
| 
 | |
| def capabilitiesReceiveUpdate(baseDir: str,
 | |
|                               nickname: str, domain: str, port: int,
 | |
|                               actor: str,
 | |
|                               newCapabilitiesId: str,
 | |
|                               capabilityList: [], debug: bool) -> bool:
 | |
|     """An update for a capability or the given actor has arrived
 | |
|     """
 | |
|     ocapFilename = \
 | |
|         getOcapFilename(baseDir, nickname, domain, actor, 'granted')
 | |
|     if not ocapFilename:
 | |
|         return False
 | |
| 
 | |
|     if not os.path.isfile(ocapFilename):
 | |
|         if debug:
 | |
|             print('DEBUG: capabilities file not found during update')
 | |
|             print(ocapFilename)
 | |
|         return False
 | |
| 
 | |
|     ocapJson = loadJson(ocapFilename)
 | |
| 
 | |
|     if ocapJson:
 | |
|         ocapJson['id'] = newCapabilitiesId
 | |
|         ocapJson['capability'] = capabilityList
 | |
| 
 | |
|         return saveJson(ocapJson, ocapFilename)
 | |
|     return False
 |