Remove the moribund e2e code

It wasn't used, and doing crypto would likely involve javascript anyway
merge-requests/30/head
Bob Mottram 2023-05-03 19:16:47 +01:00
parent 4be67440e9
commit ef790a299b
4 changed files with 1 additions and 412 deletions

204
daemon.py
View File

@ -409,9 +409,6 @@ from followingCalendar import add_person_to_calendar
from followingCalendar import remove_person_from_calendar
from notifyOnPost import add_notify_on_post
from notifyOnPost import remove_notify_on_post
from devices import e2e_edevices_collection
from devices import e2e_evalid_device
from devices import e2e_eadd_device
from newswire import get_rs_sfrom_dict
from newswire import rss2header
from newswire import rss2footer
@ -17877,37 +17874,8 @@ class PubServer(BaseHTTPRequestHandler):
curr_session):
return
# list of registered devices for e2ee
# see https://github.com/tootsuite/mastodon/pull/13820
if authorized and users_in_path:
if self.path.endswith('/collections/devices'):
nickname = self.path.split('/users/')
if '/' in nickname:
nickname = nickname.split('/')[0]
dev_json = e2e_edevices_collection(self.server.base_dir,
nickname,
self.server.domain,
self.server.domain_full,
self.server.http_prefix)
msg_str = json.dumps(dev_json, ensure_ascii=False)
msg_str = self._convert_domains(calling_domain,
referer_domain,
msg_str)
msg = msg_str.encode('utf-8')
msglen = len(msg)
accept_str = self.headers['Accept']
protocol_str = \
get_json_content_from_accept(accept_str)
self._set_headers(protocol_str, msglen,
None, calling_domain, False)
self._write(msg)
fitness_performance(getreq_start_time, self.server.fitness,
'_GET', 'registered devices',
self.server.debug)
return
fitness_performance(getreq_start_time, self.server.fitness,
'_GET', 'registered devices done',
'_GET', '_show_blog_page',
self.server.debug)
if html_getreq and users_in_path:
@ -22211,170 +22179,6 @@ class PubServer(BaseHTTPRequestHandler):
curr_session, proxy_type)
return page_number
def _crypto_ap_iread_handle(self):
"""Reads handle
"""
message_bytes = None
max_device_id_length = 2048
length = int(self.headers['Content-length'])
if length >= max_device_id_length:
print('WARN: handle post to crypto API is too long ' +
str(length) + ' bytes')
return {}
try:
message_bytes = self.rfile.read(length)
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('WARN: handle POST message_bytes ' +
'connection reset by peer')
else:
print('WARN: handle POST message_bytes socket error')
return {}
except ValueError as ex:
print('EX: handle POST message_bytes rfile.read failed ' +
str(ex))
return {}
len_message = len(message_bytes)
if len_message > 2048:
print('WARN: handle post to crypto API is too long ' +
str(len_message) + ' bytes')
return {}
handle = message_bytes.decode("utf-8")
if not handle:
return None
if '@' not in handle:
return None
if '[' in handle:
return json.loads(message_bytes)
if handle.startswith('@'):
handle = handle[1:]
if '@' not in handle:
return None
return handle.strip()
def _crypto_ap_iread_json(self) -> {}:
"""Obtains json from POST to the crypto API
"""
message_bytes = None
max_crypto_message_length = 10240
length = int(self.headers['Content-length'])
if length >= max_crypto_message_length:
print('WARN: post to crypto API is too long ' +
str(length) + ' bytes')
return {}
try:
message_bytes = self.rfile.read(length)
except SocketError as ex:
if ex.errno == errno.ECONNRESET:
print('WARN: POST message_bytes ' +
'connection reset by peer')
else:
print('WARN: POST message_bytes socket error')
return {}
except ValueError as ex:
print('EX: POST message_bytes rfile.read failed, ' + str(ex))
return {}
len_message = len(message_bytes)
if len_message > 10240:
print('WARN: post to crypto API is too long ' +
str(len_message) + ' bytes')
return {}
return json.loads(message_bytes)
def _crypto_api_query(self, calling_domain: str,
referer_domain: str) -> bool:
handle = self._crypto_ap_iread_handle()
if not handle:
return False
if isinstance(handle, str):
person_dir = acct_handle_dir(self.server.base_dir, handle)
if not os.path.isdir(person_dir + '/devices'):
return False
devices_list = []
for _, _, files in os.walk(person_dir + '/devices'):
for fname in files:
device_filename = \
os.path.join(person_dir + '/devices', fname)
if not os.path.isfile(device_filename):
continue
content_json = load_json(device_filename)
if content_json:
devices_list.append(content_json)
break
# return the list of devices for this handle
msg_str = \
json.dumps(devices_list, ensure_ascii=False)
msg_str = self._convert_domains(calling_domain,
referer_domain,
msg_str)
msg = msg_str.encode('utf-8')
msglen = len(msg)
accept_str = self.headers['Accept']
protocol_str = \
get_json_content_from_accept(accept_str)
self._set_headers(protocol_str, msglen,
None, calling_domain, False)
self._write(msg)
return True
return False
def _crypto_api(self, path: str, authorized: bool,
calling_domain: str, referer_domain: str) -> None:
"""POST or GET with the crypto API
"""
if authorized and path.startswith('/api/v1/crypto/keys/upload'):
# register a device to an authorized account
if not self.authorized_nickname:
self._400()
return
device_keys = self._crypto_ap_iread_json()
if not device_keys:
self._400()
return
if isinstance(device_keys, dict):
if not e2e_evalid_device(device_keys):
self._400()
return
fingerprint_key = \
device_keys['fingerprint_key']['publicKeyBase64']
e2e_eadd_device(self.server.base_dir,
self.authorized_nickname,
self.server.domain,
device_keys['deviceId'],
device_keys['name'],
device_keys['claim'],
fingerprint_key,
device_keys['identityKey']['publicKeyBase64'],
device_keys['fingerprint_key']['type'],
device_keys['identityKey']['type'])
self._200()
return
self._400()
elif path.startswith('/api/v1/crypto/keys/query'):
# given a handle (nickname@domain) return a list of the devices
# registered to that handle
if not self._crypto_api_query(calling_domain, referer_domain):
self._400()
elif path.startswith('/api/v1/crypto/keys/claim'):
# TODO
self._200()
elif authorized and path.startswith('/api/v1/crypto/delivery'):
# TODO
self._200()
elif (authorized and
path.startswith('/api/v1/crypto/encrypted_messages/clear')):
# TODO
self._200()
elif path.startswith('/api/v1/crypto/encrypted_messages'):
# TODO
self._200()
else:
self._400()
def do_POST(self):
if self.server.starting_daemon:
return
@ -22487,12 +22291,6 @@ class PubServer(BaseHTTPRequestHandler):
print('POST Not authorized')
print(str(self.headers))
if self.path.startswith('/api/v1/crypto/'):
self._crypto_api(self.path, authorized,
calling_domain, calling_domain)
self.server.postreq_busy = False
return
# if this is a POST to the outbox then check authentication
self.outbox_authenticated = False
self.post_to_nickname = None

