inbox queue

master
Bob Mottram 2019-07-04 13:23:53 +01:00
parent 0ddcbdcd96
commit 08134954bc
2 changed files with 100 additions and 64 deletions

View File

@ -23,8 +23,10 @@ from posts import outboxMessageCreateWrap
from posts import savePostToOutbox from posts import savePostToOutbox
from inbox import inboxPermittedMessage from inbox import inboxPermittedMessage
from inbox import inboxMessageHasParams from inbox import inboxMessageHasParams
from inbox import runInboxQueue
from follow import getFollowingFeed from follow import getFollowingFeed
from auth import authorize from auth import authorize
from threads import threadWithTrace
import os import os
import sys import sys
@ -367,6 +369,9 @@ class PubServer(BaseHTTPRequestHandler):
if not headers.get('keyId'): if not headers.get('keyId'):
if self.server.debug: if self.server.debug:
print('DEBUG: POST to inbox has no keyId in header') print('DEBUG: POST to inbox has no keyId in header')
self.send_response(403)
self.end_headers()
self.server.POSTbusy=False
return return
if self.server.debug: if self.server.debug:
@ -385,78 +390,26 @@ class PubServer(BaseHTTPRequestHandler):
headers['keyId'], \ headers['keyId'], \
self.postToNickname, \ self.postToNickname, \
self.server.domain, \ self.server.domain, \
messageJson) messageJson,
self.headers)
if cacheFilename: if cacheFilename:
if cacheFilename not in self.server.inboxQueue: if cacheFilename not in self.server.inboxQueue:
self.server.inboxQueue.append(cacheFilename) self.server.inboxQueue.append(cacheFilename)
self.send_response(201)
self.end_headers()
self.server.POSTbusy=False
return
self.send_response(403)
self.end_headers()
self.server.POSTbusy=False
return return
else: else:
print('DEBUG: POST to shared inbox') print('DEBUG: POST to shared inbox')
return self.send_response(201)
currSessionTime=int(time.time())
if currSessionTime-self.server.sessionLastUpdate>1200:
self.server.sessionLastUpdate=currSessionTime
self.server.session = \
createSession(self.server.domain,self.server.port, \
self.server.useTor)
if self.server.debug:
print('DEBUG: POST started new session')
if self.server.debug:
print('DEBUG: POST get actor url from '+self.server.baseDir)
personUrl=messageJson['actor']
if self.server.debug:
print('DEBUG: POST get public key of '+personUrl+' from '+self.server.baseDir)
pubKey=getPersonPubKey(self.server.session,personUrl, \
self.server.personCache)
if not pubKey:
if self.server.debug:
print('DEBUG: POST no sender public key')
self.send_response(401)
self.end_headers() self.end_headers()
self.server.POSTbusy=False self.server.POSTbusy=False
return return
if self.server.debug:
print('DEBUG: POST check signature')
if not verifyPostHeaders(self.server.httpPrefix, pubKey, self.headers, \
'/inbox' ,False, json.dumps(messageJson)):
print('**************** POST signature verification failed')
self.send_response(401)
self.end_headers()
self.server.POSTbusy=False
return
if self.server.debug:
print('DEBUG: POST valid')
if receiveFollowRequest(self.server.baseDir,messageJson, \
self.server.federationList):
self.send_response(200)
self.end_headers()
self.server.POSTbusy=False
return
pprint(messageJson)
# add a property to the object, just to mess with data
#message['received'] = 'ok'
# send the message back
#self._set_headers('application/json')
#self.wfile.write(json.dumps(message).encode('utf-8'))
self.server.receivedMessage=True
self.send_response(200)
self.end_headers()
self.server.POSTbusy=False
def runDaemon(domain: str,port=80,httpPrefix='https',fedList=[],useTor=False,debug=False) -> None: def runDaemon(domain: str,port=80,httpPrefix='https',fedList=[],useTor=False,debug=False) -> None:
if len(domain)==0: if len(domain)==0:
domain='localhost' domain='localhost'
@ -485,4 +438,6 @@ def runDaemon(domain: str,port=80,httpPrefix='https',fedList=[],useTor=False,deb
httpd.receivedMessage=False httpd.receivedMessage=False
httpd.inboxQueue=[] httpd.inboxQueue=[]
print('Running ActivityPub daemon on ' + domain + ' port ' + str(port)) print('Running ActivityPub daemon on ' + domain + ' port ' + str(port))
httpd.thrInboxQueue=threadWithTrace(target=runInboxQueue,args=(baseDir,httpPrefix,httpd.personCache,httpd.inboxQueue,domain,port,useTor,httpd.federationList,debug),daemon=True)
httpd.thrInboxQueue.start()
httpd.serve_forever() httpd.serve_forever()

View File

@ -9,8 +9,15 @@ __status__ = "Production"
import json import json
import os import os
import datetime import datetime
import time
import json
import commentjson
from utils import urlPermitted from utils import urlPermitted
from utils import createInboxQueueDir from utils import createInboxQueueDir
from posts import getPersonPubKey
from httpsig import verifyPostHeaders
from session import createSession
from follow import receiveFollowRequest
def inboxMessageHasParams(messageJson: {}) -> bool: def inboxMessageHasParams(messageJson: {}) -> bool:
"""Checks whether an incoming message contains expected parameters """Checks whether an incoming message contains expected parameters
@ -51,7 +58,7 @@ def validPublishedDate(published) -> bool:
return False return False
return True return True
def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str, domain: str,postJson: {}) -> str: def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str, domain: str,postJson: {},headers: {}) -> str:
"""Saves the give json to the inbox queue for the person """Saves the give json to the inbox queue for the person
keyId specifies the actor sending the post keyId specifies the actor sending the post
""" """
@ -78,6 +85,7 @@ def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str,
newBufferItem = { newBufferItem = {
'published': published, 'published': published,
'keyId': keyid, 'keyId': keyid,
'headers': headers,
'post': postJson, 'post': postJson,
'filename': filename, 'filename': filename,
'destination': destination 'destination': destination
@ -86,3 +94,76 @@ def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str,
with open(filename, 'w') as fp: with open(filename, 'w') as fp:
commentjson.dump(newQueueItem, fp, indent=4, sort_keys=False) commentjson.dump(newQueueItem, fp, indent=4, sort_keys=False)
return filename return filename
def runInboxQueue(baseDir: str,httpPrefix: str,personCache: {},queue: [],domain: str,port: int,useTor: bool,federationList: [],debug: bool) -> None:
"""Processes received items and moves them to
the appropriate directories
"""
currSessionTime=int(time.time())
sessionLastUpdate=currSessionTime
session=createSession(domain,port,useTor)
if debug:
print('DEBUG: Inbox queue running')
while True:
if len(queue)>0:
currSessionTime=int(time.time())
if currSessionTime-sessionLastUpdate>1200:
session=createSession(domain,port,useTor)
sessionLastUpdate=currSessionTime
# oldest item first
queue.sort()
queueFilename=queue[0]
if not os.path.isfile(queueFilename):
if debug:
print("DEBUG: queue item rejected becase it has no file: "+queueFilename)
queue.pop(0)
continue
# Load the queue json
with open(queueFilename, 'r') as fp:
queueJson=commentjson.load(fp)
# Try a few times to obtain teh public key
pubKey=None
for tries in range(5):
pubKey=getPersonPubKey(session,queueJson['keyId'],personCache)
if not pubKey:
if debug:
print('DEBUG: Retry '+str(tries+1)+' obtaining public key for '+queueJson['keyId'])
time.sleep(5)
if not pubKey:
if debug:
print('DEBUG: public key could not be obtained from '+queueJson['keyId'])
os.remove(queueFilename)
queue.pop(0)
continue
# check the signature
if not verifyPostHeaders(httpPrefix, \
pubKey, queueJson.headers, \
'/inbox', False, \
json.dumps(messageJson)):
if debug:
print('DEBUG: Header signature check failed')
os.remove(queueFilename)
queue.pop(0)
continue
if receiveFollowRequest(baseDir, \
queueJson.post, \
federationList):
if debug:
print('DEBUG: Follow accepted from '+queueJson['keyId'])
os.remove(queueFilename)
queue.pop(0)
continue
if debug:
print('DEBUG: Queue post accepted')
# move to the destination inbox
os.rename(queueFilename,queueJson['destination'])
queue.pop(0)
time.sleep(2)