mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			365 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			365 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "roles.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.6.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@libreserver.org"
 | |
| __status__ = "Production"
 | |
| __module_group__ = "Profile Metadata"
 | |
| 
 | |
| import os
 | |
| from utils import data_dir
 | |
| from utils import load_json
 | |
| from utils import save_json
 | |
| from utils import remove_domain_port
 | |
| from utils import acct_dir
 | |
| from utils import text_in_file
 | |
| from utils import get_config_param
 | |
| from status import get_status_number
 | |
| 
 | |
| 
 | |
| def _clear_role_status(base_dir: str, role: str) -> None:
 | |
|     """Removes role status from all accounts
 | |
|     This could be slow if there are many users, but only happens
 | |
|     rarely when roles are appointed or removed
 | |
|     """
 | |
|     dir_str = data_dir(base_dir)
 | |
|     directory = os.fsencode(dir_str + '/')
 | |
|     for fname in os.scandir(directory):
 | |
|         filename = os.fsdecode(fname.name)
 | |
|         if '@' not in filename:
 | |
|             continue
 | |
|         if not filename.endswith(".json"):
 | |
|             continue
 | |
|         filename = os.path.join(dir_str + '/', filename)
 | |
|         if not text_in_file('"' + role + '"', filename):
 | |
|             continue
 | |
|         actor_json = load_json(filename)
 | |
|         if not actor_json:
 | |
|             continue
 | |
|         roles_list = get_actor_roles_list(actor_json)
 | |
|         if role in roles_list:
 | |
|             roles_list.remove(role)
 | |
|             actor_roles_from_list(actor_json, roles_list)
 | |
|             save_json(actor_json, filename)
 | |
| 
 | |
| 
 | |
| def _add_role(base_dir: str, nickname: str, domain: str,
 | |
|               role_filename: str) -> None:
 | |
|     """Adds a role nickname to the file.
 | |
|     This is a file containing the nicknames of accounts having this role
 | |
|     """
 | |
|     domain = remove_domain_port(domain)
 | |
|     role_file = data_dir(base_dir) + '/' + role_filename
 | |
|     if os.path.isfile(role_file):
 | |
|         # is this nickname already in the file?
 | |
| 
 | |
|         lines: list[str] = []
 | |
|         try:
 | |
|             with open(role_file, 'r', encoding='utf-8') as fp_role:
 | |
|                 lines = fp_role.readlines()
 | |
|         except OSError:
 | |
|             print('EX: _add_role, failed to read roles file ' + role_file)
 | |
| 
 | |
|         for role_nickname in lines:
 | |
|             role_nickname = role_nickname.strip('\n').strip('\r')
 | |
|             if role_nickname == nickname:
 | |
|                 return
 | |
|         lines.append(nickname)
 | |
| 
 | |
|         try:
 | |
|             with open(role_file, 'w+', encoding='utf-8') as fp_role:
 | |
|                 for role_nickname in lines:
 | |
|                     role_nickname = role_nickname.strip('\n').strip('\r')
 | |
|                     if len(role_nickname) < 2:
 | |
|                         continue
 | |
|                     dir_str = data_dir(base_dir)
 | |
|                     if os.path.isdir(dir_str + '/' +
 | |
|                                      role_nickname + '@' + domain):
 | |
|                         fp_role.write(role_nickname + '\n')
 | |
|         except OSError:
 | |
|             print('EX: _add_role, failed to write roles file1 ' + role_file)
 | |
|     else:
 | |
|         try:
 | |
|             with open(role_file, 'w+', encoding='utf-8') as fp_role:
 | |
|                 account_dir = acct_dir(base_dir, nickname, domain)
 | |
|                 if os.path.isdir(account_dir):
 | |
|                     fp_role.write(nickname + '\n')
 | |
|         except OSError:
 | |
|             print('EX: _add_role, failed to write roles file2 ' + role_file)
 | |
| 
 | |
| 
 | |
| def _remove_role(base_dir: str, nickname: str, role_filename: str) -> None:
 | |
