Add queue to send out posts

main2
Bob Mottram 2019-10-16 19:19:18 +01:00
parent 7c77f16ff7
commit d8c4661741
4 changed files with 116 additions and 64 deletions

View File

@ -60,6 +60,7 @@ from auth import createPassword
from auth import createBasicAuthHeader
from auth import authorizeBasic
from threads import threadWithTrace
from threads import removeDormantThreads
from media import getMediaPath
from media import createMediaDirs
from delete import outboxDelete
@ -4614,7 +4615,28 @@ class PubServer(BaseHTTPRequestHandler):
class PubServerUnitTest(PubServer):
protocol_version = 'HTTP/1.0'
def runPostsQueue(baseDir: str,sendThreads: [],debug: bool) -> None:
"""Manages the threads used to send posts
"""
while True:
time.sleep(1)
removeDormantThreads(baseDir,sendThreads,debug)
def runPostsWatchdog(httpd) -> None:
"""This tries to keep the posts thread running even if it dies
"""
print('Starting posts queue watchdog')
postsQueueOriginal=httpd.thrPostsQueue.clone(runPostsQueue)
httpd.thrPostsQueue.start()
while True:
time.sleep(20)
if not httpd.thrPostsQueue.isAlive():
httpd.thrPostsQueue.kill()
httpd.thrPostsQueue=postsQueueOriginal.clone(runPostsQueue)
httpd.thrPostsQueue.start()
print('Restarting posts queue...')
def runDaemon(projectVersion, \
instanceId,clientToServer: bool, \
baseDir: str,domain: str, \
@ -4626,7 +4648,7 @@ def runDaemon(projectVersion, \
useTor=False,maxReplies=64, \
domainMaxPostsPerDay=8640,accountMaxPostsPerDay=8640, \
allowDeletion=False,debug=False,unitTest=False, \
instanceOnlySkillsSearch=False) -> None:
instanceOnlySkillsSearch=False,sendThreads=[]) -> None:
if len(domain)==0:
domain='localhost'
if '.' not in domain:
@ -4701,7 +4723,7 @@ def runDaemon(projectVersion, \
httpd.POSTbusy=False
httpd.receivedMessage=False
httpd.inboxQueue=[]
httpd.sendThreads=[]
httpd.sendThreads=sendThreads
httpd.postLog=[]
httpd.maxQueueLength=16
httpd.ocapAlways=ocapAlways
@ -4754,6 +4776,18 @@ def runDaemon(projectVersion, \
httpd.maxPostsInBox),daemon=True)
httpd.thrCache.start()
print('Creating posts queue')
httpd.thrPostsQueue= \
threadWithTrace(target=runPostsQueue, \
args=(baseDir,httpd.sendThreads,debug),daemon=True)
if not unitTest:
httpd.thrPostsWatchdog= \
threadWithTrace(target=runPostsWatchdog, \
args=(httpd),daemon=True)
httpd.thrPostsWatchdog.start()
else:
httpd.thrPostsQueue.start()
print('Creating inbox queue')
httpd.thrInboxQueue= \
threadWithTrace(target=runInboxQueue, \

View File

@ -20,7 +20,6 @@ import time
from time import gmtime, strftime
from collections import OrderedDict
from threads import threadWithTrace
from threads import removeDormantThreads
from cache import storePersonInCache
from cache import getPersonFromCache
from cache import expirePersonCache
@ -1130,8 +1129,6 @@ def sendPost(projectVersion: str, \
toDomain,toPort, \
postPath,httpPrefix,withDigest,postJsonStr)
removeDormantThreads(sendThreads,debug)
# Keep the number of threads being used small
while len(sendThreads)>20:
print('WARN: Maximum threads reached - killing send thread')
@ -1410,15 +1407,14 @@ def sendSignedJson(postJsonObject: {},session,baseDir: str, \
createSignedHeader(privateKeyPem,nickname,domain,port, \
toDomain,toPort, \
postPath,httpPrefix,withDigest,postJsonStr)
removeDormantThreads(sendThreads,debug)
# Keep the number of threads being used small
while len(sendThreads)>20:
while len(sendThreads)>1000:
print('WARN: Maximum threads reached - killing send thread')
sendThreads[0].kill()
sendThreads.pop(0)
print('WARN: thread killed')
if debug:
print('DEBUG: starting thread to send post')
pprint(postJsonObject)
@ -1431,7 +1427,7 @@ def sendSignedJson(postJsonObject: {},session,baseDir: str, \
postLog,
debug),daemon=True)
sendThreads.append(thr)
thr.start()
#thr.start()
return 0
def addToField(activityType: str,postJsonObject: {},debug: bool) -> ({},bool):

View File

@ -174,7 +174,7 @@ def testThreads():
assert thr.isAlive()==False
def createServerAlice(path: str,domain: str,port: int,federationList: [], \
hasFollows: bool,hasPosts :bool,ocapAlways: bool):
hasFollows: bool,hasPosts :bool,ocapAlways: bool,sendThreads: []):
print('Creating test server: Alice on port '+str(port))
if os.path.isdir(path):
shutil.rmtree(path)
@ -224,10 +224,10 @@ def createServerAlice(path: str,domain: str,port: int,federationList: [], \
noreply,nolike,nopics,noannounce,cw,ocapAlways, \
useTor,maxReplies, \
domainMaxPostsPerDay,accountMaxPostsPerDay, \
allowDeletion,True,True,False)
allowDeletion,True,True,False,sendThreads)
def createServerBob(path: str,domain: str,port: int,federationList: [], \
hasFollows: bool,hasPosts :bool,ocapAlways :bool):
hasFollows: bool,hasPosts :bool,ocapAlways :bool,sendThreads: []):
print('Creating test server: Bob on port '+str(port))
if os.path.isdir(path):
shutil.rmtree(path)
@ -278,10 +278,10 @@ def createServerBob(path: str,domain: str,port: int,federationList: [], \
noreply,nolike,nopics,noannounce,cw,ocapAlways, \
useTor,maxReplies, \
domainMaxPostsPerDay,accountMaxPostsPerDay, \
allowDeletion,True,True,False)
allowDeletion,True,True,False,sendThreads)
def createServerEve(path: str,domain: str,port: int,federationList: [], \
hasFollows: bool,hasPosts :bool,ocapAlways :bool):
hasFollows: bool,hasPosts :bool,ocapAlways :bool,sendThreads: []):
print('Creating test server: Eve on port '+str(port))
if os.path.isdir(path):
shutil.rmtree(path)
@ -310,7 +310,7 @@ def createServerEve(path: str,domain: str,port: int,federationList: [], \
runDaemon(__version__,"instanceId",False,path,domain,port,port, \
httpPrefix,federationList,maxMentions,False, \
noreply,nolike,nopics,noannounce,cw,ocapAlways, \
useTor,maxReplies,allowDeletion,True,True,False)
useTor,maxReplies,allowDeletion,True,True,False,sendThreads)
def testPostMessageBetweenServers():
print('Testing sending message from one server to the inbox of another')
@ -338,17 +338,19 @@ def testPostMessageBetweenServers():
bobDomain='127.0.0.100'
bobPort=61936
federationList=[bobDomain,aliceDomain]
aliceSendThreads=[]
bobSendThreads=[]
thrAlice = \
threadWithTrace(target=createServerAlice, \
args=(aliceDir,aliceDomain,alicePort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,aliceSendThreads),daemon=True)
thrBob = \
threadWithTrace(target=createServerBob, \
args=(bobDir,bobDomain,bobPort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,bobSendThreads),daemon=True)
thrAlice.start()
thrBob.start()
@ -368,7 +370,6 @@ def testPostMessageBetweenServers():
inReplyTo=None
inReplyToAtomUri=None
subject=None
aliceSendThreads = []
alicePostLog = []
followersOnly=False
saveToFile=True
@ -412,7 +413,9 @@ def testPostMessageBetweenServers():
# inbox item created
assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))])==1
# queue item removed
assert len([name for name in os.listdir(queuePath) if os.path.isfile(os.path.join(queuePath, name))])==0
testval=len([name for name in os.listdir(queuePath) if os.path.isfile(os.path.join(queuePath, name))])
print('queuePath: '+queuePath+' '+str(testval))
assert testval==0
assert validInbox(bobDir,'bob',bobDomain)
assert validInboxFilenames(bobDir,'bob',bobDomain,aliceDomain,alicePort)
@ -425,7 +428,6 @@ def testPostMessageBetweenServers():
bobDomain+':'+str(bobPort),federationList,False)
sessionBob = createSession(bobDomain,bobPort,useTor)
bobSendThreads = []
bobPostLog = []
bobPersonCache={}
bobCachedWebfingers={}
@ -522,29 +524,32 @@ def testFollowBetweenServersWithCapabilities():
aliceDir=baseDir+'/.tests/alice'
aliceDomain='127.0.0.42'
alicePort=61935
aliceSendThreads=[]
thrAlice = \
threadWithTrace(target=createServerAlice, \
args=(aliceDir,aliceDomain,alicePort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,aliceSendThreads),daemon=True)
bobDir=baseDir+'/.tests/bob'
bobDomain='127.0.0.64'
bobPort=61936
bobSendThreads=[]
thrBob = \
threadWithTrace(target=createServerBob, \
args=(bobDir,bobDomain,bobPort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,bobSendThreads),daemon=True)
eveDir=baseDir+'/.tests/eve'
eveDomain='127.0.0.55'
evePort=61937
eveSendThreads=[]
thrEve = \
threadWithTrace(target=createServerEve, \
args=(eveDir,eveDomain,evePort, \
federationList,False,False, \
False),daemon=True)
False,eveSendThreads),daemon=True)
thrAlice.start()
thrBob.start()
@ -576,7 +581,6 @@ def testFollowBetweenServersWithCapabilities():
inReplyTo=None
inReplyToAtomUri=None
subject=None
aliceSendThreads = []
alicePostLog = []
followersOnly=False
saveToFile=True
@ -584,7 +588,6 @@ def testFollowBetweenServersWithCapabilities():
ccUrl=None
alicePersonCache={}
aliceCachedWebfingers={}
aliceSendThreads=[]
alicePostLog=[]
sendResult = \
sendFollowRequest(sessionAlice,aliceDir, \
@ -619,11 +622,9 @@ def testFollowBetweenServersWithCapabilities():
print('\n\n*********************************************************')
print('Eve tries to send to Bob')
sessionEve = createSession(eveDomain,evePort,useTor)
eveSendThreads = []
evePostLog = []
evePersonCache={}
eveCachedWebfingers={}
eveSendThreads=[]
evePostLog=[]
useBlurhash=False
sendResult = \
@ -654,11 +655,9 @@ def testFollowBetweenServersWithCapabilities():
print('\n\n*********************************************************')
print('Alice sends a message to Bob')
aliceSendThreads = []
alicePostLog = []
alicePersonCache={}
aliceCachedWebfingers={}
aliceSendThreads=[]
alicePostLog=[]
useBlurhash=False
sendResult = \
@ -694,7 +693,6 @@ def testFollowBetweenServersWithCapabilities():
aliceDir+'/accounts/alice@'+aliceDomain+'/ocap/granted/'+ \
httpPrefix+':##'+bobDomain+':'+str(bobPort)+'#users#bob.json'
sessionBob = createSession(bobDomain,bobPort,useTor)
bobSendThreads = []
bobPostLog = []
bobPersonCache={}
bobCachedWebfingers={}
@ -804,20 +802,22 @@ def testFollowBetweenServers():
aliceDir=baseDir+'/.tests/alice'
aliceDomain='127.0.0.42'
alicePort=61935
aliceSendThreads=[]
thrAlice = \
threadWithTrace(target=createServerAlice, \
args=(aliceDir,aliceDomain,alicePort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,aliceSendThreads),daemon=True)
bobDir=baseDir+'/.tests/bob'
bobDomain='127.0.0.64'
bobPort=61936
bobSendThreads=[]
thrBob = \
threadWithTrace(target=createServerBob, \
args=(bobDir,bobDomain,bobPort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,bobSendThreads),daemon=True)
thrAlice.start()
thrBob.start()
@ -845,7 +845,6 @@ def testFollowBetweenServers():
inReplyTo=None
inReplyToAtomUri=None
subject=None
aliceSendThreads = []
alicePostLog = []
followersOnly=False
saveToFile=True
@ -853,7 +852,6 @@ def testFollowBetweenServers():
ccUrl=None
alicePersonCache={}
aliceCachedWebfingers={}
aliceSendThreads=[]
alicePostLog=[]
sendResult = \
sendFollowRequest(sessionAlice,aliceDir, \
@ -876,11 +874,9 @@ def testFollowBetweenServers():
print('\n\n*********************************************************')
print('Alice sends a message to Bob')
aliceSendThreads = []
alicePostLog = []
alicePersonCache={}
aliceCachedWebfingers={}
aliceSendThreads=[]
alicePostLog=[]
useBlurhash=False
sendResult = \
@ -1253,20 +1249,22 @@ def testClientToServer():
aliceDir=baseDir+'/.tests/alice'
aliceDomain='127.0.0.42'
alicePort=61935
aliceSendThreads=[]
thrAlice = \
threadWithTrace(target=createServerAlice, \
args=(aliceDir,aliceDomain,alicePort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,aliceSendThreads),daemon=True)
bobDir=baseDir+'/.tests/bob'
bobDomain='127.0.0.64'
bobPort=61936
bobSendThreads=[]
thrBob = \
threadWithTrace(target=createServerBob, \
args=(bobDir,bobDomain,bobPort, \
federationList,False,False, \
ocapAlways),daemon=True)
ocapAlways,bobSendThreads),daemon=True)
thrAlice.start()
thrBob.start()

