Function to start threads

merge-requests/30/head
Bob Mottram 2022-07-28 10:59:18 +01:00
parent cf44fd6ee4
commit db95a22ee4
9 changed files with 81 additions and 51 deletions

View File

@ -124,6 +124,7 @@ from auth import create_password
from auth import create_basic_auth_header from auth import create_basic_auth_header
from auth import authorize_basic from auth import authorize_basic
from auth import store_basic_credentials from auth import store_basic_credentials
from threads import begin_thread
from threads import thread_with_trace from threads import thread_with_trace
from threads import remove_dormant_threads from threads import remove_dormant_threads
from media import process_meta_data from media import process_meta_data
@ -1660,7 +1661,9 @@ class PubServer(BaseHTTPRequestHandler):
curr_session, proxy_type), curr_session, proxy_type),
daemon=True) daemon=True)
print('Starting outbox thread') print('Starting outbox thread')
self.server.outboxThread[account_outbox_thread_name][index].start() outbox_thread = \
self.server.outboxThread[account_outbox_thread_name][index]
begin_thread(outbox_thread, '_post_to_outbox_thread')
return True return True
def _update_inbox_queue(self, nickname: str, message_json: {}, def _update_inbox_queue(self, nickname: str, message_json: {},
@ -8082,7 +8085,8 @@ class PubServer(BaseHTTPRequestHandler):
self.server.person_cache, self.server.person_cache,
self.server.check_actor_timeout), self.server.check_actor_timeout),
daemon=True) daemon=True)
self.server.thrCheckActor[nickname].start() begin_thread(self.server.thrCheckActor[nickname],
'_show_person_options')
msg = \ msg = \
html_person_options(self.server.default_timeline, html_person_options(self.server.default_timeline,
@ -20962,7 +20966,7 @@ def run_posts_watchdog(project_version: str, httpd) -> None:
""" """
print('THREAD: Starting posts queue watchdog') print('THREAD: Starting posts queue watchdog')
posts_queue_original = httpd.thrPostsQueue.clone(run_posts_queue) posts_queue_original = httpd.thrPostsQueue.clone(run_posts_queue)
httpd.thrPostsQueue.start() begin_thread(httpd.thrPostsQueue, 'run_posts_watchdog')
while True: while True:
time.sleep(20) time.sleep(20)
if httpd.thrPostsQueue.is_alive(): if httpd.thrPostsQueue.is_alive():
@ -20970,7 +20974,7 @@ def run_posts_watchdog(project_version: str, httpd) -> None:
httpd.thrPostsQueue.kill() httpd.thrPostsQueue.kill()
print('THREAD: restarting posts queue') print('THREAD: restarting posts queue')
httpd.thrPostsQueue = posts_queue_original.clone(run_posts_queue) httpd.thrPostsQueue = posts_queue_original.clone(run_posts_queue)
httpd.thrPostsQueue.start() begin_thread(httpd.thrPostsQueue, 'run_posts_watchdog 2')
print('Restarting posts queue...') print('Restarting posts queue...')
@ -20979,7 +20983,7 @@ def run_shares_expire_watchdog(project_version: str, httpd) -> None:
""" """
print('THREAD: Starting shares expiry watchdog') print('THREAD: Starting shares expiry watchdog')
shares_expire_original = httpd.thrSharesExpire.clone(run_shares_expire) shares_expire_original = httpd.thrSharesExpire.clone(run_shares_expire)
httpd.thrSharesExpire.start() begin_thread(httpd.thrSharesExpire, 'run_shares_expire_watchdog')
while True: while True:
time.sleep(20) time.sleep(20)
if httpd.thrSharesExpire.is_alive(): if httpd.thrSharesExpire.is_alive():
@ -20987,7 +20991,7 @@ def run_shares_expire_watchdog(project_version: str, httpd) -> None:
httpd.thrSharesExpire.kill() httpd.thrSharesExpire.kill()
print('THREAD: restarting shares watchdog') print('THREAD: restarting shares watchdog')
httpd.thrSharesExpire = shares_expire_original.clone(run_shares_expire) httpd.thrSharesExpire = shares_expire_original.clone(run_shares_expire)
httpd.thrSharesExpire.start() begin_thread(httpd.thrSharesExpire, 'run_shares_expire_watchdog 2')
print('Restarting shares expiry...') print('Restarting shares expiry...')
@ -21511,7 +21515,7 @@ def run_daemon(clacks: str,
httpd.thrFitness = \ httpd.thrFitness = \
thread_with_trace(target=fitness_thread, thread_with_trace(target=fitness_thread,
args=(base_dir, httpd.fitness), daemon=True) args=(base_dir, httpd.fitness), daemon=True)
httpd.thrFitness.start() begin_thread(httpd.thrFitness, 'run_daemon thrFitness')
httpd.recent_posts_cache = {} httpd.recent_posts_cache = {}
@ -21523,7 +21527,7 @@ def run_daemon(clacks: str,
archive_dir, archive_dir,
httpd.recent_posts_cache, httpd.recent_posts_cache,
httpd.maxPostsInBox), daemon=True) httpd.maxPostsInBox), daemon=True)
httpd.thrCache.start() begin_thread(httpd.thrCache, 'run_daemon thrCache')
# number of mins after which sending posts or updates will expire # number of mins after which sending posts or updates will expire
httpd.send_threads_timeout_mins = send_threads_timeout_mins httpd.send_threads_timeout_mins = send_threads_timeout_mins
@ -21538,9 +21542,9 @@ def run_daemon(clacks: str,
httpd.thrPostsWatchdog = \ httpd.thrPostsWatchdog = \
thread_with_trace(target=run_posts_watchdog, thread_with_trace(target=run_posts_watchdog,
args=(project_version, httpd), daemon=True) args=(project_version, httpd), daemon=True)
httpd.thrPostsWatchdog.start() begin_thread(httpd.thrPostsWatchdog, 'run_daemon thrPostWatchdog')
else: else:
httpd.thrPostsQueue.start() begin_thread(httpd.thrPostsQueue, 'run_daemon thrPostWatchdog 2')
print('THREAD: Creating expire thread for shared items') print('THREAD: Creating expire thread for shared items')
httpd.thrSharesExpire = \ httpd.thrSharesExpire = \
@ -21551,9 +21555,11 @@ def run_daemon(clacks: str,
httpd.thrSharesExpireWatchdog = \ httpd.thrSharesExpireWatchdog = \
thread_with_trace(target=run_shares_expire_watchdog, thread_with_trace(target=run_shares_expire_watchdog,
args=(project_version, httpd), daemon=True) args=(project_version, httpd), daemon=True)
httpd.thrSharesExpireWatchdog.start() begin_thread(httpd.thrSharesExpireWatchdog,
'run_daemon thrSharesExpireWatchdog')
else: else:
httpd.thrSharesExpire.start() begin_thread(httpd.thrSharesExpire,
'run_daemon thrSharesExpireWatchdog 2')
httpd.max_recent_posts = max_recent_posts httpd.max_recent_posts = max_recent_posts
httpd.iconsCache = {} httpd.iconsCache = {}
@ -21649,38 +21655,44 @@ def run_daemon(clacks: str,
httpd.thrImportFollowing = \ httpd.thrImportFollowing = \
thread_with_trace(target=run_import_following_watchdog, thread_with_trace(target=run_import_following_watchdog,
args=(project_version, httpd), daemon=True) args=(project_version, httpd), daemon=True)
httpd.thrImportFollowing.start() begin_thread(httpd.thrImportFollowing,
'run_daemon thrImportFollowing')
print('THREAD: Creating inbox queue watchdog') print('THREAD: Creating inbox queue watchdog')
httpd.thrWatchdog = \ httpd.thrWatchdog = \
thread_with_trace(target=run_inbox_queue_watchdog, thread_with_trace(target=run_inbox_queue_watchdog,
args=(project_version, httpd), daemon=True) args=(project_version, httpd), daemon=True)
httpd.thrWatchdog.start() begin_thread(httpd.thrWatchdog, 'run_daemon thrWatchdog')
print('THREAD: Creating scheduled post watchdog') print('THREAD: Creating scheduled post watchdog')
httpd.thrWatchdogSchedule = \ httpd.thrWatchdogSchedule = \
thread_with_trace(target=run_post_schedule_watchdog, thread_with_trace(target=run_post_schedule_watchdog,
args=(project_version, httpd), daemon=True) args=(project_version, httpd), daemon=True)
httpd.thrWatchdogSchedule.start() begin_thread(httpd.thrWatchdogSchedule,
'run_daemon thrWatchdogSchedule')
print('THREAD: Creating newswire watchdog') print('THREAD: Creating newswire watchdog')
httpd.thrNewswireWatchdog = \ httpd.thrNewswireWatchdog = \
thread_with_trace(target=run_newswire_watchdog, thread_with_trace(target=run_newswire_watchdog,
args=(project_version, httpd), daemon=True) args=(project_version, httpd), daemon=True)
httpd.thrNewswireWatchdog.start() begin_thread(httpd.thrNewswireWatchdog,
'run_daemon thrNewswireWatchdog')
print('THREAD: Creating federated shares watchdog') print('THREAD: Creating federated shares watchdog')
httpd.thrFederatedSharesWatchdog = \ httpd.thrFederatedSharesWatchdog = \
thread_with_trace(target=run_federated_shares_watchdog, thread_with_trace(target=run_federated_shares_watchdog,
args=(project_version, httpd), daemon=True) args=(project_version, httpd), daemon=True)
httpd.thrFederatedSharesWatchdog.start() begin_thread(httpd.thrFederatedSharesWatchdog,
'run_daemon thrFederatedSharesWatchdog')
else: else:
print('Starting inbox queue') print('Starting inbox queue')
httpd.thrInboxQueue.start() begin_thread(httpd.thrInboxQueue, 'run_daemon start inbox')
print('Starting scheduled posts daemon') print('Starting scheduled posts daemon')
httpd.thrPostSchedule.start() begin_thread(httpd.thrPostSchedule,
'run_daemon start scheduled posts')
print('Starting federated shares daemon') print('Starting federated shares daemon')
httpd.thrFederatedSharesDaemon.start() begin_thread(httpd.thrFederatedSharesDaemon,
'run_daemon start federated shares')
if client_to_server: if client_to_server:
print('Running ActivityPub client on ' + print('Running ActivityPub client on ' +

View File

@ -17,6 +17,7 @@ from follow import is_following_actor
from follow import send_follow_request from follow import send_follow_request
from session import create_session from session import create_session
from session import set_session_for_sender from session import set_session_for_sender
from threads import begin_thread
def _establish_import_session(httpd, def _establish_import_session(httpd,
@ -206,7 +207,8 @@ def run_import_following_watchdog(project_version: str, httpd) -> None:
print('THREAD: Starting import following watchdog ' + project_version) print('THREAD: Starting import following watchdog ' + project_version)
import_following_original = \ import_following_original = \
httpd.thrImportFollowing.clone(run_import_following) httpd.thrImportFollowing.clone(run_import_following)
httpd.thrImportFollowing.start() begin_thread(httpd.thrImportFollowing,
'run_import_following_watchdog')
while True: while True:
time.sleep(50) time.sleep(50)
if httpd.thrImportFollowing.is_alive(): if httpd.thrImportFollowing.is_alive():
@ -215,5 +217,6 @@ def run_import_following_watchdog(project_version: str, httpd) -> None:
print('THREAD: restarting import following watchdog') print('THREAD: restarting import following watchdog')
httpd.thrImportFollowing = \ httpd.thrImportFollowing = \
import_following_original.clone(run_import_following) import_following_original.clone(run_import_following)
httpd.thrImportFollowing.start() begin_thread(httpd.thrImportFollowing,
'run_import_following_watchdog 2')
print('Restarting import following...') print('Restarting import following...')

View File

@ -135,6 +135,7 @@ from content import reject_twitter_summary
from content import load_dogwhistles from content import load_dogwhistles
from content import valid_url_lengths from content import valid_url_lengths
from content import remove_script from content import remove_script
from threads import begin_thread
def cache_svg_images(session, base_dir: str, http_prefix: str, def cache_svg_images(session, base_dir: str, http_prefix: str,
@ -4433,7 +4434,7 @@ def run_inbox_queue_watchdog(project_version: str, httpd) -> None:
""" """
print('THREAD: Starting inbox queue watchdog ' + project_version) print('THREAD: Starting inbox queue watchdog ' + project_version)
inbox_queue_original = httpd.thrInboxQueue.clone(run_inbox_queue) inbox_queue_original = httpd.thrInboxQueue.clone(run_inbox_queue)
httpd.thrInboxQueue.start() begin_thread(httpd.thrInboxQueue, 'run_inbox_queue_watchdog')
while True: while True:
time.sleep(20) time.sleep(20)
if not httpd.thrInboxQueue.is_alive() or httpd.restart_inbox_queue: if not httpd.thrInboxQueue.is_alive() or httpd.restart_inbox_queue:
@ -4442,7 +4443,7 @@ def run_inbox_queue_watchdog(project_version: str, httpd) -> None:
print('THREAD: restarting inbox queue watchdog') print('THREAD: restarting inbox queue watchdog')
httpd.thrInboxQueue = inbox_queue_original.clone(run_inbox_queue) httpd.thrInboxQueue = inbox_queue_original.clone(run_inbox_queue)
httpd.inbox_queue.clear() httpd.inbox_queue.clear()
httpd.thrInboxQueue.start() begin_thread(httpd.thrInboxQueue, 'run_inbox_queue_watchdog 2')
print('Restarting inbox queue...') print('Restarting inbox queue...')
httpd.restart_inbox_queue_in_progress = False httpd.restart_inbox_queue_in_progress = False
httpd.restart_inbox_queue = False httpd.restart_inbox_queue = False

View File

@ -19,6 +19,7 @@ from utils import acct_dir
from utils import text_in_file from utils import text_in_file
from utils import remove_eol from utils import remove_eol
from threads import thread_with_trace from threads import thread_with_trace
from threads import begin_thread
from session import create_session from session import create_session
@ -105,7 +106,7 @@ def manual_deny_follow_request_thread(session, session_onion, session_i2p,
debug, debug,
project_version, project_version,
signing_priv_key_pem), daemon=True) signing_priv_key_pem), daemon=True)
thr.start() begin_thread(thr, 'manual_deny_follow_request_thread')
send_threads.append(thr) send_threads.append(thr)
@ -361,5 +362,5 @@ def manual_approve_follow_request_thread(session, session_onion, session_i2p,
project_version, project_version,
signing_priv_key_pem, signing_priv_key_pem,
proxy_type), daemon=True) proxy_type), daemon=True)
thr.start() begin_thread(thr, 'manual_approve_follow_request_thread')
send_threads.append(thr) send_threads.append(thr)

View File

@ -37,6 +37,7 @@ from utils import local_actor_url
from utils import text_in_file from utils import text_in_file
from inbox import store_hash_tags from inbox import store_hash_tags
from session import create_session from session import create_session
from threads import begin_thread
def _update_feeds_outbox_index(base_dir: str, domain: str, def _update_feeds_outbox_index(base_dir: str, domain: str,
@ -867,7 +868,7 @@ def run_newswire_watchdog(project_version: str, httpd) -> None:
print('THREAD: Starting newswire watchdog') print('THREAD: Starting newswire watchdog')
newswire_original = \ newswire_original = \
httpd.thrPostSchedule.clone(run_newswire_daemon) httpd.thrPostSchedule.clone(run_newswire_daemon)
httpd.thrNewswireDaemon.start() begin_thread(httpd.thrNewswireDaemon, 'run_newswire_watchdog')
while True: while True:
time.sleep(50) time.sleep(50)
if httpd.thrNewswireDaemon.is_alive(): if httpd.thrNewswireDaemon.is_alive():
@ -876,5 +877,5 @@ def run_newswire_watchdog(project_version: str, httpd) -> None:
print('THREAD: restarting newswire watchdog') print('THREAD: restarting newswire watchdog')
httpd.thrNewswireDaemon = \ httpd.thrNewswireDaemon = \
newswire_original.clone(run_newswire_daemon) newswire_original.clone(run_newswire_daemon)
httpd.thrNewswireDaemon.start() begin_thread(httpd.thrNewswireDaemon, 'run_newswire_watchdog 2')
print('Restarting newswire daemon...') print('Restarting newswire daemon...')

View File

@ -15,10 +15,10 @@ import shutil
import sys import sys
import time import time
import random import random
from socket import error as SocketError
from time import gmtime, strftime from time import gmtime, strftime
from collections import OrderedDict from collections import OrderedDict
from threads import thread_with_trace from threads import thread_with_trace
from threads import begin_thread
from cache import store_person_in_cache from cache import store_person_in_cache
from cache import get_person_from_cache from cache import get_person_from_cache
from cache import expire_person_cache from cache import expire_person_cache
@ -2548,7 +2548,7 @@ def send_post(signing_priv_key_pem: str, project_version: str,
post_log, debug, http_prefix, post_log, debug, http_prefix,
domain_full), daemon=True) domain_full), daemon=True)
send_threads.append(thr) send_threads.append(thr)
thr.start() begin_thread(thr, 'send_post')
return 0 return 0
@ -2967,7 +2967,7 @@ def send_signed_json(post_json_object: {}, session, base_dir: str,
post_log, debug, post_log, debug,
http_prefix, domain_full), daemon=True) http_prefix, domain_full), daemon=True)
send_threads.append(thr) send_threads.append(thr)
# thr.start() # begin_thread(thr, 'send_signed_json')
return 0 return 0
@ -3272,15 +3272,9 @@ def send_to_named_addresses_thread(server, session, session_onion, session_i2p,
shared_item_federation_tokens, shared_item_federation_tokens,
signing_priv_key_pem, signing_priv_key_pem,
proxy_type), daemon=True) proxy_type), daemon=True)
try: if not begin_thread(send_thread, 'send_to_named_addresses_thread'):
send_thread.start()
except SocketError as ex:
print('WARN: socket error while starting ' + print('WARN: socket error while starting ' +
'thread to send to named addresses. ' + str(ex)) 'thread to send to named addresses.')
return None
except ValueError as ex:
print('WARN: error while starting ' +
'thread to send to named addresses. ' + str(ex))
return None return None
return send_thread return send_thread
@ -3572,15 +3566,9 @@ def send_to_followers_thread(server, session, session_onion, session_i2p,
shared_items_federated_domains, shared_items_federated_domains,
shared_item_federation_tokens, shared_item_federation_tokens,
signing_priv_key_pem), daemon=True) signing_priv_key_pem), daemon=True)
try: if not begin_thread(send_thread, 'send_to_followers_thread'):
send_thread.start()
except SocketError as ex:
print('WARN: socket error while starting ' +
'thread to send to followers. ' + str(ex))
return None
except ValueError as ex:
print('WARN: error while starting ' + print('WARN: error while starting ' +
'thread to send to followers. ' + str(ex)) 'thread to send to followers.')
return None return None
return send_thread return send_thread