|     """Removes a role nickname from the file.
 | |
|     This is a file containing the nicknames of accounts having this role
 | |
|     """
 | |
|     role_file = data_dir(base_dir) + '/' + role_filename
 | |
|     if not os.path.isfile(role_file):
 | |
|         return
 | |
| 
 | |
|     try:
 | |
|         with open(role_file, 'r', encoding='utf-8') as fp_role:
 | |
|             lines = fp_role.readlines()
 | |
|     except OSError:
 | |
|         print('EX: _remove_role, failed to read roles file ' + role_file)
 | |
| 
 | |
|     try:
 | |
|         with open(role_file, 'w+', encoding='utf-8') as fp_role:
 | |
|             for role_nickname in lines:
 | |
|                 role_nickname = role_nickname.strip('\n').strip('\r')
 | |
|                 if len(role_nickname) > 1 and role_nickname != nickname:
 | |
|                     fp_role.write(role_nickname + '\n')
 | |
|     except OSError:
 | |
|         print('EX: _remove_role, failed to regenerate roles file ' + role_file)
 | |
| 
 | |
| 
 | |
| def _set_actor_role(actor_json: {}, role_name: str) -> bool:
 | |
|     """Sets a role for an actor
 | |
|     """
 | |
|     if not actor_json.get('hasOccupation'):
 | |
|         return False
 | |
|     if not isinstance(actor_json['hasOccupation'], list):
 | |
|         return False
 | |
| 
 | |
|     # occupation category from www.onetonline.org
 | |
|     category = None
 | |
|     if 'admin' in role_name:
 | |
|         category = '15-1299.01'
 | |
|     elif 'moderator' in role_name:
 | |
|         category = '11-9199.02'
 | |
|     elif 'editor' in role_name:
 | |
|         category = '27-3041.00'
 | |
|     elif 'counselor' in role_name:
 | |
|         category = '23-1022.00'
 | |
|     elif 'artist' in role_name:
 | |
|         category = '27-1024.00'
 | |
|     if not category:
 | |
|         return False
 | |
| 
 | |
|     for index, _ in enumerate(actor_json['hasOccupation']):
 | |
|         occupation_item = actor_json['hasOccupation'][index]
 | |
|         if not isinstance(occupation_item, dict):
 | |
|             continue
 | |
|         if not occupation_item.get('@type'):
 | |
|             continue
 | |
|         if occupation_item['@type'] != 'Role':
 | |
|             continue
 | |
|         if occupation_item['hasOccupation']['name'] == role_name:
 | |
|             return True
 | |
|     _, published = get_status_number()
 | |
|     new_role = {
 | |
|         "@type": "Role",
 | |
|         "hasOccupation": {
 | |
|             "@type": "Occupation",
 | |
|             "name": role_name,
 | |
|             "description": "Fediverse instance role",
 | |
|             "occupationLocation": {
 | |
|                 "@type": "City",
 | |
|                 "url": "Fediverse"
 | |
|             },
 | |
|             "occupationalCategory": {
 | |
|                 "@type": "CategoryCode",
 | |
|                 "inCodeSet": {
 | |
|                     "@type": "CategoryCodeSet",
 | |
|                     "name": "O*Net-SOC",
 | |
|                     "dateModified": "2019",
 | |
|                     "url": "https://www.onetonline.org/"
 | |
|                 },
 | |
|                 "codeValue": category,
 | |
|                 "url": "https://www.onetonline.org/link/summary/" + category
 | |
|             }
 | |
|         },
 | |
|         "startDate": published
 | |
|     }
 | |
|     actor_json['hasOccupation'].append(new_role)
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def actor_roles_from_list(actor_json: {}, roles_list: []) -> None:
 | |
|     """Sets roles from a list
 | |
|     """
 | |
|     # clear Roles from the occupation list
 | |
|     empty_roles_list: list[dict] = []
 | |
|     for occupation_item in actor_json['hasOccupation']:
 | |
