mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			199 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			199 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "devices.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.3.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@libreserver.org"
 | |
| __status__ = "Production"
 | |
| __module_group__ = "Security"
 | |
| 
 | |
| # REST API overview
 | |
| #
 | |
| # To support Olm, the following APIs are required:
 | |
| #
 | |
| #  * Uploading keys for a device (current app)
 | |
| #    POST /api/v1/crypto/keys/upload
 | |
| #
 | |
| #  * Querying available devices of people you want to establish a session with
 | |
| #    POST /api/v1/crypto/keys/query
 | |
| #
 | |
| #  * Claiming a pre-key (one-time-key) for each device you want to establish
 | |
| #    a session with
 | |
| #    POST /api/v1/crypto/keys/claim
 | |
| #
 | |
| #  * Sending encrypted messages directly to specific devices of other people
 | |
| #    POST /api/v1/crypto/delivery
 | |
| #
 | |
| #  * Collect encrypted messages addressed to the current device
 | |
| #    GET /api/v1/crypto/encrypted_messages
 | |
| #
 | |
| #  * Clear all encrypted messages addressed to the current device
 | |
| #    POST /api/v1/crypto/encrypted_messages/clear
 | |
| 
 | |
| import os
 | |
| from utils import load_json
 | |
| from utils import save_json
 | |
| from utils import acct_dir
 | |
| from utils import local_actor_url
 | |
| 
 | |
| 
 | |
| def e2e_eremove_device(base_dir: str, nickname: str, domain: str,
 | |
|                        device_id: str) -> bool:
 | |
|     """Unregisters a device for e2ee
 | |
|     """
 | |
|     person_dir = acct_dir(base_dir, nickname, domain)
 | |
|     device_filename = person_dir + '/devices/' + device_id + '.json'
 | |
|     if os.path.isfile(device_filename):
 | |
|         try:
 | |
|             os.remove(device_filename)
 | |
|         except OSError:
 | |
|             print('EX: e2e_eremove_device unable to delete ' + device_filename)
 | |
|         return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def e2e_evalid_device(deviceJson: {}) -> bool:
 | |
|     """Returns true if the given json contains valid device keys
 | |
|     """
 | |
|     if not isinstance(deviceJson, dict):
 | |
|         return False
 | |
|     if not deviceJson.get('deviceId'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['deviceId'], str):
 | |
|         return False
 | |
|     if not deviceJson.get('type'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['type'], str):
 | |
|         return False
 | |
|     if not deviceJson.get('name'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['name'], str):
 | |
|         return False
 | |
|     if deviceJson['type'] != 'Device':
 | |
|         return False
 | |
|     if not deviceJson.get('claim'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['claim'], str):
 | |
|         return False
 | |
|     if not deviceJson.get('fingerprintKey'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['fingerprintKey'], dict):
 | |
|         return False
 | |
|     if not deviceJson['fingerprintKey'].get('type'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['fingerprintKey']['type'], str):
 | |
|         return False
 | |
|     if not deviceJson['fingerprintKey'].get('publicKeyBase64'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['fingerprintKey']['publicKeyBase64'], str):
 | |
|         return False
 | |
|     if not deviceJson.get('identityKey'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['identityKey'], dict):
 | |
|         return False
 | |
|     if not deviceJson['identityKey'].get('type'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['identityKey']['type'], str):
 | |
|         return False
 | |
|     if not deviceJson['identityKey'].get('publicKeyBase64'):
 | |
|         return False
 | |
|     if not isinstance(deviceJson['identityKey']['publicKeyBase64'], str):
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def e2e_eadd_device(base_dir: str, nickname: str, domain: str,
 | |
|                     device_id: str, name: str, claim_url: str,
 | |
|                     fingerprint_public_key: str,
 | |
|                     identity_public_key: str,
 | |
|                     fingerprint_key_type="Ed25519Key",
 | |
|                     identity_key_type="Curve25519Key") -> bool:
 | |
