Decrypt pgp encrypted DMs

merge-requests/30/head
Bob Mottram 2021-03-11 17:15:32 +00:00
parent 09a7b3200b
commit b7c1b02346
5 changed files with 134 additions and 20 deletions

View File

@ -65,6 +65,9 @@ def _removeQuotesWithinQuotes(content: str) -> str:
def htmlReplaceEmailQuote(content: str) -> str: def htmlReplaceEmailQuote(content: str) -> str:
"""Replaces an email style quote "> Some quote" with html blockquote """Replaces an email style quote "> Some quote" with html blockquote
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
# replace quote paragraph # replace quote paragraph
if '<p>&quot;' in content: if '<p>&quot;' in content:
if '&quot;</p>' in content: if '&quot;</p>' in content:
@ -106,6 +109,9 @@ def htmlReplaceQuoteMarks(content: str) -> str:
"""Replaces quotes with html formatting """Replaces quotes with html formatting
"hello" becomes <q>hello</q> "hello" becomes <q>hello</q>
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
if '"' not in content: if '"' not in content:
if '&quot;' not in content: if '&quot;' not in content:
return content return content
@ -197,6 +203,9 @@ def dangerousCSS(filename: str, allowLocalNetworkAccess: bool) -> bool:
def switchWords(baseDir: str, nickname: str, domain: str, content: str) -> str: def switchWords(baseDir: str, nickname: str, domain: str, content: str) -> str:
"""Performs word replacements. eg. Trump -> The Orange Menace """Performs word replacements. eg. Trump -> The Orange Menace
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
switchWordsFilename = baseDir + '/accounts/' + \ switchWordsFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/replacewords.txt' nickname + '@' + domain + '/replacewords.txt'
if not os.path.isfile(switchWordsFilename): if not os.path.isfile(switchWordsFilename):
@ -582,6 +591,9 @@ def _addMention(wordStr: str, httpPrefix: str, following: str, petnames: str,
def replaceContentDuplicates(content: str) -> str: def replaceContentDuplicates(content: str) -> str:
"""Replaces invalid duplicates within content """Replaces invalid duplicates within content
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
while '<<' in content: while '<<' in content:
content = content.replace('<<', '<') content = content.replace('<<', '<')
while '>>' in content: while '>>' in content:
@ -593,6 +605,9 @@ def replaceContentDuplicates(content: str) -> str:
def removeTextFormatting(content: str) -> str: def removeTextFormatting(content: str) -> str:
"""Removes markup for bold, italics, etc """Removes markup for bold, italics, etc
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
if '<' not in content: if '<' not in content:
return content return content
removeMarkup = ('b', 'i', 'ul', 'ol', 'li', 'em', 'strong', removeMarkup = ('b', 'i', 'ul', 'ol', 'li', 'em', 'strong',
@ -610,6 +625,9 @@ def removeLongWords(content: str, maxWordLength: int,
"""Breaks up long words so that on mobile screens this doesn't """Breaks up long words so that on mobile screens this doesn't
disrupt the layout disrupt the layout
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
content = replaceContentDuplicates(content) content = replaceContentDuplicates(content)
if ' ' not in content: if ' ' not in content:
# handle a single very long string with no spaces # handle a single very long string with no spaces

View File

@ -25,6 +25,7 @@ from follow import sendFollowRequestViaServer
from follow import sendUnfollowRequestViaServer from follow import sendUnfollowRequestViaServer
from posts import sendPostViaServer from posts import sendPostViaServer
from announce import sendAnnounceViaServer from announce import sendAnnounceViaServer
from pgp import pgpDecrypt
def _waitForKeypress(timeout: int, debug: bool) -> str: def _waitForKeypress(timeout: int, debug: bool) -> str:
@ -485,10 +486,11 @@ def runNotificationsClient(baseDir: str, proxyType: str, httpPrefix: str,
else: else:
messageStr = speakerJson['say'] + '. ' + \ messageStr = speakerJson['say'] + '. ' + \
speakerJson['imageDescription'] speakerJson['imageDescription']
messageStr = pgpDecrypt(messageStr)
content = messageStr content = messageStr
if speakerJson.get('content'): if speakerJson.get('content'):
content = speakerJson['content'] content = pgpDecrypt(speakerJson['content'])
# say the speaker's name # say the speaker's name
_sayCommand(nameStr, nameStr, screenreader, _sayCommand(nameStr, nameStr, screenreader,

94
pgp.py
View File

@ -6,6 +6,8 @@ __maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net" __email__ = "bob@freedombone.net"
__status__ = "Production" __status__ = "Production"
import subprocess
def getEmailAddress(actorJson: {}) -> str: def getEmailAddress(actorJson: {}) -> str:
"""Returns the email address for the given actor """Returns the email address for the given actor
@ -232,9 +234,9 @@ def extractPGPPublicKey(content: str) -> str:
""" """
startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--' startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--'
endBlock = '--END PGP PUBLIC KEY BLOCK--' endBlock = '--END PGP PUBLIC KEY BLOCK--'
if not startBlock in content: if startBlock not in content:
return None return None
if not endBlock in content: if endBlock not in content:
return None return None
if '\n' not in content: if '\n' not in content:
return None return None
@ -252,3 +254,91 @@ def extractPGPPublicKey(content: str) -> str:
if extracting: if extracting:
publicKey += line + '\n' publicKey += line + '\n'
return publicKey return publicKey
def _pgpImportPubKey(recipientPubKey: str) -> str:
""" Import the given public key
"""
# do a dry run
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --dry-run --import 2> /dev/null'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if err:
return None
# this time for real
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --import 2> /dev/null'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if err:
return None
# get the key id
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --show-keys'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if not importResult:
return None
importResult = importResult.decode('utf-8').split('\n')
keyId = ''
for line in importResult:
if line.startswith('pub'):
continue
elif line.startswith('uid'):
continue
elif line.startswith('sub'):
continue
keyId = line.strip()
break
return keyId
def pgpEncrypt(content: str, recipientPubKey: str) -> str:
""" Encrypt using your default pgp key to the given recipient
"""
keyId = _pgpImportPubKey(recipientPubKey)
if not keyId:
return None
cmdEncrypt = \
'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \
keyId + ' 2> /dev/null'
proc = subprocess.Popen([cmdEncrypt],
stdout=subprocess.PIPE, shell=True)
(encryptResult, err) = proc.communicate()
if not encryptResult:
return None
encryptResult = encryptResult.decode('utf-8')
if '--BEGIN PGP MESSAGE--' not in encryptResult:
return None
return encryptResult
def pgpDecrypt(content: str) -> str:
""" Encrypt using your default pgp key to the given recipient
"""
if '--BEGIN PGP MESSAGE--' not in content:
return content
# if the public key is also included within the message then import it
startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--'
if startBlock in content:
pubKey = extractPGPPublicKey(content)
if pubKey:
_pgpImportPubKey(pubKey)
cmdDecrypt = \
'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null'
proc = subprocess.Popen([cmdDecrypt],
stdout=subprocess.PIPE, shell=True)
(decryptResult, err) = proc.communicate()
if not decryptResult:
return content
decryptResult = decryptResult.decode('utf-8')
return decryptResult

View File

@ -413,21 +413,24 @@ def _postToSpeakerJson(baseDir: str, httpPrefix: str,
content = urllib.parse.unquote_plus(postJsonObject['object']['content']) content = urllib.parse.unquote_plus(postJsonObject['object']['content'])
content = html.unescape(content) content = html.unescape(content)
content = content.replace('<p>', '').replace('</p>', ' ') content = content.replace('<p>', '').replace('</p>', ' ')
# replace some emoji before removing html if '--BEGIN PGP MESSAGE--' not in content:
if ' <3' in content: # replace some emoji before removing html
content = content.replace(' <3', ' ' + translate['heart']) if ' <3' in content:
content = removeHtml(htmlReplaceQuoteMarks(content)) content = content.replace(' <3', ' ' + translate['heart'])
content = speakerReplaceLinks(content, translate, detectedLinks) content = removeHtml(htmlReplaceQuoteMarks(content))
# replace all double spaces content = speakerReplaceLinks(content, translate, detectedLinks)
while ' ' in content: # replace all double spaces
content = content.replace(' ', ' ') while ' ' in content:
content = content.replace(' . ', '. ').strip() content = content.replace(' ', ' ')
sayContent = content content = content.replace(' . ', '. ').strip()
sayContent = _speakerPronounce(baseDir, content, translate) sayContent = content
# replace all double spaces sayContent = _speakerPronounce(baseDir, content, translate)
while ' ' in sayContent: # replace all double spaces
sayContent = sayContent.replace(' ', ' ') while ' ' in sayContent:
sayContent = sayContent.replace(' . ', '. ').strip() sayContent = sayContent.replace(' ', ' ')
sayContent = sayContent.replace(' . ', '. ').strip()
else:
sayContent = content
imageDescription = '' imageDescription = ''
if postJsonObject['object'].get('attachment'): if postJsonObject['object'].get('attachment'):

View File

@ -3000,7 +3000,8 @@ def testFunctions():
'E2EEremoveDevice', 'E2EEremoveDevice',
'setOrganizationScheme', 'setOrganizationScheme',
'fill_headers', 'fill_headers',
'_nothing' '_nothing',
"pgpEncrypt"
] ]
excludeImports = [ excludeImports = [
'link', 'link',
@ -3418,7 +3419,7 @@ def testEmojiImages():
def testExtractPGPPublicKey(): def testExtractPGPPublicKey():
print('testExtractPGPPublicKey') print('testExtractPGPPublicKey')
pubKey = \ pubKey = \
'-----BEGIN PGP PUBLIC KEY BLOCK-----\n' + \ '-----BEGIN PGP PUBLIC KEY BLOCK-----\n\n' + \
'mDMEWZBueBYJKwYBBAHaRw8BAQdAKx1t6wL0RTuU6/' + \ 'mDMEWZBueBYJKwYBBAHaRw8BAQdAKx1t6wL0RTuU6/' + \
'IBjngMbVJJ3Wg/3UW73/PV\n' + \ 'IBjngMbVJJ3Wg/3UW73/PV\n' + \
'I47xKTS0IUJvYiBNb3R0cmFtIDxib2JAZnJlZWRvb' + \ 'I47xKTS0IUJvYiBNb3R0cmFtIDxib2JAZnJlZWRvb' + \