Restart the inbox queue when full

main
Bob Mottram 2020-04-27 10:41:38 +01:00
parent 2af1b5cb3b
commit c79efe6d6f
2 changed files with 18 additions and 1 deletions

View File

@ -790,6 +790,12 @@ class PubServer(BaseHTTPRequestHandler):
messageBytes: str) -> int: messageBytes: str) -> int:
"""Update the inbox queue """Update the inbox queue
""" """
if self.server.restartInboxQueueInProgress:
self._503()
print('Message arrrived but currently restarting inbox queue')
self.server.POSTbusy = False
return 2
# check for blocked domains so that they can be rejected early # check for blocked domains so that they can be rejected early
messageDomain = None messageDomain = None
if messageJson.get('actor'): if messageJson.get('actor'):
@ -810,6 +816,8 @@ class PubServer(BaseHTTPRequestHandler):
print('Queue: Inbox queue is full') print('Queue: Inbox queue is full')
self._503() self._503()
self.server.POSTbusy = False self.server.POSTbusy = False
if not restartInboxQueueInProgress:
self.server.restartInboxQueue=True
return 2 return 2
# Convert the headers needed for signature verification to dict # Convert the headers needed for signature verification to dict
@ -7258,6 +7266,11 @@ def runDaemon(blogsInstance: bool, mediaInstance: bool,
httpd.thrPostSchedule = \ httpd.thrPostSchedule = \
threadWithTrace(target=runPostSchedule, threadWithTrace(target=runPostSchedule,
args=(baseDir, httpd, 20), daemon=True) args=(baseDir, httpd, 20), daemon=True)
# flags used when restarting the inbox queue
httpd.restartInboxQueueInProgress=False
httpd.restartInboxQueue=False
if not unitTest: if not unitTest:
print('Creating inbox queue watchdog') print('Creating inbox queue watchdog')
httpd.thrWatchdog = \ httpd.thrWatchdog = \

View File

@ -2185,11 +2185,15 @@ def runInboxQueueWatchdog(projectVersion: str, httpd) -> None:
httpd.thrInboxQueue.start() httpd.thrInboxQueue.start()
while True: while True:
time.sleep(20) time.sleep(20)
if not httpd.thrInboxQueue.isAlive(): if not httpd.thrInboxQueue.isAlive() or httpd.restartInboxQueue:
httpd.restartInboxQueueInProgress=True
httpd.thrInboxQueue.kill() httpd.thrInboxQueue.kill()
httpd.thrInboxQueue = inboxQueueOriginal.clone(runInboxQueue) httpd.thrInboxQueue = inboxQueueOriginal.clone(runInboxQueue)
httpd.inboxQueue.clear()
httpd.thrInboxQueue.start() httpd.thrInboxQueue.start()
print('Restarting inbox queue...') print('Restarting inbox queue...')
httpd.restartInboxQueueInProgress=False
httpd.restartInboxQueue=False
def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int, def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int,