diff --git a/README_commandline.md b/README_commandline.md index 0b7b34443..383b5214e 100644 --- a/README_commandline.md +++ b/README_commandline.md @@ -430,6 +430,7 @@ like Like the last post unlike Unlike the last post reply Reply to the last post post Create a new post +post to [handle] Create a new direct message announce/boost Boost the last post follow [handle] Make a follow request unfollow [handle] Stop following the give handle diff --git a/content.py b/content.py index 32858c926..eb9615af3 100644 --- a/content.py +++ b/content.py @@ -65,6 +65,9 @@ def _removeQuotesWithinQuotes(content: str) -> str: def htmlReplaceEmailQuote(content: str) -> str: """Replaces an email style quote "> Some quote" with html blockquote """ + if '--BEGIN PGP MESSAGE--' in content or \ + '--BEGIN PGP PUBLIC KEY BLOCK--' in content: + return content # replace quote paragraph if '

"' in content: if '"

' in content: @@ -106,6 +109,9 @@ def htmlReplaceQuoteMarks(content: str) -> str: """Replaces quotes with html formatting "hello" becomes hello """ + if '--BEGIN PGP MESSAGE--' in content or \ + '--BEGIN PGP PUBLIC KEY BLOCK--' in content: + return content if '"' not in content: if '"' not in content: return content @@ -197,6 +203,9 @@ def dangerousCSS(filename: str, allowLocalNetworkAccess: bool) -> bool: def switchWords(baseDir: str, nickname: str, domain: str, content: str) -> str: """Performs word replacements. eg. Trump -> The Orange Menace """ + if '--BEGIN PGP MESSAGE--' in content or \ + '--BEGIN PGP PUBLIC KEY BLOCK--' in content: + return content switchWordsFilename = baseDir + '/accounts/' + \ nickname + '@' + domain + '/replacewords.txt' if not os.path.isfile(switchWordsFilename): @@ -582,6 +591,9 @@ def _addMention(wordStr: str, httpPrefix: str, following: str, petnames: str, def replaceContentDuplicates(content: str) -> str: """Replaces invalid duplicates within content """ + if '--BEGIN PGP MESSAGE--' in content or \ + '--BEGIN PGP PUBLIC KEY BLOCK--' in content: + return content while '<<' in content: content = content.replace('<<', '<') while '>>' in content: @@ -593,6 +605,9 @@ def replaceContentDuplicates(content: str) -> str: def removeTextFormatting(content: str) -> str: """Removes markup for bold, italics, etc """ + if '--BEGIN PGP MESSAGE--' in content or \ + '--BEGIN PGP PUBLIC KEY BLOCK--' in content: + return content if '<' not in content: return content removeMarkup = ('b', 'i', 'ul', 'ol', 'li', 'em', 'strong', @@ -610,6 +625,9 @@ def removeLongWords(content: str, maxWordLength: int, """Breaks up long words so that on mobile screens this doesn't disrupt the layout """ + if '--BEGIN PGP MESSAGE--' in content or \ + '--BEGIN PGP PUBLIC KEY BLOCK--' in content: + return content content = replaceContentDuplicates(content) if ' ' not in content: # handle a single very long string with no spaces diff --git a/notifications_client.py b/notifications_client.py index 0f841d539..00b0763e7 100644 --- a/notifications_client.py +++ b/notifications_client.py @@ -25,6 +25,7 @@ from follow import sendFollowRequestViaServer from follow import sendUnfollowRequestViaServer from posts import sendPostViaServer from announce import sendAnnounceViaServer +from pgp import pgpDecrypt def _waitForKeypress(timeout: int, debug: bool) -> str: @@ -485,10 +486,11 @@ def runNotificationsClient(baseDir: str, proxyType: str, httpPrefix: str, else: messageStr = speakerJson['say'] + '. ' + \ speakerJson['imageDescription'] + messageStr = pgpDecrypt(messageStr) content = messageStr if speakerJson.get('content'): - content = speakerJson['content'] + content = pgpDecrypt(speakerJson['content']) # say the speaker's name _sayCommand(nameStr, nameStr, screenreader, @@ -535,18 +537,28 @@ def runNotificationsClient(baseDir: str, proxyType: str, httpPrefix: str, print('') elif (keyPress == 'post' or keyPress == 'p' or keyPress == 'send' or + keyPress.startswith('dm ') or + keyPress.startswith('direct message ') or keyPress.startswith('post ') or keyPress.startswith('send ')): sessionPost = createSession(proxyType) - if keyPress.startswith('post ') or \ + if keyPress.startswith('dm ') or \ + keyPress.startswith('direct message ') or \ + keyPress.startswith('post ') or \ keyPress.startswith('send '): keyPress = keyPress.replace(' to ', ' ') + keyPress = keyPress.replace(' dm ', ' ') + keyPress = keyPress.replace(' DM ', ' ') # direct message toHandle = None if keyPress.startswith('post '): toHandle = keyPress.split('post ', 1)[1] elif keyPress.startswith('send '): toHandle = keyPress.split('send ', 1)[1] + elif keyPress.startswith('dm '): + toHandle = keyPress.split('dm ', 1)[1] + elif keyPress.startswith('direct message '): + toHandle = keyPress.split('direct message ', 1)[1] if toHandle: _notificationNewDM(sessionPost, toHandle, baseDir, nickname, password, diff --git a/pgp.py b/pgp.py index 5a332a7f3..da60613a9 100644 --- a/pgp.py +++ b/pgp.py @@ -6,6 +6,8 @@ __maintainer__ = "Bob Mottram" __email__ = "bob@freedombone.net" __status__ = "Production" +import subprocess + def getEmailAddress(actorJson: {}) -> str: """Returns the email address for the given actor @@ -225,3 +227,118 @@ def setPGPfingerprint(actorJson: {}, fingerprint: str) -> None: "value": fingerprint } actorJson['attachment'].append(newPGPfingerprint) + + +def extractPGPPublicKey(content: str) -> str: + """Returns the PGP key from the given text + """ + startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--' + endBlock = '--END PGP PUBLIC KEY BLOCK--' + if startBlock not in content: + return None + if endBlock not in content: + return None + if '\n' not in content: + return None + linesList = content.split('\n') + extracting = False + publicKey = '' + for line in linesList: + if not extracting: + if startBlock in line: + extracting = True + else: + if endBlock in line: + publicKey += line + break + if extracting: + publicKey += line + '\n' + return publicKey + + +def _pgpImportPubKey(recipientPubKey: str) -> str: + """ Import the given public key + """ + # do a dry run + cmdImportPubKey = \ + 'echo "' + recipientPubKey + '" | gpg --dry-run --import 2> /dev/null' + proc = subprocess.Popen([cmdImportPubKey], + stdout=subprocess.PIPE, shell=True) + (importResult, err) = proc.communicate() + if err: + return None + + # this time for real + cmdImportPubKey = \ + 'echo "' + recipientPubKey + '" | gpg --import 2> /dev/null' + proc = subprocess.Popen([cmdImportPubKey], + stdout=subprocess.PIPE, shell=True) + (importResult, err) = proc.communicate() + if err: + return None + + # get the key id + cmdImportPubKey = \ + 'echo "' + recipientPubKey + '" | gpg --show-keys' + proc = subprocess.Popen([cmdImportPubKey], + stdout=subprocess.PIPE, shell=True) + (importResult, err) = proc.communicate() + if not importResult: + return None + importResult = importResult.decode('utf-8').split('\n') + keyId = '' + for line in importResult: + if line.startswith('pub'): + continue + elif line.startswith('uid'): + continue + elif line.startswith('sub'): + continue + keyId = line.strip() + break + return keyId + + +def pgpEncrypt(content: str, recipientPubKey: str) -> str: + """ Encrypt using your default pgp key to the given recipient + """ + keyId = _pgpImportPubKey(recipientPubKey) + if not keyId: + return None + + cmdEncrypt = \ + 'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \ + keyId + ' 2> /dev/null' + proc = subprocess.Popen([cmdEncrypt], + stdout=subprocess.PIPE, shell=True) + (encryptResult, err) = proc.communicate() + if not encryptResult: + return None + encryptResult = encryptResult.decode('utf-8') + if '--BEGIN PGP MESSAGE--' not in encryptResult: + return None + return encryptResult + + +def pgpDecrypt(content: str) -> str: + """ Encrypt using your default pgp key to the given recipient + """ + if '--BEGIN PGP MESSAGE--' not in content: + return content + + # if the public key is also included within the message then import it + startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--' + if startBlock in content: + pubKey = extractPGPPublicKey(content) + if pubKey: + _pgpImportPubKey(pubKey) + + cmdDecrypt = \ + 'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null' + proc = subprocess.Popen([cmdDecrypt], + stdout=subprocess.PIPE, shell=True) + (decryptResult, err) = proc.communicate() + if not decryptResult: + return content + decryptResult = decryptResult.decode('utf-8') + return decryptResult diff --git a/speaker.py b/speaker.py index 250ecfb8a..e28b37466 100644 --- a/speaker.py +++ b/speaker.py @@ -84,6 +84,7 @@ def _speakerPronounce(baseDir: str, sayText: str, translate: {}) -> str: ".js": " dot J-S", "PSQL": "Postgres S-Q-L", "SQL": "S-Q-L", + "gdpr": "G-D-P-R", "coop": "co-op", "KMail": "K-Mail", "gmail": "G-mail", @@ -412,21 +413,24 @@ def _postToSpeakerJson(baseDir: str, httpPrefix: str, content = urllib.parse.unquote_plus(postJsonObject['object']['content']) content = html.unescape(content) content = content.replace('

