flake8 format

main
Bob Mottram 2020-04-04 11:28:58 +01:00
parent 3334cc0466
commit d55fe8bb59
1 changed files with 142 additions and 119 deletions

261
roles.py
View File

@ -1,14 +1,12 @@
__filename__="roles.py"
__author__="Bob Mottram"
__license__="AGPL3+"
__version__="1.1.0"
__maintainer__="Bob Mottram"
__email__="bob@freedombone.net"
__status__="Production"
__filename__ = "roles.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import json
import os
import time
from webfinger import webfingerHandle
from auth import createBasicAuthHeader
from posts import getPersonBox
@ -18,109 +16,118 @@ from utils import getDomainFromActor
from utils import loadJson
from utils import saveJson
def clearModeratorStatus(baseDir: str) -> None:
"""Removes moderator status from all accounts
This could be slow if there are many users, but only happens
rarely when moderators are appointed or removed
"""
directory=os.fsencode(baseDir+'/accounts/')
directory = os.fsencode(baseDir + '/accounts/')
for f in os.scandir(directory):
f=f.name
filename=os.fsdecode(f)
f = f.name
filename = os.fsdecode(f)
if filename.endswith(".json") and '@' in filename:
filename=os.path.join(baseDir+'/accounts/', filename)
filename = os.path.join(baseDir + '/accounts/', filename)
if '"moderator"' in open(filename).read():
actorJson=loadJson(filename)
actorJson = loadJson(filename)
if actorJson:
if actorJson['roles'].get('instance'):
if 'moderator' in actorJson['roles']['instance']:
actorJson['roles']['instance'].remove('moderator')
saveJson(actorJson,filename)
saveJson(actorJson, filename)
def addModerator(baseDir: str,nickname: str,domain: str) -> None:
def addModerator(baseDir: str, nickname: str, domain: str) -> None:
"""Adds a moderator nickname to the file
"""
if ':' in domain:
domain=domain.split(':')[0]
moderatorsFile=baseDir+'/accounts/moderators.txt'
domain = domain.split(':')[0]
moderatorsFile = baseDir + '/accounts/moderators.txt'
if os.path.isfile(moderatorsFile):
# is this nickname already in the file?
with open(moderatorsFile, "r") as f:
lines=f.readlines()
lines = f.readlines()
for moderator in lines:
moderator=moderator.strip('\n')
if line==nickname:
moderator = moderator.strip('\n')
if moderator == nickname:
return
lines.append(nickname)
with open(moderatorsFile, "w") as f:
for moderator in lines:
moderator=moderator.strip('\n')
if len(moderator)>1:
if os.path.isdir(baseDir+'/accounts/'+moderator+'@'+domain):
f.write(moderator+'\n')
moderator = moderator.strip('\n')
if len(moderator) > 1:
if os.path.isdir(baseDir + '/accounts/' +
moderator + '@' + domain):
f.write(moderator + '\n')
else:
with open(moderatorsFile, "w+") as f:
if os.path.isdir(baseDir+'/accounts/'+nickname+'@'+domain):
f.write(nickname+'\n')
if os.path.isdir(baseDir + '/accounts/' +
nickname + '@' + domain):
f.write(nickname + '\n')
def removeModerator(baseDir: str,nickname: str):
def removeModerator(baseDir: str, nickname: str):
"""Removes a moderator nickname from the file
"""
moderatorsFile=baseDir+'/accounts/moderators.txt'
moderatorsFile = baseDir + '/accounts/moderators.txt'
if not os.path.isfile(moderatorsFile):
return
with open(moderatorsFile, "r") as f:
lines=f.readlines()
lines = f.readlines()
with open(moderatorsFile, "w") as f:
for moderator in lines:
moderator=moderator.strip('\n')
if len(moderator)>1 and moderator!=nickname:
f.write(moderator+'\n')
moderator = moderator.strip('\n')
if len(moderator) > 1 and moderator != nickname:
f.write(moderator + '\n')
def setRole(baseDir: str,nickname: str,domain: str, \
project: str,role: str) -> bool:
def setRole(baseDir: str, nickname: str, domain: str,
project: str, role: str) -> bool:
"""Set a person's role within a project
Setting the role to an empty string or None will remove it
"""
# avoid giant strings
if len(role)>128 or len(project)>128:
if len(role) > 128 or len(project) > 128:
return False
actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json'
actorFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '.json'
if not os.path.isfile(actorFilename):
return False
actorJson=loadJson(actorFilename)
actorJson = loadJson(actorFilename)
if actorJson:
if role:
# add the role
if project=='instance' and 'role'=='moderator':
addModerator(baseDir,nickname,domain)
if project == 'instance' and 'role' == 'moderator':
addModerator(baseDir, nickname, domain)
if actorJson['roles'].get(project):
if role not in actorJson['roles'][project]:
actorJson['roles'][project].append(role)
else:
actorJson['roles'][project]=[role]
actorJson['roles'][project] = [role]
else:
# remove the role
if project=='instance':
removeModerator(baseDir,nickname)
if project == 'instance':
removeModerator(baseDir, nickname)
if actorJson['roles'].get(project):
actorJson['roles'][project].remove(role)
# if the project contains no roles then remove it
if len(actorJson['roles'][project])==0:
if len(actorJson['roles'][project]) == 0:
del actorJson['roles'][project]
saveJson(actorJson,actorFilename)
saveJson(actorJson, actorFilename)
return True
def getRoles(baseDir: str,nickname: str,domain: str, \
def getRoles(baseDir: str, nickname: str, domain: str,
project: str) -> []:
"""Returns the roles for a given person on a given project
"""
actorFilename=baseDir+'/accounts/'+nickname+'@'+domain+'.json'
actorFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '.json'
if not os.path.isfile(actorFilename):
return False
actorJson=loadJson(actorFilename)
actorJson = loadJson(actorFilename)
if actorJson:
if not actorJson.get('roles'):
return None
@ -129,12 +136,14 @@ def getRoles(baseDir: str,nickname: str,domain: str, \
return actorJson['roles'][project]
return None
def outboxDelegate(baseDir: str,authenticatedNickname: str,messageJson: {},debug: bool) -> bool:
def outboxDelegate(baseDir: str, authenticatedNickname: str,
messageJson: {}, debug: bool) -> bool:
"""Handles receiving a delegation request
"""
if not messageJson.get('type'):
return False
if not messageJson['type']=='Delegate':
if not messageJson['type'] == 'Delegate':
return False
if not messageJson.get('object'):
return False
@ -142,7 +151,7 @@ def outboxDelegate(baseDir: str,authenticatedNickname: str,messageJson: {},debug
return False
if not messageJson['object'].get('type'):
return False
if not messageJson['object']['type']=='Role':
if not messageJson['object']['type'] == 'Role':
return False
if not messageJson['object'].get('object'):
return False
@ -154,68 +163,70 @@ def outboxDelegate(baseDir: str,authenticatedNickname: str,messageJson: {},debug
print('WARN: No ; separator between project and role')
return False
delegatorNickname=getNicknameFromActor(messageJson['actor'])
if delegatorNickname!=authenticatedNickname:
delegatorNickname = getNicknameFromActor(messageJson['actor'])
if delegatorNickname != authenticatedNickname:
return
domain,port=getDomainFromActor(messageJson['actor'])
project=messageJson['object']['object'].split(';')[0].strip()
domain, port = getDomainFromActor(messageJson['actor'])
project = messageJson['object']['object'].split(';')[0].strip()
# instance delegators can delagate to other projects
# than their own
canDelegate=False
delegatorRoles=getRoles(baseDir,delegatorNickname, \
domain,'instance')
canDelegate = False
delegatorRoles = getRoles(baseDir, delegatorNickname,
domain, 'instance')
if delegatorRoles:
if 'delegator' in delegatorRoles:
canDelegate=True
canDelegate = True
if canDelegate==False:
canDelegate=True
if not canDelegate:
canDelegate = True
# non-instance delegators can only delegate within their project
delegatorRoles=getRoles(baseDir,delegatorNickname, \
domain,project)
delegatorRoles = getRoles(baseDir, delegatorNickname,
domain, project)
if delegatorRoles:
if 'delegator' not in delegatorRoles:
return False
else:
return False
if canDelegate==False:
if not canDelegate:
return False
nickname=getNicknameFromActor(messageJson['object']['actor'])
nickname = getNicknameFromActor(messageJson['object']['actor'])
if not nickname:
print('WARN: unable to find nickname in '+messageJson['object']['actor'])
print('WARN: unable to find nickname in ' +
messageJson['object']['actor'])
return False
domainFull=domain
if port:
if port!=80 and port!=443:
if ':' not in domain:
domainFull=domain+':'+str(port)
role=messageJson['object']['object'].split(';')[1].strip().lower()
role = \
messageJson['object']['object'].split(';')[1].strip().lower()
if not role:
setRole(baseDir,nickname,domain,project,None)
setRole(baseDir, nickname, domain, project, None)
return True
# what roles is this person already assigned to?
existingRoles=getRoles(baseDir,nickname,domain,project)
existingRoles = getRoles(baseDir, nickname, domain, project)
if existingRoles:
if role in existingRoles:
if debug:
print(nickname+'@'+domain+' is already assigned to the role '+role+' within the project '+project)
print(nickname + '@' + domain +
' is already assigned to the role ' +
role + ' within the project ' + project)
return False
setRole(baseDir,nickname,domain,project,role)
setRole(baseDir, nickname, domain, project, role)
if debug:
print(nickname+'@'+domain+' assigned to the role '+role+' within the project '+project)
print(nickname + '@' + domain +
' assigned to the role ' + role +
' within the project ' + project)
return True
def sendRoleViaServer(baseDir: str,session, \
delegatorNickname: str,password: str, \
delegatorDomain: str,delegatorPort: int, \
httpPrefix: str,nickname: str, \
project: str,role: str, \
cachedWebfingers: {},personCache: {}, \
debug: bool,projectVersion: str) -> {}:
def sendRoleViaServer(baseDir: str, session,
delegatorNickname: str, password: str,
delegatorDomain: str, delegatorPort: int,
httpPrefix: str, nickname: str,
project: str, role: str,
cachedWebfingers: {}, personCache: {},
debug: bool, projectVersion: str) -> {}:
"""A delegator creates a role for a person via c2s
Setting role to an empty string or None removes the role
"""
@ -223,28 +234,34 @@ def sendRoleViaServer(baseDir: str,session, \
print('WARN: No session for sendRoleViaServer')
return 6
delegatorDomainFull=delegatorDomain
if fromPort:
if fromPort!=80 and fromPort!=443:
delegatorDomainFull = delegatorDomain
if delegatorPort:
if delegatorPort != 80 and delegatorPort != 443:
if ':' not in delegatorDomain:
delegatorDomainFull=delegatorDomain+':'+str(fromPort)
delegatorDomainFull = \
delegatorDomain + ':' + str(delegatorPort)
toUrl= \
httpPrefix+'://'+delegatorDomainFull+'/users/'+nickname
ccUrl= \
httpPrefix+'://'+delegatorDomainFull+'/users/'+ \
delegatorNickname+'/followers'
toUrl = \
httpPrefix + '://' + delegatorDomainFull + '/users/' + nickname
ccUrl = \
httpPrefix + '://' + delegatorDomainFull + '/users/' + \
delegatorNickname + '/followers'
if role:
roleStr=project.lower()+';'+role.lower()
roleStr = project.lower() + ';' + role.lower()
else:
roleStr=project.lower()+';'
newRoleJson={
roleStr = project.lower() + ';'
actor = \
httpPrefix + '://' + delegatorDomainFull + \
'/users/' + delegatorNickname
delegateActor = \
httpPrefix + '://' + delegatorDomainFull + '/users/' + nickname
newRoleJson = {
'type': 'Delegate',
'actor': httpPrefix+'://'+delegatorDomainFull+'/users/'+delegatorNickname,
'actor': actor,
'object': {
'type': 'Role',
'actor': httpPrefix+'://'+delegatorDomainFull+'/users/'+nickname,
'actor': delegateActor,
'object': roleStr,
'to': [toUrl],
'cc': [ccUrl]
@ -253,46 +270,52 @@ def sendRoleViaServer(baseDir: str,session, \
'cc': [ccUrl]
}
handle=httpPrefix+'://'+delegatorDomainFull+'/@'+delegatorNickname
handle = \
httpPrefix + '://' + delegatorDomainFull + '/@' + delegatorNickname
# lookup the inbox for the To handle
wfRequest=webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \
delegatorDomain,projectVersion)
wfRequest = webfingerHandle(session, handle, httpPrefix,
cachedWebfingers,
delegatorDomain, projectVersion)
if not wfRequest:
if debug:
print('DEBUG: announce webfinger failed for '+handle)
print('DEBUG: announce webfinger failed for ' + handle)
return 1
postToBox='outbox'
postToBox = 'outbox'
# get the actor inbox for the To handle
inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName= \
getPersonBox(baseDir,session,wfRequest,personCache, \
projectVersion,httpPrefix, \
delegatorNickname,delegatorDomain,postToBox)
(inboxUrl, pubKeyId, pubKey,
fromPersonId, sharedInbox,
capabilityAcquisition,
avatarUrl, displayName) = getPersonBox(baseDir, session,
wfRequest, personCache,
projectVersion, httpPrefix,
delegatorNickname,
delegatorDomain, postToBox)
if not inboxUrl:
if debug:
print('DEBUG: No '+postToBox+' was found for '+handle)
print('DEBUG: No ' + postToBox + ' was found for ' + handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: No actor was found for '+handle)
print('DEBUG: No actor was found for ' + handle)
return 4
authHeader=createBasicAuthHeader(delegatorNickname,password)
authHeader = createBasicAuthHeader(delegatorNickname, password)
headers={
'host': delegatorDomain, \
'Content-type': 'application/json', \
headers = {
'host': delegatorDomain,
'Content-type': 'application/json',
'Authorization': authHeader
}
postResult= \
postJson(session,newRoleJson,[],inboxUrl,headers,"inbox:write")
#if not postResult:
# if debug:
# print('DEBUG: POST announce failed for c2s to '+inboxUrl)
# return 5
postResult = \
postJson(session, newRoleJson, [], inboxUrl, headers, "inbox:write")
if not postResult:
if debug:
print('DEBUG: POST announce failed for c2s to '+inboxUrl)
# return 5
if debug:
print('DEBUG: c2s POST role success')