main
Bob Mottram 2020-04-16 10:49:57 +01:00
parent d81c985e44
commit d21b584f78
1 changed files with 386 additions and 386 deletions

772
inbox.py
View File

@ -2249,403 +2249,403 @@ def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int,
if queueRestoreCtr >= 30: if queueRestoreCtr >= 30:
queueRestoreCtr = 0 queueRestoreCtr = 0
restoreQueueItems(baseDir, queue) restoreQueueItems(baseDir, queue)
else: continue
currTime = int(time.time()) currTime = int(time.time())
# recreate the session periodically # recreate the session periodically
if not session or currTime - sessionLastUpdate > 1200: if not session or currTime - sessionLastUpdate > 1200:
print('Creating inbox session') print('Creating inbox session')
session = createSession(useTor) session = createSession(useTor)
sessionLastUpdate = currTime sessionLastUpdate = currTime
# oldest item first # oldest item first
queue.sort() queue.sort()
queueFilename = queue[0] queueFilename = queue[0]
if not os.path.isfile(queueFilename): if not os.path.isfile(queueFilename):
print("Queue: queue item rejected because it has no file: " + print("Queue: queue item rejected because it has no file: " +
queueFilename) queueFilename)
if len(queue) > 0: if len(queue) > 0:
queue.pop(0) queue.pop(0)
continue continue
print('Loading queue item ' + queueFilename) print('Loading queue item ' + queueFilename)
# Load the queue json # Load the queue json
queueJson = loadJson(queueFilename, 1) queueJson = loadJson(queueFilename, 1)
if not queueJson: if not queueJson:
print('Queue: runInboxQueue failed to load inbox queue item ' + print('Queue: runInboxQueue failed to load inbox queue item ' +
queueFilename) queueFilename)
# Assume that the file is probably corrupt/unreadable # Assume that the file is probably corrupt/unreadable
if len(queue) > 0: if len(queue) > 0:
queue.pop(0) queue.pop(0)
# delete the queue file # delete the queue file
if os.path.isfile(queueFilename): if os.path.isfile(queueFilename):
try: try:
os.remove(queueFilename)
except BaseException:
pass
continue
# clear the daily quotas for maximum numbers of received posts
if currTime-quotasLastUpdateDaily > 60 * 60 * 24:
quotasDaily = {
'domains': {},
'accounts': {}
}
quotasLastUpdateDaily = currTime
# clear the per minute quotas for maximum numbers of received posts
if currTime-quotasLastUpdatePerMin > 60:
quotasPerMin = {
'domains': {},
'accounts': {}
}
quotasLastUpdatePerMin = currTime
# limit the number of posts which can arrive per domain per day
postDomain = queueJson['postDomain']
if postDomain:
if domainMaxPostsPerDay > 0:
if quotasDaily['domains'].get(postDomain):
if quotasDaily['domains'][postDomain] > \
domainMaxPostsPerDay:
print('Queue: Quota per day - Maximum posts for ' +
postDomain + ' reached (' +
str(domainMaxPostsPerDay) + ')')
if len(queue) > 0:
try:
os.remove(queueFilename)
except BaseException:
pass
queue.pop(0)
continue
quotasDaily['domains'][postDomain] += 1
else:
quotasDaily['domains'][postDomain] = 1
if quotasPerMin['domains'].get(postDomain):
domainMaxPostsPerMin = \
int(domainMaxPostsPerDay / (24 * 60))
if domainMaxPostsPerMin < 10:
domainMaxPostsPerMin = 10
if quotasPerMin['domains'][postDomain] > \
domainMaxPostsPerMin:
print('Queue: Quota per min - Maximum posts for ' +
postDomain + ' reached (' +
str(domainMaxPostsPerMin) + ')')
if len(queue) > 0:
try:
os.remove(queueFilename)
except BaseException:
pass
queue.pop(0)
continue
quotasPerMin['domains'][postDomain] += 1
else:
quotasPerMin['domains'][postDomain] = 1
if accountMaxPostsPerDay > 0:
postHandle = queueJson['postNickname'] + '@' + postDomain
if quotasDaily['accounts'].get(postHandle):
if quotasDaily['accounts'][postHandle] > \
accountMaxPostsPerDay:
print('Queue: Quota account posts per day -' +
' Maximum posts for ' +
postHandle + ' reached (' +
str(accountMaxPostsPerDay) + ')')
if len(queue) > 0:
try:
os.remove(queueFilename)
except BaseException:
pass
queue.pop(0)
continue
quotasDaily['accounts'][postHandle] += 1
else:
quotasDaily['accounts'][postHandle] = 1
if quotasPerMin['accounts'].get(postHandle):
accountMaxPostsPerMin = \
int(accountMaxPostsPerDay / (24 * 60))
if accountMaxPostsPerMin < 10:
accountMaxPostsPerMin = 10
if quotasPerMin['accounts'][postHandle] > \
accountMaxPostsPerMin:
print('Queue: Quota account posts per min -' +
' Maximum posts for ' +
postHandle + ' reached (' +
str(accountMaxPostsPerMin) + ')')
if len(queue) > 0:
try:
os.remove(queueFilename)
except BaseException:
pass
queue.pop(0)
continue
quotasPerMin['accounts'][postHandle] += 1
else:
quotasPerMin['accounts'][postHandle] = 1
if debug:
if accountMaxPostsPerDay > 0 or domainMaxPostsPerDay > 0:
pprint(quotasDaily)
print('Obtaining public key for actor ' + queueJson['actor'])
# Try a few times to obtain the public key
pubKey = None
keyId = None
for tries in range(8):
keyId = None
signatureParams = \
queueJson['httpHeaders']['signature'].split(',')
for signatureItem in signatureParams:
if signatureItem.startswith('keyId='):
if '"' in signatureItem:
keyId = signatureItem.split('"')[1]
break
if not keyId:
print('Queue: No keyId in signature: ' +
queueJson['httpHeaders']['signature'])
pubKey = None
break
pubKey = \
getPersonPubKey(baseDir, session, keyId,
personCache, debug,
projectVersion, httpPrefix,
domain, onionDomain)
if pubKey:
if debug:
print('DEBUG: public key: ' + str(pubKey))
break
if debug:
print('DEBUG: Retry ' + str(tries+1) +
' obtaining public key for ' + keyId)
time.sleep(5)
if not pubKey:
print('Queue: public key could not be obtained from ' + keyId)
if os.path.isfile(queueFilename):
os.remove(queueFilename) os.remove(queueFilename)
if len(queue) > 0: except BaseException:
queue.pop(0) pass
continue continue
# check the signature # clear the daily quotas for maximum numbers of received posts
if debug: if currTime-quotasLastUpdateDaily > 60 * 60 * 24:
print('DEBUG: checking http headers') quotasDaily = {
pprint(queueJson['httpHeaders']) 'domains': {},
if not verifyPostHeaders(httpPrefix, 'accounts': {}
pubKey, }
queueJson['httpHeaders'], quotasLastUpdateDaily = currTime
queueJson['path'], False,
queueJson['digest'],
json.dumps(queueJson['post']),
debug):
print('Queue: Header signature check failed')
pprint(queueJson['httpHeaders'])
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
if debug: # clear the per minute quotas for maximum numbers of received posts
print('DEBUG: Signature check success') if currTime-quotasLastUpdatePerMin > 60:
quotasPerMin = {
'domains': {},
'accounts': {}
}
quotasLastUpdatePerMin = currTime
# set the id to the same as the post filename # limit the number of posts which can arrive per domain per day
# This makes the filename and the id consistent postDomain = queueJson['postDomain']
# if queueJson['post'].get('id'): if postDomain:
# queueJson['post']['id']=queueJson['id'] if domainMaxPostsPerDay > 0:
if quotasDaily['domains'].get(postDomain):
if receiveUndo(session, if quotasDaily['domains'][postDomain] > \
baseDir, httpPrefix, port, domainMaxPostsPerDay:
sendThreads, postLog, print('Queue: Quota per day - Maximum posts for ' +
cachedWebfingers, postDomain + ' reached (' +
personCache, str(domainMaxPostsPerDay) + ')')
queueJson['post'], if len(queue) > 0:
federationList, try:
debug, os.remove(queueFilename)
acceptedCaps=["inbox:write", "objects:read"]): except BaseException:
print('Queue: Undo accepted from ' + keyId) pass
if os.path.isfile(queueFilename): queue.pop(0)
os.remove(queueFilename) continue
if len(queue) > 0: quotasDaily['domains'][postDomain] += 1
queue.pop(0)
continue
if debug:
print('DEBUG: checking for follow requests')
if receiveFollowRequest(session,
baseDir, httpPrefix, port,
sendThreads, postLog,
cachedWebfingers,
personCache,
queueJson['post'],
federationList,
debug, projectVersion,
acceptedCaps=["inbox:write",
"objects:read"]):
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
print('Queue: Follow activity for ' + keyId +
' removed from accepted from queue')
continue
else:
if debug:
print('DEBUG: No follow requests')
if receiveAcceptReject(session,
baseDir, httpPrefix, domain, port,
sendThreads, postLog,
cachedWebfingers, personCache,
queueJson['post'],
federationList, debug):
print('Queue: Accept/Reject received from ' + keyId)
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
if receiveUpdate(recentPostsCache, session,
baseDir, httpPrefix,
domain, port,
sendThreads, postLog,
cachedWebfingers,
personCache,
queueJson['post'],
federationList,
queueJson['postNickname'],
debug):
print('Queue: Update accepted from ' + keyId)
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
# get recipients list
recipientsDict, recipientsDictFollowers = \
inboxPostRecipients(baseDir, queueJson['post'],
httpPrefix, domain, port, debug)
if len(recipientsDict.items()) == 0 and \
len(recipientsDictFollowers.items()) == 0:
print('Queue: no recipients were resolved ' +
'for post arriving in inbox')
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
# if there are only a small number of followers then
# process them as if they were specifically
# addresses to particular accounts
noOfFollowItems = len(recipientsDictFollowers.items())
if noOfFollowItems > 0:
# always deliver to individual inboxes
if noOfFollowItems < 999999:
if debug:
print('DEBUG: moving ' + str(noOfFollowItems) +
' inbox posts addressed to followers')
for handle, postItem in recipientsDictFollowers.items():
recipientsDict[handle] = postItem
recipientsDictFollowers = {}
# recipientsList = [recipientsDict, recipientsDictFollowers]
if debug:
print('*************************************')
print('Resolved recipients list:')
pprint(recipientsDict)
print('Resolved followers list:')
pprint(recipientsDictFollowers)
print('*************************************')
if queueJson['post'].get('capability'):
if not isinstance(queueJson['post']['capability'], list):
print('Queue: capability on post should be a list')
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
# Copy any posts addressed to followers into the shared inbox
# this avoid copying file multiple times to potentially many
# individual inboxes
# This obviously bypasses object capabilities and so
# any checking will needs to be handled at the time when inbox
# GET happens on individual accounts.
# See posts.py/createBoxBase
if len(recipientsDictFollowers) > 0:
sharedInboxPostFilename = \
queueJson['destination'].replace(inboxHandle, inboxHandle)
if not os.path.isfile(sharedInboxPostFilename):
saveJson(queueJson['post'], sharedInboxPostFilename)
# for posts addressed to specific accounts
for handle, capsId in recipientsDict.items():
destination = \
queueJson['destination'].replace(inboxHandle, handle)
# check that capabilities are accepted
if queueJson['post'].get('capability'):
capabilityIdList = queueJson['post']['capability']
# does the capability id list within the post
# contain the id of the recipient with this handle?
# Here the capability id begins with the handle,
# so this could also be matched separately, but it's
# probably not necessary
if capsId in capabilityIdList:
inboxAfterCapabilities(recentPostsCache,
maxRecentPosts,
session, keyId, handle,
queueJson['post'],
baseDir, httpPrefix,
sendThreads, postLog,
cachedWebfingers,
personCache, queue,
domain, onionDomain,
port, useTor,
federationList, ocapAlways,
debug, acceptedCaps,
queueFilename, destination,
maxReplies, allowDeletion,
maxMentions, maxEmoji,
translate, unitTest)
else:
print('Queue: object capabilities check has failed')
if debug:
pprint(queueJson['post'])
else: else:
if not ocapAlways: quotasDaily['domains'][postDomain] = 1
inboxAfterCapabilities(recentPostsCache,
maxRecentPosts,
session, keyId, handle,
queueJson['post'],
baseDir, httpPrefix,
sendThreads, postLog,
cachedWebfingers,
personCache, queue,
domain, onionDomain,
port, useTor,
federationList, ocapAlways,
debug, acceptedCaps,
queueFilename, destination,
maxReplies, allowDeletion,
maxMentions, maxEmoji,
translate, unitTest)
if debug:
pprint(queueJson['post'])
print('No capability list within post')
print('ocapAlways: ' + str(ocapAlways))
print('Queue: Queue post accepted') if quotasPerMin['domains'].get(postDomain):
domainMaxPostsPerMin = \
int(domainMaxPostsPerDay / (24 * 60))
if domainMaxPostsPerMin < 10:
domainMaxPostsPerMin = 10
if quotasPerMin['domains'][postDomain] > \
domainMaxPostsPerMin:
print('Queue: Quota per min - Maximum posts for ' +
postDomain + ' reached (' +
str(domainMaxPostsPerMin) + ')')
if len(queue) > 0:
try:
os.remove(queueFilename)
except BaseException:
pass
queue.pop(0)
continue
quotasPerMin['domains'][postDomain] += 1
else:
quotasPerMin['domains'][postDomain] = 1
if accountMaxPostsPerDay > 0:
postHandle = queueJson['postNickname'] + '@' + postDomain
if quotasDaily['accounts'].get(postHandle):
if quotasDaily['accounts'][postHandle] > \
accountMaxPostsPerDay:
print('Queue: Quota account posts per day -' +
' Maximum posts for ' +
postHandle + ' reached (' +
str(accountMaxPostsPerDay) + ')')
if len(queue) > 0:
try:
os.remove(queueFilename)
except BaseException:
pass
queue.pop(0)
continue
quotasDaily['accounts'][postHandle] += 1
else:
quotasDaily['accounts'][postHandle] = 1
if quotasPerMin['accounts'].get(postHandle):
accountMaxPostsPerMin = \
int(accountMaxPostsPerDay / (24 * 60))
if accountMaxPostsPerMin < 10:
accountMaxPostsPerMin = 10
if quotasPerMin['accounts'][postHandle] > \
accountMaxPostsPerMin:
print('Queue: Quota account posts per min -' +
' Maximum posts for ' +
postHandle + ' reached (' +
str(accountMaxPostsPerMin) + ')')
if len(queue) > 0:
try:
os.remove(queueFilename)
except BaseException:
pass
queue.pop(0)
continue
quotasPerMin['accounts'][postHandle] += 1
else:
quotasPerMin['accounts'][postHandle] = 1
if debug:
if accountMaxPostsPerDay > 0 or domainMaxPostsPerDay > 0:
pprint(quotasDaily)
print('Obtaining public key for actor ' + queueJson['actor'])
# Try a few times to obtain the public key
pubKey = None
keyId = None
for tries in range(8):
keyId = None
signatureParams = \
queueJson['httpHeaders']['signature'].split(',')
for signatureItem in signatureParams:
if signatureItem.startswith('keyId='):
if '"' in signatureItem:
keyId = signatureItem.split('"')[1]
break
if not keyId:
print('Queue: No keyId in signature: ' +
queueJson['httpHeaders']['signature'])
pubKey = None
break
pubKey = \
getPersonPubKey(baseDir, session, keyId,
personCache, debug,
projectVersion, httpPrefix,
domain, onionDomain)
if pubKey:
if debug:
print('DEBUG: public key: ' + str(pubKey))
break
if debug:
print('DEBUG: Retry ' + str(tries+1) +
' obtaining public key for ' + keyId)
time.sleep(5)
if not pubKey:
print('Queue: public key could not be obtained from ' + keyId)
if os.path.isfile(queueFilename): if os.path.isfile(queueFilename):
os.remove(queueFilename) os.remove(queueFilename)
if len(queue) > 0: if len(queue) > 0:
queue.pop(0) queue.pop(0)
continue
# check the signature
if debug:
print('DEBUG: checking http headers')
pprint(queueJson['httpHeaders'])
if not verifyPostHeaders(httpPrefix,
pubKey,
queueJson['httpHeaders'],
queueJson['path'], False,
queueJson['digest'],
json.dumps(queueJson['post']),
debug):
print('Queue: Header signature check failed')
pprint(queueJson['httpHeaders'])
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
if debug:
print('DEBUG: Signature check success')
# set the id to the same as the post filename
# This makes the filename and the id consistent
# if queueJson['post'].get('id'):
# queueJson['post']['id']=queueJson['id']
if receiveUndo(session,
baseDir, httpPrefix, port,
sendThreads, postLog,
cachedWebfingers,
personCache,
queueJson['post'],
federationList,
debug,
acceptedCaps=["inbox:write", "objects:read"]):
print('Queue: Undo accepted from ' + keyId)
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
if debug:
print('DEBUG: checking for follow requests')
if receiveFollowRequest(session,
baseDir, httpPrefix, port,
sendThreads, postLog,
cachedWebfingers,
personCache,
queueJson['post'],
federationList,
debug, projectVersion,
acceptedCaps=["inbox:write",
"objects:read"]):
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
print('Queue: Follow activity for ' + keyId +
' removed from accepted from queue')
continue
else:
if debug:
print('DEBUG: No follow requests')
if receiveAcceptReject(session,
baseDir, httpPrefix, domain, port,
sendThreads, postLog,
cachedWebfingers, personCache,
queueJson['post'],
federationList, debug):
print('Queue: Accept/Reject received from ' + keyId)
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
if receiveUpdate(recentPostsCache, session,
baseDir, httpPrefix,
domain, port,
sendThreads, postLog,
cachedWebfingers,
personCache,
queueJson['post'],
federationList,
queueJson['postNickname'],
debug):
print('Queue: Update accepted from ' + keyId)
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
# get recipients list
recipientsDict, recipientsDictFollowers = \
inboxPostRecipients(baseDir, queueJson['post'],
httpPrefix, domain, port, debug)
if len(recipientsDict.items()) == 0 and \
len(recipientsDictFollowers.items()) == 0:
print('Queue: no recipients were resolved ' +
'for post arriving in inbox')
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
# if there are only a small number of followers then
# process them as if they were specifically
# addresses to particular accounts
noOfFollowItems = len(recipientsDictFollowers.items())
if noOfFollowItems > 0:
# always deliver to individual inboxes
if noOfFollowItems < 999999:
if debug:
print('DEBUG: moving ' + str(noOfFollowItems) +
' inbox posts addressed to followers')
for handle, postItem in recipientsDictFollowers.items():
recipientsDict[handle] = postItem
recipientsDictFollowers = {}
# recipientsList = [recipientsDict, recipientsDictFollowers]
if debug:
print('*************************************')
print('Resolved recipients list:')
pprint(recipientsDict)
print('Resolved followers list:')
pprint(recipientsDictFollowers)
print('*************************************')
if queueJson['post'].get('capability'):
if not isinstance(queueJson['post']['capability'], list):
print('Queue: capability on post should be a list')
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)
continue
# Copy any posts addressed to followers into the shared inbox
# this avoid copying file multiple times to potentially many
# individual inboxes
# This obviously bypasses object capabilities and so
# any checking will needs to be handled at the time when inbox
# GET happens on individual accounts.
# See posts.py/createBoxBase
if len(recipientsDictFollowers) > 0:
sharedInboxPostFilename = \
queueJson['destination'].replace(inboxHandle, inboxHandle)
if not os.path.isfile(sharedInboxPostFilename):
saveJson(queueJson['post'], sharedInboxPostFilename)
# for posts addressed to specific accounts
for handle, capsId in recipientsDict.items():
destination = \
queueJson['destination'].replace(inboxHandle, handle)
# check that capabilities are accepted
if queueJson['post'].get('capability'):
capabilityIdList = queueJson['post']['capability']
# does the capability id list within the post
# contain the id of the recipient with this handle?
# Here the capability id begins with the handle,
# so this could also be matched separately, but it's
# probably not necessary
if capsId in capabilityIdList:
inboxAfterCapabilities(recentPostsCache,
maxRecentPosts,
session, keyId, handle,
queueJson['post'],
baseDir, httpPrefix,
sendThreads, postLog,
cachedWebfingers,
personCache, queue,
domain, onionDomain,
port, useTor,
federationList, ocapAlways,
debug, acceptedCaps,
queueFilename, destination,
maxReplies, allowDeletion,
maxMentions, maxEmoji,
translate, unitTest)
else:
print('Queue: object capabilities check has failed')
if debug:
pprint(queueJson['post'])
else:
if not ocapAlways:
inboxAfterCapabilities(recentPostsCache,
maxRecentPosts,
session, keyId, handle,
queueJson['post'],
baseDir, httpPrefix,
sendThreads, postLog,
cachedWebfingers,
personCache, queue,
domain, onionDomain,
port, useTor,
federationList, ocapAlways,
debug, acceptedCaps,
queueFilename, destination,
maxReplies, allowDeletion,
maxMentions, maxEmoji,
translate, unitTest)
if debug:
pprint(queueJson['post'])
print('No capability list within post')
print('ocapAlways: ' + str(ocapAlways))
print('Queue: Queue post accepted')
if os.path.isfile(queueFilename):
os.remove(queueFilename)
if len(queue) > 0:
queue.pop(0)