Use a ring buffer for outbox threads

merge-requests/30/head
Bob Mottram 2021-10-23 12:58:38 +01:00
parent 343c9771bb
commit 2f0f48c346
2 changed files with 125 additions and 40 deletions

View File

@ -296,8 +296,8 @@ from utils import dangerousMarkup
from utils import refreshNewswire from utils import refreshNewswire
from utils import isImageFile from utils import isImageFile
from utils import hasGroupType from utils import hasGroupType
from manualapprove import manualDenyFollowRequest from manualapprove import manualDenyFollowRequestThread
from manualapprove import manualApproveFollowRequest from manualapprove import manualApproveFollowRequestThread
from announce import createAnnounce from announce import createAnnounce
from content import getPriceFromString from content import getPriceFromString
from content import replaceEmojiFromTags from content import replaceEmojiFromTags
@ -1235,6 +1235,37 @@ class PubServer(BaseHTTPRequestHandler):
self.server.CWlists, self.server.CWlists,
self.server.listsEnabled) self.server.listsEnabled)
def _getOutboxThreadIndex(self, nickname: str,
maxOutboxThreadsPerAccount: int) -> int:
"""Returns the outbox thread index for the given account
This is a ring buffer used to store the thread objects which
are sending out posts
"""
accountOutboxThreadName = nickname
if not accountOutboxThreadName:
accountOutboxThreadName = '*'
# create the buffer for the given account
if not self.server.outboxThread.get(accountOutboxThreadName):
self.server.outboxThread[accountOutboxThreadName] = \
[None] * maxOutboxThreadsPerAccount
self.server.outboxThreadIndex[accountOutboxThreadName] = 0
return 0
# increment the ring buffer index
index = self.server.outboxThreadIndex[accountOutboxThreadName] + 1
if index >= maxOutboxThreadsPerAccount:
index = 0
self.server.outboxThreadIndex[accountOutboxThreadName] = index
# remove any existing thread from the current index in the buffer
if self.server.outboxThread.get(accountOutboxThreadName):
acct = accountOutboxThreadName
if self.server.outboxThread[acct][index].is_alive():
self.server.outboxThread[acct][index].kill()
return index
def _postToOutboxThread(self, messageJson: {}) -> bool: def _postToOutboxThread(self, messageJson: {}) -> bool:
"""Creates a thread to send a post """Creates a thread to send a post
""" """
@ -1242,24 +1273,18 @@ class PubServer(BaseHTTPRequestHandler):
if not accountOutboxThreadName: if not accountOutboxThreadName:
accountOutboxThreadName = '*' accountOutboxThreadName = '*'
if self.server.outboxThread.get(accountOutboxThreadName): index = self._getOutboxThreadIndex(accountOutboxThreadName, 8)
print('Waiting for previous outbox thread to end')
waitCtr = 0
thName = accountOutboxThreadName
while self.server.outboxThread[thName].is_alive() and waitCtr < 8:
time.sleep(1)
waitCtr += 1
if waitCtr >= 8:
self.server.outboxThread[accountOutboxThreadName].kill()
print('Creating outbox thread') print('Creating outbox thread ' +
self.server.outboxThread[accountOutboxThreadName] = \ accountOutboxThreadName + '/' +
str(self.server.outboxThreadIndex[accountOutboxThreadName]))
self.server.outboxThread[accountOutboxThreadName][index] = \
threadWithTrace(target=self._postToOutbox, threadWithTrace(target=self._postToOutbox,
args=(messageJson.copy(), args=(messageJson.copy(),
self.server.projectVersion, None), self.server.projectVersion, None),
daemon=True) daemon=True)
print('Starting outbox thread') print('Starting outbox thread')
self.server.outboxThread[accountOutboxThreadName].start() self.server.outboxThread[accountOutboxThreadName][index].start()
return True return True
def _updateInboxQueue(self, nickname: str, messageJson: {}, def _updateInboxQueue(self, nickname: str, messageJson: {},
@ -7126,7 +7151,7 @@ class PubServer(BaseHTTPRequestHandler):
self._404() self._404()
self.server.GETbusy = False self.server.GETbusy = False
return return
manualApproveFollowRequest(self.server.session, manualApproveFollowRequestThread(self.server.session,
baseDir, httpPrefix, baseDir, httpPrefix,
followerNickname, followerNickname,
domain, port, domain, port,
@ -7284,7 +7309,7 @@ class PubServer(BaseHTTPRequestHandler):
followingHandle = \ followingHandle = \
handleNickname + '@' + getFullDomain(handleDomain, handlePort) handleNickname + '@' + getFullDomain(handleDomain, handlePort)
if '@' in followingHandle: if '@' in followingHandle:
manualDenyFollowRequest(self.server.session, manualDenyFollowRequestThread(self.server.session,
baseDir, httpPrefix, baseDir, httpPrefix,
followerNickname, followerNickname,
domain, port, domain, port,
@ -17197,6 +17222,7 @@ def runDaemon(listsEnabled: str,
httpd.registration = False httpd.registration = False
httpd.enableSharedInbox = enableSharedInbox httpd.enableSharedInbox = enableSharedInbox
httpd.outboxThread = {} httpd.outboxThread = {}
httpd.outboxThreadIndex = {}
httpd.newPostThread = {} httpd.newPostThread = {}
httpd.projectVersion = projectVersion httpd.projectVersion = projectVersion
httpd.secureMode = secureMode httpd.secureMode = secureMode

View File

@ -16,6 +16,7 @@ from utils import removeDomainPort
from utils import getPortFromDomain from utils import getPortFromDomain
from utils import getUserPaths from utils import getUserPaths
from utils import acctDir from utils import acctDir
from threads import threadWithTrace
def manualDenyFollowRequest(session, baseDir: str, def manualDenyFollowRequest(session, baseDir: str,
@ -67,6 +68,35 @@ def manualDenyFollowRequest(session, baseDir: str,
print('Follow request from ' + denyHandle + ' was denied.') print('Follow request from ' + denyHandle + ' was denied.')
def manualDenyFollowRequestThread(session, baseDir: str,
httpPrefix: str,
nickname: str, domain: str, port: int,
denyHandle: str,
federationList: [],
sendThreads: [], postLog: [],
cachedWebfingers: {}, personCache: {},
debug: bool,
projectVersion: str,
signingPrivateKeyPem: str) -> None:
"""Manually deny a follow request, within a thread so that the
user interface doesn't lag
"""
thr = \
threadWithTrace(target=manualDenyFollowRequest,
args=(session, baseDir,
httpPrefix,
nickname, domain, port,
denyHandle,
federationList,
sendThreads, postLog,
cachedWebfingers, personCache,
debug,
projectVersion,
signingPrivateKeyPem), daemon=True)
thr.start()
sendThreads.append(thr)
def _approveFollowerHandle(accountDir: str, approveHandle: str) -> None: def _approveFollowerHandle(accountDir: str, approveHandle: str) -> None:
""" Record manually approved handles so that if they unfollow and then """ Record manually approved handles so that if they unfollow and then
re-follow later then they don't need to be manually approved again re-follow later then they don't need to be manually approved again
@ -231,3 +261,32 @@ def manualApproveFollowRequest(session, baseDir: str,
os.remove(approveFollowsFilename + '.new') os.remove(approveFollowsFilename + '.new')
except BaseException: except BaseException:
pass pass
def manualApproveFollowRequestThread(session, baseDir: str,
httpPrefix: str,
nickname: str, domain: str, port: int,
approveHandle: str,
federationList: [],
sendThreads: [], postLog: [],
cachedWebfingers: {}, personCache: {},
debug: bool,
projectVersion: str,
signingPrivateKeyPem: str) -> None:
"""Manually approve a follow request, in a thread so as not to cause
the UI to lag
"""
thr = \
threadWithTrace(target=manualApproveFollowRequest,
args=(session, baseDir,
httpPrefix,
nickname, domain, port,
approveHandle,
federationList,
sendThreads, postLog,
cachedWebfingers, personCache,
debug,
projectVersion,
signingPrivateKeyPem), daemon=True)
thr.start()
sendThreads.append(thr)