', '').replace('

', ' ') - # replace some emoji before removing html - if ' <3' in content: - content = content.replace(' <3', ' ' + translate['heart']) - content = removeHtml(htmlReplaceQuoteMarks(content)) - content = speakerReplaceLinks(content, translate, detectedLinks) - # replace all double spaces - while ' ' in content: - content = content.replace(' ', ' ') - content = content.replace(' . ', '. ').strip() - sayContent = content - sayContent = _speakerPronounce(baseDir, content, translate) - # replace all double spaces - while ' ' in sayContent: - sayContent = sayContent.replace(' ', ' ') - sayContent = sayContent.replace(' . ', '. ').strip() + if '--BEGIN PGP MESSAGE--' not in content: + # replace some emoji before removing html + if ' <3' in content: + content = content.replace(' <3', ' ' + translate['heart']) + content = removeHtml(htmlReplaceQuoteMarks(content)) + content = speakerReplaceLinks(content, translate, detectedLinks) + # replace all double spaces + while ' ' in content: + content = content.replace(' ', ' ') + content = content.replace(' . ', '. ').strip() + sayContent = content + sayContent = _speakerPronounce(baseDir, content, translate) + # replace all double spaces + while ' ' in sayContent: + sayContent = sayContent.replace(' ', ' ') + sayContent = sayContent.replace(' . ', '. ').strip() + else: + sayContent = content imageDescription = '' if postJsonObject['object'].get('attachment'): @@ -468,6 +472,9 @@ def _postToSpeakerJson(baseDir: str, httpPrefix: str, sayContent = \ translate['announces'] + ' ' + \ announcedHandle + '. ' + sayContent + content = \ + translate['announces'] + ' ' + \ + announcedHandle + '. ' + content postId = None if postJsonObject['object'].get('id'): postId = postJsonObject['object']['id'] diff --git a/tests.py b/tests.py index baf92a34c..b71a5ec47 100644 --- a/tests.py +++ b/tests.py @@ -102,6 +102,7 @@ from mastoapiv1 import getNicknameFromMastoApiV1Id from webapp_post import prepareHtmlPostNickname from webapp_utils import markdownToHtml from speaker import speakerReplaceLinks +from pgp import extractPGPPublicKey testServerAliceRunning = False testServerBobRunning = False @@ -2999,7 +3000,8 @@ def testFunctions(): 'E2EEremoveDevice', 'setOrganizationScheme', 'fill_headers', - '_nothing' + '_nothing', + "pgpEncrypt" ] excludeImports = [ 'link', @@ -3414,9 +3416,39 @@ def testEmojiImages(): assert os.path.isfile(emojiImageFilename) +def testExtractPGPPublicKey(): + print('testExtractPGPPublicKey') + pubKey = \ + '-----BEGIN PGP PUBLIC KEY BLOCK-----\n\n' + \ + 'mDMEWZBueBYJKwYBBAHaRw8BAQdAKx1t6wL0RTuU6/' + \ + 'IBjngMbVJJ3Wg/3UW73/PV\n' + \ + 'I47xKTS0IUJvYiBNb3R0cmFtIDxib2JAZnJlZWRvb' + \ + 'WJvbmUubmV0PoiQBBMWCAA4\n' + \ + 'FiEEmruCwAq/OfgmgEh9zCU2GR+nwz8FAlmQbngCG' + \ + 'wMFCwkIBwMFFQoJCAsFFgID\n' + \ + 'AQACHgECF4AACgkQzCU2GR+nwz/9sAD/YgsHnVszH' + \ + 'Nz1zlVc5EgY1ByDupiJpHj0\n' + \ + 'XsLYk3AbNRgBALn45RqgD4eWHpmOriH09H5Rc5V9i' + \ + 'N4+OiGUn2AzJ6oHuDgEWZBu\n' + \ + 'eBIKKwYBBAGXVQEFAQEHQPRBG2ZQJce475S3e0Dxe' + \ + 'b0Fz5WdEu2q3GYLo4QG+4Ry\n' + \ + 'AwEIB4h4BBgWCAAgFiEEmruCwAq/OfgmgEh9zCU2G' + \ + 'R+nwz8FAlmQbngCGwwACgkQ\n' + \ + 'zCU2GR+nwz+OswD+JOoyBku9FzuWoVoOevU2HH+bP' + \ + 'OMDgY2OLnST9ZSyHkMBAMcK\n' + \ + 'fnaZ2Wi050483Sj2RmQRpb99Dod7rVZTDtCqXk0J\n' + \ + '=gv5G\n' + \ + '-----END PGP PUBLIC KEY BLOCK-----' + testStr = "Some introduction\n\n" + pubKey + "\n\nSome message." + result = extractPGPPublicKey(testStr) + assert result + assert result == pubKey + + def runAllTests(): print('Running tests...') testFunctions() + testExtractPGPPublicKey() testEmojiImages() testCamelCaseSplit() testSpeakerReplaceLinks()