diff --git a/roles.py b/roles.py index 241c1611..6be78ec3 100644 --- a/roles.py +++ b/roles.py @@ -1,14 +1,12 @@ -__filename__="roles.py" -__author__="Bob Mottram" -__license__="AGPL3+" -__version__="1.1.0" -__maintainer__="Bob Mottram" -__email__="bob@freedombone.net" -__status__="Production" +__filename__ = "roles.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.1.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@freedombone.net" +__status__ = "Production" -import json import os -import time from webfinger import webfingerHandle from auth import createBasicAuthHeader from posts import getPersonBox @@ -18,109 +16,118 @@ from utils import getDomainFromActor from utils import loadJson from utils import saveJson + def clearModeratorStatus(baseDir: str) -> None: """Removes moderator status from all accounts This could be slow if there are many users, but only happens rarely when moderators are appointed or removed """ - directory=os.fsencode(baseDir+'/accounts/') + directory = os.fsencode(baseDir + '/accounts/') for f in os.scandir(directory): - f=f.name - filename=os.fsdecode(f) + f = f.name + filename = os.fsdecode(f) if filename.endswith(".json") and '@' in filename: - filename=os.path.join(baseDir+'/accounts/', filename) + filename = os.path.join(baseDir + '/accounts/', filename) if '"moderator"' in open(filename).read(): - actorJson=loadJson(filename) + actorJson = loadJson(filename) if actorJson: if actorJson['roles'].get('instance'): if 'moderator' in actorJson['roles']['instance']: actorJson['roles']['instance'].remove('moderator') - saveJson(actorJson,filename) + saveJson(actorJson, filename) -def addModerator(baseDir: str,nickname: str,domain: str) -> None: + +def addModerator(baseDir: str, nickname: str, domain: str) -> None: """Adds a moderator nickname to the file """ if ':' in domain: - domain=domain.split(':')[0] - moderatorsFile=baseDir+'/accounts/moderators.txt' + domain = domain.split(':')[0] + moderatorsFile = baseDir + '/accounts/moderators.txt' if os.path.isfile(moderatorsFile): # is this nickname already in the file? with open(moderatorsFile, "r") as f: - lines=f.readlines() + lines = f.readlines() for moderator in lines: - moderator=moderator.strip('\n') - if line==nickname: + moderator = moderator.strip('\n') + if moderator == nickname: return lines.append(nickname) with open(moderatorsFile, "w") as f: for moderator in lines: - moderator=moderator.strip('\n') - if len(moderator)>1: - if os.path.isdir(baseDir+'/accounts/'+moderator+'@'+domain): - f.write(moderator+'\n') + moderator = moderator.strip('\n') + if len(moderator) > 1: + if os.path.isdir(baseDir + '/accounts/' + + moderator + '@' + domain): + f.write(moderator + '\n') else: with open(moderatorsFile, "w+") as f: - if os.path.isdir(baseDir+'/accounts/'+nickname+'@'+domain): - f.write(nickname+'\n') + if os.path.isdir(baseDir + '/accounts/' + + nickname + '@' + domain): + f.write(nickname + '\n') -def removeModerator(baseDir: str,nickname: str): + +def removeModerator(baseDir: str, nickname: str): """Removes a moderator nickname from the file """ - moderatorsFile=baseDir+'/accounts/moderators.txt' + moderatorsFile = baseDir + '/accounts/moderators.txt' if not os.path.isfile(moderatorsFile): return with open(moderatorsFile, "r") as f: - lines=f.readlines() + lines = f.readlines() with open(moderatorsFile, "w") as f: for moderator in lines: - moderator=moderator.strip('\n') - if len(moderator)>1 and moderator!=nickname: - f.write(moderator+'\n') + moderator = moderator.strip('\n') + if len(moderator) > 1 and moderator != nickname: + f.write(moderator + '\n') -def setRole(baseDir: str,nickname: str,domain: str, \ - project: str,role: str) -> bool: + +def setRole(baseDir: str, nickname: str, domain: str, + project: str, role: str) -> bool: """Set a person's role within a project Setting the role to an empty string or None will remove it """ # avoid giant strings - if len(role)>128 or len(project)>128: + if len(role) > 128 or len(project) > 128: return False - actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json' + actorFilename = baseDir + '/accounts/' + \ + nickname + '@' + domain + '.json' if not os.path.isfile(actorFilename): return False - actorJson=loadJson(actorFilename) + actorJson = loadJson(actorFilename) if actorJson: if role: # add the role - if project=='instance' and 'role'=='moderator': - addModerator(baseDir,nickname,domain) + if project == 'instance' and 'role' == 'moderator': + addModerator(baseDir, nickname, domain) if actorJson['roles'].get(project): if role not in actorJson['roles'][project]: actorJson['roles'][project].append(role) else: - actorJson['roles'][project]=[role] + actorJson['roles'][project] = [role] else: # remove the role - if project=='instance': - removeModerator(baseDir,nickname) + if project == 'instance': + removeModerator(baseDir, nickname) if actorJson['roles'].get(project): actorJson['roles'][project].remove(role) # if the project contains no roles then remove it - if len(actorJson['roles'][project])==0: + if len(actorJson['roles'][project]) == 0: del actorJson['roles'][project] - saveJson(actorJson,actorFilename) + saveJson(actorJson, actorFilename) return True -def getRoles(baseDir: str,nickname: str,domain: str, \ + +def getRoles(baseDir: str, nickname: str, domain: str, project: str) -> []: """Returns the roles for a given person on a given project """ - actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json' + actorFilename = baseDir + '/accounts/' + \ + nickname + '@' + domain + '.json' if not os.path.isfile(actorFilename): return False - actorJson=loadJson(actorFilename) + actorJson = loadJson(actorFilename) if actorJson: if not actorJson.get('roles'): return None @@ -129,12 +136,14 @@ def getRoles(baseDir: str,nickname: str,domain: str, \ return actorJson['roles'][project] return None -def outboxDelegate(baseDir: str,authenticatedNickname: str,messageJson: {},debug: bool) -> bool: + +def outboxDelegate(baseDir: str, authenticatedNickname: str, + messageJson: {}, debug: bool) -> bool: """Handles receiving a delegation request """ if not messageJson.get('type'): return False - if not messageJson['type']=='Delegate': + if not messageJson['type'] == 'Delegate': return False if not messageJson.get('object'): return False @@ -142,7 +151,7 @@ def outboxDelegate(baseDir: str,authenticatedNickname: str,messageJson: {},debug return False if not messageJson['object'].get('type'): return False - if not messageJson['object']['type']=='Role': + if not messageJson['object']['type'] == 'Role': return False if not messageJson['object'].get('object'): return False @@ -154,68 +163,70 @@ def outboxDelegate(baseDir: str,authenticatedNickname: str,messageJson: {},debug print('WARN: No ; separator between project and role') return False - delegatorNickname=getNicknameFromActor(messageJson['actor']) - if delegatorNickname!=authenticatedNickname: + delegatorNickname = getNicknameFromActor(messageJson['actor']) + if delegatorNickname != authenticatedNickname: return - domain,port=getDomainFromActor(messageJson['actor']) - project=messageJson['object']['object'].split(';')[0].strip() + domain, port = getDomainFromActor(messageJson['actor']) + project = messageJson['object']['object'].split(';')[0].strip() # instance delegators can delagate to other projects # than their own - canDelegate=False - delegatorRoles=getRoles(baseDir,delegatorNickname, \ - domain,'instance') + canDelegate = False + delegatorRoles = getRoles(baseDir, delegatorNickname, + domain, 'instance') if delegatorRoles: if 'delegator' in delegatorRoles: - canDelegate=True + canDelegate = True - if canDelegate==False: - canDelegate=True + if not canDelegate: + canDelegate = True # non-instance delegators can only delegate within their project - delegatorRoles=getRoles(baseDir,delegatorNickname, \ - domain,project) + delegatorRoles = getRoles(baseDir, delegatorNickname, + domain, project) if delegatorRoles: if 'delegator' not in delegatorRoles: return False else: return False - if canDelegate==False: + if not canDelegate: return False - nickname=getNicknameFromActor(messageJson['object']['actor']) + nickname = getNicknameFromActor(messageJson['object']['actor']) if not nickname: - print('WARN: unable to find nickname in '+messageJson['object']['actor']) + print('WARN: unable to find nickname in ' + + messageJson['object']['actor']) return False - domainFull=domain - if port: - if port!=80 and port!=443: - if ':' not in domain: - domainFull=domain+':'+str(port) - role=messageJson['object']['object'].split(';')[1].strip().lower() + role = \ + messageJson['object']['object'].split(';')[1].strip().lower() if not role: - setRole(baseDir,nickname,domain,project,None) + setRole(baseDir, nickname, domain, project, None) return True # what roles is this person already assigned to? - existingRoles=getRoles(baseDir,nickname,domain,project) + existingRoles = getRoles(baseDir, nickname, domain, project) if existingRoles: if role in existingRoles: if debug: - print(nickname+'@'+domain+' is already assigned to the role '+role+' within the project '+project) + print(nickname + '@' + domain + + ' is already assigned to the role ' + + role + ' within the project ' + project) return False - setRole(baseDir,nickname,domain,project,role) + setRole(baseDir, nickname, domain, project, role) if debug: - print(nickname+'@'+domain+' assigned to the role '+role+' within the project '+project) + print(nickname + '@' + domain + + ' assigned to the role ' + role + + ' within the project ' + project) return True -def sendRoleViaServer(baseDir: str,session, \ - delegatorNickname: str,password: str, \ - delegatorDomain: str,delegatorPort: int, \ - httpPrefix: str,nickname: str, \ - project: str,role: str, \ - cachedWebfingers: {},personCache: {}, \ - debug: bool,projectVersion: str) -> {}: + +def sendRoleViaServer(baseDir: str, session, + delegatorNickname: str, password: str, + delegatorDomain: str, delegatorPort: int, + httpPrefix: str, nickname: str, + project: str, role: str, + cachedWebfingers: {}, personCache: {}, + debug: bool, projectVersion: str) -> {}: """A delegator creates a role for a person via c2s Setting role to an empty string or None removes the role """ @@ -223,28 +234,34 @@ def sendRoleViaServer(baseDir: str,session, \ print('WARN: No session for sendRoleViaServer') return 6 - delegatorDomainFull=delegatorDomain - if fromPort: - if fromPort!=80 and fromPort!=443: + delegatorDomainFull = delegatorDomain + if delegatorPort: + if delegatorPort != 80 and delegatorPort != 443: if ':' not in delegatorDomain: - delegatorDomainFull=delegatorDomain+':'+str(fromPort) + delegatorDomainFull = \ + delegatorDomain + ':' + str(delegatorPort) - toUrl= \ - httpPrefix+'://'+delegatorDomainFull+'/users/'+nickname - ccUrl= \ - httpPrefix+'://'+delegatorDomainFull+'/users/'+ \ - delegatorNickname+'/followers' + toUrl = \ + httpPrefix + '://' + delegatorDomainFull + '/users/' + nickname + ccUrl = \ + httpPrefix + '://' + delegatorDomainFull + '/users/' + \ + delegatorNickname + '/followers' if role: - roleStr=project.lower()+';'+role.lower() + roleStr = project.lower() + ';' + role.lower() else: - roleStr=project.lower()+';' - newRoleJson={ + roleStr = project.lower() + ';' + actor = \ + httpPrefix + '://' + delegatorDomainFull + \ + '/users/' + delegatorNickname + delegateActor = \ + httpPrefix + '://' + delegatorDomainFull + '/users/' + nickname + newRoleJson = { 'type': 'Delegate', - 'actor': httpPrefix+'://'+delegatorDomainFull+'/users/'+delegatorNickname, + 'actor': actor, 'object': { 'type': 'Role', - 'actor': httpPrefix+'://'+delegatorDomainFull+'/users/'+nickname, + 'actor': delegateActor, 'object': roleStr, 'to': [toUrl], 'cc': [ccUrl] @@ -253,46 +270,52 @@ def sendRoleViaServer(baseDir: str,session, \ 'cc': [ccUrl] } - handle=httpPrefix+'://'+delegatorDomainFull+'/@'+delegatorNickname + handle = \ + httpPrefix + '://' + delegatorDomainFull + '/@' + delegatorNickname # lookup the inbox for the To handle - wfRequest=webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \ - delegatorDomain,projectVersion) + wfRequest = webfingerHandle(session, handle, httpPrefix, + cachedWebfingers, + delegatorDomain, projectVersion) if not wfRequest: if debug: - print('DEBUG: announce webfinger failed for '+handle) + print('DEBUG: announce webfinger failed for ' + handle) return 1 - postToBox='outbox' + postToBox = 'outbox' # get the actor inbox for the To handle - inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName= \ - getPersonBox(baseDir,session,wfRequest,personCache, \ - projectVersion,httpPrefix, \ - delegatorNickname,delegatorDomain,postToBox) + (inboxUrl, pubKeyId, pubKey, + fromPersonId, sharedInbox, + capabilityAcquisition, + avatarUrl, displayName) = getPersonBox(baseDir, session, + wfRequest, personCache, + projectVersion, httpPrefix, + delegatorNickname, + delegatorDomain, postToBox) if not inboxUrl: if debug: - print('DEBUG: No '+postToBox+' was found for '+handle) + print('DEBUG: No ' + postToBox + ' was found for ' + handle) return 3 if not fromPersonId: if debug: - print('DEBUG: No actor was found for '+handle) + print('DEBUG: No actor was found for ' + handle) return 4 - authHeader=createBasicAuthHeader(delegatorNickname,password) + authHeader = createBasicAuthHeader(delegatorNickname, password) - headers={ - 'host': delegatorDomain, \ - 'Content-type': 'application/json', \ + headers = { + 'host': delegatorDomain, + 'Content-type': 'application/json', 'Authorization': authHeader } - postResult= \ - postJson(session,newRoleJson,[],inboxUrl,headers,"inbox:write") - #if not postResult: - # if debug: - # print('DEBUG: POST announce failed for c2s to '+inboxUrl) - # return 5 + postResult = \ + postJson(session, newRoleJson, [], inboxUrl, headers, "inbox:write") + if not postResult: + if debug: + print('DEBUG: POST announce failed for c2s to '+inboxUrl) +# return 5 if debug: print('DEBUG: c2s POST role success')