View File

@ -1,198 +0,0 @@
__filename__ = "devices.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.4.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Security"
# REST API overview
#
# To support Olm, the following APIs are required:
#
# * Uploading keys for a device (current app)
# POST /api/v1/crypto/keys/upload
#
# * Querying available devices of people you want to establish a session with
# POST /api/v1/crypto/keys/query
#
# * Claiming a pre-key (one-time-key) for each device you want to establish
# a session with
# POST /api/v1/crypto/keys/claim
#
# * Sending encrypted messages directly to specific devices of other people
# POST /api/v1/crypto/delivery
#
# * Collect encrypted messages addressed to the current device
# GET /api/v1/crypto/encrypted_messages
#
# * Clear all encrypted messages addressed to the current device
# POST /api/v1/crypto/encrypted_messages/clear
import os
from utils import load_json
from utils import save_json
from utils import acct_dir
from utils import local_actor_url
def e2e_eremove_device(base_dir: str, nickname: str, domain: str,
device_id: str) -> bool:
"""Unregisters a device for e2ee
"""
person_dir = acct_dir(base_dir, nickname, domain)
device_filename = person_dir + '/devices/' + device_id + '.json'
if os.path.isfile(device_filename):
try:
os.remove(device_filename)
except OSError:
print('EX: e2e_eremove_device unable to delete ' + device_filename)
return True
return False
def e2e_evalid_device(deviceJson: {}) -> bool:
"""Returns true if the given json contains valid device keys
"""
if not isinstance(deviceJson, dict):
return False
if not deviceJson.get('deviceId'):
return False
if not isinstance(deviceJson['deviceId'], str):
return False
if not deviceJson.get('type'):
return False
if not isinstance(deviceJson['type'], str):
return False
if not deviceJson.get('name'):
return False
if not isinstance(deviceJson['name'], str):
return False
if deviceJson['type'] != 'Device':
return False
if not deviceJson.get('claim'):
return False
if not isinstance(deviceJson['claim'], str):
return False
if not deviceJson.get('fingerprintKey'):
return False
if not isinstance(deviceJson['fingerprintKey'], dict):
return False
if not deviceJson['fingerprintKey'].get('type'):
return False
if not isinstance(deviceJson['fingerprintKey']['type'], str):
return False
if not deviceJson['fingerprintKey'].get('publicKeyBase64'):
return False
if not isinstance(deviceJson['fingerprintKey']['publicKeyBase64'], str):
return False
if not deviceJson.get('identityKey'):
return False
if not isinstance(deviceJson['identityKey'], dict):
return False
if not deviceJson['identityKey'].get('type'):
return False
if not isinstance(deviceJson['identityKey']['type'], str):
return False
if not deviceJson['identityKey'].get('publicKeyBase64'):
return False
if not isinstance(deviceJson['identityKey']['publicKeyBase64'], str):
return False
return True
def e2e_eadd_device(base_dir: str, nickname: str, domain: str,
device_id: str, name: str, claim_url: str,
fingerprint_public_key: str,
identity_public_key: str,
fingerprint_key_type="Ed25519Key",
identity_key_type="Curve25519Key") -> bool:
"""Registers a device for e2ee
claim_url could be something like:
http://localhost:3000/users/admin/claim?id=11119
"""
if ' ' in device_id or '/' in device_id or \
'?' in device_id or '#' in device_id or \
'.' in device_id:
return False
person_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(person_dir):
return False
if not os.path.isdir(person_dir + '/devices'):
os.mkdir(person_dir + '/devices')
device_dict = {
"deviceId": device_id,
"type": "Device",
"name": name,
"claim": claim_url,
"fingerprintKey": {
"type": fingerprint_key_type,
"publicKeyBase64": fingerprint_public_key
},
"identityKey": {
"type": identity_key_type,
"publicKeyBase64": identity_public_key
}
}
device_filename = person_dir + '/devices/' + device_id + '.json'
return save_json(device_dict, device_filename)
def e2e_edevices_collection(base_dir: str, nickname: str, domain: str,
domain_full: str, http_prefix: str) -> {}:
"""Returns a list of registered devices
"""
person_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(person_dir):
return {}
person_id = local_actor_url(http_prefix, nickname, domain_full)
if not os.path.isdir(person_dir + '/devices'):
os.mkdir(person_dir + '/devices')
device_list = []
for _, _, files in os.walk(person_dir + '/devices/'):
for dev in files:
if not dev.endswith('.json'):
continue
device_filename = os.path.join(person_dir + '/devices', dev)
dev_json = load_json(device_filename)
if dev_json:
device_list.append(dev_json)
break
devices_dict = {
'id': person_id + '/collections/devices',
'type': 'Collection',
'totalItems': len(device_list),
'items': device_list
}
return devices_dict
def e2e_edecrypt_message_from_device(message_json: {}) -> str:
"""Locally decrypts a message on the device.
This should probably be a link to a local script
or native app, such that what the user sees isn't
something which the server could get access to.
"""
# TODO
# {
# "type": "EncryptedMessage",
# "messageType": 0,
# "cipherText": "...",
# "digest": {
# "type": "Digest",
# "digestAlgorithm": "http://www.w3.org/2000/09/xmldsig#hmac-sha256",
# "digestValue": "5f6ad31acd64995483d75c7..."
# },
# "messageFranking": "...",
# "attributedTo": {
# "type": "Device",
# "deviceId": "11119"
# },
# "to": {
# "type": "Device",
# "deviceId": "11876"
# }
# }
return ''

