diff --git a/petnames.py b/petnames.py index d8d77346e..bf3b66d92 100644 --- a/petnames.py +++ b/petnames.py @@ -23,46 +23,46 @@ def set_pet_name(base_dir: str, nickname: str, domain: str, handle = handle[1:] if petname.startswith('@'): petname = petname[1:] - petnamesFilename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' + petnames_filename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' entry = petname + ' ' + handle + '\n' # does this entry already exist? - if os.path.isfile(petnamesFilename): - with open(petnamesFilename, 'r') as petnamesFile: - petnamesStr = petnamesFile.read() - if entry in petnamesStr: + if os.path.isfile(petnames_filename): + with open(petnames_filename, 'r') as petnames_file: + petnames_str = petnames_file.read() + if entry in petnames_str: return True - if ' ' + handle + '\n' in petnamesStr: - petnamesList = petnamesStr.split('\n') - newPetnamesStr = '' - for pet in petnamesList: + if ' ' + handle + '\n' in petnames_str: + petnames_list = petnames_str.split('\n') + new_petnames_str = '' + for pet in petnames_list: if not pet.endswith(' ' + handle): - newPetnamesStr += pet + '\n' + new_petnames_str += pet + '\n' else: - newPetnamesStr += entry + new_petnames_str += entry # save the updated petnames file try: - with open(petnamesFilename, 'w+') as petnamesFile: - petnamesFile.write(newPetnamesStr) + with open(petnames_filename, 'w+') as petnames_file: + petnames_file.write(new_petnames_str) except OSError: - print('EX: unable to save ' + petnamesFilename) + print('EX: unable to save ' + petnames_filename) return False return True # entry does not exist in the petnames file try: - with open(petnamesFilename, 'a+') as petnamesFile: - petnamesFile.write(entry) + with open(petnames_filename, 'a+') as petnames_file: + petnames_file.write(entry) except OSError: - print('EX: unable to append ' + petnamesFilename) + print('EX: unable to append ' + petnames_filename) return False return True # first entry try: - with open(petnamesFilename, 'w+') as petnamesFile: - petnamesFile.write(entry) + with open(petnames_filename, 'w+') as petnames_file: + petnames_file.write(entry) except OSError: - print('EX: unable to write ' + petnamesFilename) + print('EX: unable to write ' + petnames_filename) return False return True @@ -75,21 +75,21 @@ def get_pet_name(base_dir: str, nickname: str, domain: str, return '' if handle.startswith('@'): handle = handle[1:] - petnamesFilename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' + petnames_filename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' - if not os.path.isfile(petnamesFilename): + if not os.path.isfile(petnames_filename): return '' - with open(petnamesFilename, 'r') as petnamesFile: - petnamesStr = petnamesFile.read() - if ' ' + handle + '\n' in petnamesStr: - petnamesList = petnamesStr.split('\n') - for pet in petnamesList: + with open(petnames_filename, 'r') as petnames_file: + petnames_str = petnames_file.read() + if ' ' + handle + '\n' in petnames_str: + petnames_list = petnames_str.split('\n') + for pet in petnames_list: if pet.endswith(' ' + handle): return pet.replace(' ' + handle, '').strip() - elif ' ' + handle.lower() + '\n' in petnamesStr.lower(): - petnamesList = petnamesStr.split('\n') + elif ' ' + handle.lower() + '\n' in petnames_str.lower(): + petnames_list = petnames_str.split('\n') handle = handle.lower() - for pet in petnamesList: + for pet in petnames_list: if pet.lower().endswith(' ' + handle): handle2 = pet.split(' ')[-1] return pet.replace(' ' + handle2, '').strip() @@ -102,15 +102,15 @@ def _get_pet_name_handle(base_dir: str, nickname: str, domain: str, """ if petname.startswith('@'): petname = petname[1:] - petnamesFilename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' + petnames_filename = acct_dir(base_dir, nickname, domain) + '/petnames.txt' - if not os.path.isfile(petnamesFilename): + if not os.path.isfile(petnames_filename): return '' - with open(petnamesFilename, 'r') as petnamesFile: - petnamesStr = petnamesFile.read() - if petname + ' ' in petnamesStr: - petnamesList = petnamesStr.split('\n') - for pet in petnamesList: + with open(petnames_filename, 'r') as petnames_file: + petnames_str = petnames_file.read() + if petname + ' ' in petnames_str: + petnames_list = petnames_str.split('\n') + for pet in petnames_list: if pet.startswith(petname + ' '): handle = pet.replace(petname + ' ', '').strip() return handle diff --git a/pgp.py b/pgp.py index fdd89bc42..6119db20e 100644 --- a/pgp.py +++ b/pgp.py @@ -94,15 +94,15 @@ def get_pgp_fingerprint(actor_json: {}) -> str: def set_email_address(actor_json: {}, email_address: str) -> None: """Sets the email address for the given actor """ - notEmailAddress = False + not_email_address = False if '@' not in email_address: - notEmailAddress = True + not_email_address = True if '.' not in email_address: - notEmailAddress = True + not_email_address = True if '<' in email_address: - notEmailAddress = True + not_email_address = True if email_address.startswith('@'): - notEmailAddress = True + not_email_address = True if not actor_json.get('attachment'): actor_json['attachment'] = [] @@ -120,7 +120,7 @@ def set_email_address(actor_json: {}, email_address: str) -> None: break if property_found: actor_json['attachment'].remove(property_found) - if notEmailAddress: + if not_email_address: return for property_value in actor_json['attachment']: @@ -135,25 +135,25 @@ def set_email_address(actor_json: {}, email_address: str) -> None: property_value['value'] = email_address return - newEmailAddress = { + new_email_address = { "name": "Email", "type": "PropertyValue", "value": email_address } - actor_json['attachment'].append(newEmailAddress) + actor_json['attachment'].append(new_email_address) def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None: """Sets a PGP public key for the given actor """ - removeKey = False + remove_key = False if not pgp_pub_key: - removeKey = True + remove_key = True else: if not contains_pgp_public_key(pgp_pub_key): - removeKey = True + remove_key = True if '<' in pgp_pub_key: - removeKey = True + remove_key = True if not actor_json.get('attachment'): actor_json['attachment'] = [] @@ -171,7 +171,7 @@ def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None: break if property_found: actor_json['attachment'].remove(property_value) - if removeKey: + if remove_key: return for property_value in actor_json['attachment']: @@ -197,12 +197,12 @@ def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None: def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None: """Sets a PGP fingerprint for the given actor """ - removeFingerprint = False + remove_fingerprint = False if not fingerprint: - removeFingerprint = True + remove_fingerprint = True else: if len(fingerprint) < 10: - removeFingerprint = True + remove_fingerprint = True if not actor_json.get('attachment'): actor_json['attachment'] = [] @@ -220,7 +220,7 @@ def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None: break if property_found: actor_json['attachment'].remove(property_value) - if removeFingerprint: + if remove_fingerprint: return for property_value in actor_json['attachment']: @@ -246,92 +246,93 @@ def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None: def extract_pgp_public_key(content: str) -> str: """Returns the PGP key from the given text """ - startBlock = '--BEGIN PGP PUBLIC KEY BLOCK--' - endBlock = '--END PGP PUBLIC KEY BLOCK--' - if startBlock not in content: + start_block = '--BEGIN PGP PUBLIC KEY BLOCK--' + end_block = '--END PGP PUBLIC KEY BLOCK--' + if start_block not in content: return None - if endBlock not in content: + if end_block not in content: return None if '\n' not in content: return None - linesList = content.split('\n') + lines_list = content.split('\n') extracting = False - publicKey = '' - for line in linesList: + public_key = '' + for line in lines_list: if not extracting: - if startBlock in line: + if start_block in line: extracting = True else: - if endBlock in line: - publicKey += line + if end_block in line: + public_key += line break if extracting: - publicKey += line + '\n' - return publicKey + public_key += line + '\n' + return public_key -def _pgp_import_pub_key(recipientPubKey: str) -> str: +def _pgp_import_pub_key(recipient_pub_key: str) -> str: """ Import the given public key """ # do a dry run - cmdImportPubKey = \ - 'echo "' + recipientPubKey + '" | gpg --dry-run --import 2> /dev/null' - proc = subprocess.Popen([cmdImportPubKey], + cmd_import_pub_key = \ + 'echo "' + recipient_pub_key + \ + '" | gpg --dry-run --import 2> /dev/null' + proc = subprocess.Popen([cmd_import_pub_key], stdout=subprocess.PIPE, shell=True) - (importResult, err) = proc.communicate() + (import_result, err) = proc.communicate() if err: return None # this time for real - cmdImportPubKey = \ - 'echo "' + recipientPubKey + '" | gpg --import 2> /dev/null' - proc = subprocess.Popen([cmdImportPubKey], + cmd_import_pub_key = \ + 'echo "' + recipient_pub_key + '" | gpg --import 2> /dev/null' + proc = subprocess.Popen([cmd_import_pub_key], stdout=subprocess.PIPE, shell=True) - (importResult, err) = proc.communicate() + (import_result, err) = proc.communicate() if err: return None # get the key id - cmdImportPubKey = \ - 'echo "' + recipientPubKey + '" | gpg --show-keys' - proc = subprocess.Popen([cmdImportPubKey], + cmd_import_pub_key = \ + 'echo "' + recipient_pub_key + '" | gpg --show-keys' + proc = subprocess.Popen([cmd_import_pub_key], stdout=subprocess.PIPE, shell=True) - (importResult, err) = proc.communicate() - if not importResult: + (import_result, err) = proc.communicate() + if not import_result: return None - importResult = importResult.decode('utf-8').split('\n') - keyId = '' - for line in importResult: + import_result = import_result.decode('utf-8').split('\n') + key_id = '' + for line in import_result: if line.startswith('pub'): continue - elif line.startswith('uid'): + if line.startswith('uid'): continue - elif line.startswith('sub'): + if line.startswith('sub'): continue - keyId = line.strip() + key_id = line.strip() break - return keyId + return key_id -def _pgp_encrypt(content: str, recipientPubKey: str) -> str: +def _pgp_encrypt(content: str, recipient_pub_key: str) -> str: """ Encrypt using your default pgp key to the given recipient """ - keyId = _pgp_import_pub_key(recipientPubKey) - if not keyId: + key_id = _pgp_import_pub_key(recipient_pub_key) + if not key_id: return None - cmdEncrypt = \ + cmd_encrypt = \ 'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \ - keyId + ' 2> /dev/null' - proc = subprocess.Popen([cmdEncrypt], + key_id + ' 2> /dev/null' + proc = subprocess.Popen([cmd_encrypt], stdout=subprocess.PIPE, shell=True) - (encryptResult, err) = proc.communicate() - if not encryptResult: + (encrypt_result, _) = proc.communicate() + if not encrypt_result: return None - encryptResult = encryptResult.decode('utf-8') - if not is_pgp_encrypted(encryptResult): + encrypt_result = encrypt_result.decode('utf-8') + if not is_pgp_encrypted(encrypt_result): return None - return encryptResult + return encrypt_result def _get_pgp_public_key_from_actor(signing_priv_key_pem: str, @@ -341,7 +342,7 @@ def _get_pgp_public_key_from_actor(signing_priv_key_pem: str, public key specified """ if not actor_json: - actor_json, asHeader = \ + actor_json, _ = \ get_actor_json(domain, handle, False, False, False, True, signing_priv_key_pem, None) if not actor_json: @@ -366,11 +367,11 @@ def _get_pgp_public_key_from_actor(signing_priv_key_pem: str, def has_local_pg_pkey() -> bool: """Returns true if there is a local .gnupg directory """ - homeDir = str(Path.home()) - gpgDir = homeDir + '/.gnupg' - if os.path.isdir(gpgDir): - keyId = pgp_local_public_key() - if keyId: + home_dir = str(Path.home()) + gpg_dir = home_dir + '/.gnupg' + if os.path.isdir(gpg_dir): + key_id = pgp_local_public_key() + if key_id: return True return False @@ -380,12 +381,12 @@ def pgp_encrypt_to_actor(domain: str, content: str, toHandle: str, """PGP encrypt a message to the given actor or handle """ # get the actor and extract the pgp public key from it - recipientPubKey = \ + recipient_pub_key = \ _get_pgp_public_key_from_actor(signing_priv_key_pem, domain, toHandle) - if not recipientPubKey: + if not recipient_pub_key: return None # encrypt using the recipient public key - return _pgp_encrypt(content, recipientPubKey) + return _pgp_encrypt(content, recipient_pub_key) def pgp_decrypt(domain: str, content: str, fromHandle: str, @@ -398,32 +399,32 @@ def pgp_decrypt(domain: str, content: str, fromHandle: str, # if the public key is also included within the message then import it if contains_pgp_public_key(content): - pubKey = extract_pgp_public_key(content) + pub_key = extract_pgp_public_key(content) else: - pubKey = \ + pub_key = \ _get_pgp_public_key_from_actor(signing_priv_key_pem, domain, content, fromHandle) - if pubKey: - _pgp_import_pub_key(pubKey) + if pub_key: + _pgp_import_pub_key(pub_key) - cmdDecrypt = \ + cmd_decrypt = \ 'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null' - proc = subprocess.Popen([cmdDecrypt], + proc = subprocess.Popen([cmd_decrypt], stdout=subprocess.PIPE, shell=True) - (decryptResult, err) = proc.communicate() - if not decryptResult: + (decrypt_result, _) = proc.communicate() + if not decrypt_result: return content - decryptResult = decryptResult.decode('utf-8').strip() - return decryptResult + decrypt_result = decrypt_result.decode('utf-8').strip() + return decrypt_result def _pgp_local_public_key_id() -> str: """Gets the local pgp public key ID """ - cmdStr = \ + cmd_str = \ "gpgconf --list-options gpg | " + \ "awk -F: '$1 == \"default-key\" {print $10}'" - proc = subprocess.Popen([cmdStr], + proc = subprocess.Popen([cmd_str], stdout=subprocess.PIPE, shell=True) (result, err) = proc.communicate() if err: @@ -438,11 +439,11 @@ def _pgp_local_public_key_id() -> str: def pgp_local_public_key() -> str: """Gets the local pgp public key """ - keyId = _pgp_local_public_key_id() - if not keyId: - keyId = '' - cmdStr = "gpg --armor --export " + keyId - proc = subprocess.Popen([cmdStr], + key_id = _pgp_local_public_key_id() + if not key_id: + key_id = '' + cmd_str = "gpg --armor --export " + key_id + proc = subprocess.Popen([cmd_str], stdout=subprocess.PIPE, shell=True) (result, err) = proc.communicate() if err: @@ -473,12 +474,12 @@ def pgp_public_key_upload(base_dir: str, session, pgp_pub_key = pgp_local_public_key() if not pgp_pub_key: return None - pgp_pub_keyId = _pgp_local_public_key_id() + pgp_pub_key_id = _pgp_local_public_key_id() else: if debug: print('Testing with PGP public key ' + test) pgp_pub_key = test - pgp_pub_keyId = None + pgp_pub_key_id = None domain_full = get_full_domain(domain, port) if debug: @@ -489,7 +490,7 @@ def pgp_public_key_upload(base_dir: str, session, if debug: print('Getting actor for ' + handle) - actor_json, asHeader = \ + actor_json, _ = \ get_actor_json(domain_full, handle, False, False, debug, True, signing_priv_key_pem, session) if not actor_json: @@ -531,8 +532,8 @@ def pgp_public_key_upload(base_dir: str, session, return None # set the pgp details - if pgp_pub_keyId: - set_pgp_fingerprint(actor_json, pgp_pub_keyId) + if pgp_pub_key_id: + set_pgp_fingerprint(actor_json, pgp_pub_key_id) else: if debug: print('No PGP key Id. Continuing anyway.') @@ -542,10 +543,10 @@ def pgp_public_key_upload(base_dir: str, session, set_pgp_pub_key(actor_json, pgp_pub_key) # create an actor update - statusNumber, published = get_status_number() - actorUpdate = { + status_number, _ = get_status_number() + actor_update = { '@context': 'https://www.w3.org/ns/activitystreams', - 'id': actor + '#updates/' + statusNumber, + 'id': actor + '#updates/' + status_number, 'type': 'Update', 'actor': actor, 'to': [actor], @@ -553,7 +554,7 @@ def pgp_public_key_upload(base_dir: str, session, 'object': actor_json } if debug: - print('actor update is ' + str(actorUpdate)) + print('actor update is ' + str(actor_update)) # lookup the inbox for the To handle wf_request = \ @@ -571,51 +572,51 @@ def pgp_public_key_upload(base_dir: str, session, ' did not return a dict. ' + str(wf_request)) return None - postToBox = 'outbox' + post_to_box = 'outbox' # get the actor inbox for the To handle - originDomain = domain - (inboxUrl, pubKeyId, pubKey, fromPersonId, sharedInbox, avatarUrl, - displayName, _) = get_person_box(signing_priv_key_pem, originDomain, - base_dir, session, wf_request, - person_cache, - __version__, http_prefix, nickname, - domain, postToBox, 35725) + origin_domain = domain + (inbox_url, _, _, from_person_id, _, _, + _, _) = get_person_box(signing_priv_key_pem, origin_domain, + base_dir, session, wf_request, + person_cache, + __version__, http_prefix, nickname, + domain, post_to_box, 35725) - if not inboxUrl: + if not inbox_url: if debug: - print('DEBUG: No ' + postToBox + ' was found for ' + handle) + print('DEBUG: No ' + post_to_box + ' was found for ' + handle) return None - if not fromPersonId: + if not from_person_id: if debug: print('DEBUG: No actor was found for ' + handle) return None - authHeader = create_basic_auth_header(nickname, password) + auth_header = create_basic_auth_header(nickname, password) headers = { 'host': domain, 'Content-type': 'application/json', - 'Authorization': authHeader + 'Authorization': auth_header } quiet = not debug tries = 0 while tries < 4: - postResult = \ + post_result = \ post_json(http_prefix, domain_full, - session, actorUpdate, [], inboxUrl, + session, actor_update, [], inbox_url, headers, 5, quiet) - if postResult: + if post_result: break tries += 1 - if postResult is None: + if post_result is None: if debug: print('DEBUG: POST pgp actor update failed for c2s to ' + - inboxUrl) + inbox_url) return None if debug: print('DEBUG: c2s POST pgp actor update success') - return actorUpdate + return actor_update