epicyon/pgp.py

621 lines
18 KiB
Python
Raw Normal View History

2020-04-03 18:52:18 +00:00
__filename__ = "pgp.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
2021-01-26 10:07:42 +00:00
__version__ = "1.2.0"
2020-04-03 18:52:18 +00:00
__maintainer__ = "Bob Mottram"
2021-09-10 16:14:50 +00:00
__email__ = "bob@libreserver.org"
2020-04-03 18:52:18 +00:00
__status__ = "Production"
2021-06-15 15:08:12 +00:00
__module_group__ = "Profile Metadata"
import os
2021-03-11 17:15:32 +00:00
import subprocess
from pathlib import Path
from person import getActorJson
2021-03-12 12:04:34 +00:00
from utils import containsPGPPublicKey
from utils import isPGPEncrypted
2021-03-17 20:18:00 +00:00
from utils import getFullDomain
from utils import getStatusNumber
2021-08-14 11:13:39 +00:00
from utils import localActorUrl
from utils import replaceUsersWithAt
2021-03-17 20:18:00 +00:00
from webfinger import webfingerHandle
from posts import getPersonBox
from auth import createBasicAuthHeader
from session import postJson
2021-03-11 17:15:32 +00:00
def getEmailAddress(actorJson: {}) -> str:
"""Returns the email address for the given actor
"""
if not actorJson.get('attachment'):
return ''
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith('email'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
2020-04-03 18:52:18 +00:00
if propertyValue['type'] != 'PropertyValue':
continue
if '@' not in propertyValue['value']:
continue
if '.' not in propertyValue['value']:
continue
return propertyValue['value']
return ''
2020-04-03 18:52:18 +00:00
def getPGPpubKey(actorJson: {}) -> str:
"""Returns PGP public key for the given actor
"""
if not actorJson.get('attachment'):
return ''
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith('pgp'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
2020-04-03 18:52:18 +00:00
if propertyValue['type'] != 'PropertyValue':
continue
if not containsPGPPublicKey(propertyValue['value']):
continue
return propertyValue['value']
return ''
2020-04-03 18:52:18 +00:00
2020-07-06 09:52:06 +00:00
def getPGPfingerprint(actorJson: {}) -> str:
"""Returns PGP fingerprint for the given actor
"""
if not actorJson.get('attachment'):
return ''
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith('openpgp'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
if len(propertyValue['value']) < 10:
continue
return propertyValue['value']
return ''
2020-04-03 18:52:18 +00:00
def setEmailAddress(actorJson: {}, emailAddress: str) -> None:
"""Sets the email address for the given actor
"""
2020-07-06 10:25:18 +00:00
notEmailAddress = False
if '@' not in emailAddress:
notEmailAddress = True
if '.' not in emailAddress:
notEmailAddress = True
if '<' in emailAddress:
notEmailAddress = True
2020-07-06 10:25:18 +00:00
if emailAddress.startswith('@'):
notEmailAddress = True
if not actorJson.get('attachment'):
2020-04-03 18:52:18 +00:00
actorJson['attachment'] = []
2019-12-17 23:35:59 +00:00
# remove any existing value
2020-04-03 18:52:18 +00:00
propertyFound = None
2019-12-17 23:35:59 +00:00
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('email'):
continue
2020-04-03 18:52:18 +00:00
propertyFound = propertyValue
2019-12-17 23:35:59 +00:00
break
if propertyFound:
actorJson['attachment'].remove(propertyFound)
2020-07-06 10:25:18 +00:00
if notEmailAddress:
return
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('email'):
continue
2020-04-03 18:52:18 +00:00
if propertyValue['type'] != 'PropertyValue':
continue
2020-04-03 18:52:18 +00:00
propertyValue['value'] = emailAddress
return
2020-04-03 18:52:18 +00:00
newEmailAddress = {
"name": "Email",
"type": "PropertyValue",
"value": emailAddress
}
actorJson['attachment'].append(newEmailAddress)
2020-04-03 18:52:18 +00:00
def setPGPpubKey(actorJson: {}, PGPpubKey: str) -> None:
"""Sets a PGP public key for the given actor
"""
2020-07-06 09:52:06 +00:00
removeKey = False
if not PGPpubKey:
removeKey = True
else:
if not containsPGPPublicKey(PGPpubKey):
2020-07-06 09:52:06 +00:00
removeKey = True
if '<' in PGPpubKey:
removeKey = True
2020-07-06 09:52:06 +00:00
if not actorJson.get('attachment'):
2020-04-03 18:52:18 +00:00
actorJson['attachment'] = []
2019-12-17 23:35:59 +00:00
# remove any existing value
2020-04-03 18:52:18 +00:00
propertyFound = None
2019-12-17 23:35:59 +00:00
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('pgp'):
continue
2020-04-03 18:52:18 +00:00
propertyFound = propertyValue
2019-12-17 23:35:59 +00:00
break
if propertyFound:
actorJson['attachment'].remove(propertyValue)
2020-07-06 09:52:06 +00:00
if removeKey:
return
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('pgp'):
continue
2020-04-03 18:52:18 +00:00
if propertyValue['type'] != 'PropertyValue':
continue
2020-04-03 18:52:18 +00:00
propertyValue['value'] = PGPpubKey
return
2020-04-03 18:52:18 +00:00
newPGPpubKey = {
"name": "PGP",
"type": "PropertyValue",
"value": PGPpubKey
}
actorJson['attachment'].append(newPGPpubKey)
2020-07-06 09:52:06 +00:00
def setPGPfingerprint(actorJson: {}, fingerprint: str) -> None:
"""Sets a PGP fingerprint for the given actor
"""
removeFingerprint = False
if not fingerprint:
removeFingerprint = True
else:
if len(fingerprint) < 10:
removeFingerprint = True
if not actorJson.get('attachment'):
actorJson['attachment'] = []
# remove any existing value
propertyFound = None
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('openpgp'):
continue
propertyFound = propertyValue
break
if propertyFound:
actorJson['attachment'].remove(propertyValue)
if removeFingerprint:
return
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue['name'].lower().startswith('openpgp'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = fingerprint.strip()
return
newPGPfingerprint = {
"name": "OpenPGP",
"type": "PropertyValue",
"value": fingerprint
}
actorJson['attachment'].append(newPGPfingerprint)
def extractPGPPublicKey(content: str) -> str:
"""Returns the PGP key from the given text
"""
startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--'
endBlock = '--END PGP PUBLIC KEY BLOCK--'
2021-03-11 17:15:32 +00:00
if startBlock not in content:
return None
2021-03-11 17:15:32 +00:00
if endBlock not in content:
return None
if '\n' not in content:
return None
linesList = content.split('\n')
extracting = False
publicKey = ''
for line in linesList:
if not extracting:
if startBlock in line:
extracting = True
else:
if endBlock in line:
publicKey += line
break
if extracting:
publicKey += line + '\n'
return publicKey
2021-03-11 17:15:32 +00:00
def _pgpImportPubKey(recipientPubKey: str) -> str:
""" Import the given public key
"""
# do a dry run
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --dry-run --import 2> /dev/null'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if err:
return None
# this time for real
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --import 2> /dev/null'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if err:
return None
# get the key id
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --show-keys'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if not importResult:
return None
importResult = importResult.decode('utf-8').split('\n')
keyId = ''
for line in importResult:
if line.startswith('pub'):
continue
elif line.startswith('uid'):
continue
elif line.startswith('sub'):
continue
keyId = line.strip()
break
return keyId
def _pgpEncrypt(content: str, recipientPubKey: str) -> str:
2021-03-11 17:15:32 +00:00
""" Encrypt using your default pgp key to the given recipient
"""
keyId = _pgpImportPubKey(recipientPubKey)
if not keyId:
return None
cmdEncrypt = \
'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \
keyId + ' 2> /dev/null'
proc = subprocess.Popen([cmdEncrypt],
stdout=subprocess.PIPE, shell=True)
(encryptResult, err) = proc.communicate()
if not encryptResult:
return None
encryptResult = encryptResult.decode('utf-8')
if not isPGPEncrypted(encryptResult):
2021-03-11 17:15:32 +00:00
return None
return encryptResult
def _getPGPPublicKeyFromActor(signingPrivateKeyPem: str,
domain: str, handle: str,
2021-06-20 11:28:35 +00:00
actorJson: {} = None) -> str:
"""Searches tags on the actor to see if there is any PGP
public key specified
"""
if not actorJson:
2021-06-18 09:22:16 +00:00
actorJson, asHeader = \
getActorJson(domain, handle, False, False, False, True,
signingPrivateKeyPem, None)
if not actorJson:
return None
if not actorJson.get('attachment'):
return None
if not isinstance(actorJson['attachment'], list):
return None
# search through the tags on the actor
for tag in actorJson['attachment']:
if not isinstance(tag, dict):
continue
if not tag.get('value'):
continue
2021-03-11 20:35:48 +00:00
if not isinstance(tag['value'], str):
continue
if containsPGPPublicKey(tag['value']):
return tag['value']
return None
def hasLocalPGPkey() -> bool:
"""Returns true if there is a local .gnupg directory
"""
homeDir = str(Path.home())
gpgDir = homeDir + '/.gnupg'
2021-03-11 20:48:02 +00:00
if os.path.isdir(gpgDir):
keyId = pgpLocalPublicKey()
2021-05-05 09:31:35 +00:00
if keyId:
return True
return False
def pgpEncryptToActor(domain: str, content: str, toHandle: str,
signingPrivateKeyPem: str) -> str:
"""PGP encrypt a message to the given actor or handle
"""
# get the actor and extract the pgp public key from it
recipientPubKey = \
_getPGPPublicKeyFromActor(signingPrivateKeyPem, domain, toHandle)
if not recipientPubKey:
return None
# encrypt using the recipient public key
return _pgpEncrypt(content, recipientPubKey)
def pgpDecrypt(domain: str, content: str, fromHandle: str,
signingPrivateKeyPem: str) -> str:
2021-03-11 17:15:32 +00:00
""" Encrypt using your default pgp key to the given recipient
fromHandle can be a handle or actor url
2021-03-11 17:15:32 +00:00
"""
if not isPGPEncrypted(content):
2021-03-11 17:15:32 +00:00
return content
# if the public key is also included within the message then import it
if containsPGPPublicKey(content):
2021-03-11 17:15:32 +00:00
pubKey = extractPGPPublicKey(content)
else:
pubKey = \
_getPGPPublicKeyFromActor(signingPrivateKeyPem,
domain, content, fromHandle)
if pubKey:
_pgpImportPubKey(pubKey)
2021-03-11 17:15:32 +00:00
cmdDecrypt = \
'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null'
proc = subprocess.Popen([cmdDecrypt],
stdout=subprocess.PIPE, shell=True)
(decryptResult, err) = proc.communicate()
if not decryptResult:
return content
decryptResult = decryptResult.decode('utf-8').strip()
2021-03-11 17:15:32 +00:00
return decryptResult
2021-03-17 20:18:00 +00:00
def _pgpLocalPublicKeyId() -> str:
"""Gets the local pgp public key ID
"""
cmdStr = \
"gpgconf --list-options gpg | " + \
"awk -F: '$1 == \"default-key\" {print $10}'"
proc = subprocess.Popen([cmdStr],
stdout=subprocess.PIPE, shell=True)
(result, err) = proc.communicate()
if err:
return None
if not result:
return None
if len(result) < 5:
return None
2021-03-17 20:23:44 +00:00
return result.decode('utf-8').replace('"', '').strip()
2021-03-17 20:18:00 +00:00
2021-05-05 09:53:00 +00:00
def pgpLocalPublicKey() -> str:
2021-03-17 20:18:00 +00:00
"""Gets the local pgp public key
"""
2021-03-17 20:22:27 +00:00
keyId = _pgpLocalPublicKeyId()
2021-03-17 20:18:00 +00:00
if not keyId:
keyId = ''
2021-03-17 20:18:00 +00:00
cmdStr = "gpg --armor --export " + keyId
proc = subprocess.Popen([cmdStr],
stdout=subprocess.PIPE, shell=True)
(result, err) = proc.communicate()
if err:
return None
if not result:
return None
2021-03-17 20:24:42 +00:00
return extractPGPPublicKey(result.decode('utf-8'))
2021-03-17 20:18:00 +00:00
def pgpPublicKeyUpload(baseDir: str, session,
nickname: str, password: str,
domain: str, port: int,
httpPrefix: str,
cachedWebfingers: {}, personCache: {},
debug: bool, test: str,
signingPrivateKeyPem: str) -> {}:
2021-03-17 20:18:00 +00:00
if debug:
print('pgpPublicKeyUpload')
if not session:
if debug:
print('WARN: No session for pgpPublicKeyUpload')
return None
if not test:
if debug:
print('Getting PGP public key')
2021-05-05 09:53:00 +00:00
PGPpubKey = pgpLocalPublicKey()
2021-03-17 20:18:00 +00:00
if not PGPpubKey:
return None
PGPpubKeyId = _pgpLocalPublicKeyId()
else:
if debug:
print('Testing with PGP public key ' + test)
PGPpubKey = test
PGPpubKeyId = None
domainFull = getFullDomain(domain, port)
if debug:
print('PGP test domain: ' + domainFull)
handle = nickname + '@' + domainFull
if debug:
print('Getting actor for ' + handle)
actorJson, asHeader = \
getActorJson(domainFull, handle, False, False, debug, True,
signingPrivateKeyPem, session)
2021-03-17 20:18:00 +00:00
if not actorJson:
if debug:
print('No actor returned for ' + handle)
return None
if debug:
print('Actor for ' + handle + ' obtained')
2021-08-14 11:13:39 +00:00
actor = localActorUrl(httpPrefix, nickname, domainFull)
handle = replaceUsersWithAt(actor)
2021-03-17 20:18:00 +00:00
# check that this looks like the correct actor
if not actorJson.get('id'):
if debug:
print('Actor has no id')
return None
if not actorJson.get('url'):
if debug:
print('Actor has no url')
return None
if not actorJson.get('type'):
if debug:
print('Actor has no type')
return None
if actorJson['id'] != actor:
if debug:
print('Actor id is not ' + actor +
' instead is ' + actorJson['id'])
return None
if actorJson['url'] != handle:
if debug:
print('Actor url is not ' + handle)
return None
if actorJson['type'] != 'Person':
if debug:
print('Actor type is not Person')
return None
# set the pgp details
if PGPpubKeyId:
setPGPfingerprint(actorJson, PGPpubKeyId)
else:
if debug:
print('No PGP key Id. Continuing anyway.')
if debug:
print('Setting PGP key within ' + actor)
setPGPpubKey(actorJson, PGPpubKey)
# create an actor update
statusNumber, published = getStatusNumber()
actorUpdate = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': actor + '#updates/' + statusNumber,
'type': 'Update',
'actor': actor,
'to': [actor],
'cc': [],
'object': actorJson
}
if debug:
print('actor update is ' + str(actorUpdate))
# lookup the inbox for the To handle
wfRequest = \
webfingerHandle(session, handle, httpPrefix, cachedWebfingers,
domain, __version__, debug, False,
signingPrivateKeyPem)
2021-03-17 20:18:00 +00:00
if not wfRequest:
if debug:
print('DEBUG: pgp actor update webfinger failed for ' +
handle)
return None
if not isinstance(wfRequest, dict):
if debug:
print('WARN: Webfinger for ' + handle +
' did not return a dict. ' + str(wfRequest))
return None
postToBox = 'outbox'
# get the actor inbox for the To handle
2021-09-15 14:05:08 +00:00
originDomain = domain
(inboxUrl, pubKeyId, pubKey, fromPersonId, sharedInbox, avatarUrl,
displayName, _) = getPersonBox(signingPrivateKeyPem, originDomain,
baseDir, session, wfRequest, personCache,
__version__, httpPrefix, nickname,
domain, postToBox, 35725)
2021-03-17 20:18:00 +00:00
if not inboxUrl:
if debug:
print('DEBUG: No ' + postToBox + ' was found for ' + handle)
return None
if not fromPersonId:
if debug:
print('DEBUG: No actor was found for ' + handle)
return None
authHeader = createBasicAuthHeader(nickname, password)
headers = {
'host': domain,
'Content-type': 'application/json',
'Authorization': authHeader
}
quiet = not debug
2021-03-17 21:23:52 +00:00
tries = 0
2021-03-17 20:18:00 +00:00
while tries < 4:
postResult = \
2021-06-20 13:39:53 +00:00
postJson(httpPrefix, domainFull,
session, actorUpdate, [], inboxUrl,
2021-03-17 21:26:58 +00:00
headers, 5, quiet)
2021-03-17 20:18:00 +00:00
if postResult:
break
tries += 1
if postResult is None:
if debug:
print('DEBUG: POST pgp actor update failed for c2s to ' +
inboxUrl)
return None
if debug:
print('DEBUG: c2s POST pgp actor update success')
return actorUpdate