View File

@ -5579,7 +5579,6 @@ def _test_functions():
'create_server_alice',
'create_server_bob',
'create_server_eve',
'e2e_eremove_device',
'setOrganizationScheme',
'fill_headers',
'_nothing',

View File

@ -97,7 +97,6 @@ from webapp_utils import html_footer
from webapp_utils import get_broken_link_substitute
from webapp_media import add_embedded_elements
from webapp_question import insert_question
from devices import e2e_edecrypt_message_from_device
from webfinger import webfinger_handle
from speaker import update_speaker
from languages import auto_translate_post
@ -2528,14 +2527,6 @@ def individual_post_as_html(signing_priv_key_pem: str,
system_language: ''
}
displaying_ciphertext = False
if post_json_object['object'].get('cipherText'):
displaying_ciphertext = True
post_json_object['object']['content'] = \
e2e_edecrypt_message_from_device(post_json_object['object'])
post_json_object['object']['contentMap'][system_language] = \
post_json_object['object']['content']
domain_full = get_full_domain(domain, port)
if not content_str:
content_str = get_content_from_post(post_json_object, system_language,
@ -2623,7 +2614,6 @@ def individual_post_as_html(signing_priv_key_pem: str,
content_str.replace('\t', '').replace('\r', '')
# Add bold text
if bold_reading and \
not displaying_ciphertext and \
not post_is_blog:
content_str = bold_reading_string(content_str)