|         if not isinstance(occupation_item, dict):
 | |
|             continue
 | |
|         if not occupation_item.get('@type'):
 | |
|             continue
 | |
|         if occupation_item['@type'] == 'Role':
 | |
|             continue
 | |
|         empty_roles_list.append(occupation_item)
 | |
|     actor_json['hasOccupation'] = empty_roles_list
 | |
| 
 | |
|     # create the new list
 | |
|     for role_name in roles_list:
 | |
|         _set_actor_role(actor_json, role_name)
 | |
| 
 | |
| 
 | |
| def get_actor_roles_list(actor_json: {}) -> []:
 | |
|     """Gets a list of role names from an actor
 | |
|     """
 | |
|     if not actor_json.get('hasOccupation'):
 | |
|         return []
 | |
|     if not isinstance(actor_json['hasOccupation'], list):
 | |
|         return []
 | |
|     roles_list: list[str] = []
 | |
|     for occupation_item in actor_json['hasOccupation']:
 | |
|         if not isinstance(occupation_item, dict):
 | |
|             continue
 | |
|         if not occupation_item.get('@type'):
 | |
|             continue
 | |
|         if occupation_item['@type'] != 'Role':
 | |
|             continue
 | |
|         role_name = occupation_item['hasOccupation']['name']
 | |
|         if role_name not in roles_list:
 | |
|             roles_list.append(role_name)
 | |
|     return roles_list
 | |
| 
 | |
| 
 | |
| def set_role(base_dir: str, nickname: str, domain: str,
 | |
|              role: str) -> bool:
 | |
|     """Set a person's role
 | |
|     Setting the role to an empty string or None will remove it
 | |
|     """
 | |
|     # avoid giant strings
 | |
|     if len(role) > 128:
 | |
|         return False
 | |
|     actor_filename = acct_dir(base_dir, nickname, domain) + '.json'
 | |
|     if not os.path.isfile(actor_filename):
 | |
|         return False
 | |
| 
 | |
|     role_files = {
 | |
|         "moderator": "moderators.txt",
 | |
|         "editor": "editors.txt",
 | |
|         "counselor": "counselors.txt",
 | |
|         "artist": "artists.txt"
 | |
|     }
 | |
| 
 | |
|     actor_json = load_json(actor_filename)
 | |
|     if actor_json:
 | |
|         if not actor_json.get('hasOccupation'):
 | |
|             return False
 | |
|         roles_list = get_actor_roles_list(actor_json)
 | |
|         actor_changed = False
 | |
|         if role:
 | |
|             # add the role
 | |
|             if role_files.get(role):
 | |
|                 _add_role(base_dir, nickname, domain, role_files[role])
 | |
|             if role not in roles_list:
 | |
|                 roles_list.append(role)
 | |
|                 roles_list.sort()
 | |
|                 actor_roles_from_list(actor_json, roles_list)
 | |
|                 actor_changed = True
 | |
|         else:
 | |
|             # remove the role
 | |
|             if role_files.get(role):
 | |
|                 _remove_role(base_dir, nickname, role_files[role])
 | |
|             if role in roles_list:
 | |
|                 roles_list.remove(role)
 | |
|                 actor_roles_from_list(actor_json, roles_list)
 | |
|                 actor_changed = True
 | |
|         if actor_changed:
 | |
|             save_json(actor_json, actor_filename)
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def actor_has_role(actor_json: {}, role_name: str) -> bool:
 | |
|     """Returns true if the given actor has the given role
 | |
|     """
 | |
|     roles_list = get_actor_roles_list(actor_json)
 | |
|     return role_name in roles_list
 | |
| 
 | |
| 
 | |
| def is_devops(base_dir: str, nickname: str) -> bool:
 | |
|     """Returns true if the given nickname has the devops role
 | |
|     """
 | |
|     devops_file = data_dir(base_dir) + '/devops.txt'
 | |
| 
 | |
|     if not os.path.isfile(devops_file):
 | |
