Merge branch 'main' of ssh://code.freedombone.net:2222/bashrc/epicyon into main

main
Bob Mottram 2021-03-11 20:38:07 +00:00
commit bf03b75802
5 changed files with 250 additions and 141 deletions

View File

@ -11,6 +11,7 @@ import shutil
import sys
import time
import argparse
from person import getActorJson
from person import createPerson
from person import createGroup
from person import setProfileImage
@ -1370,137 +1371,7 @@ if args.migrations:
sys.exit()
if args.actor:
originalActor = args.actor
if '/@' in args.actor or \
'/users/' in args.actor or \
args.actor.startswith('http') or \
args.actor.startswith('dat'):
# format: https://domain/@nick
prefixes = getProtocolPrefixes()
for prefix in prefixes:
args.actor = args.actor.replace(prefix, '')
args.actor = args.actor.replace('/@', '/users/')
if not hasUsersPath(args.actor):
print('Expected actor format: ' +
'https://domain/@nick or https://domain/users/nick')
sys.exit()
if '/users/' in args.actor:
nickname = args.actor.split('/users/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = args.actor.split('/users/')[0]
elif '/profile/' in args.actor:
nickname = args.actor.split('/profile/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = args.actor.split('/profile/')[0]
elif '/channel/' in args.actor:
nickname = args.actor.split('/channel/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = args.actor.split('/channel/')[0]
elif '/accounts/' in args.actor:
nickname = args.actor.split('/accounts/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = args.actor.split('/accounts/')[0]
elif '/u/' in args.actor:
nickname = args.actor.split('/u/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = args.actor.split('/u/')[0]
else:
# format: @nick@domain
if '@' not in args.actor:
print('Syntax: --actor nickname@domain')
sys.exit()
if args.actor.startswith('@'):
args.actor = args.actor[1:]
if '@' not in args.actor:
print('Syntax: --actor nickname@domain')
sys.exit()
nickname = args.actor.split('@')[0]
domain = args.actor.split('@')[1]
domain = domain.replace('\n', '').replace('\r', '')
cachedWebfingers = {}
if args.http or domain.endswith('.onion'):
httpPrefix = 'http'
port = 80
proxyType = 'tor'
elif domain.endswith('.i2p'):
httpPrefix = 'http'
port = 80
proxyType = 'i2p'
elif args.gnunet:
httpPrefix = 'gnunet'
port = 80
proxyType = 'gnunet'
else:
httpPrefix = 'https'
port = 443
session = createSession(proxyType)
if nickname == 'inbox':
nickname = domain
handle = nickname + '@' + domain
wfRequest = webfingerHandle(session, handle,
httpPrefix, cachedWebfingers,
None, __version__)
if not wfRequest:
print('Unable to webfinger ' + handle)
sys.exit()
if not isinstance(wfRequest, dict):
print('Webfinger for ' + handle + ' did not return a dict. ' +
str(wfRequest))
sys.exit()
pprint(wfRequest)
personUrl = None
if wfRequest.get('errors'):
print('wfRequest error: ' + str(wfRequest['errors']))
if hasUsersPath(args.actor):
personUrl = originalActor
else:
sys.exit()
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/activity+json; profile="' + profileStr + '"'
}
if not personUrl:
personUrl = getUserUrl(wfRequest)
if nickname == domain:
personUrl = personUrl.replace('/users/', '/actor/')
personUrl = personUrl.replace('/accounts/', '/actor/')
personUrl = personUrl.replace('/channel/', '/actor/')
personUrl = personUrl.replace('/profile/', '/actor/')
personUrl = personUrl.replace('/u/', '/actor/')
if not personUrl:
# try single user instance
personUrl = httpPrefix + '://' + domain
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/ld+json; profile="' + profileStr + '"'
}
if '/channel/' in personUrl or '/accounts/' in personUrl:
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/ld+json; profile="' + profileStr + '"'
}
personJson = \
getJson(session, personUrl, asHeader, None, __version__,
httpPrefix, None)
if personJson:
pprint(personJson)
else:
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/jrd+json; profile="' + profileStr + '"'
}
personJson = \
getJson(session, personUrl, asHeader, None,
__version__, httpPrefix, None)
if personJson:
pprint(personJson)
else:
print('Failed to get ' + personUrl)
getActorJson(args.actor, args.http, args.gnunet, False)
sys.exit()
if args.followers:

View File

@ -11,6 +11,7 @@ import html
import time
import sys
import select
from random import randint
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import getFullDomain
@ -26,6 +27,8 @@ from follow import sendUnfollowRequestViaServer
from posts import sendPostViaServer
from announce import sendAnnounceViaServer
from pgp import pgpDecrypt
from pgp import hasLocalPGPkey
from pgp import pgpEncryptToActor
def _waitForKeypress(timeout: int, debug: bool) -> str:
@ -320,6 +323,32 @@ def _notificationNewDM(session, toHandle: str,
subject = None
commentsEnabled = True
subject = None
# if there is a local PGP key then attempt to encrypt the DM
# using the PGP public key of the recipient
if hasLocalPGPkey():
sayStr = \
'Local PGP key detected...' + \
'Fetching PGP public key for ' + toHandle
_sayCommand(sayStr, sayStr, screenreader, systemLanguage, espeak)
paddedMessage = newMessage
if len(paddedMessage) < 32:
# add some padding before and after
# This is to guard against cribs based on small messages, like "Hi"
for before in range(randint(1, 16)):
paddedMessage = ' ' + paddedMessage
for after in range(randint(1, 16)):
paddedMessage += ' '
cipherText = \
pgpEncryptToActor(paddedMessage, toHandle)
if not cipherText:
sayStr = toHandle + ' has no PGP public key'
_sayCommand(sayStr, sayStr, screenreader, systemLanguage, espeak)
else:
newMessage = cipherText
sayStr = 'Message encrypted'
_sayCommand(sayStr, sayStr, screenreader, systemLanguage, espeak)
sayStr = 'Sending'
_sayCommand(sayStr, sayStr, screenreader, systemLanguage, espeak)
if sendPostViaServer(__version__,
@ -486,11 +515,17 @@ def runNotificationsClient(baseDir: str, proxyType: str, httpPrefix: str,
else:
messageStr = speakerJson['say'] + '. ' + \
speakerJson['imageDescription']
messageStr = pgpDecrypt(messageStr)
if speakerJson.get('id'):
messageStr = pgpDecrypt(messageStr,
speakerJson['id'])
content = messageStr
if speakerJson.get('content'):
content = pgpDecrypt(speakerJson['content'])
if speakerJson.get('id'):
content = pgpDecrypt(speakerJson['content'],
speakerJson['id'])
else:
content = speakerJson['content']
# say the speaker's name
_sayCommand(nameStr, nameStr, screenreader,

150
person.py
View File

@ -19,6 +19,7 @@ from cryptography.hazmat.primitives import serialization
from shutil import copyfile
from webfinger import createWebfingerEndpoint
from webfinger import storeWebfingerEndpoint
from posts import getUserUrl
from posts import createDMTimeline
from posts import createRepliesTimeline
from posts import createMediaTimeline
@ -41,6 +42,12 @@ from utils import saveJson
from utils import setConfigParam
from utils import getConfigParam
from utils import refreshNewswire
from utils import getProtocolPrefixes
from utils import hasUsersPath
from session import createSession
from session import getJson
from webfinger import webfingerHandle
from pprint import pprint
def generateRSAKey() -> (str, str):
@ -1093,3 +1100,146 @@ def setPersonNotes(baseDir: str, nickname: str, domain: str,
with open(notesFilename, 'w+') as notesFile:
notesFile.write(notes)
return True
def getActorJson(handle: str, http: bool, gnunet: bool, quiet=False) -> {}:
"""Returns the actor json
"""
originalActor = handle
if '/@' in handle or \
'/users/' in handle or \
handle.startswith('http') or \
handle.startswith('dat'):
# format: https://domain/@nick
prefixes = getProtocolPrefixes()
for prefix in prefixes:
handle = handle.replace(prefix, '')
handle = handle.replace('/@', '/users/')
if not hasUsersPath(handle):
if not quiet:
print('Expected actor format: ' +
'https://domain/@nick or https://domain/users/nick')
return None
if '/users/' in handle:
nickname = handle.split('/users/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = handle.split('/users/')[0]
elif '/profile/' in handle:
nickname = handle.split('/profile/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = handle.split('/profile/')[0]
elif '/channel/' in handle:
nickname = handle.split('/channel/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = handle.split('/channel/')[0]
elif '/accounts/' in handle:
nickname = handle.split('/accounts/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = handle.split('/accounts/')[0]
elif '/u/' in handle:
nickname = handle.split('/u/')[1]
nickname = nickname.replace('\n', '').replace('\r', '')
domain = handle.split('/u/')[0]
else:
# format: @nick@domain
if '@' not in handle:
if not quiet:
print('Syntax: --actor nickname@domain')
return None
if handle.startswith('@'):
handle = handle[1:]
if '@' not in handle:
if not quiet:
print('Syntax: --actor nickname@domain')
return None
nickname = handle.split('@')[0]
domain = handle.split('@')[1]
domain = domain.replace('\n', '').replace('\r', '')
cachedWebfingers = {}
proxyType = None
if http or domain.endswith('.onion'):
httpPrefix = 'http'
proxyType = 'tor'
elif domain.endswith('.i2p'):
httpPrefix = 'http'
proxyType = 'i2p'
elif gnunet:
httpPrefix = 'gnunet'
proxyType = 'gnunet'
else:
httpPrefix = 'https'
session = createSession(proxyType)
if nickname == 'inbox':
nickname = domain
handle = nickname + '@' + domain
wfRequest = webfingerHandle(session, handle,
httpPrefix, cachedWebfingers,
None, __version__)
if not wfRequest:
if not quiet:
print('Unable to webfinger ' + handle)
return None
if not isinstance(wfRequest, dict):
if not quiet:
print('Webfinger for ' + handle + ' did not return a dict. ' +
str(wfRequest))
return None
if not quiet:
pprint(wfRequest)
personUrl = None
if wfRequest.get('errors'):
if not quiet:
print('wfRequest error: ' + str(wfRequest['errors']))
if hasUsersPath(handle):
personUrl = originalActor
else:
return None
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/activity+json; profile="' + profileStr + '"'
}
if not personUrl:
personUrl = getUserUrl(wfRequest)
if nickname == domain:
personUrl = personUrl.replace('/users/', '/actor/')
personUrl = personUrl.replace('/accounts/', '/actor/')
personUrl = personUrl.replace('/channel/', '/actor/')
personUrl = personUrl.replace('/profile/', '/actor/')
personUrl = personUrl.replace('/u/', '/actor/')
if not personUrl:
# try single user instance
personUrl = httpPrefix + '://' + domain
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/ld+json; profile="' + profileStr + '"'
}
if '/channel/' in personUrl or '/accounts/' in personUrl:
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/ld+json; profile="' + profileStr + '"'
}
personJson = \
getJson(session, personUrl, asHeader, None, __version__,
httpPrefix, None, 20, quiet)
if personJson:
if not quiet:
pprint(personJson)
else:
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/jrd+json; profile="' + profileStr + '"'
}
personJson = \
getJson(session, personUrl, asHeader, None,
__version__, httpPrefix, None)
if not quiet:
if personJson:
pprint(personJson)
else:
print('Failed to get ' + personUrl)
return personJson

62
pgp.py
View File

@ -6,7 +6,10 @@ __maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
import subprocess
from pathlib import Path
from person import getActorJson
def getEmailAddress(actorJson: {}) -> str:
@ -299,7 +302,7 @@ def _pgpImportPubKey(recipientPubKey: str) -> str:
return keyId
def pgpEncrypt(content: str, recipientPubKey: str) -> str:
def _pgpEncrypt(content: str, recipientPubKey: str) -> str:
""" Encrypt using your default pgp key to the given recipient
"""
keyId = _pgpImportPubKey(recipientPubKey)
@ -320,16 +323,67 @@ def pgpEncrypt(content: str, recipientPubKey: str) -> str:
return encryptResult
def pgpDecrypt(content: str) -> str:
def _getPGPPublicKeyFromActor(handle: str, actorJson=None) -> str:
"""Searches tags on the actor to see if there is any PGP
public key specified
"""
if not actorJson:
actorJson = getActorJson(handle, False, False, True)
if not actorJson:
return None
if not actorJson.get('attachment'):
return None
if not isinstance(actorJson['attachment'], list):
return None
# search through the tags on the actor
for tag in actorJson['attachment']:
if not isinstance(tag, dict):
continue
if not tag.get('value'):
continue
if not isinstance(tag['value'], str):
continue
if '--BEGIN PGP PUBLIC KEY BLOCK--' in tag['value']:
if '--END PGP PUBLIC KEY BLOCK--' in tag['value']:
return tag['value']
return None
def hasLocalPGPkey() -> bool:
"""Returns true if there is a local .gnupg directory
"""
homeDir = str(Path.home())
gpgDir = homeDir + '/.gnupg'
if os.path.isfile(gpgDir):
return True
return False
def pgpEncryptToActor(content: str, toHandle: str) -> str:
"""PGP encrypt a message to the given actor or handle
"""
# get the actor and extract the pgp public key from it
recipientPubKey = _getPGPPublicKeyFromActor(toHandle)
if not recipientPubKey:
return None
# encrypt using the recipient public key
return _pgpEncrypt(content, recipientPubKey)
def pgpDecrypt(content: str, fromHandle: str) -> str:
""" Encrypt using your default pgp key to the given recipient
fromHandle can be a handle or actor url
"""
if '--BEGIN PGP MESSAGE--' not in content:
return content
# if the public key is also included within the message then import it
startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--'
if startBlock in content:
endBlock = '--END PGP PUBLIC KEY BLOCK--'
if startBlock in content and endBlock in content:
pubKey = extractPGPPublicKey(content)
else:
pubKey = _getPGPPublicKeyFromActor(content, fromHandle)
if pubKey:
_pgpImportPubKey(pubKey)
@ -340,5 +394,5 @@ def pgpDecrypt(content: str) -> str:
(decryptResult, err) = proc.communicate()
if not decryptResult:
return content
decryptResult = decryptResult.decode('utf-8')
decryptResult = decryptResult.decode('utf-8').strip()
return decryptResult

View File

@ -3000,8 +3000,7 @@ def testFunctions():
'E2EEremoveDevice',
'setOrganizationScheme',
'fill_headers',
'_nothing',
"pgpEncrypt"
'_nothing'
]
excludeImports = [
'link',