epicyon/threads.py

166 lines
5.0 KiB
Python

__filename__ = "threads.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.3.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"
import threading
import sys
import time
import datetime
class thread_with_trace(threading.Thread):
def __init__(self, *args, **keywords):
self.start_time = datetime.datetime.utcnow()
self.is_started = False
tries = 0
while tries < 3:
try:
self._args, self._keywords = args, keywords
threading.Thread.__init__(self, *self._args, **self._keywords)
self.killed = False
break
except Exception as ex:
print('ERROR: threads.py/__init__ failed - ' + str(ex))
time.sleep(1)
tries += 1
def start(self):
tries = 0
while tries < 3:
try:
self.__run_backup = self.run
self.run = self.__run
threading.Thread.start(self)
break
except Exception as ex:
print('ERROR: threads.py/start failed - ' + str(ex))
time.sleep(1)
tries += 1
# note that this is set True even if all tries failed
self.is_started = True
def __run(self):
sys.settrace(self.globaltrace)
if not callable(self.__run_backup):
print('ERROR: threads.py/__run ' +
str(self.__run_backup) + 'is not callable')
return
try:
self.__run_backup()
self.run = self.__run_backup
except Exception as ex:
print('ERROR: threads.py/__run failed - ' + str(ex))
def globaltrace(self, frame, event, arg):
"""Trace the thread
"""
if event == 'call':
return self.localtrace
return None
def localtrace(self, frame, event, arg):
"""Trace the thread
"""
if self.killed:
if event == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
"""Kill the thread
"""
self.killed = True
def clone(self, func):
"""Create a clone
"""
print('THREAD: clone')
return thread_with_trace(target=func,
args=self._args,
daemon=True)
def remove_dormant_threads(base_dir: str, threads_list: [], debug: bool,
timeout_mins: int) -> None:
"""Removes threads whose execution has completed
"""
if len(threads_list) == 0:
return
timeout_secs = int(timeout_mins * 60)
dormant_threads = []
curr_time = datetime.datetime.utcnow()
changed = False
# which threads are dormant?
no_of_active_threads = 0
for thrd in threads_list:
remove_thread = False
if thrd.is_started:
if not thrd.is_alive():
if (curr_time - thrd.start_time).total_seconds() > 10:
if debug:
print('DEBUG: ' +
'thread is not alive ten seconds after start')
remove_thread = True
# timeout for started threads
if (curr_time - thrd.start_time).total_seconds() > timeout_secs:
if debug:
print('DEBUG: started thread timed out')
remove_thread = True
else:
# timeout for threads which havn't been started
if (curr_time - thrd.start_time).total_seconds() > timeout_secs:
if debug:
print('DEBUG: unstarted thread timed out')
remove_thread = True
if remove_thread:
dormant_threads.append(thrd)
else:
no_of_active_threads += 1
if debug:
print('DEBUG: ' + str(no_of_active_threads) +
' active threads out of ' + str(len(threads_list)))
# remove the dormant threads
dormant_ctr = 0
for thrd in dormant_threads:
if debug:
print('DEBUG: Removing dormant thread ' + str(dormant_ctr))
dormant_ctr += 1
threads_list.remove(thrd)
thrd.kill()
changed = True
# start scheduled threads
if len(threads_list) < 10:
ctr = 0
for thrd in threads_list:
if not thrd.is_started:
print('Starting new send thread ' + str(ctr))
thrd.start()
changed = True
break
ctr += 1
if not changed:
return
if debug:
send_log_filename = base_dir + '/send.csv'
try:
with open(send_log_filename, 'a+') as fp_log:
fp_log.write(curr_time.strftime("%Y-%m-%dT%H:%M:%SZ") +
',' + str(no_of_active_threads) +
',' + str(len(threads_list)) + '\n')
except OSError:
print('EX: remove_dormant_threads unable to write ' +
send_log_filename)