Snake case

merge-requests/26/head
Bob Mottram 2022-01-03 15:06:00 +00:00
parent 0a74de68f6
commit c55025c9fa
2 changed files with 153 additions and 152 deletions

View File

@ -23,46 +23,46 @@ def set_pet_name(base_dir: str, nickname: str, domain: str,
handle = handle[1:] handle = handle[1:]
if petname.startswith('@'): if petname.startswith('@'):
petname = petname[1:] petname = petname[1:]
petnamesFilename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' petnames_filename = acct_dir(base_dir, nickname, domain) + '/petnames.txt'
entry = petname + ' ' + handle + '\n' entry = petname + ' ' + handle + '\n'
# does this entry already exist? # does this entry already exist?
if os.path.isfile(petnamesFilename): if os.path.isfile(petnames_filename):
with open(petnamesFilename, 'r') as petnamesFile: with open(petnames_filename, 'r') as petnames_file:
petnamesStr = petnamesFile.read() petnames_str = petnames_file.read()
if entry in petnamesStr: if entry in petnames_str:
return True return True
if ' ' + handle + '\n' in petnamesStr: if ' ' + handle + '\n' in petnames_str:
petnamesList = petnamesStr.split('\n') petnames_list = petnames_str.split('\n')
newPetnamesStr = '' new_petnames_str = ''
for pet in petnamesList: for pet in petnames_list:
if not pet.endswith(' ' + handle): if not pet.endswith(' ' + handle):
newPetnamesStr += pet + '\n' new_petnames_str += pet + '\n'
else: else:
newPetnamesStr += entry new_petnames_str += entry
# save the updated petnames file # save the updated petnames file
try: try:
with open(petnamesFilename, 'w+') as petnamesFile: with open(petnames_filename, 'w+') as petnames_file:
petnamesFile.write(newPetnamesStr) petnames_file.write(new_petnames_str)
except OSError: except OSError:
print('EX: unable to save ' + petnamesFilename) print('EX: unable to save ' + petnames_filename)
return False return False
return True return True
# entry does not exist in the petnames file # entry does not exist in the petnames file
try: try:
with open(petnamesFilename, 'a+') as petnamesFile: with open(petnames_filename, 'a+') as petnames_file:
petnamesFile.write(entry) petnames_file.write(entry)
except OSError: except OSError:
print('EX: unable to append ' + petnamesFilename) print('EX: unable to append ' + petnames_filename)
return False return False
return True return True
# first entry # first entry
try: try:
with open(petnamesFilename, 'w+') as petnamesFile: with open(petnames_filename, 'w+') as petnames_file:
petnamesFile.write(entry) petnames_file.write(entry)
except OSError: except OSError:
print('EX: unable to write ' + petnamesFilename) print('EX: unable to write ' + petnames_filename)
return False return False
return True return True
@ -75,21 +75,21 @@ def get_pet_name(base_dir: str, nickname: str, domain: str,
return '' return ''
if handle.startswith('@'): if handle.startswith('@'):
handle = handle[1:] handle = handle[1:]
petnamesFilename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' petnames_filename = acct_dir(base_dir, nickname, domain) + '/petnames.txt'
if not os.path.isfile(petnamesFilename): if not os.path.isfile(petnames_filename):
return '' return ''
with open(petnamesFilename, 'r') as petnamesFile: with open(petnames_filename, 'r') as petnames_file:
petnamesStr = petnamesFile.read() petnames_str = petnames_file.read()
if ' ' + handle + '\n' in petnamesStr: if ' ' + handle + '\n' in petnames_str:
petnamesList = petnamesStr.split('\n') petnames_list = petnames_str.split('\n')
for pet in petnamesList: for pet in petnames_list:
if pet.endswith(' ' + handle): if pet.endswith(' ' + handle):
return pet.replace(' ' + handle, '').strip() return pet.replace(' ' + handle, '').strip()
elif ' ' + handle.lower() + '\n' in petnamesStr.lower(): elif ' ' + handle.lower() + '\n' in petnames_str.lower():
petnamesList = petnamesStr.split('\n') petnames_list = petnames_str.split('\n')
handle = handle.lower() handle = handle.lower()
for pet in petnamesList: for pet in petnames_list:
if pet.lower().endswith(' ' + handle): if pet.lower().endswith(' ' + handle):
handle2 = pet.split(' ')[-1] handle2 = pet.split(' ')[-1]
return pet.replace(' ' + handle2, '').strip() return pet.replace(' ' + handle2, '').strip()
@ -102,15 +102,15 @@ def _get_pet_name_handle(base_dir: str, nickname: str, domain: str,
""" """
if petname.startswith('@'): if petname.startswith('@'):
petname = petname[1:] petname = petname[1:]
petnamesFilename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' petnames_filename = acct_dir(base_dir, nickname, domain) + '/petnames.txt'
if not os.path.isfile(petnamesFilename): if not os.path.isfile(petnames_filename):
return '' return ''
with open(petnamesFilename, 'r') as petnamesFile: with open(petnames_filename, 'r') as petnames_file:
petnamesStr = petnamesFile.read() petnames_str = petnames_file.read()
if petname + ' ' in petnamesStr: if petname + ' ' in petnames_str:
petnamesList = petnamesStr.split('\n') petnames_list = petnames_str.split('\n')
for pet in petnamesList: for pet in petnames_list:
if pet.startswith(petname + ' '): if pet.startswith(petname + ' '):
handle = pet.replace(petname + ' ', '').strip() handle = pet.replace(petname + ' ', '').strip()
return handle return handle

231
pgp.py
View File

@ -94,15 +94,15 @@ def get_pgp_fingerprint(actor_json: {}) -> str:
def set_email_address(actor_json: {}, email_address: str) -> None: def set_email_address(actor_json: {}, email_address: str) -> None:
"""Sets the email address for the given actor """Sets the email address for the given actor
""" """
notEmailAddress = False not_email_address = False
if '@' not in email_address: if '@' not in email_address:
notEmailAddress = True not_email_address = True
if '.' not in email_address: if '.' not in email_address:
notEmailAddress = True not_email_address = True
if '<' in email_address: if '<' in email_address:
notEmailAddress = True not_email_address = True
if email_address.startswith('@'): if email_address.startswith('@'):
notEmailAddress = True not_email_address = True
if not actor_json.get('attachment'): if not actor_json.get('attachment'):
actor_json['attachment'] = [] actor_json['attachment'] = []
@ -120,7 +120,7 @@ def set_email_address(actor_json: {}, email_address: str) -> None:
break break
if property_found: if property_found:
actor_json['attachment'].remove(property_found) actor_json['attachment'].remove(property_found)
if notEmailAddress: if not_email_address:
return return
for property_value in actor_json['attachment']: for property_value in actor_json['attachment']:
@ -135,25 +135,25 @@ def set_email_address(actor_json: {}, email_address: str) -> None:
property_value['value'] = email_address property_value['value'] = email_address
return return
newEmailAddress = { new_email_address = {
"name": "Email", "name": "Email",
"type": "PropertyValue", "type": "PropertyValue",
"value": email_address "value": email_address
} }
actor_json['attachment'].append(newEmailAddress) actor_json['attachment'].append(new_email_address)
def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None: def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None:
"""Sets a PGP public key for the given actor """Sets a PGP public key for the given actor
""" """
removeKey = False remove_key = False
if not pgp_pub_key: if not pgp_pub_key:
removeKey = True remove_key = True
else: else:
if not contains_pgp_public_key(pgp_pub_key): if not contains_pgp_public_key(pgp_pub_key):
removeKey = True remove_key = True
if '<' in pgp_pub_key: if '<' in pgp_pub_key:
removeKey = True remove_key = True
if not actor_json.get('attachment'): if not actor_json.get('attachment'):
actor_json['attachment'] = [] actor_json['attachment'] = []
@ -171,7 +171,7 @@ def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None:
break break
if property_found: if property_found:
actor_json['attachment'].remove(property_value) actor_json['attachment'].remove(property_value)
if removeKey: if remove_key:
return return
for property_value in actor_json['attachment']: for property_value in actor_json['attachment']:
@ -197,12 +197,12 @@ def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None:
def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None: def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None:
"""Sets a PGP fingerprint for the given actor """Sets a PGP fingerprint for the given actor
""" """
removeFingerprint = False remove_fingerprint = False
if not fingerprint: if not fingerprint:
removeFingerprint = True remove_fingerprint = True
else: else:
if len(fingerprint) < 10: if len(fingerprint) < 10:
removeFingerprint = True remove_fingerprint = True
if not actor_json.get('attachment'): if not actor_json.get('attachment'):
actor_json['attachment'] = [] actor_json['attachment'] = []
@ -220,7 +220,7 @@ def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None:
break break
if property_found: if property_found:
actor_json['attachment'].remove(property_value) actor_json['attachment'].remove(property_value)
if removeFingerprint: if remove_fingerprint:
return return
for property_value in actor_json['attachment']: for property_value in actor_json['attachment']:
@ -246,92 +246,93 @@ def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None:
def extract_pgp_public_key(content: str) -> str: def extract_pgp_public_key(content: str) -> str:
"""Returns the PGP key from the given text """Returns the PGP key from the given text
""" """
startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--' start_block = '--BEGIN PGP PUBLIC KEY BLOCK--'
endBlock = '--END PGP PUBLIC KEY BLOCK--' end_block = '--END PGP PUBLIC KEY BLOCK--'
if startBlock not in content: if start_block not in content:
return None return None
if endBlock not in content: if end_block not in content:
return None return None
if '\n' not in content: if '\n' not in content:
return None return None
linesList = content.split('\n') lines_list = content.split('\n')
extracting = False extracting = False
publicKey = '' public_key = ''
for line in linesList: for line in lines_list:
if not extracting: if not extracting:
if startBlock in line: if start_block in line:
extracting = True extracting = True
else: else:
if endBlock in line: if end_block in line:
publicKey += line public_key += line
break break
if extracting: if extracting:
publicKey += line + '\n' public_key += line + '\n'
return publicKey return public_key
def _pgp_import_pub_key(recipientPubKey: str) -> str: def _pgp_import_pub_key(recipient_pub_key: str) -> str:
""" Import the given public key """ Import the given public key
""" """
# do a dry run # do a dry run
cmdImportPubKey = \ cmd_import_pub_key = \
'echo "' + recipientPubKey + '" | gpg --dry-run --import 2> /dev/null' 'echo "' + recipient_pub_key + \
proc = subprocess.Popen([cmdImportPubKey], '" | gpg --dry-run --import 2> /dev/null'
proc = subprocess.Popen([cmd_import_pub_key],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate() (import_result, err) = proc.communicate()
if err: if err:
return None return None
# this time for real # this time for real
cmdImportPubKey = \ cmd_import_pub_key = \
'echo "' + recipientPubKey + '" | gpg --import 2> /dev/null' 'echo "' + recipient_pub_key + '" | gpg --import 2> /dev/null'
proc = subprocess.Popen([cmdImportPubKey], proc = subprocess.Popen([cmd_import_pub_key],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate() (import_result, err) = proc.communicate()
if err: if err:
return None return None
# get the key id # get the key id
cmdImportPubKey = \ cmd_import_pub_key = \
'echo "' + recipientPubKey + '" | gpg --show-keys' 'echo "' + recipient_pub_key + '" | gpg --show-keys'
proc = subprocess.Popen([cmdImportPubKey], proc = subprocess.Popen([cmd_import_pub_key],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(importResult, err) = proc.communicate() (import_result, err) = proc.communicate()
if not importResult: if not import_result:
return None return None
importResult = importResult.decode('utf-8').split('\n') import_result = import_result.decode('utf-8').split('\n')
keyId = '' key_id = ''
for line in importResult: for line in import_result:
if line.startswith('pub'): if line.startswith('pub'):
continue continue
elif line.startswith('uid'): if line.startswith('uid'):
continue continue
elif line.startswith('sub'): if line.startswith('sub'):
continue continue
keyId = line.strip() key_id = line.strip()
break break
return keyId return key_id
def _pgp_encrypt(content: str, recipientPubKey: str) -> str: def _pgp_encrypt(content: str, recipient_pub_key: str) -> str:
""" Encrypt using your default pgp key to the given recipient """ Encrypt using your default pgp key to the given recipient
""" """
keyId = _pgp_import_pub_key(recipientPubKey) key_id = _pgp_import_pub_key(recipient_pub_key)
if not keyId: if not key_id:
return None return None
cmdEncrypt = \ cmd_encrypt = \
'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \ 'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \
keyId + ' 2> /dev/null' key_id + ' 2> /dev/null'
proc = subprocess.Popen([cmdEncrypt], proc = subprocess.Popen([cmd_encrypt],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(encryptResult, err) = proc.communicate() (encrypt_result, _) = proc.communicate()
if not encryptResult: if not encrypt_result:
return None return None
encryptResult = encryptResult.decode('utf-8') encrypt_result = encrypt_result.decode('utf-8')
if not is_pgp_encrypted(encryptResult): if not is_pgp_encrypted(encrypt_result):
return None return None
return encryptResult return encrypt_result
def _get_pgp_public_key_from_actor(signing_priv_key_pem: str, def _get_pgp_public_key_from_actor(signing_priv_key_pem: str,
@ -341,7 +342,7 @@ def _get_pgp_public_key_from_actor(signing_priv_key_pem: str,
public key specified public key specified
""" """
if not actor_json: if not actor_json:
actor_json, asHeader = \ actor_json, _ = \
get_actor_json(domain, handle, False, False, False, True, get_actor_json(domain, handle, False, False, False, True,
signing_priv_key_pem, None) signing_priv_key_pem, None)
if not actor_json: if not actor_json:
@ -366,11 +367,11 @@ def _get_pgp_public_key_from_actor(signing_priv_key_pem: str,
def has_local_pg_pkey() -> bool: def has_local_pg_pkey() -> bool:
"""Returns true if there is a local .gnupg directory """Returns true if there is a local .gnupg directory
""" """
homeDir = str(Path.home()) home_dir = str(Path.home())
gpgDir = homeDir + '/.gnupg' gpg_dir = home_dir + '/.gnupg'
if os.path.isdir(gpgDir): if os.path.isdir(gpg_dir):
keyId = pgp_local_public_key() key_id = pgp_local_public_key()
if keyId: if key_id:
return True return True
return False return False
@ -380,12 +381,12 @@ def pgp_encrypt_to_actor(domain: str, content: str, toHandle: str,
"""PGP encrypt a message to the given actor or handle """PGP encrypt a message to the given actor or handle
""" """
# get the actor and extract the pgp public key from it # get the actor and extract the pgp public key from it
recipientPubKey = \ recipient_pub_key = \
_get_pgp_public_key_from_actor(signing_priv_key_pem, domain, toHandle) _get_pgp_public_key_from_actor(signing_priv_key_pem, domain, toHandle)
if not recipientPubKey: if not recipient_pub_key:
return None return None
# encrypt using the recipient public key # encrypt using the recipient public key
return _pgp_encrypt(content, recipientPubKey) return _pgp_encrypt(content, recipient_pub_key)
def pgp_decrypt(domain: str, content: str, fromHandle: str, def pgp_decrypt(domain: str, content: str, fromHandle: str,
@ -398,32 +399,32 @@ def pgp_decrypt(domain: str, content: str, fromHandle: str,
# if the public key is also included within the message then import it # if the public key is also included within the message then import it
if contains_pgp_public_key(content): if contains_pgp_public_key(content):
pubKey = extract_pgp_public_key(content) pub_key = extract_pgp_public_key(content)
else: else:
pubKey = \ pub_key = \
_get_pgp_public_key_from_actor(signing_priv_key_pem, _get_pgp_public_key_from_actor(signing_priv_key_pem,
domain, content, fromHandle) domain, content, fromHandle)
if pubKey: if pub_key:
_pgp_import_pub_key(pubKey) _pgp_import_pub_key(pub_key)
cmdDecrypt = \ cmd_decrypt = \
'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null' 'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null'
proc = subprocess.Popen([cmdDecrypt], proc = subprocess.Popen([cmd_decrypt],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(decryptResult, err) = proc.communicate() (decrypt_result, _) = proc.communicate()
if not decryptResult: if not decrypt_result:
return content return content
decryptResult = decryptResult.decode('utf-8').strip() decrypt_result = decrypt_result.decode('utf-8').strip()
return decryptResult return decrypt_result
def _pgp_local_public_key_id() -> str: def _pgp_local_public_key_id() -> str:
"""Gets the local pgp public key ID """Gets the local pgp public key ID
""" """
cmdStr = \ cmd_str = \
"gpgconf --list-options gpg | " + \ "gpgconf --list-options gpg | " + \
"awk -F: '$1 == \"default-key\" {print $10}'" "awk -F: '$1 == \"default-key\" {print $10}'"
proc = subprocess.Popen([cmdStr], proc = subprocess.Popen([cmd_str],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(result, err) = proc.communicate() (result, err) = proc.communicate()
if err: if err:
@ -438,11 +439,11 @@ def _pgp_local_public_key_id() -> str:
def pgp_local_public_key() -> str: def pgp_local_public_key() -> str:
"""Gets the local pgp public key """Gets the local pgp public key
""" """
keyId = _pgp_local_public_key_id() key_id = _pgp_local_public_key_id()
if not keyId: if not key_id:
keyId = '' key_id = ''
cmdStr = "gpg --armor --export " + keyId cmd_str = "gpg --armor --export " + key_id
proc = subprocess.Popen([cmdStr], proc = subprocess.Popen([cmd_str],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(result, err) = proc.communicate() (result, err) = proc.communicate()
if err: if err:
@ -473,12 +474,12 @@ def pgp_public_key_upload(base_dir: str, session,
pgp_pub_key = pgp_local_public_key() pgp_pub_key = pgp_local_public_key()
if not pgp_pub_key: if not pgp_pub_key:
return None return None
pgp_pub_keyId = _pgp_local_public_key_id() pgp_pub_key_id = _pgp_local_public_key_id()
else: else:
if debug: if debug:
print('Testing with PGP public key ' + test) print('Testing with PGP public key ' + test)
pgp_pub_key = test pgp_pub_key = test
pgp_pub_keyId = None pgp_pub_key_id = None
domain_full = get_full_domain(domain, port) domain_full = get_full_domain(domain, port)
if debug: if debug:
@ -489,7 +490,7 @@ def pgp_public_key_upload(base_dir: str, session,
if debug: if debug:
print('Getting actor for ' + handle) print('Getting actor for ' + handle)
actor_json, asHeader = \ actor_json, _ = \
get_actor_json(domain_full, handle, False, False, debug, True, get_actor_json(domain_full, handle, False, False, debug, True,
signing_priv_key_pem, session) signing_priv_key_pem, session)
if not actor_json: if not actor_json:
@ -531,8 +532,8 @@ def pgp_public_key_upload(base_dir: str, session,
return None return None
# set the pgp details # set the pgp details
if pgp_pub_keyId: if pgp_pub_key_id:
set_pgp_fingerprint(actor_json, pgp_pub_keyId) set_pgp_fingerprint(actor_json, pgp_pub_key_id)
else: else:
if debug: if debug:
print('No PGP key Id. Continuing anyway.') print('No PGP key Id. Continuing anyway.')
@ -542,10 +543,10 @@ def pgp_public_key_upload(base_dir: str, session,
set_pgp_pub_key(actor_json, pgp_pub_key) set_pgp_pub_key(actor_json, pgp_pub_key)
# create an actor update # create an actor update
statusNumber, published = get_status_number() status_number, _ = get_status_number()
actorUpdate = { actor_update = {
'@context': 'https://www.w3.org/ns/activitystreams', '@context': 'https://www.w3.org/ns/activitystreams',
'id': actor + '#updates/' + statusNumber, 'id': actor + '#updates/' + status_number,
'type': 'Update', 'type': 'Update',
'actor': actor, 'actor': actor,
'to': [actor], 'to': [actor],
@ -553,7 +554,7 @@ def pgp_public_key_upload(base_dir: str, session,
'object': actor_json 'object': actor_json
} }
if debug: if debug:
print('actor update is ' + str(actorUpdate)) print('actor update is ' + str(actor_update))
# lookup the inbox for the To handle # lookup the inbox for the To handle
wf_request = \ wf_request = \
@ -571,51 +572,51 @@ def pgp_public_key_upload(base_dir: str, session,
' did not return a dict. ' + str(wf_request)) ' did not return a dict. ' + str(wf_request))
return None return None
postToBox = 'outbox' post_to_box = 'outbox'
# get the actor inbox for the To handle # get the actor inbox for the To handle
originDomain = domain origin_domain = domain
(inboxUrl, pubKeyId, pubKey, fromPersonId, sharedInbox, avatarUrl, (inbox_url, _, _, from_person_id, _, _,
displayName, _) = get_person_box(signing_priv_key_pem, originDomain, _, _) = get_person_box(signing_priv_key_pem, origin_domain,
base_dir, session, wf_request, base_dir, session, wf_request,
person_cache, person_cache,
__version__, http_prefix, nickname, __version__, http_prefix, nickname,
domain, postToBox, 35725) domain, post_to_box, 35725)
if not inboxUrl: if not inbox_url:
if debug: if debug:
print('DEBUG: No ' + postToBox + ' was found for ' + handle) print('DEBUG: No ' + post_to_box + ' was found for ' + handle)
return None return None
if not fromPersonId: if not from_person_id:
if debug: if debug:
print('DEBUG: No actor was found for ' + handle) print('DEBUG: No actor was found for ' + handle)
return None return None
authHeader = create_basic_auth_header(nickname, password) auth_header = create_basic_auth_header(nickname, password)
headers = { headers = {
'host': domain, 'host': domain,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': auth_header
} }
quiet = not debug quiet = not debug
tries = 0 tries = 0
while tries < 4: while tries < 4:
postResult = \ post_result = \
post_json(http_prefix, domain_full, post_json(http_prefix, domain_full,
session, actorUpdate, [], inboxUrl, session, actor_update, [], inbox_url,
headers, 5, quiet) headers, 5, quiet)
if postResult: if post_result:
break break
tries += 1 tries += 1
if postResult is None: if post_result is None:
if debug: if debug:
print('DEBUG: POST pgp actor update failed for c2s to ' + print('DEBUG: POST pgp actor update failed for c2s to ' +
inboxUrl) inbox_url)
return None return None
if debug: if debug:
print('DEBUG: c2s POST pgp actor update success') print('DEBUG: c2s POST pgp actor update success')
return actorUpdate return actor_update