Sending scheduled posts

main
Bob Mottram 2020-01-12 20:13:44 +00:00
parent 768d8fff4e
commit 28ab6b8983
3 changed files with 171 additions and 22 deletions

View File

@ -166,6 +166,8 @@ from cache import storePersonInCache
from cache import getPersonFromCache from cache import getPersonFromCache
from httpsig import verifyPostHeaders from httpsig import verifyPostHeaders
from theme import setTheme from theme import setTheme
from schedule import runPostSchedule
from schedule import runPostScheduleWatchdog
import os import os
import sys import sys
@ -224,8 +226,7 @@ class PubServer(BaseHTTPRequestHandler):
messageId,messageId,None, \ messageId,messageId,None, \
False,None,None,None) False,None,None,None)
if messageJson: if messageJson:
self.postToNickname=nickname if self._postToOutbox(messageJson,__version__,nickname):
if self._postToOutbox(messageJson,__version__):
postFilename= \ postFilename= \
locatePost(self.server.baseDir,nickname, \ locatePost(self.server.baseDir,nickname, \
self.server.domain,messageId) self.server.domain,messageId)
@ -639,7 +640,7 @@ class PubServer(BaseHTTPRequestHandler):
return False return False
return True return True
def _postToOutbox(self,messageJson: {},version: str) -> bool: def _postToOutbox(self,messageJson: {},version: str,postToNickname=None) -> bool:
"""post is received by the outbox """post is received by the outbox
Client to server message post Client to server message post
https://www.w3.org/TR/activitypub/#client-to-server-outbox-delivery https://www.w3.org/TR/activitypub/#client-to-server-outbox-delivery
@ -648,6 +649,8 @@ class PubServer(BaseHTTPRequestHandler):
if self.server.debug: if self.server.debug:
print('DEBUG: POST to outbox has no "type" parameter') print('DEBUG: POST to outbox has no "type" parameter')
return False return False
if postToNickname:
self.postToNickname=postToNickname
if not messageJson.get('object') and messageJson.get('content'): if not messageJson.get('object') and messageJson.get('content'):
if messageJson['type']!='Create': if messageJson['type']!='Create':
# https://www.w3.org/TR/activitypub/#object-without-create # https://www.w3.org/TR/activitypub/#object-without-create
@ -3746,8 +3749,7 @@ class PubServer(BaseHTTPRequestHandler):
if messageJson: if messageJson:
if fields['schedulePost']: if fields['schedulePost']:
return 1 return 1
self.postToNickname=nickname if self._postToOutbox(messageJson,__version__,nickname):
if self._postToOutbox(messageJson,__version__):
populateReplies(self.server.baseDir, \ populateReplies(self.server.baseDir, \
self.server.httpPrefix, \ self.server.httpPrefix, \
self.server.domainFull, \ self.server.domainFull, \
@ -3774,8 +3776,7 @@ class PubServer(BaseHTTPRequestHandler):
if messageJson: if messageJson:
if fields['schedulePost']: if fields['schedulePost']:
return 1 return 1
self.postToNickname=nickname if self._postToOutbox(messageJson,__version__,nickname):
if self._postToOutbox(messageJson,__version__):
populateReplies(self.server.baseDir, \ populateReplies(self.server.baseDir, \
self.server.httpPrefix, \ self.server.httpPrefix, \
self.server.domain, \ self.server.domain, \
@ -3802,8 +3803,7 @@ class PubServer(BaseHTTPRequestHandler):
if messageJson: if messageJson:
if fields['schedulePost']: if fields['schedulePost']:
return 1 return 1
self.postToNickname=nickname if self._postToOutbox(messageJson,__version__,nickname):
if self._postToOutbox(messageJson,__version__):
populateReplies(self.server.baseDir, \ populateReplies(self.server.baseDir, \
self.server.httpPrefix, \ self.server.httpPrefix, \
self.server.domain, \ self.server.domain, \
@ -3834,10 +3834,9 @@ class PubServer(BaseHTTPRequestHandler):
if messageJson: if messageJson:
if fields['schedulePost']: if fields['schedulePost']:
return 1 return 1
self.postToNickname=nickname
if self.server.debug: if self.server.debug:
print('DEBUG: new DM to '+str(messageJson['object']['to'])) print('DEBUG: new DM to '+str(messageJson['object']['to']))
if self._postToOutbox(messageJson,__version__): if self._postToOutbox(messageJson,__version__,nickname):
populateReplies(self.server.baseDir, \ populateReplies(self.server.baseDir, \
self.server.httpPrefix, \ self.server.httpPrefix, \
self.server.domain, \ self.server.domain, \
@ -3866,8 +3865,7 @@ class PubServer(BaseHTTPRequestHandler):
self.server.useBlurHash, \ self.server.useBlurHash, \
self.server.debug,fields['subject']) self.server.debug,fields['subject'])
if messageJson: if messageJson:
self.postToNickname=nickname if self._postToOutbox(messageJson,__version__,nickname):
if self._postToOutbox(messageJson,__version__):
return 1 return 1
else: else:
return -1 return -1
@ -3895,10 +3893,9 @@ class PubServer(BaseHTTPRequestHandler):
self.server.useBlurHash, \ self.server.useBlurHash, \
fields['subject'],int(fields['duration'])) fields['subject'],int(fields['duration']))
if messageJson: if messageJson:
self.postToNickname=nickname
if self.server.debug: if self.server.debug:
print('DEBUG: new Question') print('DEBUG: new Question')
if self._postToOutbox(messageJson,__version__): if self._postToOutbox(messageJson,__version__,nickname):
return 1 return 1
return -1 return -1
elif postType=='newshare': elif postType=='newshare':
@ -4524,8 +4521,7 @@ class PubServer(BaseHTTPRequestHandler):
'cc': [], 'cc': [],
'object': actorJson 'object': actorJson
} }
self.postToNickname=nickname self._postToOutbox(updateActorJson,__version__,nickname)
self._postToOutbox(updateActorJson,__version__)
if fields.get('deactivateThisAccount'): if fields.get('deactivateThisAccount'):
if fields['deactivateThisAccount']=='on': if fields['deactivateThisAccount']=='on':
deactivateAccount(self.server.baseDir,nickname,self.server.domain) deactivateAccount(self.server.baseDir,nickname,self.server.domain)
@ -5826,13 +5822,25 @@ def runDaemon(mediaInstance: bool,maxRecentPosts: int, \
allowDeletion,debug,maxMentions,maxEmoji, \ allowDeletion,debug,maxMentions,maxEmoji, \
httpd.translate, \ httpd.translate, \
unitTest,httpd.acceptedCaps),daemon=True) unitTest,httpd.acceptedCaps),daemon=True)
print('Creating scheduled post thread')
httpd.thrPostSchedule= \
threadWithTrace(target=runPostSchedule, \
args=(baseDir,httpd,20),daemon=True)
if not unitTest: if not unitTest:
print('Creating inbox queue watchdog')
httpd.thrWatchdog= \ httpd.thrWatchdog= \
threadWithTrace(target=runInboxQueueWatchdog, \ threadWithTrace(target=runInboxQueueWatchdog, \
args=(projectVersion,httpd),daemon=True) args=(projectVersion,httpd),daemon=True)
httpd.thrWatchdog.start() httpd.thrWatchdog.start()
print('Creating scheduled post watchdog')
httpd.thrWatchdogSchedule= \
threadWithTrace(target=runPostScheduleWatchdog, \
args=(projectVersion,httpd),daemon=True)
httpd.thrWatchdogSchedule.start()
else: else:
httpd.thrInboxQueue.start() httpd.thrInboxQueue.start()
httpd.thrPostSchedule.start()
if clientToServer: if clientToServer:
print('Running ActivityPub client on ' + domain + ' port ' + str(proxyPort)) print('Running ActivityPub client on ' + domain + ' port ' + str(proxyPort))

