mirror of https://gitlab.com/bashrc2/epicyon
Snake case
parent
3dfbb2bfc8
commit
cafc946df7
206
roles.py
206
roles.py
|
@ -21,9 +21,8 @@ def _clear_role_status(base_dir: str, role: str) -> None:
|
|||
rarely when roles are appointed or removed
|
||||
"""
|
||||
directory = os.fsencode(base_dir + '/accounts/')
|
||||
for f in os.scandir(directory):
|
||||
f = f.name
|
||||
filename = os.fsdecode(f)
|
||||
for fname in os.scandir(directory):
|
||||
filename = os.fsdecode(fname.name)
|
||||
if '@' not in filename:
|
||||
continue
|
||||
if not filename.endswith(".json"):
|
||||
|
@ -34,10 +33,10 @@ def _clear_role_status(base_dir: str, role: str) -> None:
|
|||
actor_json = load_json(filename)
|
||||
if not actor_json:
|
||||
continue
|
||||
rolesList = get_actor_roles_list(actor_json)
|
||||
if role in rolesList:
|
||||
rolesList.remove(role)
|
||||
set_rolesFromList(actor_json, rolesList)
|
||||
roles_list = get_actor_roles_list(actor_json)
|
||||
if role in roles_list:
|
||||
roles_list.remove(role)
|
||||
set_rolesFromList(actor_json, roles_list)
|
||||
save_json(actor_json, filename)
|
||||
|
||||
|
||||
|
@ -74,53 +73,74 @@ def clear_moderator_status(base_dir: str) -> None:
|
|||
|
||||
|
||||
def _add_role(base_dir: str, nickname: str, domain: str,
|
||||
roleFilename: str) -> None:
|
||||
role_filename: str) -> None:
|
||||
"""Adds a role nickname to the file.
|
||||
This is a file containing the nicknames of accounts having this role
|
||||
"""
|
||||
domain = remove_domain_port(domain)
|
||||
roleFile = base_dir + '/accounts/' + roleFilename
|
||||
if os.path.isfile(roleFile):
|
||||
role_file = base_dir + '/accounts/' + role_filename
|
||||
if os.path.isfile(role_file):
|
||||
# is this nickname already in the file?
|
||||
with open(roleFile, 'r') as f:
|
||||
lines = f.readlines()
|
||||
for roleNickname in lines:
|
||||
roleNickname = roleNickname.strip('\n').strip('\r')
|
||||
if roleNickname == nickname:
|
||||
|
||||
lines = []
|
||||
try:
|
||||
with open(role_file, 'r') as fp_role:
|
||||
lines = fp_role.readlines()
|
||||
except OSError:
|
||||
print('EX: _add_role, failed to read roles file ' + role_file)
|
||||
|
||||
for role_nickname in lines:
|
||||
role_nickname = role_nickname.strip('\n').strip('\r')
|
||||
if role_nickname == nickname:
|
||||
return
|
||||
lines.append(nickname)
|
||||
with open(roleFile, 'w+') as f:
|
||||
for roleNickname in lines:
|
||||
roleNickname = roleNickname.strip('\n').strip('\r')
|
||||
if len(roleNickname) < 2:
|
||||
|
||||
try:
|
||||
with open(role_file, 'w+') as fp_role:
|
||||
for role_nickname in lines:
|
||||
role_nickname = role_nickname.strip('\n').strip('\r')
|
||||
if len(role_nickname) < 2:
|
||||
continue
|
||||
if os.path.isdir(base_dir + '/accounts/' +
|
||||
roleNickname + '@' + domain):
|
||||
f.write(roleNickname + '\n')
|
||||
role_nickname + '@' + domain):
|
||||
fp_role.write(role_nickname + '\n')
|
||||
except OSError:
|
||||
print('EX: _add_role, failed to write roles file1 ' + role_file)
|
||||
else:
|
||||
with open(roleFile, 'w+') as f:
|
||||
accountDir = acct_dir(base_dir, nickname, domain)
|
||||
if os.path.isdir(accountDir):
|
||||
f.write(nickname + '\n')
|
||||
try:
|
||||
with open(role_file, 'w+') as fp_role:
|
||||
account_dir = acct_dir(base_dir, nickname, domain)
|
||||
if os.path.isdir(account_dir):
|
||||
fp_role.write(nickname + '\n')
|
||||
except OSError:
|
||||
print('EX: _add_role, failed to write roles file2 ' + role_file)
|
||||
|
||||
|
||||
def _remove_role(base_dir: str, nickname: str, roleFilename: str) -> None:
|
||||
def _remove_role(base_dir: str, nickname: str, role_filename: str) -> None:
|
||||
"""Removes a role nickname from the file.
|
||||
This is a file containing the nicknames of accounts having this role
|
||||
"""
|
||||
roleFile = base_dir + '/accounts/' + roleFilename
|
||||
if not os.path.isfile(roleFile):
|
||||
role_file = base_dir + '/accounts/' + role_filename
|
||||
if not os.path.isfile(role_file):
|
||||
return
|
||||
with open(roleFile, 'r') as f:
|
||||
lines = f.readlines()
|
||||
with open(roleFile, 'w+') as f:
|
||||
for roleNickname in lines:
|
||||
roleNickname = roleNickname.strip('\n').strip('\r')
|
||||
if len(roleNickname) > 1 and roleNickname != nickname:
|
||||
f.write(roleNickname + '\n')
|
||||
|
||||
try:
|
||||
with open(role_file, 'r') as fp_role:
|
||||
lines = fp_role.readlines()
|
||||
except OSError:
|
||||
print('EX: _remove_role, failed to read roles file ' + role_file)
|
||||
|
||||
try:
|
||||
with open(role_file, 'w+') as fp_role:
|
||||
for role_nickname in lines:
|
||||
role_nickname = role_nickname.strip('\n').strip('\r')
|
||||
if len(role_nickname) > 1 and role_nickname != nickname:
|
||||
fp_role.write(role_nickname + '\n')
|
||||
except OSError:
|
||||
print('EX: _remove_role, failed to regenerate roles file ' + role_file)
|
||||
|
||||
|
||||
def _set_actor_role(actor_json: {}, roleName: str) -> bool:
|
||||
def _set_actor_role(actor_json: {}, role_name: str) -> bool:
|
||||
"""Sets a role for an actor
|
||||
"""
|
||||
if not actor_json.get('hasOccupation'):
|
||||
|
@ -130,35 +150,35 @@ def _set_actor_role(actor_json: {}, roleName: str) -> bool:
|
|||
|
||||
# occupation category from www.onetonline.org
|
||||
category = None
|
||||
if 'admin' in roleName:
|
||||
if 'admin' in role_name:
|
||||
category = '15-1299.01'
|
||||
elif 'moderator' in roleName:
|
||||
elif 'moderator' in role_name:
|
||||
category = '11-9199.02'
|
||||
elif 'editor' in roleName:
|
||||
elif 'editor' in role_name:
|
||||
category = '27-3041.00'
|
||||
elif 'counselor' in roleName:
|
||||
elif 'counselor' in role_name:
|
||||
category = '23-1022.00'
|
||||
elif 'artist' in roleName:
|
||||
elif 'artist' in role_name:
|
||||
category = '27-1024.00'
|
||||
if not category:
|
||||
return False
|
||||
|
||||
for index in range(len(actor_json['hasOccupation'])):
|
||||
occupationItem = actor_json['hasOccupation'][index]
|
||||
if not isinstance(occupationItem, dict):
|
||||
occupation_item = actor_json['hasOccupation'][index]
|
||||
if not isinstance(occupation_item, dict):
|
||||
continue
|
||||
if not occupationItem.get('@type'):
|
||||
if not occupation_item.get('@type'):
|
||||
continue
|
||||
if occupationItem['@type'] != 'Role':
|
||||
if occupation_item['@type'] != 'Role':
|
||||
continue
|
||||
if occupationItem['hasOccupation']['name'] == roleName:
|
||||
if occupation_item['hasOccupation']['name'] == role_name:
|
||||
return True
|
||||
statusNumber, published = get_status_number()
|
||||
newRole = {
|
||||
_, published = get_status_number()
|
||||
new_role = {
|
||||
"@type": "Role",
|
||||
"hasOccupation": {
|
||||
"@type": "Occupation",
|
||||
"name": roleName,
|
||||
"name": role_name,
|
||||
"description": "Fediverse instance role",
|
||||
"occupationLocation": {
|
||||
"@type": "City",
|
||||
|
@ -178,28 +198,28 @@ def _set_actor_role(actor_json: {}, roleName: str) -> bool:
|
|||
},
|
||||
"startDate": published
|
||||
}
|
||||
actor_json['hasOccupation'].append(newRole)
|
||||
actor_json['hasOccupation'].append(new_role)
|
||||
return True
|
||||
|
||||
|
||||
def set_rolesFromList(actor_json: {}, rolesList: []) -> None:
|
||||
def set_rolesFromList(actor_json: {}, roles_list: []) -> None:
|
||||
"""Sets roles from a list
|
||||
"""
|
||||
# clear Roles from the occupation list
|
||||
emptyRolesList = []
|
||||
for occupationItem in actor_json['hasOccupation']:
|
||||
if not isinstance(occupationItem, dict):
|
||||
empty_roles_list = []
|
||||
for occupation_item in actor_json['hasOccupation']:
|
||||
if not isinstance(occupation_item, dict):
|
||||
continue
|
||||
if not occupationItem.get('@type'):
|
||||
if not occupation_item.get('@type'):
|
||||
continue
|
||||
if occupationItem['@type'] == 'Role':
|
||||
if occupation_item['@type'] == 'Role':
|
||||
continue
|
||||
emptyRolesList.append(occupationItem)
|
||||
actor_json['hasOccupation'] = emptyRolesList
|
||||
empty_roles_list.append(occupation_item)
|
||||
actor_json['hasOccupation'] = empty_roles_list
|
||||
|
||||
# create the new list
|
||||
for roleName in rolesList:
|
||||
_set_actor_role(actor_json, roleName)
|
||||
for role_name in roles_list:
|
||||
_set_actor_role(actor_json, role_name)
|
||||
|
||||
|
||||
def get_actor_roles_list(actor_json: {}) -> []:
|
||||
|
@ -209,18 +229,18 @@ def get_actor_roles_list(actor_json: {}) -> []:
|
|||
return []
|
||||
if not isinstance(actor_json['hasOccupation'], list):
|
||||
return []
|
||||
rolesList = []
|
||||
for occupationItem in actor_json['hasOccupation']:
|
||||
if not isinstance(occupationItem, dict):
|
||||
roles_list = []
|
||||
for occupation_item in actor_json['hasOccupation']:
|
||||
if not isinstance(occupation_item, dict):
|
||||
continue
|
||||
if not occupationItem.get('@type'):
|
||||
if not occupation_item.get('@type'):
|
||||
continue
|
||||
if occupationItem['@type'] != 'Role':
|
||||
if occupation_item['@type'] != 'Role':
|
||||
continue
|
||||
roleName = occupationItem['hasOccupation']['name']
|
||||
if roleName not in rolesList:
|
||||
rolesList.append(roleName)
|
||||
return rolesList
|
||||
role_name = occupation_item['hasOccupation']['name']
|
||||
if role_name not in roles_list:
|
||||
roles_list.append(role_name)
|
||||
return roles_list
|
||||
|
||||
|
||||
def set_role(base_dir: str, nickname: str, domain: str,
|
||||
|
@ -231,47 +251,47 @@ def set_role(base_dir: str, nickname: str, domain: str,
|
|||
# avoid giant strings
|
||||
if len(role) > 128:
|
||||
return False
|
||||
actorFilename = acct_dir(base_dir, nickname, domain) + '.json'
|
||||
if not os.path.isfile(actorFilename):
|
||||
actor_filename = acct_dir(base_dir, nickname, domain) + '.json'
|
||||
if not os.path.isfile(actor_filename):
|
||||
return False
|
||||
|
||||
roleFiles = {
|
||||
role_files = {
|
||||
"moderator": "moderators.txt",
|
||||
"editor": "editors.txt",
|
||||
"counselor": "counselors.txt",
|
||||
"artist": "artists.txt"
|
||||
}
|
||||
|
||||
actor_json = load_json(actorFilename)
|
||||
actor_json = load_json(actor_filename)
|
||||
if actor_json:
|
||||
if not actor_json.get('hasOccupation'):
|
||||
return False
|
||||
rolesList = get_actor_roles_list(actor_json)
|
||||
actorChanged = False
|
||||
roles_list = get_actor_roles_list(actor_json)
|
||||
actor_changed = False
|
||||
if role:
|
||||
# add the role
|
||||
if roleFiles.get(role):
|
||||
_add_role(base_dir, nickname, domain, roleFiles[role])
|
||||
if role not in rolesList:
|
||||
rolesList.append(role)
|
||||
rolesList.sort()
|
||||
set_rolesFromList(actor_json, rolesList)
|
||||
actorChanged = True
|
||||
if role_files.get(role):
|
||||
_add_role(base_dir, nickname, domain, role_files[role])
|
||||
if role not in roles_list:
|
||||
roles_list.append(role)
|
||||
roles_list.sort()
|
||||
set_rolesFromList(actor_json, roles_list)
|
||||
actor_changed = True
|
||||
else:
|
||||
# remove the role
|
||||
if roleFiles.get(role):
|
||||
_remove_role(base_dir, nickname, roleFiles[role])
|
||||
if role in rolesList:
|
||||
rolesList.remove(role)
|
||||
set_rolesFromList(actor_json, rolesList)
|
||||
actorChanged = True
|
||||
if actorChanged:
|
||||
save_json(actor_json, actorFilename)
|
||||
if role_files.get(role):
|
||||
_remove_role(base_dir, nickname, role_files[role])
|
||||
if role in roles_list:
|
||||
roles_list.remove(role)
|
||||
set_rolesFromList(actor_json, roles_list)
|
||||
actor_changed = True
|
||||
if actor_changed:
|
||||
save_json(actor_json, actor_filename)
|
||||
return True
|
||||
|
||||
|
||||
def actor_has_role(actor_json: {}, roleName: str) -> bool:
|
||||
def actor_has_role(actor_json: {}, role_name: str) -> bool:
|
||||
"""Returns true if the given actor has the given role
|
||||
"""
|
||||
rolesList = get_actor_roles_list(actor_json)
|
||||
return roleName in rolesList
|
||||
roles_list = get_actor_roles_list(actor_json)
|
||||
return role_name in roles_list
|
||||
|
|
Loading…
Reference in New Issue