Merge branch 'main' of ssh://code.freedombone.net:2222/bashrc/epicyon into main

main
Bob Mottram 2021-03-11 17:18:52 +00:00
commit f082514c19
6 changed files with 205 additions and 18 deletions

View File

@ -430,6 +430,7 @@ like Like the last post
unlike Unlike the last post unlike Unlike the last post
reply Reply to the last post reply Reply to the last post
post Create a new post post Create a new post
post to [handle] Create a new direct message
announce/boost Boost the last post announce/boost Boost the last post
follow [handle] Make a follow request follow [handle] Make a follow request
unfollow [handle] Stop following the give handle unfollow [handle] Stop following the give handle

View File

@ -65,6 +65,9 @@ def _removeQuotesWithinQuotes(content: str) -> str:
def htmlReplaceEmailQuote(content: str) -> str: def htmlReplaceEmailQuote(content: str) -> str:
"""Replaces an email style quote "> Some quote" with html blockquote """Replaces an email style quote "> Some quote" with html blockquote
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
# replace quote paragraph # replace quote paragraph
if '<p>&quot;' in content: if '<p>&quot;' in content:
if '&quot;</p>' in content: if '&quot;</p>' in content:
@ -106,6 +109,9 @@ def htmlReplaceQuoteMarks(content: str) -> str:
"""Replaces quotes with html formatting """Replaces quotes with html formatting
"hello" becomes <q>hello</q> "hello" becomes <q>hello</q>
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
if '"' not in content: if '"' not in content:
if '&quot;' not in content: if '&quot;' not in content:
return content return content
@ -197,6 +203,9 @@ def dangerousCSS(filename: str, allowLocalNetworkAccess: bool) -> bool:
def switchWords(baseDir: str, nickname: str, domain: str, content: str) -> str: def switchWords(baseDir: str, nickname: str, domain: str, content: str) -> str:
"""Performs word replacements. eg. Trump -> The Orange Menace """Performs word replacements. eg. Trump -> The Orange Menace
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
switchWordsFilename = baseDir + '/accounts/' + \ switchWordsFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/replacewords.txt' nickname + '@' + domain + '/replacewords.txt'
if not os.path.isfile(switchWordsFilename): if not os.path.isfile(switchWordsFilename):
@ -582,6 +591,9 @@ def _addMention(wordStr: str, httpPrefix: str, following: str, petnames: str,
def replaceContentDuplicates(content: str) -> str: def replaceContentDuplicates(content: str) -> str:
"""Replaces invalid duplicates within content """Replaces invalid duplicates within content
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
while '<<' in content: while '<<' in content:
content = content.replace('<<', '<') content = content.replace('<<', '<')
while '>>' in content: while '>>' in content:
@ -593,6 +605,9 @@ def replaceContentDuplicates(content: str) -> str:
def removeTextFormatting(content: str) -> str: def removeTextFormatting(content: str) -> str:
"""Removes markup for bold, italics, etc """Removes markup for bold, italics, etc
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
if '<' not in content: if '<' not in content:
return content return content
removeMarkup = ('b', 'i', 'ul', 'ol', 'li', 'em', 'strong', removeMarkup = ('b', 'i', 'ul', 'ol', 'li', 'em', 'strong',
@ -610,6 +625,9 @@ def removeLongWords(content: str, maxWordLength: int,
"""Breaks up long words so that on mobile screens this doesn't """Breaks up long words so that on mobile screens this doesn't
disrupt the layout disrupt the layout
""" """
if '--BEGIN PGP MESSAGE--' in content or \
'--BEGIN PGP PUBLIC KEY BLOCK--' in content:
return content
content = replaceContentDuplicates(content) content = replaceContentDuplicates(content)
if ' ' not in content: if ' ' not in content:
# handle a single very long string with no spaces # handle a single very long string with no spaces

View File

@ -25,6 +25,7 @@ from follow import sendFollowRequestViaServer
from follow import sendUnfollowRequestViaServer from follow import sendUnfollowRequestViaServer
from posts import sendPostViaServer from posts import sendPostViaServer
from announce import sendAnnounceViaServer from announce import sendAnnounceViaServer
from pgp import pgpDecrypt
def _waitForKeypress(timeout: int, debug: bool) -> str: def _waitForKeypress(timeout: int, debug: bool) -> str:
@ -485,10 +486,11 @@ def runNotificationsClient(baseDir: str, proxyType: str, httpPrefix: str,
else: else:
messageStr = speakerJson['say'] + '. ' + \ messageStr = speakerJson['say'] + '. ' + \
speakerJson['imageDescription'] speakerJson['imageDescription']
messageStr = pgpDecrypt(messageStr)
content = messageStr content = messageStr
if speakerJson.get('content'): if speakerJson.get('content'):
content = speakerJson['content'] content = pgpDecrypt(speakerJson['content'])
# say the speaker's name # say the speaker's name
_sayCommand(nameStr, nameStr, screenreader, _sayCommand(nameStr, nameStr, screenreader,
@ -535,18 +537,28 @@ def runNotificationsClient(baseDir: str, proxyType: str, httpPrefix: str,
print('') print('')
elif (keyPress == 'post' or keyPress == 'p' or elif (keyPress == 'post' or keyPress == 'p' or
keyPress == 'send' or keyPress == 'send' or
keyPress.startswith('dm ') or
keyPress.startswith('direct message ') or
keyPress.startswith('post ') or keyPress.startswith('post ') or
keyPress.startswith('send ')): keyPress.startswith('send ')):
sessionPost = createSession(proxyType) sessionPost = createSession(proxyType)
if keyPress.startswith('post ') or \ if keyPress.startswith('dm ') or \
keyPress.startswith('direct message ') or \
keyPress.startswith('post ') or \
keyPress.startswith('send '): keyPress.startswith('send '):
keyPress = keyPress.replace(' to ', ' ') keyPress = keyPress.replace(' to ', ' ')
keyPress = keyPress.replace(' dm ', ' ')
keyPress = keyPress.replace(' DM ', ' ')
# direct message # direct message
toHandle = None toHandle = None
if keyPress.startswith('post '): if keyPress.startswith('post '):
toHandle = keyPress.split('post ', 1)[1] toHandle = keyPress.split('post ', 1)[1]
elif keyPress.startswith('send '): elif keyPress.startswith('send '):
toHandle = keyPress.split('send ', 1)[1] toHandle = keyPress.split('send ', 1)[1]
elif keyPress.startswith('dm '):
toHandle = keyPress.split('dm ', 1)[1]
elif keyPress.startswith('direct message '):
toHandle = keyPress.split('direct message ', 1)[1]
if toHandle: if toHandle:
_notificationNewDM(sessionPost, toHandle, _notificationNewDM(sessionPost, toHandle,
baseDir, nickname, password, baseDir, nickname, password,

117
pgp.py
View File

@ -6,6 +6,8 @@ __maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net" __email__ = "bob@freedombone.net"
__status__ = "Production" __status__ = "Production"
import subprocess
def getEmailAddress(actorJson: {}) -> str: def getEmailAddress(actorJson: {}) -> str:
"""Returns the email address for the given actor """Returns the email address for the given actor
@ -225,3 +227,118 @@ def setPGPfingerprint(actorJson: {}, fingerprint: str) -> None:
"value": fingerprint "value": fingerprint
} }
actorJson['attachment'].append(newPGPfingerprint) actorJson['attachment'].append(newPGPfingerprint)
def extractPGPPublicKey(content: str) -> str:
"""Returns the PGP key from the given text
"""
startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--'
endBlock = '--END PGP PUBLIC KEY BLOCK--'
if startBlock not in content:
return None
if endBlock not in content:
return None
if '\n' not in content:
return None
linesList = content.split('\n')
extracting = False
publicKey = ''
for line in linesList:
if not extracting:
if startBlock in line:
extracting = True
else:
if endBlock in line:
publicKey += line
break
if extracting:
publicKey += line + '\n'
return publicKey
def _pgpImportPubKey(recipientPubKey: str) -> str:
""" Import the given public key
"""
# do a dry run
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --dry-run --import 2> /dev/null'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if err:
return None
# this time for real
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --import 2> /dev/null'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if err:
return None
# get the key id
cmdImportPubKey = \
'echo "' + recipientPubKey + '" | gpg --show-keys'
proc = subprocess.Popen([cmdImportPubKey],
stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate()
if not importResult:
return None
importResult = importResult.decode('utf-8').split('\n')
keyId = ''
for line in importResult:
if line.startswith('pub'):
continue
elif line.startswith('uid'):
continue
elif line.startswith('sub'):
continue
keyId = line.strip()
break
return keyId
def pgpEncrypt(content: str, recipientPubKey: str) -> str:
""" Encrypt using your default pgp key to the given recipient
"""
keyId = _pgpImportPubKey(recipientPubKey)
if not keyId:
return None
cmdEncrypt = \
'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \
keyId + ' 2> /dev/null'
proc = subprocess.Popen([cmdEncrypt],
stdout=subprocess.PIPE, shell=True)
(encryptResult, err) = proc.communicate()
if not encryptResult:
return None
encryptResult = encryptResult.decode('utf-8')
if '--BEGIN PGP MESSAGE--' not in encryptResult:
return None
return encryptResult
def pgpDecrypt(content: str) -> str:
""" Encrypt using your default pgp key to the given recipient
"""
if '--BEGIN PGP MESSAGE--' not in content:
return content
# if the public key is also included within the message then import it
startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--'
if startBlock in content:
pubKey = extractPGPPublicKey(content)
if pubKey:
_pgpImportPubKey(pubKey)
cmdDecrypt = \
'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null'
proc = subprocess.Popen([cmdDecrypt],
stdout=subprocess.PIPE, shell=True)
(decryptResult, err) = proc.communicate()
if not decryptResult:
return content
decryptResult = decryptResult.decode('utf-8')
return decryptResult

View File

@ -84,6 +84,7 @@ def _speakerPronounce(baseDir: str, sayText: str, translate: {}) -> str:
".js": " dot J-S", ".js": " dot J-S",
"PSQL": "Postgres S-Q-L", "PSQL": "Postgres S-Q-L",
"SQL": "S-Q-L", "SQL": "S-Q-L",
"gdpr": "G-D-P-R",
"coop": "co-op", "coop": "co-op",
"KMail": "K-Mail", "KMail": "K-Mail",
"gmail": "G-mail", "gmail": "G-mail",
@ -412,21 +413,24 @@ def _postToSpeakerJson(baseDir: str, httpPrefix: str,
content = urllib.parse.unquote_plus(postJsonObject['object']['content']) content = urllib.parse.unquote_plus(postJsonObject['object']['content'])
content = html.unescape(content) content = html.unescape(content)
content = content.replace('<p>', '').replace('</p>', ' ') content = content.replace('<p>', '').replace('</p>', ' ')
# replace some emoji before removing html if '--BEGIN PGP MESSAGE--' not in content:
if ' <3' in content: # replace some emoji before removing html
content = content.replace(' <3', ' ' + translate['heart']) if ' <3' in content:
content = removeHtml(htmlReplaceQuoteMarks(content)) content = content.replace(' <3', ' ' + translate['heart'])
content = speakerReplaceLinks(content, translate, detectedLinks) content = removeHtml(htmlReplaceQuoteMarks(content))
# replace all double spaces content = speakerReplaceLinks(content, translate, detectedLinks)
while ' ' in content: # replace all double spaces
content = content.replace(' ', ' ') while ' ' in content:
content = content.replace(' . ', '. ').strip() content = content.replace(' ', ' ')
sayContent = content content = content.replace(' . ', '. ').strip()
sayContent = _speakerPronounce(baseDir, content, translate) sayContent = content
# replace all double spaces sayContent = _speakerPronounce(baseDir, content, translate)
while ' ' in sayContent: # replace all double spaces
sayContent = sayContent.replace(' ', ' ') while ' ' in sayContent:
sayContent = sayContent.replace(' . ', '. ').strip() sayContent = sayContent.replace(' ', ' ')
sayContent = sayContent.replace(' . ', '. ').strip()
else:
sayContent = content
imageDescription = '' imageDescription = ''
if postJsonObject['object'].get('attachment'): if postJsonObject['object'].get('attachment'):
@ -468,6 +472,9 @@ def _postToSpeakerJson(baseDir: str, httpPrefix: str,
sayContent = \ sayContent = \
translate['announces'] + ' ' + \ translate['announces'] + ' ' + \
announcedHandle + '. ' + sayContent announcedHandle + '. ' + sayContent
content = \
translate['announces'] + ' ' + \
announcedHandle + '. ' + content
postId = None postId = None
if postJsonObject['object'].get('id'): if postJsonObject['object'].get('id'):
postId = postJsonObject['object']['id'] postId = postJsonObject['object']['id']

View File

@ -102,6 +102,7 @@ from mastoapiv1 import getNicknameFromMastoApiV1Id
from webapp_post import prepareHtmlPostNickname from webapp_post import prepareHtmlPostNickname
from webapp_utils import markdownToHtml from webapp_utils import markdownToHtml
from speaker import speakerReplaceLinks from speaker import speakerReplaceLinks
from pgp import extractPGPPublicKey
testServerAliceRunning = False testServerAliceRunning = False
testServerBobRunning = False testServerBobRunning = False
@ -2999,7 +3000,8 @@ def testFunctions():
'E2EEremoveDevice', 'E2EEremoveDevice',
'setOrganizationScheme', 'setOrganizationScheme',
'fill_headers', 'fill_headers',
'_nothing' '_nothing',
"pgpEncrypt"
] ]
excludeImports = [ excludeImports = [
'link', 'link',
@ -3414,9 +3416,39 @@ def testEmojiImages():
assert os.path.isfile(emojiImageFilename) assert os.path.isfile(emojiImageFilename)
def testExtractPGPPublicKey():
print('testExtractPGPPublicKey')
pubKey = \
'-----BEGIN PGP PUBLIC KEY BLOCK-----\n\n' + \
'mDMEWZBueBYJKwYBBAHaRw8BAQdAKx1t6wL0RTuU6/' + \
'IBjngMbVJJ3Wg/3UW73/PV\n' + \
'I47xKTS0IUJvYiBNb3R0cmFtIDxib2JAZnJlZWRvb' + \
'WJvbmUubmV0PoiQBBMWCAA4\n' + \
'FiEEmruCwAq/OfgmgEh9zCU2GR+nwz8FAlmQbngCG' + \
'wMFCwkIBwMFFQoJCAsFFgID\n' + \
'AQACHgECF4AACgkQzCU2GR+nwz/9sAD/YgsHnVszH' + \
'Nz1zlVc5EgY1ByDupiJpHj0\n' + \
'XsLYk3AbNRgBALn45RqgD4eWHpmOriH09H5Rc5V9i' + \
'N4+OiGUn2AzJ6oHuDgEWZBu\n' + \
'eBIKKwYBBAGXVQEFAQEHQPRBG2ZQJce475S3e0Dxe' + \
'b0Fz5WdEu2q3GYLo4QG+4Ry\n' + \
'AwEIB4h4BBgWCAAgFiEEmruCwAq/OfgmgEh9zCU2G' + \
'R+nwz8FAlmQbngCGwwACgkQ\n' + \
'zCU2GR+nwz+OswD+JOoyBku9FzuWoVoOevU2HH+bP' + \
'OMDgY2OLnST9ZSyHkMBAMcK\n' + \
'fnaZ2Wi050483Sj2RmQRpb99Dod7rVZTDtCqXk0J\n' + \
'=gv5G\n' + \
'-----END PGP PUBLIC KEY BLOCK-----'
testStr = "Some introduction\n\n" + pubKey + "\n\nSome message."
result = extractPGPPublicKey(testStr)
assert result
assert result == pubKey
def runAllTests(): def runAllTests():
print('Running tests...') print('Running tests...')
testFunctions() testFunctions()
testExtractPGPPublicKey()
testEmojiImages() testEmojiImages()
testCamelCaseSplit() testCamelCaseSplit()
testSpeakerReplaceLinks() testSpeakerReplaceLinks()