flake8 format

main
Bob Mottram 2020-04-04 15:14:25 +01:00
parent b63bf2c72d
commit dc2386c583
1 changed files with 133 additions and 116 deletions

View File

@ -1,10 +1,10 @@
__filename__="webfinger.py" __filename__ = "webfinger.py"
__author__="Bob Mottram" __author__ = "Bob Mottram"
__license__="AGPL3+" __license__ = "AGPL3+"
__version__="1.1.0" __version__ = "1.1.0"
__maintainer__="Bob Mottram" __maintainer__ = "Bob Mottram"
__email__="bob@freedombone.net" __email__ = "bob@freedombone.net"
__status__="Production" __status__ = "Production"
import base64 import base64
try: try:
@ -13,10 +13,7 @@ try:
except ImportError: except ImportError:
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto.Util import number from Crypto.Util import number
import requests
import json
import os import os
import time
from session import getJson from session import getJson
from cache import storeWebfingerInCache from cache import storeWebfingerInCache
from cache import getWebfingerFromCache from cache import getWebfingerFromCache
@ -24,116 +21,130 @@ from utils import loadJson
from utils import loadJsonOnionify from utils import loadJsonOnionify
from utils import saveJson from utils import saveJson
def parseHandle(handle: str) -> (str,str):
def parseHandle(handle: str) -> (str, str):
if '.' not in handle: if '.' not in handle:
return None,None return None, None
handleStr = handle.replace('https://', '').replace('http://', '')
handleStr = handleStr.replace('dat://', '').replace('i2p://', '')
if '/@' in handle: if '/@' in handle:
domain,nickname= \ domain, nickname = handleStr.split('/@')
handle.replace('https://','').replace('http://','').replace('dat://','').replace('i2p://','').split('/@')
else: else:
if '/users/' in handle: if '/users/' in handle:
domain,nickname= \ domain, nickname = handleStr.split('/users/')
handle.replace('https://','').replace('http://','').replace('i2p://','').replace('dat://','').split('/users/')
else: else:
if '@' in handle: if '@' in handle:
nickname,domain=handle.split('@') nickname, domain = handle.split('@')
else: else:
return None,None return None, None
return nickname, domain
return nickname,domain
def webfingerHandle(session,handle: str,httpPrefix: str,cachedWebfingers: {}, \ def webfingerHandle(session, handle: str, httpPrefix: str,
fromDomain: str,projectVersion: str) -> {}: cachedWebfingers: {},
fromDomain: str, projectVersion: str) -> {}:
if not session: if not session:
print('WARN: No session specified for webfingerHandle') print('WARN: No session specified for webfingerHandle')
return None return None
nickname,domain=parseHandle(handle) nickname, domain = parseHandle(handle)
if not nickname: if not nickname:
return None return None
wfDomain=domain wfDomain = domain
if ':' in wfDomain: if ':' in wfDomain:
#wfPortStr=wfDomain.split(':')[1] # wfPortStr=wfDomain.split(':')[1]
#if wfPortStr.isdigit(): # if wfPortStr.isdigit():
# wfPort=int(wfPortStr) # wfPort=int(wfPortStr)
#if wfPort==80 or wfPort==443: # if wfPort==80 or wfPort==443:
wfDomain=wfDomain.split(':')[0] wfDomain = wfDomain.split(':')[0]
wf=getWebfingerFromCache(nickname+'@'+wfDomain,cachedWebfingers) wf = getWebfingerFromCache(nickname + '@' + wfDomain,
cachedWebfingers)
if wf: if wf:
return wf return wf
url='{}://{}/.well-known/webfinger'.format(httpPrefix,domain) url = '{}://{}/.well-known/webfinger'.format(httpPrefix, domain)
par={ par = {
'resource': 'acct:{}'.format(nickname+'@'+wfDomain) 'resource': 'acct:{}'.format(nickname + '@' + wfDomain)
} }
hdr={ hdr = {
'Accept': 'application/jrd+json' 'Accept': 'application/jrd+json'
} }
try: try:
result=getJson(session,url,hdr,par,projectVersion,httpPrefix,fromDomain) result = \
getJson(session, url, hdr, par, projectVersion,
httpPrefix, fromDomain)
except Exception as e: except Exception as e:
print("Unable to webfinger " + url) print("Unable to webfinger " + url)
print('nickname: '+str(nickname)) print('nickname: ' + str(nickname))
print('domain: '+str(wfDomain)) print('domain: ' + str(wfDomain))
print('headers: '+str(hdr)) print('headers: ' + str(hdr))
print('params: '+str(par)) print('params: ' + str(par))
print(e) print(e)
return None return None
storeWebfingerInCache(nickname+'@'+wfDomain,result,cachedWebfingers) storeWebfingerInCache(nickname + '@' + wfDomain,
result, cachedWebfingers)
return result return result
def generateMagicKey(publicKeyPem) -> str: def generateMagicKey(publicKeyPem) -> str:
"""See magic_key method in """See magic_key method in
https://github.com/tootsuite/mastodon/blob/707ddf7808f90e3ab042d7642d368c2ce8e95e6f/app/models/account.rb https://github.com/tootsuite/mastodon/blob/
707ddf7808f90e3ab042d7642d368c2ce8e95e6f/app/models/account.rb
""" """
privkey=RSA.importKey(publicKeyPem) privkey = RSA.importKey(publicKeyPem)
mod=base64.urlsafe_b64encode(number.long_to_bytes(privkey.n)).decode("utf-8") modBytes = number.long_to_bytes(privkey.n)
pubexp=base64.urlsafe_b64encode(number.long_to_bytes(privkey.e)).decode("utf-8") mod = base64.urlsafe_b64encode(modBytes).decode("utf-8")
expBytes = number.long_to_bytes(privkey.e)
pubexp = base64.urlsafe_b64encode(expBytes).decode("utf-8")
return f"data:application/magic-public-key,RSA.{mod}.{pubexp}" return f"data:application/magic-public-key,RSA.{mod}.{pubexp}"
def storeWebfingerEndpoint(nickname: str,domain: str,port: int,baseDir: str, \
wfJson: {}) -> bool: def storeWebfingerEndpoint(nickname: str, domain: str, port: int,
baseDir: str, wfJson: {}) -> bool:
"""Stores webfinger endpoint for a user to a file """Stores webfinger endpoint for a user to a file
""" """
originalDomain=domain originalDomain = domain
if port: if port:
if port!=80 and port!=443: if port != 80 and port != 443:
if ':' not in domain: if ':' not in domain:
domain=domain+':'+str(port) domain = domain + ':' + str(port)
handle=nickname+'@'+domain handle = nickname + '@' + domain
wfSubdir='/wfendpoints' wfSubdir = '/wfendpoints'
if not os.path.isdir(baseDir+wfSubdir): if not os.path.isdir(baseDir + wfSubdir):
os.mkdir(baseDir+wfSubdir) os.mkdir(baseDir + wfSubdir)
filename=baseDir+wfSubdir+'/'+handle.lower()+'.json' filename = baseDir + wfSubdir + '/' + handle.lower() + '.json'
saveJson(wfJson,filename) saveJson(wfJson, filename)
if nickname=='inbox': if nickname == 'inbox':
handle=originalDomain+'@'+domain handle = originalDomain + '@' + domain
filename=baseDir+wfSubdir+'/'+handle.lower()+'.json' filename = baseDir + wfSubdir + '/' + handle.lower() + '.json'
saveJson(wfJson,filename) saveJson(wfJson, filename)
return True return True
def createWebfingerEndpoint(nickname: str,domain: str,port: int, \
httpPrefix: str,publicKeyPem) -> {}: def createWebfingerEndpoint(nickname: str, domain: str, port: int,
httpPrefix: str, publicKeyPem) -> {}:
"""Creates a webfinger endpoint for a user """Creates a webfinger endpoint for a user
""" """
originalDomain=domain originalDomain = domain
if port: if port:
if port!=80 and port!=443: if port != 80 and port != 443:
if ':' not in domain: if ':' not in domain:
domain=domain+':'+str(port) domain = domain + ':' + str(port)
personName=nickname personName = nickname
personId=httpPrefix+"://"+domain+"/users/"+personName personId = httpPrefix + "://" + domain + "/users/" + personName
subjectStr="acct:"+personName+"@"+originalDomain subjectStr = "acct:" + personName + "@" + originalDomain
profilePageHref=httpPrefix+"://"+domain+"/@"+nickname profilePageHref = httpPrefix + "://" + domain + "/@" + nickname
if nickname=='inbox' or nickname==originalDomain: if nickname == 'inbox' or nickname == originalDomain:
personName='actor' personName = 'actor'
personId=httpPrefix+"://"+domain+"/"+personName personId = httpPrefix + "://" + domain + "/" + personName
subjectStr="acct:"+originalDomain+"@"+originalDomain subjectStr = "acct:" + originalDomain + "@" + originalDomain
profilePageHref=httpPrefix+'://'+domain+'/about/more?instance_actor=true' profilePageHref = httpPrefix + '://' + domain + \
'/about/more?instance_actor=true'
account={ actor = httpPrefix + "://" + domain + "/users/" + nickname
account = {
"aliases": [ "aliases": [
httpPrefix+"://"+domain+"/@"+personName, httpPrefix + "://" + domain + "/@" + personName,
personId personId
], ],
"links": [ "links": [
@ -143,7 +154,7 @@ def createWebfingerEndpoint(nickname: str,domain: str,port: int, \
"type": "text/html" "type": "text/html"
}, },
{ {
"href": httpPrefix+"://"+domain+"/users/"+nickname+".atom", "href": actor + ".atom",
"rel": "http://schemas.google.com/g/2010#updates-from", "rel": "http://schemas.google.com/g/2010#updates-from",
"type": "application/atom+xml" "type": "application/atom+xml"
}, },
@ -161,92 +172,98 @@ def createWebfingerEndpoint(nickname: str,domain: str,port: int, \
} }
return account return account
def webfingerNodeInfo(httpPrefix: str,domainFull: str) -> {}:
def webfingerNodeInfo(httpPrefix: str, domainFull: str) -> {}:
""" /.well-known/nodeinfo endpoint """ /.well-known/nodeinfo endpoint
""" """
nodeinfo={ nodeinfo = {
'links': [ 'links': [
{ {
'href': httpPrefix+'://'+domainFull+'/nodeinfo/2.0', 'href': httpPrefix + '://' + domainFull + '/nodeinfo/2.0',
'rel': 'http://nodeinfo.diaspora.software/ns/schema/2.0' 'rel': 'http://nodeinfo.diaspora.software/ns/schema/2.0'
} }
] ]
} }
return nodeinfo return nodeinfo
def webfingerMeta(httpPrefix: str,domainFull: str) -> str:
def webfingerMeta(httpPrefix: str, domainFull: str) -> str:
"""Return /.well-known/host-meta """Return /.well-known/host-meta
""" """
metaStr="<?xml version=1.0' encoding=UTF-8'?>" metaStr = "<?xml version=1.0' encoding=UTF-8'?>"
metaStr+="<XRD xmlns=http://docs.oasis-open.org/ns/xri/xrd-1.0'" metaStr += "<XRD xmlns=http://docs.oasis-open.org/ns/xri/xrd-1.0'"
metaStr+=" xmlns:hm=http://host-meta.net/xrd/1.0'>" metaStr += " xmlns:hm=http://host-meta.net/xrd/1.0'>"
metaStr+="" metaStr += ""
metaStr+="<hm:Host>"+domainFull+"</hm:Host>" metaStr += "<hm:Host>" + domainFull + "</hm:Host>"
metaStr+="" metaStr += ""
metaStr+="<Link rel=lrdd" metaStr += "<Link rel=lrdd"
metaStr+=" template="+httpPrefix+"://"+domainFull+"/describe?uri={uri}'>" metaStr += " template=" + httpPrefix + "://" + domainFull + \
metaStr+=" <Title>Resource Descriptor</Title>" "/describe?uri={uri}'>"
metaStr+=" </Link>" metaStr += " <Title>Resource Descriptor</Title>"
metaStr+="</XRD>" metaStr += " </Link>"
metaStr += "</XRD>"
return metaStr return metaStr
def webfingerLookup(path: str,baseDir: str, \
domain: str,onionDomain: str, \ def webfingerLookup(path: str, baseDir: str,
port: int,debug: bool) -> {}: domain: str, onionDomain: str,
port: int, debug: bool) -> {}:
"""Lookup the webfinger endpoint for an account """Lookup the webfinger endpoint for an account
""" """
if not path.startswith('/.well-known/webfinger?'): if not path.startswith('/.well-known/webfinger?'):
return None return None
handle=None handle = None
if 'resource=acct:' in path: if 'resource=acct:' in path:
handle=path.split('resource=acct:')[1].strip() handle = path.split('resource=acct:')[1].strip()
if debug: if debug:
print('DEBUG: WEBFINGER handle '+handle) print('DEBUG: WEBFINGER handle ' + handle)
else: else:
if 'resource=acct%3A' in path: if 'resource=acct%3A' in path:
handle=path.split('resource=acct%3A')[1].replace('%40','@',1).replace('%3A',':',1).strip() handle = path.split('resource=acct%3A')[1]
handle = handle.replace('%40', '@', 1)
handle = handle.replace('%3A', ':', 1).strip()
if debug: if debug:
print('DEBUG: WEBFINGER handle '+handle) print('DEBUG: WEBFINGER handle ' + handle)
if not handle: if not handle:
if debug: if debug:
print('DEBUG: WEBFINGER handle missing') print('DEBUG: WEBFINGER handle missing')
return None return None
if '&' in handle: if '&' in handle:
handle=handle.split('&')[0].strip() handle = handle.split('&')[0].strip()
if debug: if debug:
print('DEBUG: WEBFINGER handle with & removed '+handle) print('DEBUG: WEBFINGER handle with & removed ' + handle)
if '@' not in handle: if '@' not in handle:
if debug: if debug:
print('DEBUG: WEBFINGER no @ in handle '+handle) print('DEBUG: WEBFINGER no @ in handle ' + handle)
return None return None
if port: if port:
if port!=80 and port !=443: if port != 80 and port != 443:
if ':' not in handle: if ':' not in handle:
handle=handle+':'+str(port) handle = handle + ':' + str(port)
# convert @domain@domain to inbox@domain # convert @domain@domain to inbox@domain
if '@' in handle: if '@' in handle:
handleDomain=handle.split('@')[1] handleDomain = handle.split('@')[1]
if handle.startswith(handleDomain+'@'): if handle.startswith(handleDomain + '@'):
handle='inbox@'+handleDomain handle = 'inbox@' + handleDomain
# if this is a lookup for a handle using its onion domain # if this is a lookup for a handle using its onion domain
# then swap the onion domain for the clearnet version # then swap the onion domain for the clearnet version
onionify=False onionify = False
if onionDomain: if onionDomain:
if onionDomain in handle: if onionDomain in handle:
handle=handle.replace(onionDomain,domain) handle = handle.replace(onionDomain, domain)
onionify=True onionify = True
filename=baseDir+'/wfendpoints/'+handle.lower()+'.json' filename = baseDir + '/wfendpoints/' + handle.lower() + '.json'
if debug: if debug:
print('DEBUG: WEBFINGER filename '+filename) print('DEBUG: WEBFINGER filename ' + filename)
if not os.path.isfile(filename): if not os.path.isfile(filename):
if debug: if debug:
print('DEBUG: WEBFINGER filename not found '+filename) print('DEBUG: WEBFINGER filename not found ' + filename)
return None return None
if not onionify: if not onionify:
wfJson=loadJson(filename) wfJson = loadJson(filename)
else: else:
print('Webfinger request for onionified '+handle) print('Webfinger request for onionified ' + handle)
wfJson=loadJsonOnionify(filename,domain,onionDomain) wfJson = loadJsonOnionify(filename, domain, onionDomain)
if not wfJson: if not wfJson:
wfJson={"nickname": "unknown"} wfJson = {"nickname": "unknown"}
return wfJson return wfJson