epicyon/roles.py

293 lines
11 KiB
Python
Raw Normal View History

2019-07-18 15:09:23 +00:00
__filename__ = "roles.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
2019-08-29 13:35:29 +00:00
__version__ = "1.0.0"
2019-07-18 15:09:23 +00:00
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import json
import commentjson
import os
from webfinger import webfingerHandle
from auth import createBasicAuthHeader
from posts import getPersonBox
from session import postJson
from utils import getNicknameFromActor
from utils import getDomainFromActor
2019-08-12 21:20:47 +00:00
def clearModeratorStatus(baseDir: str) -> None:
"""Removes moderator status from all accounts
This could be slow if there are many users, but only happens
rarely when moderators are appointed or removed
"""
directory = os.fsencode(baseDir+'/accounts/')
2019-09-27 12:09:04 +00:00
for f in os.scandir(directory):
f=f.name
2019-08-12 21:20:47 +00:00
filename = os.fsdecode(f)
if filename.endswith(".json") and '@' in filename:
filename=os.path.join(baseDir+'/accounts/', filename)
if '"moderator"' in open(filename).read():
with open(filename, 'r') as fp:
actorJson=commentjson.load(fp)
if actorJson['roles'].get('instance'):
if 'moderator' in actorJson['roles']['instance']:
actorJson['roles']['instance'].remove('moderator')
with open(filename, 'w') as fp:
commentjson.dump(actorJson, fp, indent=4, sort_keys=False)
def addModerator(baseDir: str,nickname: str,domain: str) -> None:
2019-08-11 11:25:27 +00:00
"""Adds a moderator nickname to the file
"""
2019-08-12 21:20:47 +00:00
if ':' in domain:
domain=domain.split(':')[0]
2019-08-11 11:25:27 +00:00
moderatorsFile=baseDir+'/accounts/moderators.txt'
if os.path.isfile(moderatorsFile):
# is this nickname already in the file?
with open(moderatorsFile, "r") as f:
lines = f.readlines()
for moderator in lines:
moderator=moderator.strip('\n')
if line==nickname:
return
lines.append(nickname)
with open(moderatorsFile, "w") as f:
for moderator in lines:
moderator=moderator.strip('\n')
if len(moderator)>1:
2019-08-12 21:20:47 +00:00
if os.path.isdir(baseDir+'/accounts/'+moderator+'@'+domain):
f.write(moderator+'\n')
else:
with open(moderatorsFile, "w+") as f:
2019-08-12 21:20:47 +00:00
if os.path.isdir(baseDir+'/accounts/'+nickname+'@'+domain):
f.write(nickname+'\n')
2019-08-11 11:25:27 +00:00
def removeModerator(baseDir: str,nickname: str):
"""Removes a moderator nickname from the file
"""
moderatorsFile=baseDir+'/accounts/moderators.txt'
if not os.path.isfile(moderatorsFile):
return
with open(moderatorsFile, "r") as f:
lines = f.readlines()
with open(moderatorsFile, "w") as f:
for moderator in lines:
moderator=moderator.strip('\n')
if len(moderator)>1 and moderator!=nickname:
f.write(moderator+'\n')
2019-07-18 15:09:23 +00:00
def setRole(baseDir: str,nickname: str,domain: str, \
project: str,role: str) -> bool:
"""Set a person's role within a project
Setting the role to an empty string or None will remove it
"""
# avoid giant strings
if len(role)>128 or len(project)>128:
return False
actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json'
if not os.path.isfile(actorFilename):
return False
with open(actorFilename, 'r') as fp:
actorJson=commentjson.load(fp)
if role:
2019-08-11 11:25:27 +00:00
# add the role
if project=='instance' and 'role'=='moderator':
2019-08-12 21:20:47 +00:00
addModerator(baseDir,nickname,domain)
2019-07-18 15:09:23 +00:00
if actorJson['roles'].get(project):
if role not in actorJson['roles'][project]:
actorJson['roles'][project].append(role)
else:
actorJson['roles'][project]=[role]
else:
2019-08-11 11:25:27 +00:00
# remove the role
if project=='instance':
removeModerator(baseDir,nickname)
2019-07-18 15:09:23 +00:00
if actorJson['roles'].get(project):
actorJson['roles'][project].remove(role)
# if the project contains no roles then remove it
if len(actorJson['roles'][project])==0:
del actorJson['roles'][project]
with open(actorFilename, 'w') as fp:
commentjson.dump(actorJson, fp, indent=4, sort_keys=False)
return True
def getRoles(baseDir: str,nickname: str,domain: str, \
project: str) -> []:
"""Returns the roles for a given person on a given project
"""
actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json'
if not os.path.isfile(actorFilename):
return False
with open(actorFilename, 'r') as fp:
actorJson=commentjson.load(fp)
if not actorJson.get('roles'):
return None
if not actorJson['roles'].get(project):
return None
return actorJson['roles'][project]
return None
2019-07-19 10:01:24 +00:00
def outboxDelegate(baseDir: str,authenticatedNickname: str,messageJson: {},debug: bool) -> bool:
2019-07-18 15:09:23 +00:00
"""Handles receiving a delegation request
"""
if not messageJson.get('type'):
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not messageJson['type']=='Delegate':
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not messageJson.get('object'):
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not isinstance(messageJson['object'], dict):
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not messageJson['object'].get('type'):
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not messageJson['object']['type']=='Role':
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not messageJson['object'].get('object'):
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not messageJson['object'].get('actor'):
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if not isinstance(messageJson['object']['object'], str):
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
if ';' not in messageJson['object']['object']:
print('WARN: No ; separator between project and role')
2019-07-18 16:21:26 +00:00
return False
2019-07-18 15:09:23 +00:00
delegatorNickname=getNicknameFromActor(messageJson['actor'])
2019-07-19 10:01:24 +00:00
if delegatorNickname!=authenticatedNickname:
return
2019-07-18 15:09:23 +00:00
domain,port=getDomainFromActor(messageJson['actor'])
project=messageJson['object']['object'].split(';')[0].strip()
2019-07-18 16:21:26 +00:00
# instance delegators can delagate to other projects
# than their own
canDelegate=False
2019-07-18 15:09:23 +00:00
delegatorRoles=getRoles(baseDir,delegatorNickname, \
2019-07-18 16:21:26 +00:00
domain,'instance')
2019-07-18 15:09:23 +00:00
if delegatorRoles:
2019-07-18 16:21:26 +00:00
if 'delegator' in delegatorRoles:
canDelegate=True
if canDelegate==False:
canDelegate=True
# non-instance delegators can only delegate within their project
delegatorRoles=getRoles(baseDir,delegatorNickname, \
domain,project)
if delegatorRoles:
2019-07-18 15:09:23 +00:00
if 'delegator' not in delegatorRoles:
2019-07-18 16:21:26 +00:00
return False
else:
return False
if canDelegate==False:
return False
2019-07-18 15:09:23 +00:00
nickname=getNicknameFromActor(messageJson['object']['actor'])
2019-09-02 09:43:43 +00:00
if not nickname:
print('WARN: unable to find nickname in '+messageJson['object']['actor'])
return False
2019-07-18 15:09:23 +00:00
domainFull=domain
if port:
if port!=80 and port!=443:
if ':' not in domain:
domainFull=domain+':'+str(port)
2019-07-18 15:09:23 +00:00
role=messageJson['object']['object'].split(';')[1].strip().lower()
2019-07-18 16:43:48 +00:00
if not role:
setRole(baseDir,nickname,domain,project,None)
return True
2019-07-18 15:09:23 +00:00
# what roles is this person already assigned to?
existingRoles=getRoles(baseDir,nickname,domain,project)
if existingRoles:
if role in existingRoles:
2019-07-18 16:21:26 +00:00
if debug:
print(nickname+'@'+domain+' is already assigned to the role '+role+' within the project '+project)
return False
2019-07-18 15:09:23 +00:00
setRole(baseDir,nickname,domain,project,role)
2019-07-18 16:21:26 +00:00
if debug:
print(nickname+'@'+domain+' assigned to the role '+role+' within the project '+project)
return True
2019-07-18 15:09:23 +00:00
2019-08-20 09:16:03 +00:00
def sendRoleViaServer(baseDir: str,session, \
delegatorNickname: str,password: str, \
2019-07-18 15:09:23 +00:00
delegatorDomain: str,delegatorPort: int, \
httpPrefix: str,nickname: str, \
project: str,role: str, \
cachedWebfingers: {},personCache: {}, \
2019-08-14 20:12:27 +00:00
debug: bool,projectVersion: str) -> {}:
2019-07-18 15:09:23 +00:00
"""A delegator creates a role for a person via c2s
2019-07-18 16:43:48 +00:00
Setting role to an empty string or None removes the role
2019-07-18 15:09:23 +00:00
"""
if not session:
print('WARN: No session for sendRoleViaServer')
return 6
delegatorDomainFull=delegatorDomain
if fromPort:
if fromPort!=80 and fromPort!=443:
if ':' not in delegatorDomain:
delegatorDomainFull=delegatorDomain+':'+str(fromPort)
2019-07-18 15:09:23 +00:00
toUrl = httpPrefix+'://'+delegatorDomainFull+'/users/'+nickname
ccUrl = httpPrefix+'://'+delegatorDomainFull+'/users/'+delegatorNickname+'/followers'
2019-07-18 16:43:48 +00:00
if role:
roleStr=project.lower()+';'+role.lower()
else:
roleStr=project.lower()+';'
2019-07-18 15:09:23 +00:00
newRoleJson = {
'type': 'Delegate',
'actor': httpPrefix+'://'+delegatorDomainFull+'/users/'+delegatorNickname,
'object': {
'type': 'Role',
'actor': httpPrefix+'://'+delegatorDomainFull+'/users/'+nickname,
2019-07-18 16:43:48 +00:00
'object': roleStr,
2019-07-18 15:09:23 +00:00
'to': [toUrl],
'cc': [ccUrl]
},
'to': [toUrl],
'cc': [ccUrl]
}
handle=httpPrefix+'://'+delegatorDomainFull+'/@'+delegatorNickname
# lookup the inbox for the To handle
2019-08-14 20:12:27 +00:00
wfRequest = webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \
delegatorDomain,projectVersion)
2019-07-18 15:09:23 +00:00
if not wfRequest:
if debug:
print('DEBUG: announce webfinger failed for '+handle)
return 1
postToBox='outbox'
# get the actor inbox for the To handle
inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName = \
2019-08-20 09:16:03 +00:00
getPersonBox(baseDir,session,wfRequest,personCache, \
2019-08-14 20:12:27 +00:00
projectVersion,httpPrefix,delegatorDomain,postToBox)
2019-07-18 15:09:23 +00:00
if not inboxUrl:
if debug:
print('DEBUG: No '+postToBox+' was found for '+handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: No actor was found for '+handle)
return 4
authHeader=createBasicAuthHeader(delegatorNickname,password)
headers = {'host': delegatorDomain, \
'Content-type': 'application/json', \
'Authorization': authHeader}
postResult = \
postJson(session,newRoleJson,[],inboxUrl,headers,"inbox:write")
#if not postResult:
# if debug:
# print('DEBUG: POST announce failed for c2s to '+inboxUrl)
# return 5
if debug:
print('DEBUG: c2s POST role success')
return newRoleJson