epicyon/threads.py

187 lines
5.7 KiB
Python
Raw Normal View History

2020-04-04 12:09:57 +00:00
__filename__ = "threads.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
2023-01-21 23:03:30 +00:00
__version__ = "1.4.0"
2020-04-04 12:09:57 +00:00
__maintainer__ = "Bob Mottram"
2021-09-10 16:14:50 +00:00
__email__ = "bob@libreserver.org"
2020-04-04 12:09:57 +00:00
__status__ = "Production"
2021-06-26 11:16:41 +00:00
__module_group__ = "Core"
2019-06-30 16:36:58 +00:00
import threading
import sys
2019-06-30 16:50:43 +00:00
import time
2019-10-16 15:17:00 +00:00
import datetime
2022-07-28 09:59:18 +00:00
from socket import error as SocketError
2019-06-30 16:36:58 +00:00
2020-04-04 12:09:57 +00:00
2021-12-28 21:36:27 +00:00
class thread_with_trace(threading.Thread):
2019-09-03 10:24:15 +00:00
def __init__(self, *args, **keywords):
2022-01-03 18:44:40 +00:00
self.start_time = datetime.datetime.utcnow()
self.is_started = False
2020-04-04 12:09:57 +00:00
tries = 0
while tries < 3:
2019-10-14 20:32:00 +00:00
try:
2020-04-04 12:09:57 +00:00
self._args, self._keywords = args, keywords
threading.Thread.__init__(self, *self._args, **self._keywords)
self.killed = False
2019-10-14 20:32:00 +00:00
break
2021-12-25 15:28:52 +00:00
except Exception as ex:
print('ERROR: threads.py/__init__ failed - ' + str(ex))
2019-10-14 20:32:00 +00:00
time.sleep(1)
2020-04-04 12:09:57 +00:00
tries += 1
2020-03-22 21:16:02 +00:00
def start(self):
2020-04-04 12:09:57 +00:00
tries = 0
while tries < 3:
2019-10-14 20:32:00 +00:00
try:
2020-04-04 12:09:57 +00:00
self.__run_backup = self.run
self.run = self.__run
2019-10-14 20:32:00 +00:00
threading.Thread.start(self)
break
2021-12-25 15:28:52 +00:00
except Exception as ex:
print('ERROR: threads.py/start failed - ' + str(ex))
2019-10-14 20:32:00 +00:00
time.sleep(1)
2020-04-04 12:09:57 +00:00
tries += 1
2019-10-16 18:19:18 +00:00
# note that this is set True even if all tries failed
2022-01-03 18:44:40 +00:00
self.is_started = True
2019-10-16 18:19:18 +00:00
2019-10-14 20:32:00 +00:00
def __run(self):
2019-10-19 18:08:47 +00:00
sys.settrace(self.globaltrace)
2022-06-06 11:21:25 +00:00
if not callable(self.__run_backup):
print('ERROR: threads.py/__run ' +
str(self.__run_backup) + 'is not callable')
return
2023-04-29 19:23:56 +00:00
# try:
self.__run_backup()
self.run = self.__run_backup
# except Exception as ex:
# print('ERROR: threads.py/__run failed - ' + str(ex) +
# ', ' + str(self.__run_backup))
2019-10-14 20:32:00 +00:00
2020-03-22 21:16:02 +00:00
def globaltrace(self, frame, event, arg):
2022-01-03 18:44:40 +00:00
"""Trace the thread
"""
2020-03-22 21:16:02 +00:00
if event == 'call':
2019-06-30 16:36:58 +00:00
return self.localtrace
2022-01-03 18:44:40 +00:00
return None
2019-06-30 16:36:58 +00:00
2020-03-22 21:16:02 +00:00
def localtrace(self, frame, event, arg):
2022-01-03 18:44:40 +00:00
"""Trace the thread
"""
2020-03-22 21:16:02 +00:00
if self.killed:
if event == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
2022-01-03 18:44:40 +00:00
"""Kill the thread
"""
2020-04-04 12:09:57 +00:00
self.killed = True
2019-09-03 10:24:15 +00:00
2022-01-03 18:44:40 +00:00
def clone(self, func):
"""Create a clone
"""
2022-03-13 11:01:07 +00:00
print('THREAD: clone')
2022-01-03 18:44:40 +00:00
return thread_with_trace(target=func,
2021-12-28 21:36:27 +00:00
args=self._args,
daemon=True)
2019-10-16 10:08:21 +00:00
2020-04-04 12:09:57 +00:00
2022-01-03 18:44:40 +00:00
def remove_dormant_threads(base_dir: str, threads_list: [], debug: bool,
timeout_mins: int) -> None:
2019-10-16 10:08:21 +00:00
"""Removes threads whose execution has completed
"""
2022-01-03 18:44:40 +00:00
if len(threads_list) == 0:
2019-10-16 10:08:21 +00:00
return
2022-01-03 18:44:40 +00:00
timeout_secs = int(timeout_mins * 60)
dormant_threads = []
2021-12-26 13:17:46 +00:00
curr_time = datetime.datetime.utcnow()
2020-04-04 12:09:57 +00:00
changed = False
2019-10-16 10:08:21 +00:00
# which threads are dormant?
2022-01-03 18:44:40 +00:00
no_of_active_threads = 0
for thrd in threads_list:
remove_thread = False
2019-10-16 15:17:00 +00:00
2022-01-03 18:44:40 +00:00
if thrd.is_started:
if not thrd.is_alive():
if (curr_time - thrd.start_time).total_seconds() > 10:
2019-10-23 10:23:43 +00:00
if debug:
2020-04-04 12:09:57 +00:00
print('DEBUG: ' +
'thread is not alive ten seconds after start')
2022-01-03 18:44:40 +00:00
remove_thread = True
# timeout for started threads
2022-01-03 18:44:40 +00:00
if (curr_time - thrd.start_time).total_seconds() > timeout_secs:
if debug:
print('DEBUG: started thread timed out')
2022-01-03 18:44:40 +00:00
remove_thread = True
2019-10-23 10:21:41 +00:00
else:
# timeout for threads which havn't been started
2022-01-03 18:44:40 +00:00
if (curr_time - thrd.start_time).total_seconds() > timeout_secs:
2019-10-23 10:21:41 +00:00
if debug:
print('DEBUG: unstarted thread timed out')
2022-01-03 18:44:40 +00:00
remove_thread = True
2019-10-16 15:17:00 +00:00
2022-01-03 18:44:40 +00:00
if remove_thread:
dormant_threads.append(thrd)
2019-10-16 10:08:21 +00:00
else:
2022-01-03 18:44:40 +00:00
no_of_active_threads += 1
2019-10-16 10:08:21 +00:00
if debug:
2022-01-03 18:44:40 +00:00
print('DEBUG: ' + str(no_of_active_threads) +
' active threads out of ' + str(len(threads_list)))
2019-10-16 10:08:21 +00:00
# remove the dormant threads
2022-01-03 18:44:40 +00:00
dormant_ctr = 0
for thrd in dormant_threads:
2019-10-16 10:08:21 +00:00
if debug:
2022-01-03 18:44:40 +00:00
print('DEBUG: Removing dormant thread ' + str(dormant_ctr))
dormant_ctr += 1
threads_list.remove(thrd)
thrd.kill()
2020-04-04 12:09:57 +00:00
changed = True
2019-10-16 18:19:18 +00:00
# start scheduled threads
2022-01-03 18:44:40 +00:00
if len(threads_list) < 10:
2020-04-04 12:09:57 +00:00
ctr = 0
2022-01-03 18:44:40 +00:00
for thrd in threads_list:
if not thrd.is_started:
2020-04-04 12:09:57 +00:00
print('Starting new send thread ' + str(ctr))
2022-01-03 18:44:40 +00:00
thrd.start()
2020-04-04 12:09:57 +00:00
changed = True
2019-10-16 18:19:18 +00:00
break
2020-04-04 12:09:57 +00:00
ctr += 1
2019-10-16 18:19:18 +00:00
if not changed:
return
if debug:
2022-01-03 18:44:40 +00:00
send_log_filename = base_dir + '/send.csv'
try:
2022-06-09 14:46:30 +00:00
with open(send_log_filename, 'a+', encoding='utf-8') as fp_log:
2022-01-03 18:44:40 +00:00
fp_log.write(curr_time.strftime("%Y-%m-%dT%H:%M:%SZ") +
',' + str(no_of_active_threads) +
',' + str(len(threads_list)) + '\n')
2021-11-25 22:22:54 +00:00
except OSError:
2021-12-28 21:36:27 +00:00
print('EX: remove_dormant_threads unable to write ' +
2022-01-03 18:44:40 +00:00
send_log_filename)
2022-07-28 09:59:18 +00:00
def begin_thread(thread, calling_function: str) -> bool:
"""Start a thread
"""
try:
if not thread.is_alive():
thread.start()
except SocketError as ex:
print('WARN: socket error while starting ' +
'thread. ' + calling_function + ' ' + str(ex))
return False
except ValueError as ex:
print('WARN: value error while starting ' +
'thread. ' + calling_function + ' ' + str(ex))
return False
except BaseException:
pass
return True