mirror of https://gitlab.com/bashrc2/epicyon
Snake case
parent
d180ce40cb
commit
651c0d2069
94
threads.py
94
threads.py
|
@ -15,8 +15,8 @@ import datetime
|
|||
|
||||
class thread_with_trace(threading.Thread):
|
||||
def __init__(self, *args, **keywords):
|
||||
self.startTime = datetime.datetime.utcnow()
|
||||
self.isStarted = False
|
||||
self.start_time = datetime.datetime.utcnow()
|
||||
self.is_started = False
|
||||
tries = 0
|
||||
while tries < 3:
|
||||
try:
|
||||
|
@ -42,7 +42,7 @@ class thread_with_trace(threading.Thread):
|
|||
time.sleep(1)
|
||||
tries += 1
|
||||
# note that this is set True even if all tries failed
|
||||
self.isStarted = True
|
||||
self.is_started = True
|
||||
|
||||
def __run(self):
|
||||
sys.settrace(self.globaltrace)
|
||||
|
@ -51,90 +51,96 @@ class thread_with_trace(threading.Thread):
|
|||
self.run = self.__run_backup
|
||||
except Exception as ex:
|
||||
print('ERROR: threads.py/__run failed - ' + str(ex))
|
||||
pass
|
||||
|
||||
def globaltrace(self, frame, event, arg):
|
||||
"""Trace the thread
|
||||
"""
|
||||
if event == 'call':
|
||||
return self.localtrace
|
||||
else:
|
||||
return None
|
||||
|
||||
def localtrace(self, frame, event, arg):
|
||||
"""Trace the thread
|
||||
"""
|
||||
if self.killed:
|
||||
if event == 'line':
|
||||
raise SystemExit()
|
||||
return self.localtrace
|
||||
|
||||
def kill(self):
|
||||
"""Kill the thread
|
||||
"""
|
||||
self.killed = True
|
||||
|
||||
def clone(self, fn):
|
||||
return thread_with_trace(target=fn,
|
||||
def clone(self, func):
|
||||
"""Create a clone
|
||||
"""
|
||||
return thread_with_trace(target=func,
|
||||
args=self._args,
|
||||
daemon=True)
|
||||
|
||||
|
||||
def remove_dormant_threads(base_dir: str, threadsList: [], debug: bool,
|
||||
timeoutMins: int) -> None:
|
||||
def remove_dormant_threads(base_dir: str, threads_list: [], debug: bool,
|
||||
timeout_mins: int) -> None:
|
||||
"""Removes threads whose execution has completed
|
||||
"""
|
||||
if len(threadsList) == 0:
|
||||
if len(threads_list) == 0:
|
||||
return
|
||||
|
||||
timeoutSecs = int(timeoutMins * 60)
|
||||
dormantThreads = []
|
||||
timeout_secs = int(timeout_mins * 60)
|
||||
dormant_threads = []
|
||||
curr_time = datetime.datetime.utcnow()
|
||||
changed = False
|
||||
|
||||
# which threads are dormant?
|
||||
noOfActiveThreads = 0
|
||||
for th in threadsList:
|
||||
removeThread = False
|
||||
no_of_active_threads = 0
|
||||
for thrd in threads_list:
|
||||
remove_thread = False
|
||||
|
||||
if th.isStarted:
|
||||
if not th.is_alive():
|
||||
if (curr_time - th.startTime).total_seconds() > 10:
|
||||
if thrd.is_started:
|
||||
if not thrd.is_alive():
|
||||
if (curr_time - thrd.start_time).total_seconds() > 10:
|
||||
if debug:
|
||||
print('DEBUG: ' +
|
||||
'thread is not alive ten seconds after start')
|
||||
removeThread = True
|
||||
remove_thread = True
|
||||
# timeout for started threads
|
||||
if (curr_time - th.startTime).total_seconds() > timeoutSecs:
|
||||
if (curr_time - thrd.start_time).total_seconds() > timeout_secs:
|
||||
if debug:
|
||||
print('DEBUG: started thread timed out')
|
||||
removeThread = True
|
||||
remove_thread = True
|
||||
else:
|
||||
# timeout for threads which havn't been started
|
||||
if (curr_time - th.startTime).total_seconds() > timeoutSecs:
|
||||
if (curr_time - thrd.start_time).total_seconds() > timeout_secs:
|
||||
if debug:
|
||||
print('DEBUG: unstarted thread timed out')
|
||||
removeThread = True
|
||||
remove_thread = True
|
||||
|
||||
if removeThread:
|
||||
dormantThreads.append(th)
|
||||
if remove_thread:
|
||||
dormant_threads.append(thrd)
|
||||
else:
|
||||
noOfActiveThreads += 1
|
||||
no_of_active_threads += 1
|
||||
if debug:
|
||||
print('DEBUG: ' + str(noOfActiveThreads) +
|
||||
' active threads out of ' + str(len(threadsList)))
|
||||
print('DEBUG: ' + str(no_of_active_threads) +
|
||||
' active threads out of ' + str(len(threads_list)))
|
||||
|
||||
# remove the dormant threads
|
||||
dormantCtr = 0
|
||||
for th in dormantThreads:
|
||||
dormant_ctr = 0
|
||||
for thrd in dormant_threads:
|
||||
if debug:
|
||||
print('DEBUG: Removing dormant thread ' + str(dormantCtr))
|
||||
dormantCtr += 1
|
||||
threadsList.remove(th)
|
||||
th.kill()
|
||||
print('DEBUG: Removing dormant thread ' + str(dormant_ctr))
|
||||
dormant_ctr += 1
|
||||
threads_list.remove(thrd)
|
||||
thrd.kill()
|
||||
changed = True
|
||||
|
||||
# start scheduled threads
|
||||
if len(threadsList) < 10:
|
||||
if len(threads_list) < 10:
|
||||
ctr = 0
|
||||
for th in threadsList:
|
||||
if not th.isStarted:
|
||||
for thrd in threads_list:
|
||||
if not thrd.is_started:
|
||||
print('Starting new send thread ' + str(ctr))
|
||||
th.start()
|
||||
thrd.start()
|
||||
changed = True
|
||||
break
|
||||
ctr += 1
|
||||
|
@ -143,12 +149,12 @@ def remove_dormant_threads(base_dir: str, threadsList: [], debug: bool,
|
|||
return
|
||||
|
||||
if debug:
|
||||
sendLogFilename = base_dir + '/send.csv'
|
||||
send_log_filename = base_dir + '/send.csv'
|
||||
try:
|
||||
with open(sendLogFilename, 'a+') as logFile:
|
||||
logFile.write(curr_time.strftime("%Y-%m-%dT%H:%M:%SZ") +
|
||||
',' + str(noOfActiveThreads) +
|
||||
',' + str(len(threadsList)) + '\n')
|
||||
with open(send_log_filename, 'a+') as fp_log:
|
||||
fp_log.write(curr_time.strftime("%Y-%m-%dT%H:%M:%SZ") +
|
||||
',' + str(no_of_active_threads) +
|
||||
',' + str(len(threads_list)) + '\n')
|
||||
except OSError:
|
||||
print('EX: remove_dormant_threads unable to write ' +
|
||||
sendLogFilename)
|
||||
send_log_filename)
|
||||
|
|
Loading…
Reference in New Issue