Parameter types

master
Bob Mottram 2019-07-01 12:09:09 +01:00
parent 888e4831f2
commit d160c060c9
10 changed files with 32 additions and 32 deletions

View File

@ -16,7 +16,7 @@ personCache = {}
# cached webfinger endpoints
cachedWebfingers = {}
def storePersonInCache(personUrl: str,personJson) -> None:
def storePersonInCache(personUrl: str,personJson: {}) -> None:
"""Store an actor in the cache
"""
currTime=datetime.datetime.utcnow()
@ -27,7 +27,7 @@ def storeWebfingerInCache(handle: str,wf) -> None:
"""
cachedWebfingers[handle]=wf
def getPersonFromCache(personUrl: str):
def getPersonFromCache(personUrl: str) -> {}:
"""Get an actor from the cache
"""
if personCache.get(personUrl):
@ -40,7 +40,7 @@ def getPersonFromCache(personUrl: str):
return personCache[personUrl]['actor']
return None
def getWebfingerFromCache(handle: str):
def getWebfingerFromCache(handle: str) -> {}:
"""Get webfinger endpoint from the cache
"""
if cachedWebfingers.get(handle):

View File

@ -87,7 +87,7 @@ testPostMessageBetweenServers()
#runDaemon(domain,port,https,federationList,useTor)
#testHttpsig()
#sys.exit()
sys.exit()
#pprint(person)
#print('\n')

View File

@ -12,7 +12,7 @@ import os
import sys
from person import validUsername
def followPerson(baseDir: str,username: str, domain: str, followUsername: str, followDomain: str, federationList, followFile='following.txt') -> bool:
def followPerson(baseDir: str,username: str, domain: str, followUsername: str, followDomain: str, federationList: [], followFile='following.txt') -> bool:
"""Adds a person to the follow list
"""
if followDomain.lower().replace('\n','') not in federationList:
@ -32,7 +32,7 @@ def followPerson(baseDir: str,username: str, domain: str, followUsername: str, f
followfile.write(handleToFollow+'\n')
return True
def followerOfPerson(baseDir: str,username: str, domain: str, followerUsername: str, followerDomain: str, federationList) -> bool:
def followerOfPerson(baseDir: str,username: str, domain: str, followerUsername: str, followerDomain: str, federationList: []) -> bool:
"""Adds a follower of the given person
"""
return followPerson(baseDir,username, domain, followerUsername, followerDomain, federationList, 'followers.txt')

View File

@ -15,7 +15,7 @@ from requests.auth import AuthBase
import base64
import json
def signPostHeaders(privateKeyPem: str, username: str, domain: str, port: int,path: str, https: bool, messageBodyJson) -> str:
def signPostHeaders(privateKeyPem: str, username: str, domain: str, port: int,path: str, https: bool, messageBodyJson: {}) -> str:
"""Returns a raw signature string that can be plugged into a header and
used to verify the authenticity of an HTTP transmission.
"""
@ -59,7 +59,7 @@ def signPostHeaders(privateKeyPem: str, username: str, domain: str, port: int,pa
[f'{k}="{v}"' for k, v in signatureDict.items()])
return signatureHeader
def createSignedHeader(privateKeyPem: str,username: str,domain: str,port: int,path: str,https: bool,withDigest: bool,messageBodyJson) -> {}:
def createSignedHeader(privateKeyPem: str,username: str,domain: str,port: int,path: str,https: bool,withDigest: bool,messageBodyJson: {}) -> {}:
headerDomain=domain
if port!=80 and port!=443:
@ -74,6 +74,7 @@ def createSignedHeader(privateKeyPem: str,username: str,domain: str,port: int,pa
path='/inbox'
signatureHeader = signPostHeaders(privateKeyPem, username, domain, port, path, https, None)
headers['signature'] = signatureHeader
headers['Content-type'] = 'application/json'
return headers
def verifyPostHeaders(https: bool, publicKeyPem: str, headers: dict, path: str, GETmethod: bool, messageBodyJsonStr: str) -> bool:

View File

@ -10,7 +10,7 @@ import json
import os
import datetime
def inboxPermittedMessage(messageJson,federationList) -> bool:
def inboxPermittedMessage(messageJson: {},federationList: []) -> bool:
""" check that we are receiving from a permitted domain
"""
testParam='actor'
@ -42,7 +42,7 @@ def inboxPermittedMessage(messageJson,federationList) -> bool:
return True
def receivePublicMessage(message) -> bool:
def receivePublicMessage(message: {}) -> bool:
print("TODO")
def validPublishedDate(published):
@ -53,7 +53,7 @@ def validPublishedDate(published):
return False
return True
def receiveMessage(message,baseDir: str):
def receiveMessage(message: {},baseDir: str):
if not message.get('type'):
return
if message['type']!='Create':

View File

@ -106,7 +106,7 @@ def createPerson(baseDir: str,username: str,domain: str,port: int,https: bool, s
return privateKeyPem,publicKeyPem,newPerson,webfingerEndpoint
def validUsername(username):
def validUsername(username: str) -> bool:
forbiddenChars=['.',' ','/','?',':',';','@']
for c in forbiddenChars:
if c in username:

View File

@ -44,7 +44,7 @@ def getPersonKey(username: str,domain: str,baseDir: str,keyType='public'):
return ''
return keyPem
def permitted(url: str,federationList) -> bool:
def permitted(url: str,federationList: []) -> bool:
"""Is a url from one of the permitted domains?
"""
for domain in federationList:
@ -64,7 +64,7 @@ def getUserUrl(wfRequest) -> str:
return link['href']
return None
def parseUserFeed(session,feedUrl,asHeader) -> None:
def parseUserFeed(session,feedUrl: str,asHeader: {}) -> None:
feedJson = getJson(session,feedUrl,asHeader,None)
pprint(feedJson)
@ -90,7 +90,6 @@ def getPersonBox(session,wfRequest,boxName='inbox') -> (str,str,str,str):
personJson = getPersonFromCache(personUrl)
if not personJson:
personJson = getJson(session,personUrl,asHeader,None)
pprint(personJson)
if not personJson.get(boxName):
return personPosts
personId=None
@ -108,7 +107,7 @@ def getPersonBox(session,wfRequest,boxName='inbox') -> (str,str,str,str):
return personJson[boxName],pubKeyId,pubKey,personId
def getUserPosts(session,wfRequest,maxPosts,maxMentions,maxEmoji,maxAttachments,federationList) -> {}:
def getUserPosts(session,wfRequest: {},maxPosts: int,maxMentions: int,maxEmoji: int,maxAttachments: int,federationList: []) -> {}:
userPosts={}
feedUrl,pubKeyId,pubKey,personId = getPersonBox(session,wfRequest,'outbox')
if not feedUrl:
@ -317,13 +316,13 @@ def createPublicPost(username: str, domain: str, https: bool, content: str, foll
prefix='http'
return createPostBase(username, domain, 'https://www.w3.org/ns/activitystreams#Public', prefix+'://'+domain+'/users/'+username+'/followers', https, content, followersOnly, saveToFile, inReplyTo, inReplyToAtomUri, subject)
def threadSendPost(session,postJsonObject,federationList,inboxUrl: str,baseDir: str,signatureHeader,postLog) -> None:
def threadSendPost(session,postJsonObject: {},federationList: [],inboxUrl: str,baseDir: str,signatureHeaderJson: {},postLog: []) -> None:
"""Sends a post with exponential backoff
"""
tries=0
backoffTime=60
for attempt in range(20):
postResult = postJson(session,postJsonObject,federationList,inboxUrl,signatureHeader)
postResult = postJson(session,postJsonObject,federationList,inboxUrl,signatureHeaderJson)
if postResult:
postLog.append(postJsonObject['published']+' '+postResult+'\n')
# keep the length of the log finite
@ -340,7 +339,7 @@ def threadSendPost(session,postJsonObject,federationList,inboxUrl: str,baseDir:
time.sleep(backoffTime)
backoffTime *= 2
def sendPost(session,baseDir,username: str, domain: str, port: int, toUsername: str, toDomain: str, toPort: int, cc: str, https: bool, content: str, followersOnly: bool, saveToFile: bool, federationList, sendThreads, postLog, inReplyTo=None, inReplyToAtomUri=None, subject=None) -> int:
def sendPost(session,baseDir: str,username: str, domain: str, port: int, toUsername: str, toDomain: str, toPort: int, cc: str, https: bool, content: str, followersOnly: bool, saveToFile: bool, federationList: [], sendThreads: [], postLog: [], inReplyTo=None, inReplyToAtomUri=None, subject=None) -> int:
"""Post to another inbox
"""
prefix='https'
@ -377,7 +376,6 @@ def sendPost(session,baseDir,username: str, domain: str, port: int, toUsername:
# construct the http header
signatureHeaderJson = createSignedHeader(privateKeyPem, username, domain, port, '/inbox', https, withDigest, postJsonObject)
signatureHeaderJson['Content-type'] = 'application/json'
# Keep the number of threads being used small
while len(sendThreads)>10:

View File

@ -19,7 +19,7 @@ def createSession(onionRoute: bool):
session.proxies['https'] = 'socks5h://localhost:9050'
return session
def getJson(session,url: str,headers,params):
def getJson(session,url: str,headers: {},params: {}) -> {}:
sessionParams={}
sessionHeaders={}
if headers:
@ -30,7 +30,7 @@ def getJson(session,url: str,headers,params):
session.cookies.clear()
return session.get(url, headers=sessionHeaders, params=sessionParams).json()
def postJson(session,postJsonObject,federationList,inboxUrl: str,headers) -> str:
def postJson(session,postJsonObject: {},federationList: [],inboxUrl: str,headers: {}) -> str:
"""Post a json message to the inbox of another person
"""
# check that we are posting to a permitted domain
@ -44,6 +44,3 @@ def postJson(session,postJsonObject,federationList,inboxUrl: str,headers) -> str
postResult = session.post(url = inboxUrl, data = json.dumps(postJsonObject), headers=headers)
return postResult.text
def getBaseDirectory():
baseDirectory = os.getcwd()

View File

@ -23,8 +23,12 @@ from person import createPerson
from posts import deleteAllPosts
from posts import createPublicPost
from posts import sendPost
from follow import clearFollows
from follow import clearFollowers
from follow import followPerson
from follow import followerOfPerson
from follow import unfollowPerson
from follow import unfollowerOfPerson
testServerAliceRunning = False
testServerBobRunning = False
@ -185,7 +189,7 @@ def testPostMessageBetweenServers():
sendResult = sendPost(sessionAlice,aliceDir,'alice', '127.0.0.1', alicePort, 'bob', '127.0.0.1', bobPort, '', https, 'Why is a mouse when it spins?', False, True, federationList, aliceSendThreads, alicePostLog, inReplyTo, inReplyToAtomUri, subject)
print('sendResult: '+str(sendResult))
time.sleep(5)
time.sleep(15)
# stop the servers
thrAlice.kill()

View File

@ -17,7 +17,7 @@ from session import getJson
from cache import storeWebfingerInCache
from cache import getWebfingerFromCache
def parseHandle(handle):
def parseHandle(handle: str) -> (str,str):
if '.' not in handle:
return None, None
if '/@' in handle:
@ -34,7 +34,7 @@ def parseHandle(handle):
return username, domain
def webfingerHandle(session,handle: str,https: bool):
def webfingerHandle(session,handle: str,https: bool) -> {}:
username, domain = parseHandle(handle)
if not username:
return None
@ -57,7 +57,7 @@ def webfingerHandle(session,handle: str,https: bool):
storeWebfingerInCache(username+'@'+domain, result)
return result
def generateMagicKey(publicKeyPem):
def generateMagicKey(publicKeyPem) -> str:
"""See magic_key method in
https://github.com/tootsuite/mastodon/blob/707ddf7808f90e3ab042d7642d368c2ce8e95e6f/app/models/account.rb
"""
@ -66,7 +66,7 @@ def generateMagicKey(publicKeyPem):
pubexp = base64.urlsafe_b64encode(number.long_to_bytes(privkey.e)).decode("utf-8")
return f"data:application/magic-public-key,RSA.{mod}.{pubexp}"
def storeWebfingerEndpoint(username: str,domain: str,baseDir: str,wfJson) -> bool:
def storeWebfingerEndpoint(username: str,domain: str,baseDir: str,wfJson: {}) -> bool:
"""Stores webfinger endpoint for a user to a file
"""
handle=username+'@'+domain
@ -78,7 +78,7 @@ def storeWebfingerEndpoint(username: str,domain: str,baseDir: str,wfJson) -> boo
commentjson.dump(wfJson, fp, indent=4, sort_keys=False)
return True
def createWebfingerEndpoint(username,domain,port,https,publicKeyPem) -> {}:
def createWebfingerEndpoint(username: str,domain: str,port: int,https: bool,publicKeyPem) -> {}:
"""Creates a webfinger endpoint for a user
"""
prefix='https'
@ -141,7 +141,7 @@ def webfingerMeta() -> str:
" </Link>" \
"</XRD>"
def webfingerLookup(path: str,baseDir: str):
def webfingerLookup(path: str,baseDir: str) -> {}:
"""Lookup the webfinger endpoint for an account
"""
if not path.startswith('/.well-known/webfinger?'):