View File

@ -18,6 +18,7 @@ from utils import acct_dir
from utils import remove_eol from utils import remove_eol
from outbox import post_message_to_outbox from outbox import post_message_to_outbox
from session import create_session from session import create_session
from threads import begin_thread
def _update_post_schedule(base_dir: str, handle: str, httpd, def _update_post_schedule(base_dir: str, handle: str, httpd,
@ -199,7 +200,7 @@ def run_post_schedule_watchdog(project_version: str, httpd) -> None:
print('THREAD: Starting scheduled post watchdog ' + project_version) print('THREAD: Starting scheduled post watchdog ' + project_version)
post_schedule_original = \ post_schedule_original = \
httpd.thrPostSchedule.clone(run_post_schedule) httpd.thrPostSchedule.clone(run_post_schedule)
httpd.thrPostSchedule.start() begin_thread(httpd.thrPostSchedule, 'run_post_schedule_watchdog')
while True: while True:
time.sleep(20) time.sleep(20)
if httpd.thrPostSchedule.is_alive(): if httpd.thrPostSchedule.is_alive():
@ -208,7 +209,7 @@ def run_post_schedule_watchdog(project_version: str, httpd) -> None:
print('THREAD: restarting scheduled post watchdog') print('THREAD: restarting scheduled post watchdog')
httpd.thrPostSchedule = \ httpd.thrPostSchedule = \
post_schedule_original.clone(run_post_schedule) post_schedule_original.clone(run_post_schedule)
httpd.thrPostSchedule.start() begin_thread(httpd.thrPostSchedule, 'run_post_schedule_watchdog')
print('Restarting scheduled posts...') print('Restarting scheduled posts...')

View File

@ -45,6 +45,7 @@ from filters import is_filtered_globally
from siteactive import site_is_active from siteactive import site_is_active
from content import get_price_from_string from content import get_price_from_string
from blocking import is_blocked from blocking import is_blocked
from threads import begin_thread
def _load_dfc_ids(base_dir: str, system_language: str, def _load_dfc_ids(base_dir: str, system_language: str,
@ -1610,7 +1611,8 @@ def run_federated_shares_watchdog(project_version: str, httpd) -> None:
print('THREAD: Starting federated shares watchdog') print('THREAD: Starting federated shares watchdog')
federated_shares_original = \ federated_shares_original = \
httpd.thrPostSchedule.clone(run_federated_shares_daemon) httpd.thrPostSchedule.clone(run_federated_shares_daemon)
httpd.thrFederatedSharesDaemon.start() begin_thread(httpd.thrFederatedSharesDaemon,
'run_federated_shares_watchdog')
while True: while True:
time.sleep(55) time.sleep(55)
if httpd.thrFederatedSharesDaemon.is_alive(): if httpd.thrFederatedSharesDaemon.is_alive():
@ -1619,7 +1621,8 @@ def run_federated_shares_watchdog(project_version: str, httpd) -> None:
print('THREAD: restarting federated shares watchdog') print('THREAD: restarting federated shares watchdog')
httpd.thrFederatedSharesDaemon = \ httpd.thrFederatedSharesDaemon = \
federated_shares_original.clone(run_federated_shares_daemon) federated_shares_original.clone(run_federated_shares_daemon)
httpd.thrFederatedSharesDaemon.start() begin_thread(httpd.thrFederatedSharesDaemon,
'run_federated_shares_watchdog 2')
print('Restarting federated shares daemon...') print('Restarting federated shares daemon...')

View File

@ -11,6 +11,7 @@ import threading
import sys import sys
import time import time
import datetime import datetime
from socket import error as SocketError
class thread_with_trace(threading.Thread): class thread_with_trace(threading.Thread):
@ -163,3 +164,22 @@ def remove_dormant_threads(base_dir: str, threads_list: [], debug: bool,
except OSError: except OSError:
print('EX: remove_dormant_threads unable to write ' + print('EX: remove_dormant_threads unable to write ' +
send_log_filename) send_log_filename)
def begin_thread(thread, calling_function: str) -> bool:
"""Start a thread
"""
try:
if not thread.is_alive():
thread.start()
except SocketError as ex:
print('WARN: socket error while starting ' +
'thread. ' + calling_function + ' ' + str(ex))
return False
except ValueError as ex:
print('WARN: value error while starting ' +
'thread. ' + calling_function + ' ' + str(ex))
return False
except BaseException:
pass
return True