Test for threads

master
Bob Mottram 2019-06-30 17:50:43 +01:00
parent 74629df5c0
commit aa87daa0a1
2 changed files with 17 additions and 0 deletions

View File

@ -21,6 +21,7 @@ from person import personKeyLookup
from person import personOutboxJson
from inbox import inboxPermittedMessage
from follow import getFollowingFeed
from threads import testThreads
import os
import sys
@ -220,6 +221,7 @@ def runDaemon(domain: str,port=80,fedList=[],useTor=False) -> None:
print('Running tests...')
testHttpsig()
testCache()
testThreads()
print('Tests succeeded\n')
serverAddress = ('', port)

View File

@ -9,6 +9,7 @@ __status__ = "Production"
import threading
import sys
import trace
import time
class threadWithTrace(threading.Thread):
def __init__(self, *args, **keywords):
@ -39,3 +40,17 @@ class threadWithTrace(threading.Thread):
def kill(self):
self.killed = True
def testThreadsFunction(param: str):
for i in range(10000):
time.sleep(2)
def testThreads():
print('testThreads')
thr = threadWithTrace(target=testThreadsFunction,args=('test',),daemon=True)
thr.start()
assert thr.isAlive()==True
time.sleep(1)
thr.kill()
thr.join()
assert thr.isAlive()==False