diff --git a/daemon.py b/daemon.py index 8eb1651d..6b33b36d 100644 --- a/daemon.py +++ b/daemon.py @@ -166,6 +166,8 @@ from cache import storePersonInCache from cache import getPersonFromCache from httpsig import verifyPostHeaders from theme import setTheme +from schedule import runPostSchedule +from schedule import runPostScheduleWatchdog import os import sys @@ -224,8 +226,7 @@ class PubServer(BaseHTTPRequestHandler): messageId,messageId,None, \ False,None,None,None) if messageJson: - self.postToNickname=nickname - if self._postToOutbox(messageJson,__version__): + if self._postToOutbox(messageJson,__version__,nickname): postFilename= \ locatePost(self.server.baseDir,nickname, \ self.server.domain,messageId) @@ -639,7 +640,7 @@ class PubServer(BaseHTTPRequestHandler): return False return True - def _postToOutbox(self,messageJson: {},version: str) -> bool: + def _postToOutbox(self,messageJson: {},version: str,postToNickname=None) -> bool: """post is received by the outbox Client to server message post https://www.w3.org/TR/activitypub/#client-to-server-outbox-delivery @@ -648,6 +649,8 @@ class PubServer(BaseHTTPRequestHandler): if self.server.debug: print('DEBUG: POST to outbox has no "type" parameter') return False + if postToNickname: + self.postToNickname=postToNickname if not messageJson.get('object') and messageJson.get('content'): if messageJson['type']!='Create': # https://www.w3.org/TR/activitypub/#object-without-create @@ -3746,8 +3749,7 @@ class PubServer(BaseHTTPRequestHandler): if messageJson: if fields['schedulePost']: return 1 - self.postToNickname=nickname - if self._postToOutbox(messageJson,__version__): + if self._postToOutbox(messageJson,__version__,nickname): populateReplies(self.server.baseDir, \ self.server.httpPrefix, \ self.server.domainFull, \ @@ -3774,8 +3776,7 @@ class PubServer(BaseHTTPRequestHandler): if messageJson: if fields['schedulePost']: return 1 - self.postToNickname=nickname - if self._postToOutbox(messageJson,__version__): + if self._postToOutbox(messageJson,__version__,nickname): populateReplies(self.server.baseDir, \ self.server.httpPrefix, \ self.server.domain, \ @@ -3802,8 +3803,7 @@ class PubServer(BaseHTTPRequestHandler): if messageJson: if fields['schedulePost']: return 1 - self.postToNickname=nickname - if self._postToOutbox(messageJson,__version__): + if self._postToOutbox(messageJson,__version__,nickname): populateReplies(self.server.baseDir, \ self.server.httpPrefix, \ self.server.domain, \ @@ -3834,10 +3834,9 @@ class PubServer(BaseHTTPRequestHandler): if messageJson: if fields['schedulePost']: return 1 - self.postToNickname=nickname if self.server.debug: print('DEBUG: new DM to '+str(messageJson['object']['to'])) - if self._postToOutbox(messageJson,__version__): + if self._postToOutbox(messageJson,__version__,nickname): populateReplies(self.server.baseDir, \ self.server.httpPrefix, \ self.server.domain, \ @@ -3866,8 +3865,7 @@ class PubServer(BaseHTTPRequestHandler): self.server.useBlurHash, \ self.server.debug,fields['subject']) if messageJson: - self.postToNickname=nickname - if self._postToOutbox(messageJson,__version__): + if self._postToOutbox(messageJson,__version__,nickname): return 1 else: return -1 @@ -3895,10 +3893,9 @@ class PubServer(BaseHTTPRequestHandler): self.server.useBlurHash, \ fields['subject'],int(fields['duration'])) if messageJson: - self.postToNickname=nickname if self.server.debug: print('DEBUG: new Question') - if self._postToOutbox(messageJson,__version__): + if self._postToOutbox(messageJson,__version__,nickname): return 1 return -1 elif postType=='newshare': @@ -4524,8 +4521,7 @@ class PubServer(BaseHTTPRequestHandler): 'cc': [], 'object': actorJson } - self.postToNickname=nickname - self._postToOutbox(updateActorJson,__version__) + self._postToOutbox(updateActorJson,__version__,nickname) if fields.get('deactivateThisAccount'): if fields['deactivateThisAccount']=='on': deactivateAccount(self.server.baseDir,nickname,self.server.domain) @@ -5826,13 +5822,25 @@ def runDaemon(mediaInstance: bool,maxRecentPosts: int, \ allowDeletion,debug,maxMentions,maxEmoji, \ httpd.translate, \ unitTest,httpd.acceptedCaps),daemon=True) + print('Creating scheduled post thread') + httpd.thrPostSchedule= \ + threadWithTrace(target=runPostSchedule, \ + args=(baseDir,httpd,20),daemon=True) if not unitTest: + print('Creating inbox queue watchdog') httpd.thrWatchdog= \ threadWithTrace(target=runInboxQueueWatchdog, \ args=(projectVersion,httpd),daemon=True) httpd.thrWatchdog.start() + + print('Creating scheduled post watchdog') + httpd.thrWatchdogSchedule= \ + threadWithTrace(target=runPostScheduleWatchdog, \ + args=(projectVersion,httpd),daemon=True) + httpd.thrWatchdogSchedule.start() else: httpd.thrInboxQueue.start() + httpd.thrPostSchedule.start() if clientToServer: print('Running ActivityPub client on ' + domain + ' port ' + str(proxyPort)) diff --git a/posts.py b/posts.py index 9f7da4f8..efc4bdfb 100644 --- a/posts.py +++ b/posts.py @@ -50,6 +50,7 @@ from content import replaceEmojiFromTags from auth import createBasicAuthHeader from config import getConfigParam from blocking import isBlocked +from schedule import addSchedulePost try: from BeautifulSoup import BeautifulSoup except ImportError: @@ -722,11 +723,7 @@ def createPostBase(baseDir: str,nickname: str,domain: str,port: int, \ if eventDate and eventTime: outboxName='scheduled' # add an item to the scheduled post index file - scheduleIndexFile=baseDir+'/accounts/schedule.txt' - scheduleFile=open(scheduleIndexFile, "a+") - if scheduleFile: - scheduleFile.write(eventDateStr+' '+newPostId+'\n') - scheduleFile.close() + addSchedulePost(baseDir,nickname,domain,eventDateStr,postId) else: print('Unable to create scheduled post without date and time values') return newPost diff --git a/schedule.py b/schedule.py new file mode 100644 index 00000000..ce75c989 --- /dev/null +++ b/schedule.py @@ -0,0 +1,144 @@ +__filename__ = "schedule.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.1.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@freedombone.net" +__status__ = "Production" + +import os +import datetime + +def addSchedulePost(baseDir: str,nickname: str,domain: str, \ + eventDateStr: str,postId: str) -> None: + """Adds a scheduled post to the index + """ + handle=nickname+'@'+domain + scheduleIndexFilename=baseDir+'/accounts/'+handle+'/schedule.index' + + indexStr=eventDateStr+' '+postId + if os.path.isfile(scheduleIndexFilename): + if indexStr not in open(scheduleIndexFilename).read(): + try: + with open(scheduleIndexFilename, 'r+') as scheduleFile: + content = scheduleFile.read() + scheduleFile.seek(0, 0) + scheduleFile.write(indexStr+'\n'+content) + if debug: + print('DEBUG: scheduled post added to index') + except Exception as e: + print('WARN: Failed to write entry to scheduled posts index '+ \ + scheduleIndexFilename+' '+str(e)) + else: + scheduleFile=open(scheduleIndexFilename,'w') + if scheduleFile: + scheduleFile.write(indexStr+'\n') + scheduleFile.close() + +def updatePostSchedule(baseDir: str,handle: str,httpd,maxScheduledPosts: int) -> None: + """Checks if posts are due to be delivered and if so moves them to the outbox + """ + scheduleIndexFilename=baseDir+'/accounts/'+handle+'/schedule.index' + if not os.path.isfile(scheduleIndexFilename): + return + + # get the current time as an int + currTime=datetime.datetime.utcnow() + daysSinceEpoch=(currTime - datetime.datetime(1970,1,1)).days + + scheduleDir=baseDir+'/accounts/'+handle+'/scheduled/' + indexLines=[] + deleteSchedulePost=False + nickname=handle.split('@')[0] + with open(scheduleIndexFilename, 'r') as fp: + for line in fp: + if ' ' not in line: + continue + dateStr=line.split(' ')[0] + if 'T' not in dateStr: + continue + postId=line.split(' ',1)[1].replace('\n','') + postFilename=scheduleDir+postId+'.json' + if deleteSchedulePost: + # delete extraneous scheduled posts + if os.path.isfile(postFilename): + os.remove(postFilename) + continue + # create the new index file + indexLines.append(line) + # convert string date to int + postTime= \ + datetime.datetime.strptime(dateStr,"%Y-%m-%dT%H:%M:%S%z") + postDaysSinceEpoch= \ + (postTime - datetime.datetime(1970,1,1)).days + if daysSinceEpoch < postDaysSinceEpoch: + continue + if currTime.time().hour < postTime.time().hour: + continue + if currTime.time().minute < postTime.time().minute: + continue + if not os.path.isfile(postFilename): + indexLines.remove(line) + continue + # load post + postJsonObject=loadJson(postFilename) + if not postJsonObject: + indexLines.remove(line) + continue + + print('Sending scheduled post '+postId) + + if not httpd._postToOutbox(postJsonObject,__version__,nickname): + indexLines.remove(line) + continue + + # move to the outbox + outboxPostFilename= \ + postFilename.replace('/scheduled/','/outbox/') + os.rename(postFilename,outboxPostFilename) + + print('Scheduled post sent '+postId) + + indexLines.remove(line) + if len(indexLines)>maxScheduledPosts: + deleteSchedulePost=True + + # write the new schedule index file + scheduleIndexFile=baseDir+'/accounts/'+handle+'/schedule.index' + scheduleFile=open(scheduleIndexFile, "w+") + if scheduleFile: + for line in indexLines: + scheduleFile.write(line) + scheduleFile.close() + +def runPostSchedule(baseDir: str,httpd,maxScheduledPosts: int): + """Dispatches scheduled posts + """ + while True: + time.sleep(60) + # for each account + for subdir,dirs,files in os.walk(baseDir+'/accounts'): + for account in dirs: + if '@' not in account: + continue + # scheduled posts index for this account + scheduleIndexFilename=baseDir+'/accounts/'+account+'/schedule.index' + if not os.path.isdir(scheduleIndexFilename): + continue + updatePostSchedule(baseDir,account,httpd,maxScheduledPosts) + +def runPostScheduleWatchdog(projectVersion: str,httpd) -> None: + """This tries to keep the scheduled post thread running even if it dies + """ + print('Starting scheduled post watchdog') + postScheduleOriginal= \ + httpd.thrPostSchedule.clone(runPostSchedule) + httpd.thrPostSchedule.start() + while True: + time.sleep(20) + if not httpd.thrPostSchedule.isAlive(): + httpd.thrPostSchedule.kill() + httpd.thrPostSchedule= \ + postScheduleOriginal.clone(runPostSchedule) + httpd.thrPostSchedule.start() + print('Restarting scheduled posts...')