From 507e435a2d66c2a85955e7f9fa649366f11b1fda Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Thu, 30 Dec 2021 10:16:57 +0000 Subject: [PATCH] Snake case --- blocking.py | 734 +++++++++++++++++++++++++------------------------ tests.py | 8 +- webapp_post.py | 4 +- 3 files changed, 376 insertions(+), 370 deletions(-) diff --git a/blocking.py b/blocking.py index 63459c7f0..b564e70bc 100644 --- a/blocking.py +++ b/blocking.py @@ -38,192 +38,196 @@ from conversation import unmute_conversation def add_global_block(base_dir: str, - blockNickname: str, blockDomain: str) -> bool: + block_nickname: str, block_domain: str) -> bool: """Global block which applies to all accounts """ - blockingFilename = base_dir + '/accounts/blocking.txt' - if not blockNickname.startswith('#'): + blocking_filename = base_dir + '/accounts/blocking.txt' + if not block_nickname.startswith('#'): # is the handle already blocked? - blockHandle = blockNickname + '@' + blockDomain - if os.path.isfile(blockingFilename): - if blockHandle in open(blockingFilename).read(): + block_handle = block_nickname + '@' + block_domain + if os.path.isfile(blocking_filename): + if block_handle in open(blocking_filename).read(): return False # block an account handle or domain try: - with open(blockingFilename, 'a+') as blockFile: - blockFile.write(blockHandle + '\n') + with open(blocking_filename, 'a+') as block_file: + block_file.write(block_handle + '\n') except OSError: - print('EX: unable to save blocked handle ' + blockHandle) + print('EX: unable to save blocked handle ' + block_handle) return False else: - blockHashtag = blockNickname + block_hashtag = block_nickname # is the hashtag already blocked? - if os.path.isfile(blockingFilename): - if blockHashtag + '\n' in open(blockingFilename).read(): + if os.path.isfile(blocking_filename): + if block_hashtag + '\n' in open(blocking_filename).read(): return False # block a hashtag try: - with open(blockingFilename, 'a+') as blockFile: - blockFile.write(blockHashtag + '\n') + with open(blocking_filename, 'a+') as block_file: + block_file.write(block_hashtag + '\n') except OSError: - print('EX: unable to save blocked hashtag ' + blockHashtag) + print('EX: unable to save blocked hashtag ' + block_hashtag) return False return True def add_block(base_dir: str, nickname: str, domain: str, - blockNickname: str, blockDomain: str) -> bool: + block_nickname: str, block_domain: str) -> bool: """Block the given account """ - if blockDomain.startswith(domain) and nickname == blockNickname: + if block_domain.startswith(domain) and nickname == block_nickname: # don't block self return False domain = remove_domain_port(domain) - blockingFilename = acct_dir(base_dir, nickname, domain) + '/blocking.txt' - blockHandle = blockNickname + '@' + blockDomain - if os.path.isfile(blockingFilename): - if blockHandle + '\n' in open(blockingFilename).read(): + blocking_filename = acct_dir(base_dir, nickname, domain) + '/blocking.txt' + block_handle = block_nickname + '@' + block_domain + if os.path.isfile(blocking_filename): + if block_handle + '\n' in open(blocking_filename).read(): return False # if we are following then unfollow - followingFilename = acct_dir(base_dir, nickname, domain) + '/following.txt' - if os.path.isfile(followingFilename): - if blockHandle + '\n' in open(followingFilename).read(): - followingStr = '' + following_filename = \ + acct_dir(base_dir, nickname, domain) + '/following.txt' + if os.path.isfile(following_filename): + if block_handle + '\n' in open(following_filename).read(): + following_str = '' try: - with open(followingFilename, 'r') as followingFile: - followingStr = followingFile.read() + with open(following_filename, 'r') as foll_file: + following_str = foll_file.read() except OSError: - print('EX: Unable to read following ' + followingFilename) + print('EX: Unable to read following ' + following_filename) return False - if followingStr: - followingStr = followingStr.replace(blockHandle + '\n', '') + if following_str: + following_str = following_str.replace(block_handle + '\n', '') try: - with open(followingFilename, 'w+') as followingFile: - followingFile.write(followingStr) + with open(following_filename, 'w+') as foll_file: + foll_file.write(following_str) except OSError: - print('EX: Unable to write following ' + followingStr) + print('EX: Unable to write following ' + following_str) return False # if they are a follower then remove them - followersFilename = acct_dir(base_dir, nickname, domain) + '/followers.txt' - if os.path.isfile(followersFilename): - if blockHandle + '\n' in open(followersFilename).read(): - followersStr = '' + followers_filename = \ + acct_dir(base_dir, nickname, domain) + '/followers.txt' + if os.path.isfile(followers_filename): + if block_handle + '\n' in open(followers_filename).read(): + followers_str = '' try: - with open(followersFilename, 'r') as followersFile: - followersStr = followersFile.read() + with open(followers_filename, 'r') as foll_file: + followers_str = foll_file.read() except OSError: - print('EX: Unable to read followers ' + followersFilename) + print('EX: Unable to read followers ' + followers_filename) return False - if followersStr: - followersStr = followersStr.replace(blockHandle + '\n', '') + if followers_str: + followers_str = followers_str.replace(block_handle + '\n', '') try: - with open(followersFilename, 'w+') as followersFile: - followersFile.write(followersStr) + with open(followers_filename, 'w+') as foll_file: + foll_file.write(followers_str) except OSError: - print('EX: Unable to write followers ' + followersStr) + print('EX: Unable to write followers ' + followers_str) return False try: - with open(blockingFilename, 'a+') as blockFile: - blockFile.write(blockHandle + '\n') + with open(blocking_filename, 'a+') as block_file: + block_file.write(block_handle + '\n') except OSError: - print('EX: unable to append block handle ' + blockHandle) + print('EX: unable to append block handle ' + block_handle) return False return True def remove_global_block(base_dir: str, - unblockNickname: str, - unblockDomain: str) -> bool: + unblock_nickname: str, + unblock_domain: str) -> bool: """Unblock the given global block """ - unblockingFilename = base_dir + '/accounts/blocking.txt' - if not unblockNickname.startswith('#'): - unblockHandle = unblockNickname + '@' + unblockDomain - if os.path.isfile(unblockingFilename): - if unblockHandle in open(unblockingFilename).read(): + unblocking_filename = base_dir + '/accounts/blocking.txt' + if not unblock_nickname.startswith('#'): + unblock_handle = unblock_nickname + '@' + unblock_domain + if os.path.isfile(unblocking_filename): + if unblock_handle in open(unblocking_filename).read(): try: - with open(unblockingFilename, 'r') as fp: - with open(unblockingFilename + '.new', 'w+') as fpnew: - for line in fp: + with open(unblocking_filename, 'r') as fp_unblock: + with open(unblocking_filename + '.new', 'w+') as fpnew: + for line in fp_unblock: handle = \ line.replace('\n', '').replace('\r', '') - if unblockHandle not in line: + if unblock_handle not in line: fpnew.write(handle + '\n') except OSError as ex: print('EX: failed to remove global block ' + - unblockingFilename + ' ' + str(ex)) + unblocking_filename + ' ' + str(ex)) return False - if os.path.isfile(unblockingFilename + '.new'): + if os.path.isfile(unblocking_filename + '.new'): try: - os.rename(unblockingFilename + '.new', - unblockingFilename) + os.rename(unblocking_filename + '.new', + unblocking_filename) except OSError: - print('EX: unable to rename ' + unblockingFilename) + print('EX: unable to rename ' + unblocking_filename) return False return True else: - unblockHashtag = unblockNickname - if os.path.isfile(unblockingFilename): - if unblockHashtag + '\n' in open(unblockingFilename).read(): + unblock_hashtag = unblock_nickname + if os.path.isfile(unblocking_filename): + if unblock_hashtag + '\n' in open(unblocking_filename).read(): try: - with open(unblockingFilename, 'r') as fp: - with open(unblockingFilename + '.new', 'w+') as fpnew: - for line in fp: - blockLine = \ + with open(unblocking_filename, 'r') as fp_unblock: + with open(unblocking_filename + '.new', 'w+') as fpnew: + for line in fp_unblock: + block_line = \ line.replace('\n', '').replace('\r', '') - if unblockHashtag not in line: - fpnew.write(blockLine + '\n') + if unblock_hashtag not in line: + fpnew.write(block_line + '\n') except OSError as ex: print('EX: failed to remove global hashtag block ' + - unblockingFilename + ' ' + str(ex)) + unblocking_filename + ' ' + str(ex)) return False - if os.path.isfile(unblockingFilename + '.new'): + if os.path.isfile(unblocking_filename + '.new'): try: - os.rename(unblockingFilename + '.new', - unblockingFilename) + os.rename(unblocking_filename + '.new', + unblocking_filename) except OSError: - print('EX: unable to rename 2 ' + unblockingFilename) + print('EX: unable to rename 2 ' + unblocking_filename) return False return True return False def remove_block(base_dir: str, nickname: str, domain: str, - unblockNickname: str, unblockDomain: str) -> bool: + unblock_nickname: str, unblock_domain: str) -> bool: """Unblock the given account """ domain = remove_domain_port(domain) - unblockingFilename = acct_dir(base_dir, nickname, domain) + '/blocking.txt' - unblockHandle = unblockNickname + '@' + unblockDomain - if os.path.isfile(unblockingFilename): - if unblockHandle in open(unblockingFilename).read(): + unblocking_filename = \ + acct_dir(base_dir, nickname, domain) + '/blocking.txt' + unblock_handle = unblock_nickname + '@' + unblock_domain + if os.path.isfile(unblocking_filename): + if unblock_handle in open(unblocking_filename).read(): try: - with open(unblockingFilename, 'r') as fp: - with open(unblockingFilename + '.new', 'w+') as fpnew: - for line in fp: + with open(unblocking_filename, 'r') as fp_unblock: + with open(unblocking_filename + '.new', 'w+') as fpnew: + for line in fp_unblock: handle = line.replace('\n', '').replace('\r', '') - if unblockHandle not in line: + if unblock_handle not in line: fpnew.write(handle + '\n') except OSError as ex: print('EX: failed to remove block ' + - unblockingFilename + ' ' + str(ex)) + unblocking_filename + ' ' + str(ex)) return False - if os.path.isfile(unblockingFilename + '.new'): + if os.path.isfile(unblocking_filename + '.new'): try: - os.rename(unblockingFilename + '.new', unblockingFilename) + os.rename(unblocking_filename + '.new', + unblocking_filename) except OSError: - print('EX: unable to rename 3 ' + unblockingFilename) + print('EX: unable to rename 3 ' + unblocking_filename) return False return True return False @@ -235,12 +239,12 @@ def is_blocked_hashtag(base_dir: str, hashtag: str) -> bool: # avoid very long hashtags if len(hashtag) > 32: return True - globalBlockingFilename = base_dir + '/accounts/blocking.txt' - if os.path.isfile(globalBlockingFilename): + global_blocking_filename = base_dir + '/accounts/blocking.txt' + if os.path.isfile(global_blocking_filename): hashtag = hashtag.strip('\n').strip('\r') if not hashtag.startswith('#'): hashtag = '#' + hashtag - if hashtag + '\n' in open(globalBlockingFilename).read(): + if hashtag + '\n' in open(global_blocking_filename).read(): return True return False @@ -249,50 +253,50 @@ def get_domain_blocklist(base_dir: str) -> str: """Returns all globally blocked domains as a string This can be used for fast matching to mitigate flooding """ - blockedStr = '' + blocked_str = '' - evilDomains = evil_incarnate() - for evil in evilDomains: - blockedStr += evil + '\n' + evil_domains = evil_incarnate() + for evil in evil_domains: + blocked_str += evil + '\n' - globalBlockingFilename = base_dir + '/accounts/blocking.txt' - if not os.path.isfile(globalBlockingFilename): - return blockedStr + global_blocking_filename = base_dir + '/accounts/blocking.txt' + if not os.path.isfile(global_blocking_filename): + return blocked_str try: - with open(globalBlockingFilename, 'r') as fpBlocked: - blockedStr += fpBlocked.read() + with open(global_blocking_filename, 'r') as fp_blocked: + blocked_str += fp_blocked.read() except OSError: - print('EX: unable to read ' + globalBlockingFilename) - return blockedStr + print('EX: unable to read ' + global_blocking_filename) + return blocked_str def update_blocked_cache(base_dir: str, - blockedCache: [], - blockedCacheLastUpdated: int, - blockedCacheUpdateSecs: int) -> int: + blocked_cache: [], + blocked_cache_last_updated: int, + blocked_cache_update_secs: int) -> int: """Updates the cache of globally blocked domains held in memory """ curr_time = int(time.time()) - if blockedCacheLastUpdated > curr_time: + if blocked_cache_last_updated > curr_time: print('WARN: Cache updated in the future') - blockedCacheLastUpdated = 0 - secondsSinceLastUpdate = curr_time - blockedCacheLastUpdated - if secondsSinceLastUpdate < blockedCacheUpdateSecs: - return blockedCacheLastUpdated - globalBlockingFilename = base_dir + '/accounts/blocking.txt' - if not os.path.isfile(globalBlockingFilename): - return blockedCacheLastUpdated + blocked_cache_last_updated = 0 + seconds_since_last_update = curr_time - blocked_cache_last_updated + if seconds_since_last_update < blocked_cache_update_secs: + return blocked_cache_last_updated + global_blocking_filename = base_dir + '/accounts/blocking.txt' + if not os.path.isfile(global_blocking_filename): + return blocked_cache_last_updated try: - with open(globalBlockingFilename, 'r') as fpBlocked: - blockedLines = fpBlocked.readlines() + with open(global_blocking_filename, 'r') as fp_blocked: + blocked_lines = fp_blocked.readlines() # remove newlines - for index in range(len(blockedLines)): - blockedLines[index] = blockedLines[index].replace('\n', '') + for index in range(len(blocked_lines)): + blocked_lines[index] = blocked_lines[index].replace('\n', '') # update the cache - blockedCache.clear() - blockedCache += blockedLines + blocked_cache.clear() + blocked_cache += blocked_lines except OSError as ex: - print('EX: unable to read ' + globalBlockingFilename + ' ' + str(ex)) + print('EX: unable to read ' + global_blocking_filename + ' ' + str(ex)) return curr_time @@ -302,14 +306,14 @@ def _get_short_domain(domain: str) -> str: e.g. subdomain123.mydomain.com becomes mydomain.com """ sections = domain.split('.') - noOfSections = len(sections) - if noOfSections > 2: - return sections[noOfSections-2] + '.' + sections[-1] + no_of_sections = len(sections) + if no_of_sections > 2: + return sections[no_of_sections-2] + '.' + sections[-1] return None def is_blocked_domain(base_dir: str, domain: str, - blockedCache: [] = None) -> bool: + blocked_cache: [] = None) -> bool: """Is the given domain blocked? """ if '.' not in domain: @@ -318,98 +322,98 @@ def is_blocked_domain(base_dir: str, domain: str, if is_evil(domain): return True - shortDomain = _get_short_domain(domain) + short_domain = _get_short_domain(domain) if not broch_mode_is_active(base_dir): - if blockedCache: - for blockedStr in blockedCache: - if '*@' + domain in blockedStr: + if blocked_cache: + for blocked_str in blocked_cache: + if '*@' + domain in blocked_str: return True - if shortDomain: - if '*@' + shortDomain in blockedStr: + if short_domain: + if '*@' + short_domain in blocked_str: return True else: # instance block list - globalBlockingFilename = base_dir + '/accounts/blocking.txt' - if os.path.isfile(globalBlockingFilename): + global_blocking_filename = base_dir + '/accounts/blocking.txt' + if os.path.isfile(global_blocking_filename): try: - with open(globalBlockingFilename, 'r') as fpBlocked: - blockedStr = fpBlocked.read() - if '*@' + domain in blockedStr: + with open(global_blocking_filename, 'r') as fp_blocked: + blocked_str = fp_blocked.read() + if '*@' + domain in blocked_str: return True - if shortDomain: - if '*@' + shortDomain in blockedStr: + if short_domain: + if '*@' + short_domain in blocked_str: return True except OSError as ex: - print('EX: unable to read ' + globalBlockingFilename + + print('EX: unable to read ' + global_blocking_filename + ' ' + str(ex)) else: - allowFilename = base_dir + '/accounts/allowedinstances.txt' + allow_filename = base_dir + '/accounts/allowedinstances.txt' # instance allow list - if not shortDomain: - if domain not in open(allowFilename).read(): + if not short_domain: + if domain not in open(allow_filename).read(): return True else: - if shortDomain not in open(allowFilename).read(): + if short_domain not in open(allow_filename).read(): return True return False def is_blocked(base_dir: str, nickname: str, domain: str, - blockNickname: str, blockDomain: str, - blockedCache: [] = None) -> bool: + block_nickname: str, block_domain: str, + blocked_cache: [] = None) -> bool: """Is the given nickname blocked? """ - if is_evil(blockDomain): + if is_evil(block_domain): return True - blockHandle = None - if blockNickname and blockDomain: - blockHandle = blockNickname + '@' + blockDomain + block_handle = None + if block_nickname and block_domain: + block_handle = block_nickname + '@' + block_domain if not broch_mode_is_active(base_dir): # instance level block list - if blockedCache: - for blockedStr in blockedCache: - if '*@' + domain in blockedStr: + if blocked_cache: + for blocked_str in blocked_cache: + if '*@' + domain in blocked_str: return True - if blockHandle: - if blockHandle in blockedStr: + if block_handle: + if block_handle in blocked_str: return True else: - globalBlockingFilename = base_dir + '/accounts/blocking.txt' - if os.path.isfile(globalBlockingFilename): - if '*@' + blockDomain in open(globalBlockingFilename).read(): + global_blocks_filename = base_dir + '/accounts/blocking.txt' + if os.path.isfile(global_blocks_filename): + if '*@' + block_domain in open(global_blocks_filename).read(): return True - if blockHandle: - if blockHandle in open(globalBlockingFilename).read(): + if block_handle: + if block_handle in open(global_blocks_filename).read(): return True else: # instance allow list - allowFilename = base_dir + '/accounts/allowedinstances.txt' - shortDomain = _get_short_domain(blockDomain) - if not shortDomain: - if blockDomain not in open(allowFilename).read(): + allow_filename = base_dir + '/accounts/allowedinstances.txt' + short_domain = _get_short_domain(block_domain) + if not short_domain: + if block_domain not in open(allow_filename).read(): return True else: - if shortDomain not in open(allowFilename).read(): + if short_domain not in open(allow_filename).read(): return True # account level allow list - accountDir = acct_dir(base_dir, nickname, domain) - allowFilename = accountDir + '/allowedinstances.txt' - if os.path.isfile(allowFilename): - if blockDomain not in open(allowFilename).read(): + account_dir = acct_dir(base_dir, nickname, domain) + allow_filename = account_dir + '/allowedinstances.txt' + if os.path.isfile(allow_filename): + if block_domain not in open(allow_filename).read(): return True # account level block list - blockingFilename = accountDir + '/blocking.txt' - if os.path.isfile(blockingFilename): - if '*@' + blockDomain in open(blockingFilename).read(): + blocking_filename = account_dir + '/blocking.txt' + if os.path.isfile(blocking_filename): + if '*@' + block_domain in open(blocking_filename).read(): return True - if blockHandle: - if blockHandle in open(blockingFilename).read(): + if block_handle: + if block_handle in open(blocking_filename).read(): return True return False @@ -432,31 +436,32 @@ def outbox_block(base_dir: str, http_prefix: str, if debug: print('DEBUG: c2s block request arrived in outbox') - messageId = remove_id_ending(message_json['object']) - if '/statuses/' not in messageId: + message_id = remove_id_ending(message_json['object']) + if '/statuses/' not in message_id: if debug: print('DEBUG: c2s block object is not a status') return False - if not has_users_path(messageId): + if not has_users_path(message_id): if debug: print('DEBUG: c2s block object has no nickname') return False domain = remove_domain_port(domain) - post_filename = locate_post(base_dir, nickname, domain, messageId) + post_filename = locate_post(base_dir, nickname, domain, message_id) if not post_filename: if debug: print('DEBUG: c2s block post not found in inbox or outbox') - print(messageId) + print(message_id) return False - nicknameBlocked = get_nickname_from_actor(message_json['object']) - if not nicknameBlocked: + nickname_blocked = get_nickname_from_actor(message_json['object']) + if not nickname_blocked: print('WARN: unable to find nickname in ' + message_json['object']) return False - domainBlocked, portBlocked = get_domain_from_actor(message_json['object']) - domainBlockedFull = get_full_domain(domainBlocked, portBlocked) + domain_blocked, port_blocked = \ + get_domain_from_actor(message_json['object']) + domain_blocked_full = get_full_domain(domain_blocked, port_blocked) add_block(base_dir, nickname, domain, - nicknameBlocked, domainBlockedFull) + nickname_blocked, domain_blocked_full) if debug: print('DEBUG: post blocked via c2s - ' + post_filename) @@ -488,33 +493,34 @@ def outbox_undo_block(base_dir: str, http_prefix: str, if debug: print('DEBUG: c2s undo block request arrived in outbox') - messageId = remove_id_ending(message_json['object']['object']) - if '/statuses/' not in messageId: + message_id = remove_id_ending(message_json['object']['object']) + if '/statuses/' not in message_id: if debug: print('DEBUG: c2s undo block object is not a status') return - if not has_users_path(messageId): + if not has_users_path(message_id): if debug: print('DEBUG: c2s undo block object has no nickname') return domain = remove_domain_port(domain) - post_filename = locate_post(base_dir, nickname, domain, messageId) + post_filename = locate_post(base_dir, nickname, domain, message_id) if not post_filename: if debug: print('DEBUG: c2s undo block post not found in inbox or outbox') - print(messageId) + print(message_id) return - nicknameBlocked = get_nickname_from_actor(message_json['object']['object']) - if not nicknameBlocked: + nickname_blocked = \ + get_nickname_from_actor(message_json['object']['object']) + if not nickname_blocked: print('WARN: unable to find nickname in ' + message_json['object']['object']) return - domainObject = message_json['object']['object'] - domainBlocked, portBlocked = get_domain_from_actor(domainObject) - domainBlockedFull = get_full_domain(domainBlocked, portBlocked) + domain_object = message_json['object']['object'] + domain_blocked, port_blocked = get_domain_from_actor(domain_object) + domain_blocked_full = get_full_domain(domain_blocked, port_blocked) remove_block(base_dir, nickname, domain, - nicknameBlocked, domainBlockedFull) + nickname_blocked, domain_blocked_full) if debug: print('DEBUG: post undo blocked via c2s - ' + post_filename) @@ -535,26 +541,26 @@ def mute_post(base_dir: str, nickname: str, domain: str, port: int, return print('mute_post: ' + str(post_json_object)) - post_jsonObj = post_json_object - alsoUpdatePostId = None + post_json_obj = post_json_object + also_update_post_id = None if has_object_dict(post_json_object): - post_jsonObj = post_json_object['object'] + post_json_obj = post_json_object['object'] else: if has_object_string(post_json_object, debug): - alsoUpdatePostId = remove_id_ending(post_json_object['object']) + also_update_post_id = remove_id_ending(post_json_object['object']) domain_full = get_full_domain(domain, port) actor = local_actor_url(http_prefix, nickname, domain_full) - if post_jsonObj.get('conversation'): + if post_json_obj.get('conversation'): mute_conversation(base_dir, nickname, domain, - post_jsonObj['conversation']) + post_json_obj['conversation']) # does this post have ignores on it from differenent actors? - if not post_jsonObj.get('ignores'): + if not post_json_obj.get('ignores'): if debug: print('DEBUG: Adding initial mute to ' + post_id) - ignoresJson = { + ignores_json = { "@context": "https://www.w3.org/ns/activitystreams", 'id': post_id, 'type': 'Collection', @@ -564,45 +570,44 @@ def mute_post(base_dir: str, nickname: str, domain: str, port: int, 'actor': actor }] } - post_jsonObj['ignores'] = ignoresJson + post_json_obj['ignores'] = ignores_json else: - if not post_jsonObj['ignores'].get('items'): - post_jsonObj['ignores']['items'] = [] - itemsList = post_jsonObj['ignores']['items'] - for ignoresItem in itemsList: - if ignoresItem.get('actor'): - if ignoresItem['actor'] == actor: + if not post_json_obj['ignores'].get('items'): + post_json_obj['ignores']['items'] = [] + items_list = post_json_obj['ignores']['items'] + for ignores_item in items_list: + if ignores_item.get('actor'): + if ignores_item['actor'] == actor: return - newIgnore = { + new_ignore = { 'type': 'Ignore', 'actor': actor } - igIt = len(itemsList) - itemsList.append(newIgnore) - post_jsonObj['ignores']['totalItems'] = igIt - post_jsonObj['muted'] = True + ig_it = len(items_list) + items_list.append(new_ignore) + post_json_obj['ignores']['totalItems'] = ig_it + post_json_obj['muted'] = True if save_json(post_json_object, post_filename): print('mute_post: saved ' + post_filename) # remove cached post so that the muted version gets recreated # without its content text and/or image - cachedPostFilename = \ + cached_post_filename = \ get_cached_post_filename(base_dir, nickname, domain, post_json_object) - if cachedPostFilename: - if os.path.isfile(cachedPostFilename): + if cached_post_filename: + if os.path.isfile(cached_post_filename): try: - os.remove(cachedPostFilename) - print('MUTE: cached post removed ' + cachedPostFilename) + os.remove(cached_post_filename) + print('MUTE: cached post removed ' + cached_post_filename) except OSError: print('EX: MUTE cached post not removed ' + - cachedPostFilename) - pass + cached_post_filename) else: - print('MUTE: cached post not found ' + cachedPostFilename) + print('MUTE: cached post not found ' + cached_post_filename) try: - with open(post_filename + '.muted', 'w+') as muteFile: - muteFile.write('\n') + with open(post_filename + '.muted', 'w+') as mute_file: + mute_file.write('\n') except OSError: print('EX: Failed to save mute file ' + post_filename + '.muted') return @@ -623,34 +628,35 @@ def mute_post(base_dir: str, nickname: str, domain: str, port: int, del recent_posts_cache['html'][post_id] print('MUTE: ' + post_id + ' removed cached html') - if alsoUpdatePostId: + if also_update_post_id: post_filename = locate_post(base_dir, nickname, domain, - alsoUpdatePostId) + also_update_post_id) if os.path.isfile(post_filename): - post_jsonObj = load_json(post_filename) - cachedPostFilename = \ + post_json_obj = load_json(post_filename) + cached_post_filename = \ get_cached_post_filename(base_dir, nickname, domain, - post_jsonObj) - if cachedPostFilename: - if os.path.isfile(cachedPostFilename): + post_json_obj) + if cached_post_filename: + if os.path.isfile(cached_post_filename): try: - os.remove(cachedPostFilename) + os.remove(cached_post_filename) print('MUTE: cached referenced post removed ' + - cachedPostFilename) + cached_post_filename) except OSError: print('EX: ' + 'MUTE cached referenced post not removed ' + - cachedPostFilename) - pass + cached_post_filename) if recent_posts_cache.get('json'): - if recent_posts_cache['json'].get(alsoUpdatePostId): - del recent_posts_cache['json'][alsoUpdatePostId] - print('MUTE: ' + alsoUpdatePostId + ' removed referenced json') + if recent_posts_cache['json'].get(also_update_post_id): + del recent_posts_cache['json'][also_update_post_id] + print('MUTE: ' + also_update_post_id + + ' removed referenced json') if recent_posts_cache.get('html'): - if recent_posts_cache['html'].get(alsoUpdatePostId): - del recent_posts_cache['html'][alsoUpdatePostId] - print('MUTE: ' + alsoUpdatePostId + ' removed referenced html') + if recent_posts_cache['html'].get(also_update_post_id): + del recent_posts_cache['html'][also_update_post_id] + print('MUTE: ' + also_update_post_id + + ' removed referenced html') def unmute_post(base_dir: str, nickname: str, domain: str, port: int, @@ -665,64 +671,64 @@ def unmute_post(base_dir: str, nickname: str, domain: str, port: int, if not post_json_object: return - muteFilename = post_filename + '.muted' - if os.path.isfile(muteFilename): + mute_filename = post_filename + '.muted' + if os.path.isfile(mute_filename): try: - os.remove(muteFilename) + os.remove(mute_filename) except OSError: if debug: print('EX: unmute_post mute filename not deleted ' + - str(muteFilename)) - print('UNMUTE: ' + muteFilename + ' file removed') + str(mute_filename)) + print('UNMUTE: ' + mute_filename + ' file removed') - post_jsonObj = post_json_object - alsoUpdatePostId = None + post_json_obj = post_json_object + also_update_post_id = None if has_object_dict(post_json_object): - post_jsonObj = post_json_object['object'] + post_json_obj = post_json_object['object'] else: if has_object_string(post_json_object, debug): - alsoUpdatePostId = remove_id_ending(post_json_object['object']) + also_update_post_id = remove_id_ending(post_json_object['object']) - if post_jsonObj.get('conversation'): + if post_json_obj.get('conversation'): unmute_conversation(base_dir, nickname, domain, - post_jsonObj['conversation']) + post_json_obj['conversation']) - if post_jsonObj.get('ignores'): + if post_json_obj.get('ignores'): domain_full = get_full_domain(domain, port) actor = local_actor_url(http_prefix, nickname, domain_full) - totalItems = 0 - if post_jsonObj['ignores'].get('totalItems'): - totalItems = post_jsonObj['ignores']['totalItems'] - itemsList = post_jsonObj['ignores']['items'] - for ignoresItem in itemsList: - if ignoresItem.get('actor'): - if ignoresItem['actor'] == actor: + total_items = 0 + if post_json_obj['ignores'].get('totalItems'): + total_items = post_json_obj['ignores']['totalItems'] + items_list = post_json_obj['ignores']['items'] + for ignores_item in items_list: + if ignores_item.get('actor'): + if ignores_item['actor'] == actor: if debug: print('DEBUG: mute was removed for ' + actor) - itemsList.remove(ignoresItem) + items_list.remove(ignores_item) break - if totalItems == 1: + if total_items == 1: if debug: print('DEBUG: mute was removed from post') - del post_jsonObj['ignores'] + del post_json_obj['ignores'] else: - igItLen = len(post_jsonObj['ignores']['items']) - post_jsonObj['ignores']['totalItems'] = igItLen - post_jsonObj['muted'] = False + ig_it_len = len(post_json_obj['ignores']['items']) + post_json_obj['ignores']['totalItems'] = ig_it_len + post_json_obj['muted'] = False save_json(post_json_object, post_filename) # remove cached post so that the muted version gets recreated # with its content text and/or image - cachedPostFilename = \ + cached_post_filename = \ get_cached_post_filename(base_dir, nickname, domain, post_json_object) - if cachedPostFilename: - if os.path.isfile(cachedPostFilename): + if cached_post_filename: + if os.path.isfile(cached_post_filename): try: - os.remove(cachedPostFilename) + os.remove(cached_post_filename) except OSError: if debug: print('EX: unmute_post cached post not deleted ' + - str(cachedPostFilename)) + str(cached_post_filename)) # if the post is in the recent posts cache then mark it as unmuted if recent_posts_cache.get('index'): @@ -738,36 +744,36 @@ def unmute_post(base_dir: str, nickname: str, domain: str, port: int, if recent_posts_cache['html'].get(post_id): del recent_posts_cache['html'][post_id] print('UNMUTE: ' + post_id + ' removed cached html') - if alsoUpdatePostId: + if also_update_post_id: post_filename = locate_post(base_dir, nickname, domain, - alsoUpdatePostId) + also_update_post_id) if os.path.isfile(post_filename): - post_jsonObj = load_json(post_filename) - cachedPostFilename = \ + post_json_obj = load_json(post_filename) + cached_post_filename = \ get_cached_post_filename(base_dir, nickname, domain, - post_jsonObj) - if cachedPostFilename: - if os.path.isfile(cachedPostFilename): + post_json_obj) + if cached_post_filename: + if os.path.isfile(cached_post_filename): try: - os.remove(cachedPostFilename) + os.remove(cached_post_filename) print('MUTE: cached referenced post removed ' + - cachedPostFilename) + cached_post_filename) except OSError: if debug: print('EX: ' + 'unmute_post cached ref post not removed ' + - str(cachedPostFilename)) + str(cached_post_filename)) if recent_posts_cache.get('json'): - if recent_posts_cache['json'].get(alsoUpdatePostId): - del recent_posts_cache['json'][alsoUpdatePostId] + if recent_posts_cache['json'].get(also_update_post_id): + del recent_posts_cache['json'][also_update_post_id] print('UNMUTE: ' + - alsoUpdatePostId + ' removed referenced json') + also_update_post_id + ' removed referenced json') if recent_posts_cache.get('html'): - if recent_posts_cache['html'].get(alsoUpdatePostId): - del recent_posts_cache['html'][alsoUpdatePostId] + if recent_posts_cache['html'].get(also_update_post_id): + del recent_posts_cache['html'][also_update_post_id] print('UNMUTE: ' + - alsoUpdatePostId + ' removed referenced html') + also_update_post_id + ' removed referenced html') def outbox_mute(base_dir: str, http_prefix: str, @@ -790,24 +796,24 @@ def outbox_mute(base_dir: str, http_prefix: str, if debug: print('DEBUG: c2s mute request arrived in outbox') - messageId = remove_id_ending(message_json['object']) - if '/statuses/' not in messageId: + message_id = remove_id_ending(message_json['object']) + if '/statuses/' not in message_id: if debug: print('DEBUG: c2s mute object is not a status') return - if not has_users_path(messageId): + if not has_users_path(message_id): if debug: print('DEBUG: c2s mute object has no nickname') return domain = remove_domain_port(domain) - post_filename = locate_post(base_dir, nickname, domain, messageId) + post_filename = locate_post(base_dir, nickname, domain, message_id) if not post_filename: if debug: print('DEBUG: c2s mute post not found in inbox or outbox') - print(messageId) + print(message_id) return - nicknameMuted = get_nickname_from_actor(message_json['object']) - if not nicknameMuted: + nickname_muted = get_nickname_from_actor(message_json['object']) + if not nickname_muted: print('WARN: unable to find nickname in ' + message_json['object']) return @@ -845,24 +851,24 @@ def outbox_undo_mute(base_dir: str, http_prefix: str, if debug: print('DEBUG: c2s undo mute request arrived in outbox') - messageId = remove_id_ending(message_json['object']['object']) - if '/statuses/' not in messageId: + message_id = remove_id_ending(message_json['object']['object']) + if '/statuses/' not in message_id: if debug: print('DEBUG: c2s undo mute object is not a status') return - if not has_users_path(messageId): + if not has_users_path(message_id): if debug: print('DEBUG: c2s undo mute object has no nickname') return domain = remove_domain_port(domain) - post_filename = locate_post(base_dir, nickname, domain, messageId) + post_filename = locate_post(base_dir, nickname, domain, message_id) if not post_filename: if debug: print('DEBUG: c2s undo mute post not found in inbox or outbox') - print(messageId) + print(message_id) return - nicknameMuted = get_nickname_from_actor(message_json['object']['object']) - if not nicknameMuted: + nickname_muted = get_nickname_from_actor(message_json['object']['object']) + if not nickname_muted: print('WARN: unable to find nickname in ' + message_json['object']['object']) return @@ -878,8 +884,8 @@ def outbox_undo_mute(base_dir: str, http_prefix: str, def broch_mode_is_active(base_dir: str) -> bool: """Returns true if broch mode is active """ - allowFilename = base_dir + '/accounts/allowedinstances.txt' - return os.path.isfile(allowFilename) + allow_filename = base_dir + '/accounts/allowedinstances.txt' + return os.path.isfile(allow_filename) def set_broch_mode(base_dir: str, domain_full: str, enabled: bool) -> None: @@ -891,55 +897,55 @@ def set_broch_mode(base_dir: str, domain_full: str, enabled: bool) -> None: to construct an instance level allow list. Anything arriving which is then not from one of the allowed domains will be dropped """ - allowFilename = base_dir + '/accounts/allowedinstances.txt' + allow_filename = base_dir + '/accounts/allowedinstances.txt' if not enabled: # remove instance allow list - if os.path.isfile(allowFilename): + if os.path.isfile(allow_filename): try: - os.remove(allowFilename) + os.remove(allow_filename) except OSError: print('EX: set_broch_mode allow file not deleted ' + - str(allowFilename)) + str(allow_filename)) print('Broch mode turned off') else: - if os.path.isfile(allowFilename): - lastModified = file_last_modified(allowFilename) - print('Broch mode already activated ' + lastModified) + if os.path.isfile(allow_filename): + last_modified = file_last_modified(allow_filename) + print('Broch mode already activated ' + last_modified) return # generate instance allow list - allowedDomains = [domain_full] + allowed_domains = [domain_full] follow_files = ('following.txt', 'followers.txt') for subdir, dirs, files in os.walk(base_dir + '/accounts'): for acct in dirs: if not is_account_dir(acct): continue - accountDir = os.path.join(base_dir + '/accounts', acct) - for followFileType in follow_files: - followingFilename = accountDir + '/' + followFileType - if not os.path.isfile(followingFilename): + account_dir = os.path.join(base_dir + '/accounts', acct) + for follow_file_type in follow_files: + following_filename = account_dir + '/' + follow_file_type + if not os.path.isfile(following_filename): continue try: - with open(followingFilename, 'r') as f: - followList = f.readlines() - for handle in followList: + with open(following_filename, 'r') as foll_file: + follow_list = foll_file.readlines() + for handle in follow_list: if '@' not in handle: continue handle = handle.replace('\n', '') - handleDomain = handle.split('@')[1] - if handleDomain not in allowedDomains: - allowedDomains.append(handleDomain) + handle_domain = handle.split('@')[1] + if handle_domain not in allowed_domains: + allowed_domains.append(handle_domain) except OSError as ex: - print('EX: failed to read ' + followingFilename + + print('EX: failed to read ' + following_filename + ' ' + str(ex)) break # write the allow file try: - with open(allowFilename, 'w+') as allowFile: - allowFile.write(domain_full + '\n') - for d in allowedDomains: - allowFile.write(d + '\n') + with open(allow_filename, 'w+') as allow_file: + allow_file.write(domain_full + '\n') + for allowed in allowed_domains: + allow_file.write(allowed + '\n') print('Broch mode enabled') except OSError as ex: print('EX: Broch mode not enabled due to file write ' + str(ex)) @@ -952,29 +958,29 @@ def broch_modeLapses(base_dir: str, lapseDays: int) -> bool: """After broch mode is enabled it automatically elapses after a period of time """ - allowFilename = base_dir + '/accounts/allowedinstances.txt' - if not os.path.isfile(allowFilename): + allow_filename = base_dir + '/accounts/allowedinstances.txt' + if not os.path.isfile(allow_filename): return False - lastModified = file_last_modified(allowFilename) - modifiedDate = None + last_modified = file_last_modified(allow_filename) + modified_date = None try: - modifiedDate = \ - datetime.strptime(lastModified, "%Y-%m-%dT%H:%M:%SZ") + modified_date = \ + datetime.strptime(last_modified, "%Y-%m-%dT%H:%M:%SZ") except BaseException: - print('EX: broch_modeLapses date not parsed ' + str(lastModified)) + print('EX: broch_modeLapses date not parsed ' + str(last_modified)) return False - if not modifiedDate: + if not modified_date: return False curr_time = datetime.datetime.utcnow() - daysSinceBroch = (curr_time - modifiedDate).days - if daysSinceBroch >= lapseDays: + days_since_broch = (curr_time - modified_date).days + if days_since_broch >= lapseDays: removed = False try: - os.remove(allowFilename) + os.remove(allow_filename) removed = True except OSError: print('EX: broch_modeLapses allow file not deleted ' + - str(allowFilename)) + str(allow_filename)) if removed: set_config_param(base_dir, "broch_mode", False) print('Broch mode has elapsed') @@ -989,26 +995,26 @@ def load_cw_lists(base_dir: str, verbose: bool) -> {}: return {} result = {} for subdir, dirs, files in os.walk(base_dir + '/cwlists'): - for f in files: - if not f.endswith('.json'): + for fname in files: + if not fname.endswith('.json'): continue - listFilename = os.path.join(base_dir + '/cwlists', f) - print('listFilename: ' + listFilename) - listJson = load_json(listFilename, 0, 1) - if not listJson: + list_filename = os.path.join(base_dir + '/cwlists', fname) + print('list_filename: ' + list_filename) + list_json = load_json(list_filename, 0, 1) + if not list_json: continue - if not listJson.get('name'): + if not list_json.get('name'): continue - if not listJson.get('words') and not listJson.get('domains'): + if not list_json.get('words') and not list_json.get('domains'): continue - name = listJson['name'] + name = list_json['name'] if verbose: print('List: ' + name) - result[name] = listJson + result[name] = list_json return result -def add_c_wfrom_lists(post_json_object: {}, cw_lists: {}, translate: {}, +def add_cw_from_lists(post_json_object: {}, cw_lists: {}, translate: {}, lists_enabled: str) -> None: """Adds content warnings by matching the post content against domains or keywords @@ -1017,9 +1023,9 @@ def add_c_wfrom_lists(post_json_object: {}, cw_lists: {}, translate: {}, return if not post_json_object['object'].get('content'): return - cw = '' + cw_text = '' if post_json_object['object'].get('summary'): - cw = post_json_object['object']['summary'] + cw_text = post_json_object['object']['summary'] content = post_json_object['object']['content'] for name, item in cw_lists.items(): @@ -1034,7 +1040,7 @@ def add_c_wfrom_lists(post_json_object: {}, cw_lists: {}, translate: {}, warning = translate[warning] # is the warning already in the CW? - if warning in cw: + if warning in cw_text: continue matched = False @@ -1043,10 +1049,10 @@ def add_c_wfrom_lists(post_json_object: {}, cw_lists: {}, translate: {}, if item.get('domains'): for domain in item['domains']: if domain in content: - if cw: - cw = warning + ' / ' + cw + if cw_text: + cw_text = warning + ' / ' + cw_text else: - cw = warning + cw_text = warning matched = True break @@ -1055,15 +1061,15 @@ def add_c_wfrom_lists(post_json_object: {}, cw_lists: {}, translate: {}, # match words within the content if item.get('words'): - for wordStr in item['words']: - if wordStr in content: - if cw: - cw = warning + ' / ' + cw + for word_str in item['words']: + if word_str in content: + if cw_text: + cw_text = warning + ' / ' + cw_text else: - cw = warning + cw_text = warning break - if cw: - post_json_object['object']['summary'] = cw + if cw_text: + post_json_object['object']['summary'] = cw_text post_json_object['object']['sensitive'] = True diff --git a/tests.py b/tests.py index 478a0b8a4..15e3e7305 100644 --- a/tests.py +++ b/tests.py @@ -172,7 +172,7 @@ from shares import merge_shared_item_tokens from shares import send_share_via_server from shares import get_shared_items_catalog_via_server from blocking import load_cw_lists -from blocking import add_c_wfrom_lists +from blocking import add_cw_from_lists testServerGroupRunning = False testServerAliceRunning = False @@ -5922,7 +5922,7 @@ def _test_add_cw_lists(base_dir: str) -> None: "content": "" } } - add_c_wfrom_lists(post_json_object, cw_lists, translate, 'Murdoch press') + add_cw_from_lists(post_json_object, cw_lists, translate, 'Murdoch press') assert post_json_object['object']['sensitive'] is False assert post_json_object['object']['summary'] is None @@ -5933,7 +5933,7 @@ def _test_add_cw_lists(base_dir: str) -> None: "content": "Blah blah news.co.uk blah blah" } } - add_c_wfrom_lists(post_json_object, cw_lists, translate, 'Murdoch press') + add_cw_from_lists(post_json_object, cw_lists, translate, 'Murdoch press') assert post_json_object['object']['sensitive'] is True assert post_json_object['object']['summary'] == "Murdoch Press" @@ -5944,7 +5944,7 @@ def _test_add_cw_lists(base_dir: str) -> None: "content": "Blah blah news.co.uk blah blah" } } - add_c_wfrom_lists(post_json_object, cw_lists, translate, 'Murdoch press') + add_cw_from_lists(post_json_object, cw_lists, translate, 'Murdoch press') assert post_json_object['object']['sensitive'] is True assert post_json_object['object']['summary'] == \ "Murdoch Press / Existing CW" diff --git a/webapp_post.py b/webapp_post.py index ec34c9bf4..b4af4b096 100644 --- a/webapp_post.py +++ b/webapp_post.py @@ -81,7 +81,7 @@ from webfinger import webfinger_handle from speaker import update_speaker from languages import auto_translate_post from blocking import is_blocked -from blocking import add_c_wfrom_lists +from blocking import add_cw_from_lists from reaction import html_emoji_reactions @@ -1825,7 +1825,7 @@ def individual_post_as_html(signing_priv_key_pem: str, footerStr = newFooterStr # add any content warning from the cwlists directory - add_c_wfrom_lists(post_json_object, cw_lists, translate, lists_enabled) + add_cw_from_lists(post_json_object, cw_lists, translate, lists_enabled) postIsSensitive = False if post_json_object['object'].get('sensitive'):