diff --git a/roles.py b/roles.py index 70ddeeea5..770f0c1c3 100644 --- a/roles.py +++ b/roles.py @@ -21,9 +21,8 @@ def _clear_role_status(base_dir: str, role: str) -> None: rarely when roles are appointed or removed """ directory = os.fsencode(base_dir + '/accounts/') - for f in os.scandir(directory): - f = f.name - filename = os.fsdecode(f) + for fname in os.scandir(directory): + filename = os.fsdecode(fname.name) if '@' not in filename: continue if not filename.endswith(".json"): @@ -34,10 +33,10 @@ def _clear_role_status(base_dir: str, role: str) -> None: actor_json = load_json(filename) if not actor_json: continue - rolesList = get_actor_roles_list(actor_json) - if role in rolesList: - rolesList.remove(role) - set_rolesFromList(actor_json, rolesList) + roles_list = get_actor_roles_list(actor_json) + if role in roles_list: + roles_list.remove(role) + set_rolesFromList(actor_json, roles_list) save_json(actor_json, filename) @@ -74,53 +73,74 @@ def clear_moderator_status(base_dir: str) -> None: def _add_role(base_dir: str, nickname: str, domain: str, - roleFilename: str) -> None: + role_filename: str) -> None: """Adds a role nickname to the file. This is a file containing the nicknames of accounts having this role """ domain = remove_domain_port(domain) - roleFile = base_dir + '/accounts/' + roleFilename - if os.path.isfile(roleFile): + role_file = base_dir + '/accounts/' + role_filename + if os.path.isfile(role_file): # is this nickname already in the file? - with open(roleFile, 'r') as f: - lines = f.readlines() - for roleNickname in lines: - roleNickname = roleNickname.strip('\n').strip('\r') - if roleNickname == nickname: + + lines = [] + try: + with open(role_file, 'r') as fp_role: + lines = fp_role.readlines() + except OSError: + print('EX: _add_role, failed to read roles file ' + role_file) + + for role_nickname in lines: + role_nickname = role_nickname.strip('\n').strip('\r') + if role_nickname == nickname: return lines.append(nickname) - with open(roleFile, 'w+') as f: - for roleNickname in lines: - roleNickname = roleNickname.strip('\n').strip('\r') - if len(roleNickname) < 2: - continue - if os.path.isdir(base_dir + '/accounts/' + - roleNickname + '@' + domain): - f.write(roleNickname + '\n') + + try: + with open(role_file, 'w+') as fp_role: + for role_nickname in lines: + role_nickname = role_nickname.strip('\n').strip('\r') + if len(role_nickname) < 2: + continue + if os.path.isdir(base_dir + '/accounts/' + + role_nickname + '@' + domain): + fp_role.write(role_nickname + '\n') + except OSError: + print('EX: _add_role, failed to write roles file1 ' + role_file) else: - with open(roleFile, 'w+') as f: - accountDir = acct_dir(base_dir, nickname, domain) - if os.path.isdir(accountDir): - f.write(nickname + '\n') + try: + with open(role_file, 'w+') as fp_role: + account_dir = acct_dir(base_dir, nickname, domain) + if os.path.isdir(account_dir): + fp_role.write(nickname + '\n') + except OSError: + print('EX: _add_role, failed to write roles file2 ' + role_file) -def _remove_role(base_dir: str, nickname: str, roleFilename: str) -> None: +def _remove_role(base_dir: str, nickname: str, role_filename: str) -> None: """Removes a role nickname from the file. This is a file containing the nicknames of accounts having this role """ - roleFile = base_dir + '/accounts/' + roleFilename - if not os.path.isfile(roleFile): + role_file = base_dir + '/accounts/' + role_filename + if not os.path.isfile(role_file): return - with open(roleFile, 'r') as f: - lines = f.readlines() - with open(roleFile, 'w+') as f: - for roleNickname in lines: - roleNickname = roleNickname.strip('\n').strip('\r') - if len(roleNickname) > 1 and roleNickname != nickname: - f.write(roleNickname + '\n') + + try: + with open(role_file, 'r') as fp_role: + lines = fp_role.readlines() + except OSError: + print('EX: _remove_role, failed to read roles file ' + role_file) + + try: + with open(role_file, 'w+') as fp_role: + for role_nickname in lines: + role_nickname = role_nickname.strip('\n').strip('\r') + if len(role_nickname) > 1 and role_nickname != nickname: + fp_role.write(role_nickname + '\n') + except OSError: + print('EX: _remove_role, failed to regenerate roles file ' + role_file) -def _set_actor_role(actor_json: {}, roleName: str) -> bool: +def _set_actor_role(actor_json: {}, role_name: str) -> bool: """Sets a role for an actor """ if not actor_json.get('hasOccupation'): @@ -130,35 +150,35 @@ def _set_actor_role(actor_json: {}, roleName: str) -> bool: # occupation category from www.onetonline.org category = None - if 'admin' in roleName: + if 'admin' in role_name: category = '15-1299.01' - elif 'moderator' in roleName: + elif 'moderator' in role_name: category = '11-9199.02' - elif 'editor' in roleName: + elif 'editor' in role_name: category = '27-3041.00' - elif 'counselor' in roleName: + elif 'counselor' in role_name: category = '23-1022.00' - elif 'artist' in roleName: + elif 'artist' in role_name: category = '27-1024.00' if not category: return False for index in range(len(actor_json['hasOccupation'])): - occupationItem = actor_json['hasOccupation'][index] - if not isinstance(occupationItem, dict): + occupation_item = actor_json['hasOccupation'][index] + if not isinstance(occupation_item, dict): continue - if not occupationItem.get('@type'): + if not occupation_item.get('@type'): continue - if occupationItem['@type'] != 'Role': + if occupation_item['@type'] != 'Role': continue - if occupationItem['hasOccupation']['name'] == roleName: + if occupation_item['hasOccupation']['name'] == role_name: return True - statusNumber, published = get_status_number() - newRole = { + _, published = get_status_number() + new_role = { "@type": "Role", "hasOccupation": { "@type": "Occupation", - "name": roleName, + "name": role_name, "description": "Fediverse instance role", "occupationLocation": { "@type": "City", @@ -178,28 +198,28 @@ def _set_actor_role(actor_json: {}, roleName: str) -> bool: }, "startDate": published } - actor_json['hasOccupation'].append(newRole) + actor_json['hasOccupation'].append(new_role) return True -def set_rolesFromList(actor_json: {}, rolesList: []) -> None: +def set_rolesFromList(actor_json: {}, roles_list: []) -> None: """Sets roles from a list """ # clear Roles from the occupation list - emptyRolesList = [] - for occupationItem in actor_json['hasOccupation']: - if not isinstance(occupationItem, dict): + empty_roles_list = [] + for occupation_item in actor_json['hasOccupation']: + if not isinstance(occupation_item, dict): continue - if not occupationItem.get('@type'): + if not occupation_item.get('@type'): continue - if occupationItem['@type'] == 'Role': + if occupation_item['@type'] == 'Role': continue - emptyRolesList.append(occupationItem) - actor_json['hasOccupation'] = emptyRolesList + empty_roles_list.append(occupation_item) + actor_json['hasOccupation'] = empty_roles_list # create the new list - for roleName in rolesList: - _set_actor_role(actor_json, roleName) + for role_name in roles_list: + _set_actor_role(actor_json, role_name) def get_actor_roles_list(actor_json: {}) -> []: @@ -209,18 +229,18 @@ def get_actor_roles_list(actor_json: {}) -> []: return [] if not isinstance(actor_json['hasOccupation'], list): return [] - rolesList = [] - for occupationItem in actor_json['hasOccupation']: - if not isinstance(occupationItem, dict): + roles_list = [] + for occupation_item in actor_json['hasOccupation']: + if not isinstance(occupation_item, dict): continue - if not occupationItem.get('@type'): + if not occupation_item.get('@type'): continue - if occupationItem['@type'] != 'Role': + if occupation_item['@type'] != 'Role': continue - roleName = occupationItem['hasOccupation']['name'] - if roleName not in rolesList: - rolesList.append(roleName) - return rolesList + role_name = occupation_item['hasOccupation']['name'] + if role_name not in roles_list: + roles_list.append(role_name) + return roles_list def set_role(base_dir: str, nickname: str, domain: str, @@ -231,47 +251,47 @@ def set_role(base_dir: str, nickname: str, domain: str, # avoid giant strings if len(role) > 128: return False - actorFilename = acct_dir(base_dir, nickname, domain) + '.json' - if not os.path.isfile(actorFilename): + actor_filename = acct_dir(base_dir, nickname, domain) + '.json' + if not os.path.isfile(actor_filename): return False - roleFiles = { + role_files = { "moderator": "moderators.txt", "editor": "editors.txt", "counselor": "counselors.txt", "artist": "artists.txt" } - actor_json = load_json(actorFilename) + actor_json = load_json(actor_filename) if actor_json: if not actor_json.get('hasOccupation'): return False - rolesList = get_actor_roles_list(actor_json) - actorChanged = False + roles_list = get_actor_roles_list(actor_json) + actor_changed = False if role: # add the role - if roleFiles.get(role): - _add_role(base_dir, nickname, domain, roleFiles[role]) - if role not in rolesList: - rolesList.append(role) - rolesList.sort() - set_rolesFromList(actor_json, rolesList) - actorChanged = True + if role_files.get(role): + _add_role(base_dir, nickname, domain, role_files[role]) + if role not in roles_list: + roles_list.append(role) + roles_list.sort() + set_rolesFromList(actor_json, roles_list) + actor_changed = True else: # remove the role - if roleFiles.get(role): - _remove_role(base_dir, nickname, roleFiles[role]) - if role in rolesList: - rolesList.remove(role) - set_rolesFromList(actor_json, rolesList) - actorChanged = True - if actorChanged: - save_json(actor_json, actorFilename) + if role_files.get(role): + _remove_role(base_dir, nickname, role_files[role]) + if role in roles_list: + roles_list.remove(role) + set_rolesFromList(actor_json, roles_list) + actor_changed = True + if actor_changed: + save_json(actor_json, actor_filename) return True -def actor_has_role(actor_json: {}, roleName: str) -> bool: +def actor_has_role(actor_json: {}, role_name: str) -> bool: """Returns true if the given actor has the given role """ - rolesList = get_actor_roles_list(actor_json) - return roleName in rolesList + roles_list = get_actor_roles_list(actor_json) + return role_name in roles_list