From 08134954bc1592a9037dbd21dd8aa012e306cfa1 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Thu, 4 Jul 2019 13:23:53 +0100 Subject: [PATCH] inbox queue --- daemon.py | 81 ++++++++++++----------------------------------------- inbox.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 64 deletions(-) diff --git a/daemon.py b/daemon.py index 52a0cecd..e2cde88d 100644 --- a/daemon.py +++ b/daemon.py @@ -23,8 +23,10 @@ from posts import outboxMessageCreateWrap from posts import savePostToOutbox from inbox import inboxPermittedMessage from inbox import inboxMessageHasParams +from inbox import runInboxQueue from follow import getFollowingFeed from auth import authorize +from threads import threadWithTrace import os import sys @@ -367,6 +369,9 @@ class PubServer(BaseHTTPRequestHandler): if not headers.get('keyId'): if self.server.debug: print('DEBUG: POST to inbox has no keyId in header') + self.send_response(403) + self.end_headers() + self.server.POSTbusy=False return if self.server.debug: @@ -385,78 +390,26 @@ class PubServer(BaseHTTPRequestHandler): headers['keyId'], \ self.postToNickname, \ self.server.domain, \ - messageJson) + messageJson, + self.headers) if cacheFilename: if cacheFilename not in self.server.inboxQueue: self.server.inboxQueue.append(cacheFilename) - return + self.send_response(201) + self.end_headers() + self.server.POSTbusy=False + return + self.send_response(403) + self.end_headers() + self.server.POSTbusy=False + return else: print('DEBUG: POST to shared inbox') - return - - - - - currSessionTime=int(time.time()) - if currSessionTime-self.server.sessionLastUpdate>1200: - self.server.sessionLastUpdate=currSessionTime - self.server.session = \ - createSession(self.server.domain,self.server.port, \ - self.server.useTor) - if self.server.debug: - print('DEBUG: POST started new session') - - if self.server.debug: - print('DEBUG: POST get actor url from '+self.server.baseDir) - personUrl=messageJson['actor'] - - if self.server.debug: - print('DEBUG: POST get public key of '+personUrl+' from '+self.server.baseDir) - - pubKey=getPersonPubKey(self.server.session,personUrl, \ - self.server.personCache) - if not pubKey: - if self.server.debug: - print('DEBUG: POST no sender public key') - self.send_response(401) + self.send_response(201) self.end_headers() self.server.POSTbusy=False return - if self.server.debug: - print('DEBUG: POST check signature') - - if not verifyPostHeaders(self.server.httpPrefix, pubKey, self.headers, \ - '/inbox' ,False, json.dumps(messageJson)): - print('**************** POST signature verification failed') - self.send_response(401) - self.end_headers() - self.server.POSTbusy=False - return - - if self.server.debug: - print('DEBUG: POST valid') - - if receiveFollowRequest(self.server.baseDir,messageJson, \ - self.server.federationList): - self.send_response(200) - self.end_headers() - self.server.POSTbusy=False - return - - pprint(messageJson) - # add a property to the object, just to mess with data - #message['received'] = 'ok' - - # send the message back - #self._set_headers('application/json') - #self.wfile.write(json.dumps(message).encode('utf-8')) - - self.server.receivedMessage=True - self.send_response(200) - self.end_headers() - self.server.POSTbusy=False - def runDaemon(domain: str,port=80,httpPrefix='https',fedList=[],useTor=False,debug=False) -> None: if len(domain)==0: domain='localhost' @@ -485,4 +438,6 @@ def runDaemon(domain: str,port=80,httpPrefix='https',fedList=[],useTor=False,deb httpd.receivedMessage=False httpd.inboxQueue=[] print('Running ActivityPub daemon on ' + domain + ' port ' + str(port)) + httpd.thrInboxQueue=threadWithTrace(target=runInboxQueue,args=(baseDir,httpPrefix,httpd.personCache,httpd.inboxQueue,domain,port,useTor,httpd.federationList,debug),daemon=True) + httpd.thrInboxQueue.start() httpd.serve_forever() diff --git a/inbox.py b/inbox.py index 69a25789..8dd3bc85 100644 --- a/inbox.py +++ b/inbox.py @@ -9,8 +9,15 @@ __status__ = "Production" import json import os import datetime +import time +import json +import commentjson from utils import urlPermitted from utils import createInboxQueueDir +from posts import getPersonPubKey +from httpsig import verifyPostHeaders +from session import createSession +from follow import receiveFollowRequest def inboxMessageHasParams(messageJson: {}) -> bool: """Checks whether an incoming message contains expected parameters @@ -51,7 +58,7 @@ def validPublishedDate(published) -> bool: return False return True -def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str, domain: str,postJson: {}) -> str: +def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str, domain: str,postJson: {},headers: {}) -> str: """Saves the give json to the inbox queue for the person keyId specifies the actor sending the post """ @@ -78,6 +85,7 @@ def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str, newBufferItem = { 'published': published, 'keyId': keyid, + 'headers': headers, 'post': postJson, 'filename': filename, 'destination': destination @@ -86,3 +94,76 @@ def savePostToInboxQueue(baseDir: str,httpPrefix: str,keyId: str,nickname: str, with open(filename, 'w') as fp: commentjson.dump(newQueueItem, fp, indent=4, sort_keys=False) return filename + +def runInboxQueue(baseDir: str,httpPrefix: str,personCache: {},queue: [],domain: str,port: int,useTor: bool,federationList: [],debug: bool) -> None: + """Processes received items and moves them to + the appropriate directories + """ + currSessionTime=int(time.time()) + sessionLastUpdate=currSessionTime + session=createSession(domain,port,useTor) + if debug: + print('DEBUG: Inbox queue running') + + while True: + if len(queue)>0: + currSessionTime=int(time.time()) + if currSessionTime-sessionLastUpdate>1200: + session=createSession(domain,port,useTor) + sessionLastUpdate=currSessionTime + + # oldest item first + queue.sort() + queueFilename=queue[0] + if not os.path.isfile(queueFilename): + if debug: + print("DEBUG: queue item rejected becase it has no file: "+queueFilename) + queue.pop(0) + continue + + # Load the queue json + with open(queueFilename, 'r') as fp: + queueJson=commentjson.load(fp) + + # Try a few times to obtain teh public key + pubKey=None + for tries in range(5): + pubKey=getPersonPubKey(session,queueJson['keyId'],personCache) + if not pubKey: + if debug: + print('DEBUG: Retry '+str(tries+1)+' obtaining public key for '+queueJson['keyId']) + time.sleep(5) + if not pubKey: + if debug: + print('DEBUG: public key could not be obtained from '+queueJson['keyId']) + os.remove(queueFilename) + queue.pop(0) + continue + + # check the signature + if not verifyPostHeaders(httpPrefix, \ + pubKey, queueJson.headers, \ + '/inbox', False, \ + json.dumps(messageJson)): + if debug: + print('DEBUG: Header signature check failed') + os.remove(queueFilename) + queue.pop(0) + continue + + if receiveFollowRequest(baseDir, \ + queueJson.post, \ + federationList): + + if debug: + print('DEBUG: Follow accepted from '+queueJson['keyId']) + os.remove(queueFilename) + queue.pop(0) + continue + + if debug: + print('DEBUG: Queue post accepted') + # move to the destination inbox + os.rename(queueFilename,queueJson['destination']) + queue.pop(0) + time.sleep(2)