|     """Registers a device for e2ee
 | |
|     claim_url could be something like:
 | |
|         http://localhost:3000/users/admin/claim?id=11119
 | |
|     """
 | |
|     if ' ' in device_id or '/' in device_id or \
 | |
|        '?' in device_id or '#' in device_id or \
 | |
|        '.' in device_id:
 | |
|         return False
 | |
|     person_dir = acct_dir(base_dir, nickname, domain)
 | |
|     if not os.path.isdir(person_dir):
 | |
|         return False
 | |
|     if not os.path.isdir(person_dir + '/devices'):
 | |
|         os.mkdir(person_dir + '/devices')
 | |
|     device_dict = {
 | |
|         "deviceId": device_id,
 | |
|         "type": "Device",
 | |
|         "name": name,
 | |
|         "claim": claim_url,
 | |
|         "fingerprintKey": {
 | |
|             "type": fingerprint_key_type,
 | |
|             "publicKeyBase64": fingerprint_public_key
 | |
|         },
 | |
|         "identityKey": {
 | |
|             "type": identity_key_type,
 | |
|             "publicKeyBase64": identity_public_key
 | |
|         }
 | |
|     }
 | |
|     device_filename = person_dir + '/devices/' + device_id + '.json'
 | |
|     return save_json(device_dict, device_filename)
 | |
| 
 | |
| 
 | |
| def e2e_edevices_collection(base_dir: str, nickname: str, domain: str,
 | |
|                             domain_full: str, http_prefix: str) -> {}:
 | |
|     """Returns a list of registered devices
 | |
|     """
 | |
|     person_dir = acct_dir(base_dir, nickname, domain)
 | |
|     if not os.path.isdir(person_dir):
 | |
|         return {}
 | |
|     person_id = local_actor_url(http_prefix, nickname, domain_full)
 | |
|     if not os.path.isdir(person_dir + '/devices'):
 | |
|         os.mkdir(person_dir + '/devices')
 | |
|     device_list = []
 | |
|     for _, _, files in os.walk(person_dir + '/devices/'):
 | |
|         for dev in files:
 | |
|             if not dev.endswith('.json'):
 | |
|                 continue
 | |
|             device_filename = os.path.join(person_dir + '/devices', dev)
 | |
|             dev_json = load_json(device_filename)
 | |
|             if dev_json:
 | |
|                 device_list.append(dev_json)
 | |
|         break
 | |
| 
 | |
|     devices_dict = {
 | |
|         'id': person_id + '/collections/devices',
 | |
|         'type': 'Collection',
 | |
|         'totalItems': len(device_list),
 | |
|         'items': device_list
 | |
|     }
 | |
|     return devices_dict
 | |
| 
 | |
| 
 | |
| def e2e_edecrypt_message_from_device(message_json: {}) -> str:
 | |
|     """Locally decrypts a message on the device.
 | |
|     This should probably be a link to a local script
 | |
|     or native app, such that what the user sees isn't
 | |
|     something which the server could get access to.
 | |
|     """
 | |
|     # TODO
 | |
|     #  {
 | |
|     #    "type": "EncryptedMessage",
 | |
|     #    "messageType": 0,
 | |
|     #    "cipherText": "...",
 | |
|     #    "digest": {
 | |
|     #      "type": "Digest",
 | |
|     #      "digestAlgorithm": "http://www.w3.org/2000/09/xmldsig#hmac-sha256",
 | |
|     #      "digestValue": "5f6ad31acd64995483d75c7..."
 | |
|     #    },
 | |
|     #    "messageFranking": "...",
 | |
|     #    "attributedTo": {
 | |
|     #      "type": "Device",
 | |
|     #      "deviceId": "11119"
 | |
|     #    },
 | |
|     #    "to": {
 | |
|     #      "type": "Device",
 | |
|     #      "deviceId": "11876"
 | |
|     #    }
 | |
|     #  }
 | |
|     return ''
 |