View File

@ -50,6 +50,7 @@ from content import replaceEmojiFromTags
from auth import createBasicAuthHeader from auth import createBasicAuthHeader
from config import getConfigParam from config import getConfigParam
from blocking import isBlocked from blocking import isBlocked
from schedule import addSchedulePost
try: try:
from BeautifulSoup import BeautifulSoup from BeautifulSoup import BeautifulSoup
except ImportError: except ImportError:
@ -722,11 +723,7 @@ def createPostBase(baseDir: str,nickname: str,domain: str,port: int, \
if eventDate and eventTime: if eventDate and eventTime:
outboxName='scheduled' outboxName='scheduled'
# add an item to the scheduled post index file # add an item to the scheduled post index file
scheduleIndexFile=baseDir+'/accounts/schedule.txt' addSchedulePost(baseDir,nickname,domain,eventDateStr,postId)
scheduleFile=open(scheduleIndexFile, "a+")
if scheduleFile:
scheduleFile.write(eventDateStr+' '+newPostId+'\n')
scheduleFile.close()
else: else:
print('Unable to create scheduled post without date and time values') print('Unable to create scheduled post without date and time values')
return newPost return newPost

144
schedule.py 100644
View File

@ -0,0 +1,144 @@
__filename__ = "schedule.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
import datetime
def addSchedulePost(baseDir: str,nickname: str,domain: str, \
eventDateStr: str,postId: str) -> None:
"""Adds a scheduled post to the index
"""
handle=nickname+'@'+domain
scheduleIndexFilename=baseDir+'/accounts/'+handle+'/schedule.index'
indexStr=eventDateStr+' '+postId
if os.path.isfile(scheduleIndexFilename):
if indexStr not in open(scheduleIndexFilename).read():
try:
with open(scheduleIndexFilename, 'r+') as scheduleFile:
content = scheduleFile.read()
scheduleFile.seek(0, 0)
scheduleFile.write(indexStr+'\n'+content)
if debug:
print('DEBUG: scheduled post added to index')
except Exception as e:
print('WARN: Failed to write entry to scheduled posts index '+ \
scheduleIndexFilename+' '+str(e))
else:
scheduleFile=open(scheduleIndexFilename,'w')
if scheduleFile:
scheduleFile.write(indexStr+'\n')
scheduleFile.close()
def updatePostSchedule(baseDir: str,handle: str,httpd,maxScheduledPosts: int) -> None:
"""Checks if posts are due to be delivered and if so moves them to the outbox
"""
scheduleIndexFilename=baseDir+'/accounts/'+handle+'/schedule.index'
if not os.path.isfile(scheduleIndexFilename):
return
# get the current time as an int
currTime=datetime.datetime.utcnow()
daysSinceEpoch=(currTime - datetime.datetime(1970,1,1)).days
scheduleDir=baseDir+'/accounts/'+handle+'/scheduled/'
indexLines=[]
deleteSchedulePost=False
nickname=handle.split('@')[0]
with open(scheduleIndexFilename, 'r') as fp:
for line in fp:
if ' ' not in line:
continue
dateStr=line.split(' ')[0]
if 'T' not in dateStr:
continue
postId=line.split(' ',1)[1].replace('\n','')
postFilename=scheduleDir+postId+'.json'
if deleteSchedulePost:
# delete extraneous scheduled posts
if os.path.isfile(postFilename):
os.remove(postFilename)
continue
# create the new index file
indexLines.append(line)
# convert string date to int
postTime= \
datetime.datetime.strptime(dateStr,"%Y-%m-%dT%H:%M:%S%z")
postDaysSinceEpoch= \
(postTime - datetime.datetime(1970,1,1)).days
if daysSinceEpoch < postDaysSinceEpoch:
continue
if currTime.time().hour < postTime.time().hour:
continue
if currTime.time().minute < postTime.time().minute:
continue
if not os.path.isfile(postFilename):
indexLines.remove(line)
continue
# load post
postJsonObject=loadJson(postFilename)
if not postJsonObject:
indexLines.remove(line)
continue
print('Sending scheduled post '+postId)
if not httpd._postToOutbox(postJsonObject,__version__,nickname):
indexLines.remove(line)
continue
# move to the outbox
outboxPostFilename= \
postFilename.replace('/scheduled/','/outbox/')
os.rename(postFilename,outboxPostFilename)
print('Scheduled post sent '+postId)
indexLines.remove(line)
if len(indexLines)>maxScheduledPosts:
deleteSchedulePost=True
# write the new schedule index file
scheduleIndexFile=baseDir+'/accounts/'+handle+'/schedule.index'
scheduleFile=open(scheduleIndexFile, "w+")
if scheduleFile:
for line in indexLines:
scheduleFile.write(line)
scheduleFile.close()
def runPostSchedule(baseDir: str,httpd,maxScheduledPosts: int):
"""Dispatches scheduled posts
"""
while True:
time.sleep(60)
# for each account
for subdir,dirs,files in os.walk(baseDir+'/accounts'):
for account in dirs:
if '@' not in account:
continue
# scheduled posts index for this account
scheduleIndexFilename=baseDir+'/accounts/'+account+'/schedule.index'
if not os.path.isdir(scheduleIndexFilename):
continue
updatePostSchedule(baseDir,account,httpd,maxScheduledPosts)
def runPostScheduleWatchdog(projectVersion: str,httpd) -> None:
"""This tries to keep the scheduled post thread running even if it dies
"""
print('Starting scheduled post watchdog')
postScheduleOriginal= \
httpd.thrPostSchedule.clone(runPostSchedule)
httpd.thrPostSchedule.start()
while True:
time.sleep(20)
if not httpd.thrPostSchedule.isAlive():
httpd.thrPostSchedule.kill()
httpd.thrPostSchedule= \
postScheduleOriginal.clone(runPostSchedule)
httpd.thrPostSchedule.start()
print('Restarting scheduled posts...')