View File

@ -15,6 +15,7 @@ import datetime
class threadWithTrace(threading.Thread):
def __init__(self, *args, **keywords):
self.startTime=datetime.datetime.utcnow()
self.isStarted=False
tries=0
while tries<3:
try:
@ -39,19 +40,24 @@ class threadWithTrace(threading.Thread):
print('ERROR: threads.py/start failed - '+str(e))
time.sleep(1)
tries+=1
# note that this is set True even if all tries failed
self.isStarted=True
def __run(self):
tries=0
while tries<3:
try:
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
break
except Exception as e:
print('ERROR: threads.py/__run failed - '+str(e))
time.sleep(1)
tries+=1
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
#tries=0
#while tries<3:
# try:
# sys.settrace(self.globaltrace)
# self.__run_backup()
# self.run = self.__run_backup
# break
# except Exception as e:
# print('ERROR: threads.py/__run failed - '+str(e))
# time.sleep(1)
# tries+=1
def globaltrace(self, frame, event, arg):
if event == 'call':
@ -73,7 +79,7 @@ class threadWithTrace(threading.Thread):
args=self._args, \
daemon=True)
def removeDormantThreads(threadsList: [],debug: bool) -> None:
def removeDormantThreads(baseDir: str,threadsList: [],debug: bool) -> None:
"""Removes threads whose execution has completed
"""
if len(threadsList)==0:
@ -81,24 +87,22 @@ def removeDormantThreads(threadsList: [],debug: bool) -> None:
dormantThreads=[]
currTime=datetime.datetime.utcnow()
changed=False
# which threads are dormant?
noOfActiveThreads=0
for th in threadsList:
removeThread=False
if not th.is_alive():
if debug:
print('DEBUG: thread is not alive')
removeThread=True
elif not th.startTime:
if debug:
print('DEBUG: thread has no start time')
removeThread=True
elif (currTime-th.startTime).total_seconds()>200:
if debug:
print('DEBUG: thread is too old')
removeThread=True
if th.isStarted:
if not th.is_alive():
if debug:
print('DEBUG: thread is not alive')
removeThread=True
elif (currTime-th.startTime).total_seconds()>60:
if debug:
print('DEBUG: thread timed out')
removeThread=True
if removeThread:
dormantThreads.append(th)
@ -115,3 +119,23 @@ def removeDormantThreads(threadsList: [],debug: bool) -> None:
dormantCtr+=1
threadsList.remove(th)
th.kill()
changed=True
# start scheduled threads
if len(threadsList)<10:
ctr=0
for th in threadsList:
if not th.isStarted:
print('Starting new send thread '+str(ctr))
th.start()
changed=True
break
ctr+=1
if not changed:
return
if debug:
sendLogFilename=baseDir+'/send.csv'
with open(sendLogFilename, "a+") as logFile:
logFile.write(currTime.strftime("%Y-%m-%dT%H:%M:%SZ")+','+str(noOfActiveThreads)+','+str(len(threadsList))+'\n')