|         admin_name = get_config_param(base_dir, 'admin')
 | |
|         if not admin_name:
 | |
|             return False
 | |
|         if admin_name == nickname:
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     lines: list[str] = []
 | |
|     try:
 | |
|         with open(devops_file, 'r', encoding='utf-8') as fp_mod:
 | |
|             lines = fp_mod.readlines()
 | |
|     except OSError:
 | |
|         print('EX: is_devops unable to read ' + devops_file)
 | |
|     if not lines:
 | |
|         # if there is nothing in the file
 | |
|         admin_name = get_config_param(base_dir, 'admin')
 | |
|         if not admin_name:
 | |
|             return False
 | |
|         if admin_name == nickname:
 | |
|             return True
 | |
|     for devops in lines:
 | |
|         devops = devops.strip('\n').strip('\r')
 | |
|         if devops == nickname:
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def set_roles_from_list(base_dir: str, domain: str, admin_nickname: str,
 | |
|                         list_name: str, role_name: str, fields: [], path: str,
 | |
|                         list_filename: str) -> None:
 | |
|     """Sets the roles from a list returned from the edit profile screen under
 | |
|     role assignments
 | |
|     """
 | |
|     # check for admin user
 | |
|     if not path.startswith('/users/' + admin_nickname + '/'):
 | |
|         return
 | |
|     roles_filename = data_dir(base_dir) + '/' + list_filename
 | |
|     if not fields.get(list_name):
 | |
|         if os.path.isfile(roles_filename):
 | |
|             _clear_role_status(base_dir, role_name)
 | |
|             try:
 | |
|                 os.remove(roles_filename)
 | |
|             except OSError:
 | |
|                 print('EX: failed to remove roles file ' + roles_filename)
 | |
|         return
 | |
|     _clear_role_status(base_dir, role_name)
 | |
|     if ',' in fields[list_name]:
 | |
|         # if the list was given as comma separated
 | |
|         roles_list = fields[list_name].split(',')
 | |
|         try:
 | |
|             with open(roles_filename, 'w+',
 | |
|                       encoding='utf-8') as fp_roles:
 | |
|                 for roles_nick in roles_list:
 | |
|                     roles_nick = roles_nick.strip()
 | |
|                     roles_dir = acct_dir(base_dir, roles_nick, domain)
 | |
|                     if os.path.isdir(roles_dir):
 | |
|                         fp_roles.write(roles_nick + '\n')
 | |
|         except OSError as ex:
 | |
|             print('EX: unable to write ' + list_name + ' ' +
 | |
|                   roles_filename + ' ' + str(ex))
 | |
| 
 | |
|         for roles_nick in roles_list:
 | |
|             roles_nick = roles_nick.strip()
 | |
|             roles_dir = acct_dir(base_dir, roles_nick, domain)
 | |
|             if os.path.isdir(roles_dir):
 | |
|                 set_role(base_dir, roles_nick, domain, role_name)
 | |
|     else:
 | |
|         # nicknames on separate lines
 | |
|         roles_list = fields[list_name].split('\n')
 | |
|         try:
 | |
|             with open(roles_filename, 'w+',
 | |
|                       encoding='utf-8') as fp_roles:
 | |
|                 for roles_nick in roles_list:
 | |
|                     roles_nick = roles_nick.strip()
 | |
|                     roles_dir = acct_dir(base_dir, roles_nick, domain)
 | |
|                     if os.path.isdir(roles_dir):
 | |
|                         fp_roles.write(roles_nick + '\n')
 | |
|         except OSError as ex:
 | |
|             print('EX: unable to write  ' + list_name + ' ' +
 | |
|                   roles_filename + ' ' + str(ex))
 | |
| 
 | |
|         for roles_nick in roles_list:
 | |
|             roles_nick = roles_nick.strip()
 | |
|             roles_dir = acct_dir(base_dir, roles_nick, domain)
 | |
|             if os.path.isdir(roles_dir):
 | |
|                 set_role(base_dir, roles_nick, domain, role_name)
 |