From d21b584f786069bb6ee7980046fa5f3aa9e53851 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Thu, 16 Apr 2020 10:49:57 +0100 Subject: [PATCH] Tidying --- inbox.py | 772 +++++++++++++++++++++++++++---------------------------- 1 file changed, 386 insertions(+), 386 deletions(-) diff --git a/inbox.py b/inbox.py index 4d52e7287..3799ed362 100644 --- a/inbox.py +++ b/inbox.py @@ -2249,403 +2249,403 @@ def runInboxQueue(recentPostsCache: {}, maxRecentPosts: int, if queueRestoreCtr >= 30: queueRestoreCtr = 0 restoreQueueItems(baseDir, queue) - else: - currTime = int(time.time()) + continue + currTime = int(time.time()) - # recreate the session periodically - if not session or currTime - sessionLastUpdate > 1200: - print('Creating inbox session') - session = createSession(useTor) - sessionLastUpdate = currTime + # recreate the session periodically + if not session or currTime - sessionLastUpdate > 1200: + print('Creating inbox session') + session = createSession(useTor) + sessionLastUpdate = currTime - # oldest item first - queue.sort() - queueFilename = queue[0] - if not os.path.isfile(queueFilename): - print("Queue: queue item rejected because it has no file: " + - queueFilename) - if len(queue) > 0: - queue.pop(0) - continue + # oldest item first + queue.sort() + queueFilename = queue[0] + if not os.path.isfile(queueFilename): + print("Queue: queue item rejected because it has no file: " + + queueFilename) + if len(queue) > 0: + queue.pop(0) + continue - print('Loading queue item ' + queueFilename) + print('Loading queue item ' + queueFilename) - # Load the queue json - queueJson = loadJson(queueFilename, 1) - if not queueJson: - print('Queue: runInboxQueue failed to load inbox queue item ' + - queueFilename) - # Assume that the file is probably corrupt/unreadable - if len(queue) > 0: - queue.pop(0) - # delete the queue file - if os.path.isfile(queueFilename): - try: - os.remove(queueFilename) - except BaseException: - pass - continue - - # clear the daily quotas for maximum numbers of received posts - if currTime-quotasLastUpdateDaily > 60 * 60 * 24: - quotasDaily = { - 'domains': {}, - 'accounts': {} - } - quotasLastUpdateDaily = currTime - - # clear the per minute quotas for maximum numbers of received posts - if currTime-quotasLastUpdatePerMin > 60: - quotasPerMin = { - 'domains': {}, - 'accounts': {} - } - quotasLastUpdatePerMin = currTime - - # limit the number of posts which can arrive per domain per day - postDomain = queueJson['postDomain'] - if postDomain: - if domainMaxPostsPerDay > 0: - if quotasDaily['domains'].get(postDomain): - if quotasDaily['domains'][postDomain] > \ - domainMaxPostsPerDay: - print('Queue: Quota per day - Maximum posts for ' + - postDomain + ' reached (' + - str(domainMaxPostsPerDay) + ')') - if len(queue) > 0: - try: - os.remove(queueFilename) - except BaseException: - pass - queue.pop(0) - continue - quotasDaily['domains'][postDomain] += 1 - else: - quotasDaily['domains'][postDomain] = 1 - - if quotasPerMin['domains'].get(postDomain): - domainMaxPostsPerMin = \ - int(domainMaxPostsPerDay / (24 * 60)) - if domainMaxPostsPerMin < 10: - domainMaxPostsPerMin = 10 - if quotasPerMin['domains'][postDomain] > \ - domainMaxPostsPerMin: - print('Queue: Quota per min - Maximum posts for ' + - postDomain + ' reached (' + - str(domainMaxPostsPerMin) + ')') - if len(queue) > 0: - try: - os.remove(queueFilename) - except BaseException: - pass - queue.pop(0) - continue - quotasPerMin['domains'][postDomain] += 1 - else: - quotasPerMin['domains'][postDomain] = 1 - - if accountMaxPostsPerDay > 0: - postHandle = queueJson['postNickname'] + '@' + postDomain - if quotasDaily['accounts'].get(postHandle): - if quotasDaily['accounts'][postHandle] > \ - accountMaxPostsPerDay: - print('Queue: Quota account posts per day -' + - ' Maximum posts for ' + - postHandle + ' reached (' + - str(accountMaxPostsPerDay) + ')') - if len(queue) > 0: - try: - os.remove(queueFilename) - except BaseException: - pass - queue.pop(0) - continue - quotasDaily['accounts'][postHandle] += 1 - else: - quotasDaily['accounts'][postHandle] = 1 - - if quotasPerMin['accounts'].get(postHandle): - accountMaxPostsPerMin = \ - int(accountMaxPostsPerDay / (24 * 60)) - if accountMaxPostsPerMin < 10: - accountMaxPostsPerMin = 10 - if quotasPerMin['accounts'][postHandle] > \ - accountMaxPostsPerMin: - print('Queue: Quota account posts per min -' + - ' Maximum posts for ' + - postHandle + ' reached (' + - str(accountMaxPostsPerMin) + ')') - if len(queue) > 0: - try: - os.remove(queueFilename) - except BaseException: - pass - queue.pop(0) - continue - quotasPerMin['accounts'][postHandle] += 1 - else: - quotasPerMin['accounts'][postHandle] = 1 - - if debug: - if accountMaxPostsPerDay > 0 or domainMaxPostsPerDay > 0: - pprint(quotasDaily) - - print('Obtaining public key for actor ' + queueJson['actor']) - - # Try a few times to obtain the public key - pubKey = None - keyId = None - for tries in range(8): - keyId = None - signatureParams = \ - queueJson['httpHeaders']['signature'].split(',') - for signatureItem in signatureParams: - if signatureItem.startswith('keyId='): - if '"' in signatureItem: - keyId = signatureItem.split('"')[1] - break - if not keyId: - print('Queue: No keyId in signature: ' + - queueJson['httpHeaders']['signature']) - pubKey = None - break - - pubKey = \ - getPersonPubKey(baseDir, session, keyId, - personCache, debug, - projectVersion, httpPrefix, - domain, onionDomain) - if pubKey: - if debug: - print('DEBUG: public key: ' + str(pubKey)) - break - - if debug: - print('DEBUG: Retry ' + str(tries+1) + - ' obtaining public key for ' + keyId) - time.sleep(5) - - if not pubKey: - print('Queue: public key could not be obtained from ' + keyId) - if os.path.isfile(queueFilename): + # Load the queue json + queueJson = loadJson(queueFilename, 1) + if not queueJson: + print('Queue: runInboxQueue failed to load inbox queue item ' + + queueFilename) + # Assume that the file is probably corrupt/unreadable + if len(queue) > 0: + queue.pop(0) + # delete the queue file + if os.path.isfile(queueFilename): + try: os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - continue + except BaseException: + pass + continue - # check the signature - if debug: - print('DEBUG: checking http headers') - pprint(queueJson['httpHeaders']) - if not verifyPostHeaders(httpPrefix, - pubKey, - queueJson['httpHeaders'], - queueJson['path'], False, - queueJson['digest'], - json.dumps(queueJson['post']), - debug): - print('Queue: Header signature check failed') - pprint(queueJson['httpHeaders']) - if os.path.isfile(queueFilename): - os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - continue + # clear the daily quotas for maximum numbers of received posts + if currTime-quotasLastUpdateDaily > 60 * 60 * 24: + quotasDaily = { + 'domains': {}, + 'accounts': {} + } + quotasLastUpdateDaily = currTime - if debug: - print('DEBUG: Signature check success') + # clear the per minute quotas for maximum numbers of received posts + if currTime-quotasLastUpdatePerMin > 60: + quotasPerMin = { + 'domains': {}, + 'accounts': {} + } + quotasLastUpdatePerMin = currTime - # set the id to the same as the post filename - # This makes the filename and the id consistent - # if queueJson['post'].get('id'): - # queueJson['post']['id']=queueJson['id'] - - if receiveUndo(session, - baseDir, httpPrefix, port, - sendThreads, postLog, - cachedWebfingers, - personCache, - queueJson['post'], - federationList, - debug, - acceptedCaps=["inbox:write", "objects:read"]): - print('Queue: Undo accepted from ' + keyId) - if os.path.isfile(queueFilename): - os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - continue - - if debug: - print('DEBUG: checking for follow requests') - if receiveFollowRequest(session, - baseDir, httpPrefix, port, - sendThreads, postLog, - cachedWebfingers, - personCache, - queueJson['post'], - federationList, - debug, projectVersion, - acceptedCaps=["inbox:write", - "objects:read"]): - if os.path.isfile(queueFilename): - os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - print('Queue: Follow activity for ' + keyId + - ' removed from accepted from queue') - continue - else: - if debug: - print('DEBUG: No follow requests') - - if receiveAcceptReject(session, - baseDir, httpPrefix, domain, port, - sendThreads, postLog, - cachedWebfingers, personCache, - queueJson['post'], - federationList, debug): - print('Queue: Accept/Reject received from ' + keyId) - if os.path.isfile(queueFilename): - os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - continue - - if receiveUpdate(recentPostsCache, session, - baseDir, httpPrefix, - domain, port, - sendThreads, postLog, - cachedWebfingers, - personCache, - queueJson['post'], - federationList, - queueJson['postNickname'], - debug): - print('Queue: Update accepted from ' + keyId) - if os.path.isfile(queueFilename): - os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - continue - - # get recipients list - recipientsDict, recipientsDictFollowers = \ - inboxPostRecipients(baseDir, queueJson['post'], - httpPrefix, domain, port, debug) - if len(recipientsDict.items()) == 0 and \ - len(recipientsDictFollowers.items()) == 0: - print('Queue: no recipients were resolved ' + - 'for post arriving in inbox') - if os.path.isfile(queueFilename): - os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - continue - - # if there are only a small number of followers then - # process them as if they were specifically - # addresses to particular accounts - noOfFollowItems = len(recipientsDictFollowers.items()) - if noOfFollowItems > 0: - # always deliver to individual inboxes - if noOfFollowItems < 999999: - if debug: - print('DEBUG: moving ' + str(noOfFollowItems) + - ' inbox posts addressed to followers') - for handle, postItem in recipientsDictFollowers.items(): - recipientsDict[handle] = postItem - recipientsDictFollowers = {} -# recipientsList = [recipientsDict, recipientsDictFollowers] - - if debug: - print('*************************************') - print('Resolved recipients list:') - pprint(recipientsDict) - print('Resolved followers list:') - pprint(recipientsDictFollowers) - print('*************************************') - - if queueJson['post'].get('capability'): - if not isinstance(queueJson['post']['capability'], list): - print('Queue: capability on post should be a list') - if os.path.isfile(queueFilename): - os.remove(queueFilename) - if len(queue) > 0: - queue.pop(0) - continue - - # Copy any posts addressed to followers into the shared inbox - # this avoid copying file multiple times to potentially many - # individual inboxes - # This obviously bypasses object capabilities and so - # any checking will needs to be handled at the time when inbox - # GET happens on individual accounts. - # See posts.py/createBoxBase - if len(recipientsDictFollowers) > 0: - sharedInboxPostFilename = \ - queueJson['destination'].replace(inboxHandle, inboxHandle) - if not os.path.isfile(sharedInboxPostFilename): - saveJson(queueJson['post'], sharedInboxPostFilename) - - # for posts addressed to specific accounts - for handle, capsId in recipientsDict.items(): - destination = \ - queueJson['destination'].replace(inboxHandle, handle) - # check that capabilities are accepted - if queueJson['post'].get('capability'): - capabilityIdList = queueJson['post']['capability'] - # does the capability id list within the post - # contain the id of the recipient with this handle? - # Here the capability id begins with the handle, - # so this could also be matched separately, but it's - # probably not necessary - if capsId in capabilityIdList: - inboxAfterCapabilities(recentPostsCache, - maxRecentPosts, - session, keyId, handle, - queueJson['post'], - baseDir, httpPrefix, - sendThreads, postLog, - cachedWebfingers, - personCache, queue, - domain, onionDomain, - port, useTor, - federationList, ocapAlways, - debug, acceptedCaps, - queueFilename, destination, - maxReplies, allowDeletion, - maxMentions, maxEmoji, - translate, unitTest) - else: - print('Queue: object capabilities check has failed') - if debug: - pprint(queueJson['post']) + # limit the number of posts which can arrive per domain per day + postDomain = queueJson['postDomain'] + if postDomain: + if domainMaxPostsPerDay > 0: + if quotasDaily['domains'].get(postDomain): + if quotasDaily['domains'][postDomain] > \ + domainMaxPostsPerDay: + print('Queue: Quota per day - Maximum posts for ' + + postDomain + ' reached (' + + str(domainMaxPostsPerDay) + ')') + if len(queue) > 0: + try: + os.remove(queueFilename) + except BaseException: + pass + queue.pop(0) + continue + quotasDaily['domains'][postDomain] += 1 else: - if not ocapAlways: - inboxAfterCapabilities(recentPostsCache, - maxRecentPosts, - session, keyId, handle, - queueJson['post'], - baseDir, httpPrefix, - sendThreads, postLog, - cachedWebfingers, - personCache, queue, - domain, onionDomain, - port, useTor, - federationList, ocapAlways, - debug, acceptedCaps, - queueFilename, destination, - maxReplies, allowDeletion, - maxMentions, maxEmoji, - translate, unitTest) - if debug: - pprint(queueJson['post']) - print('No capability list within post') - print('ocapAlways: ' + str(ocapAlways)) + quotasDaily['domains'][postDomain] = 1 - print('Queue: Queue post accepted') + if quotasPerMin['domains'].get(postDomain): + domainMaxPostsPerMin = \ + int(domainMaxPostsPerDay / (24 * 60)) + if domainMaxPostsPerMin < 10: + domainMaxPostsPerMin = 10 + if quotasPerMin['domains'][postDomain] > \ + domainMaxPostsPerMin: + print('Queue: Quota per min - Maximum posts for ' + + postDomain + ' reached (' + + str(domainMaxPostsPerMin) + ')') + if len(queue) > 0: + try: + os.remove(queueFilename) + except BaseException: + pass + queue.pop(0) + continue + quotasPerMin['domains'][postDomain] += 1 + else: + quotasPerMin['domains'][postDomain] = 1 + + if accountMaxPostsPerDay > 0: + postHandle = queueJson['postNickname'] + '@' + postDomain + if quotasDaily['accounts'].get(postHandle): + if quotasDaily['accounts'][postHandle] > \ + accountMaxPostsPerDay: + print('Queue: Quota account posts per day -' + + ' Maximum posts for ' + + postHandle + ' reached (' + + str(accountMaxPostsPerDay) + ')') + if len(queue) > 0: + try: + os.remove(queueFilename) + except BaseException: + pass + queue.pop(0) + continue + quotasDaily['accounts'][postHandle] += 1 + else: + quotasDaily['accounts'][postHandle] = 1 + + if quotasPerMin['accounts'].get(postHandle): + accountMaxPostsPerMin = \ + int(accountMaxPostsPerDay / (24 * 60)) + if accountMaxPostsPerMin < 10: + accountMaxPostsPerMin = 10 + if quotasPerMin['accounts'][postHandle] > \ + accountMaxPostsPerMin: + print('Queue: Quota account posts per min -' + + ' Maximum posts for ' + + postHandle + ' reached (' + + str(accountMaxPostsPerMin) + ')') + if len(queue) > 0: + try: + os.remove(queueFilename) + except BaseException: + pass + queue.pop(0) + continue + quotasPerMin['accounts'][postHandle] += 1 + else: + quotasPerMin['accounts'][postHandle] = 1 + + if debug: + if accountMaxPostsPerDay > 0 or domainMaxPostsPerDay > 0: + pprint(quotasDaily) + + print('Obtaining public key for actor ' + queueJson['actor']) + + # Try a few times to obtain the public key + pubKey = None + keyId = None + for tries in range(8): + keyId = None + signatureParams = \ + queueJson['httpHeaders']['signature'].split(',') + for signatureItem in signatureParams: + if signatureItem.startswith('keyId='): + if '"' in signatureItem: + keyId = signatureItem.split('"')[1] + break + if not keyId: + print('Queue: No keyId in signature: ' + + queueJson['httpHeaders']['signature']) + pubKey = None + break + + pubKey = \ + getPersonPubKey(baseDir, session, keyId, + personCache, debug, + projectVersion, httpPrefix, + domain, onionDomain) + if pubKey: + if debug: + print('DEBUG: public key: ' + str(pubKey)) + break + + if debug: + print('DEBUG: Retry ' + str(tries+1) + + ' obtaining public key for ' + keyId) + time.sleep(5) + + if not pubKey: + print('Queue: public key could not be obtained from ' + keyId) if os.path.isfile(queueFilename): os.remove(queueFilename) if len(queue) > 0: queue.pop(0) + continue + + # check the signature + if debug: + print('DEBUG: checking http headers') + pprint(queueJson['httpHeaders']) + if not verifyPostHeaders(httpPrefix, + pubKey, + queueJson['httpHeaders'], + queueJson['path'], False, + queueJson['digest'], + json.dumps(queueJson['post']), + debug): + print('Queue: Header signature check failed') + pprint(queueJson['httpHeaders']) + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0) + continue + + if debug: + print('DEBUG: Signature check success') + + # set the id to the same as the post filename + # This makes the filename and the id consistent + # if queueJson['post'].get('id'): + # queueJson['post']['id']=queueJson['id'] + + if receiveUndo(session, + baseDir, httpPrefix, port, + sendThreads, postLog, + cachedWebfingers, + personCache, + queueJson['post'], + federationList, + debug, + acceptedCaps=["inbox:write", "objects:read"]): + print('Queue: Undo accepted from ' + keyId) + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0) + continue + + if debug: + print('DEBUG: checking for follow requests') + if receiveFollowRequest(session, + baseDir, httpPrefix, port, + sendThreads, postLog, + cachedWebfingers, + personCache, + queueJson['post'], + federationList, + debug, projectVersion, + acceptedCaps=["inbox:write", + "objects:read"]): + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0) + print('Queue: Follow activity for ' + keyId + + ' removed from accepted from queue') + continue + else: + if debug: + print('DEBUG: No follow requests') + + if receiveAcceptReject(session, + baseDir, httpPrefix, domain, port, + sendThreads, postLog, + cachedWebfingers, personCache, + queueJson['post'], + federationList, debug): + print('Queue: Accept/Reject received from ' + keyId) + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0) + continue + + if receiveUpdate(recentPostsCache, session, + baseDir, httpPrefix, + domain, port, + sendThreads, postLog, + cachedWebfingers, + personCache, + queueJson['post'], + federationList, + queueJson['postNickname'], + debug): + print('Queue: Update accepted from ' + keyId) + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0) + continue + + # get recipients list + recipientsDict, recipientsDictFollowers = \ + inboxPostRecipients(baseDir, queueJson['post'], + httpPrefix, domain, port, debug) + if len(recipientsDict.items()) == 0 and \ + len(recipientsDictFollowers.items()) == 0: + print('Queue: no recipients were resolved ' + + 'for post arriving in inbox') + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0) + continue + + # if there are only a small number of followers then + # process them as if they were specifically + # addresses to particular accounts + noOfFollowItems = len(recipientsDictFollowers.items()) + if noOfFollowItems > 0: + # always deliver to individual inboxes + if noOfFollowItems < 999999: + if debug: + print('DEBUG: moving ' + str(noOfFollowItems) + + ' inbox posts addressed to followers') + for handle, postItem in recipientsDictFollowers.items(): + recipientsDict[handle] = postItem + recipientsDictFollowers = {} +# recipientsList = [recipientsDict, recipientsDictFollowers] + + if debug: + print('*************************************') + print('Resolved recipients list:') + pprint(recipientsDict) + print('Resolved followers list:') + pprint(recipientsDictFollowers) + print('*************************************') + + if queueJson['post'].get('capability'): + if not isinstance(queueJson['post']['capability'], list): + print('Queue: capability on post should be a list') + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0) + continue + + # Copy any posts addressed to followers into the shared inbox + # this avoid copying file multiple times to potentially many + # individual inboxes + # This obviously bypasses object capabilities and so + # any checking will needs to be handled at the time when inbox + # GET happens on individual accounts. + # See posts.py/createBoxBase + if len(recipientsDictFollowers) > 0: + sharedInboxPostFilename = \ + queueJson['destination'].replace(inboxHandle, inboxHandle) + if not os.path.isfile(sharedInboxPostFilename): + saveJson(queueJson['post'], sharedInboxPostFilename) + + # for posts addressed to specific accounts + for handle, capsId in recipientsDict.items(): + destination = \ + queueJson['destination'].replace(inboxHandle, handle) + # check that capabilities are accepted + if queueJson['post'].get('capability'): + capabilityIdList = queueJson['post']['capability'] + # does the capability id list within the post + # contain the id of the recipient with this handle? + # Here the capability id begins with the handle, + # so this could also be matched separately, but it's + # probably not necessary + if capsId in capabilityIdList: + inboxAfterCapabilities(recentPostsCache, + maxRecentPosts, + session, keyId, handle, + queueJson['post'], + baseDir, httpPrefix, + sendThreads, postLog, + cachedWebfingers, + personCache, queue, + domain, onionDomain, + port, useTor, + federationList, ocapAlways, + debug, acceptedCaps, + queueFilename, destination, + maxReplies, allowDeletion, + maxMentions, maxEmoji, + translate, unitTest) + else: + print('Queue: object capabilities check has failed') + if debug: + pprint(queueJson['post']) + else: + if not ocapAlways: + inboxAfterCapabilities(recentPostsCache, + maxRecentPosts, + session, keyId, handle, + queueJson['post'], + baseDir, httpPrefix, + sendThreads, postLog, + cachedWebfingers, + personCache, queue, + domain, onionDomain, + port, useTor, + federationList, ocapAlways, + debug, acceptedCaps, + queueFilename, destination, + maxReplies, allowDeletion, + maxMentions, maxEmoji, + translate, unitTest) + if debug: + pprint(queueJson['post']) + print('No capability list within post') + print('ocapAlways: ' + str(ocapAlways)) + + print('Queue: Queue post accepted') + if os.path.isfile(queueFilename): + os.remove(queueFilename) + if len(queue) > 0: + queue.pop(0)