diff --git a/daemon.py b/daemon.py index 2da9ec8e0..b99d17507 100644 --- a/daemon.py +++ b/daemon.py @@ -60,6 +60,7 @@ from auth import createPassword from auth import createBasicAuthHeader from auth import authorizeBasic from threads import threadWithTrace +from threads import removeDormantThreads from media import getMediaPath from media import createMediaDirs from delete import outboxDelete @@ -4614,7 +4615,28 @@ class PubServer(BaseHTTPRequestHandler): class PubServerUnitTest(PubServer): protocol_version = 'HTTP/1.0' - + +def runPostsQueue(baseDir: str,sendThreads: [],debug: bool) -> None: + """Manages the threads used to send posts + """ + while True: + time.sleep(1) + removeDormantThreads(baseDir,sendThreads,debug) + +def runPostsWatchdog(httpd) -> None: + """This tries to keep the posts thread running even if it dies + """ + print('Starting posts queue watchdog') + postsQueueOriginal=httpd.thrPostsQueue.clone(runPostsQueue) + httpd.thrPostsQueue.start() + while True: + time.sleep(20) + if not httpd.thrPostsQueue.isAlive(): + httpd.thrPostsQueue.kill() + httpd.thrPostsQueue=postsQueueOriginal.clone(runPostsQueue) + httpd.thrPostsQueue.start() + print('Restarting posts queue...') + def runDaemon(projectVersion, \ instanceId,clientToServer: bool, \ baseDir: str,domain: str, \ @@ -4626,7 +4648,7 @@ def runDaemon(projectVersion, \ useTor=False,maxReplies=64, \ domainMaxPostsPerDay=8640,accountMaxPostsPerDay=8640, \ allowDeletion=False,debug=False,unitTest=False, \ - instanceOnlySkillsSearch=False) -> None: + instanceOnlySkillsSearch=False,sendThreads=[]) -> None: if len(domain)==0: domain='localhost' if '.' not in domain: @@ -4701,7 +4723,7 @@ def runDaemon(projectVersion, \ httpd.POSTbusy=False httpd.receivedMessage=False httpd.inboxQueue=[] - httpd.sendThreads=[] + httpd.sendThreads=sendThreads httpd.postLog=[] httpd.maxQueueLength=16 httpd.ocapAlways=ocapAlways @@ -4754,6 +4776,18 @@ def runDaemon(projectVersion, \ httpd.maxPostsInBox),daemon=True) httpd.thrCache.start() + print('Creating posts queue') + httpd.thrPostsQueue= \ + threadWithTrace(target=runPostsQueue, \ + args=(baseDir,httpd.sendThreads,debug),daemon=True) + if not unitTest: + httpd.thrPostsWatchdog= \ + threadWithTrace(target=runPostsWatchdog, \ + args=(httpd),daemon=True) + httpd.thrPostsWatchdog.start() + else: + httpd.thrPostsQueue.start() + print('Creating inbox queue') httpd.thrInboxQueue= \ threadWithTrace(target=runInboxQueue, \ diff --git a/posts.py b/posts.py index 785ad95a2..93f6d302f 100644 --- a/posts.py +++ b/posts.py @@ -20,7 +20,6 @@ import time from time import gmtime, strftime from collections import OrderedDict from threads import threadWithTrace -from threads import removeDormantThreads from cache import storePersonInCache from cache import getPersonFromCache from cache import expirePersonCache @@ -1130,8 +1129,6 @@ def sendPost(projectVersion: str, \ toDomain,toPort, \ postPath,httpPrefix,withDigest,postJsonStr) - removeDormantThreads(sendThreads,debug) - # Keep the number of threads being used small while len(sendThreads)>20: print('WARN: Maximum threads reached - killing send thread') @@ -1410,15 +1407,14 @@ def sendSignedJson(postJsonObject: {},session,baseDir: str, \ createSignedHeader(privateKeyPem,nickname,domain,port, \ toDomain,toPort, \ postPath,httpPrefix,withDigest,postJsonStr) - - removeDormantThreads(sendThreads,debug) # Keep the number of threads being used small - while len(sendThreads)>20: + while len(sendThreads)>1000: print('WARN: Maximum threads reached - killing send thread') sendThreads[0].kill() sendThreads.pop(0) print('WARN: thread killed') + if debug: print('DEBUG: starting thread to send post') pprint(postJsonObject) @@ -1431,7 +1427,7 @@ def sendSignedJson(postJsonObject: {},session,baseDir: str, \ postLog, debug),daemon=True) sendThreads.append(thr) - thr.start() + #thr.start() return 0 def addToField(activityType: str,postJsonObject: {},debug: bool) -> ({},bool): diff --git a/tests.py b/tests.py index c13780457..33aea7ad4 100644 --- a/tests.py +++ b/tests.py @@ -174,7 +174,7 @@ def testThreads(): assert thr.isAlive()==False def createServerAlice(path: str,domain: str,port: int,federationList: [], \ - hasFollows: bool,hasPosts :bool,ocapAlways: bool): + hasFollows: bool,hasPosts :bool,ocapAlways: bool,sendThreads: []): print('Creating test server: Alice on port '+str(port)) if os.path.isdir(path): shutil.rmtree(path) @@ -224,10 +224,10 @@ def createServerAlice(path: str,domain: str,port: int,federationList: [], \ noreply,nolike,nopics,noannounce,cw,ocapAlways, \ useTor,maxReplies, \ domainMaxPostsPerDay,accountMaxPostsPerDay, \ - allowDeletion,True,True,False) + allowDeletion,True,True,False,sendThreads) def createServerBob(path: str,domain: str,port: int,federationList: [], \ - hasFollows: bool,hasPosts :bool,ocapAlways :bool): + hasFollows: bool,hasPosts :bool,ocapAlways :bool,sendThreads: []): print('Creating test server: Bob on port '+str(port)) if os.path.isdir(path): shutil.rmtree(path) @@ -278,10 +278,10 @@ def createServerBob(path: str,domain: str,port: int,federationList: [], \ noreply,nolike,nopics,noannounce,cw,ocapAlways, \ useTor,maxReplies, \ domainMaxPostsPerDay,accountMaxPostsPerDay, \ - allowDeletion,True,True,False) + allowDeletion,True,True,False,sendThreads) def createServerEve(path: str,domain: str,port: int,federationList: [], \ - hasFollows: bool,hasPosts :bool,ocapAlways :bool): + hasFollows: bool,hasPosts :bool,ocapAlways :bool,sendThreads: []): print('Creating test server: Eve on port '+str(port)) if os.path.isdir(path): shutil.rmtree(path) @@ -310,7 +310,7 @@ def createServerEve(path: str,domain: str,port: int,federationList: [], \ runDaemon(__version__,"instanceId",False,path,domain,port,port, \ httpPrefix,federationList,maxMentions,False, \ noreply,nolike,nopics,noannounce,cw,ocapAlways, \ - useTor,maxReplies,allowDeletion,True,True,False) + useTor,maxReplies,allowDeletion,True,True,False,sendThreads) def testPostMessageBetweenServers(): print('Testing sending message from one server to the inbox of another') @@ -338,17 +338,19 @@ def testPostMessageBetweenServers(): bobDomain='127.0.0.100' bobPort=61936 federationList=[bobDomain,aliceDomain] + aliceSendThreads=[] + bobSendThreads=[] thrAlice = \ threadWithTrace(target=createServerAlice, \ args=(aliceDir,aliceDomain,alicePort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,aliceSendThreads),daemon=True) thrBob = \ threadWithTrace(target=createServerBob, \ args=(bobDir,bobDomain,bobPort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,bobSendThreads),daemon=True) thrAlice.start() thrBob.start() @@ -368,7 +370,6 @@ def testPostMessageBetweenServers(): inReplyTo=None inReplyToAtomUri=None subject=None - aliceSendThreads = [] alicePostLog = [] followersOnly=False saveToFile=True @@ -412,7 +413,9 @@ def testPostMessageBetweenServers(): # inbox item created assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))])==1 # queue item removed - assert len([name for name in os.listdir(queuePath) if os.path.isfile(os.path.join(queuePath, name))])==0 + testval=len([name for name in os.listdir(queuePath) if os.path.isfile(os.path.join(queuePath, name))]) + print('queuePath: '+queuePath+' '+str(testval)) + assert testval==0 assert validInbox(bobDir,'bob',bobDomain) assert validInboxFilenames(bobDir,'bob',bobDomain,aliceDomain,alicePort) @@ -425,7 +428,6 @@ def testPostMessageBetweenServers(): bobDomain+':'+str(bobPort),federationList,False) sessionBob = createSession(bobDomain,bobPort,useTor) - bobSendThreads = [] bobPostLog = [] bobPersonCache={} bobCachedWebfingers={} @@ -522,29 +524,32 @@ def testFollowBetweenServersWithCapabilities(): aliceDir=baseDir+'/.tests/alice' aliceDomain='127.0.0.42' alicePort=61935 + aliceSendThreads=[] thrAlice = \ threadWithTrace(target=createServerAlice, \ args=(aliceDir,aliceDomain,alicePort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,aliceSendThreads),daemon=True) bobDir=baseDir+'/.tests/bob' bobDomain='127.0.0.64' bobPort=61936 + bobSendThreads=[] thrBob = \ threadWithTrace(target=createServerBob, \ args=(bobDir,bobDomain,bobPort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,bobSendThreads),daemon=True) eveDir=baseDir+'/.tests/eve' eveDomain='127.0.0.55' evePort=61937 + eveSendThreads=[] thrEve = \ threadWithTrace(target=createServerEve, \ args=(eveDir,eveDomain,evePort, \ federationList,False,False, \ - False),daemon=True) + False,eveSendThreads),daemon=True) thrAlice.start() thrBob.start() @@ -576,7 +581,6 @@ def testFollowBetweenServersWithCapabilities(): inReplyTo=None inReplyToAtomUri=None subject=None - aliceSendThreads = [] alicePostLog = [] followersOnly=False saveToFile=True @@ -584,7 +588,6 @@ def testFollowBetweenServersWithCapabilities(): ccUrl=None alicePersonCache={} aliceCachedWebfingers={} - aliceSendThreads=[] alicePostLog=[] sendResult = \ sendFollowRequest(sessionAlice,aliceDir, \ @@ -619,11 +622,9 @@ def testFollowBetweenServersWithCapabilities(): print('\n\n*********************************************************') print('Eve tries to send to Bob') sessionEve = createSession(eveDomain,evePort,useTor) - eveSendThreads = [] evePostLog = [] evePersonCache={} eveCachedWebfingers={} - eveSendThreads=[] evePostLog=[] useBlurhash=False sendResult = \ @@ -654,11 +655,9 @@ def testFollowBetweenServersWithCapabilities(): print('\n\n*********************************************************') print('Alice sends a message to Bob') - aliceSendThreads = [] alicePostLog = [] alicePersonCache={} aliceCachedWebfingers={} - aliceSendThreads=[] alicePostLog=[] useBlurhash=False sendResult = \ @@ -694,7 +693,6 @@ def testFollowBetweenServersWithCapabilities(): aliceDir+'/accounts/alice@'+aliceDomain+'/ocap/granted/'+ \ httpPrefix+':##'+bobDomain+':'+str(bobPort)+'#users#bob.json' sessionBob = createSession(bobDomain,bobPort,useTor) - bobSendThreads = [] bobPostLog = [] bobPersonCache={} bobCachedWebfingers={} @@ -804,20 +802,22 @@ def testFollowBetweenServers(): aliceDir=baseDir+'/.tests/alice' aliceDomain='127.0.0.42' alicePort=61935 + aliceSendThreads=[] thrAlice = \ threadWithTrace(target=createServerAlice, \ args=(aliceDir,aliceDomain,alicePort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,aliceSendThreads),daemon=True) bobDir=baseDir+'/.tests/bob' bobDomain='127.0.0.64' bobPort=61936 + bobSendThreads=[] thrBob = \ threadWithTrace(target=createServerBob, \ args=(bobDir,bobDomain,bobPort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,bobSendThreads),daemon=True) thrAlice.start() thrBob.start() @@ -845,7 +845,6 @@ def testFollowBetweenServers(): inReplyTo=None inReplyToAtomUri=None subject=None - aliceSendThreads = [] alicePostLog = [] followersOnly=False saveToFile=True @@ -853,7 +852,6 @@ def testFollowBetweenServers(): ccUrl=None alicePersonCache={} aliceCachedWebfingers={} - aliceSendThreads=[] alicePostLog=[] sendResult = \ sendFollowRequest(sessionAlice,aliceDir, \ @@ -876,11 +874,9 @@ def testFollowBetweenServers(): print('\n\n*********************************************************') print('Alice sends a message to Bob') - aliceSendThreads = [] alicePostLog = [] alicePersonCache={} aliceCachedWebfingers={} - aliceSendThreads=[] alicePostLog=[] useBlurhash=False sendResult = \ @@ -1253,20 +1249,22 @@ def testClientToServer(): aliceDir=baseDir+'/.tests/alice' aliceDomain='127.0.0.42' alicePort=61935 + aliceSendThreads=[] thrAlice = \ threadWithTrace(target=createServerAlice, \ args=(aliceDir,aliceDomain,alicePort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,aliceSendThreads),daemon=True) bobDir=baseDir+'/.tests/bob' bobDomain='127.0.0.64' bobPort=61936 + bobSendThreads=[] thrBob = \ threadWithTrace(target=createServerBob, \ args=(bobDir,bobDomain,bobPort, \ federationList,False,False, \ - ocapAlways),daemon=True) + ocapAlways,bobSendThreads),daemon=True) thrAlice.start() thrBob.start() diff --git a/threads.py b/threads.py index 1d1eb5ba0..9db10231d 100644 --- a/threads.py +++ b/threads.py @@ -15,6 +15,7 @@ import datetime class threadWithTrace(threading.Thread): def __init__(self, *args, **keywords): self.startTime=datetime.datetime.utcnow() + self.isStarted=False tries=0 while tries<3: try: @@ -39,19 +40,24 @@ class threadWithTrace(threading.Thread): print('ERROR: threads.py/start failed - '+str(e)) time.sleep(1) tries+=1 - + # note that this is set True even if all tries failed + self.isStarted=True + def __run(self): - tries=0 - while tries<3: - try: - sys.settrace(self.globaltrace) - self.__run_backup() - self.run = self.__run_backup - break - except Exception as e: - print('ERROR: threads.py/__run failed - '+str(e)) - time.sleep(1) - tries+=1 + sys.settrace(self.globaltrace) + self.__run_backup() + self.run = self.__run_backup + #tries=0 + #while tries<3: + # try: + # sys.settrace(self.globaltrace) + # self.__run_backup() + # self.run = self.__run_backup + # break + # except Exception as e: + # print('ERROR: threads.py/__run failed - '+str(e)) + # time.sleep(1) + # tries+=1 def globaltrace(self, frame, event, arg): if event == 'call': @@ -73,7 +79,7 @@ class threadWithTrace(threading.Thread): args=self._args, \ daemon=True) -def removeDormantThreads(threadsList: [],debug: bool) -> None: +def removeDormantThreads(baseDir: str,threadsList: [],debug: bool) -> None: """Removes threads whose execution has completed """ if len(threadsList)==0: @@ -81,24 +87,22 @@ def removeDormantThreads(threadsList: [],debug: bool) -> None: dormantThreads=[] currTime=datetime.datetime.utcnow() + changed=False # which threads are dormant? noOfActiveThreads=0 for th in threadsList: removeThread=False - if not th.is_alive(): - if debug: - print('DEBUG: thread is not alive') - removeThread=True - elif not th.startTime: - if debug: - print('DEBUG: thread has no start time') - removeThread=True - elif (currTime-th.startTime).total_seconds()>200: - if debug: - print('DEBUG: thread is too old') - removeThread=True + if th.isStarted: + if not th.is_alive(): + if debug: + print('DEBUG: thread is not alive') + removeThread=True + elif (currTime-th.startTime).total_seconds()>60: + if debug: + print('DEBUG: thread timed out') + removeThread=True if removeThread: dormantThreads.append(th) @@ -115,3 +119,23 @@ def removeDormantThreads(threadsList: [],debug: bool) -> None: dormantCtr+=1 threadsList.remove(th) th.kill() + changed=True + + # start scheduled threads + if len(threadsList)<10: + ctr=0 + for th in threadsList: + if not th.isStarted: + print('Starting new send thread '+str(ctr)) + th.start() + changed=True + break + ctr+=1 + + if not changed: + return + + if debug: + sendLogFilename=baseDir+'/send.csv' + with open(sendLogFilename, "a+") as logFile: + logFile.write(currTime.strftime("%Y-%m-%dT%H:%M:%SZ")+','+str(noOfActiveThreads)+','+str(len(threadsList))+'\n')