Add quotas

master
Bob Mottram 2019-07-15 11:22:19 +01:00
parent 46788b44c6
commit ce6a60e66e
4 changed files with 75 additions and 9 deletions

View File

@ -663,7 +663,9 @@ def runDaemon(clientToServer: bool,baseDir: str,domain: str, \
port=80,httpPrefix='https', \ port=80,httpPrefix='https', \
fedList=[],noreply=False,nolike=False,nopics=False, \ fedList=[],noreply=False,nolike=False,nopics=False, \
noannounce=False,cw=False,ocapAlways=False, \ noannounce=False,cw=False,ocapAlways=False, \
useTor=False,maxReplies=64,debug=False) -> None: useTor=False,maxReplies=64, \
domainMaxPostsPerDay=1000,accountMaxPostsPerDay=1000, \
debug=False) -> None:
if len(domain)==0: if len(domain)==0:
domain='localhost' domain='localhost'
if '.' not in domain: if '.' not in domain:
@ -716,6 +718,7 @@ def runDaemon(clientToServer: bool,baseDir: str,domain: str, \
httpd.personCache,httpd.inboxQueue, \ httpd.personCache,httpd.inboxQueue, \
domain,port,useTor,httpd.federationList, \ domain,port,useTor,httpd.federationList, \
httpd.ocapAlways,maxReplies, \ httpd.ocapAlways,maxReplies, \
domainMaxPostsPerDay,accountMaxPostsPerDay, \
debug,httpd.acceptedCaps),daemon=True) debug,httpd.acceptedCaps),daemon=True)
httpd.thrInboxQueue.start() httpd.thrInboxQueue.start()
if clientToServer: if clientToServer:

View File

@ -197,6 +197,10 @@ parser.add_argument('--filter', dest='filterStr', type=str,default=None, \
help='Adds a word or phrase which if present will cause a message to be ignored') help='Adds a word or phrase which if present will cause a message to be ignored')
parser.add_argument('--unfilter', dest='unfilterStr', type=str,default=None, \ parser.add_argument('--unfilter', dest='unfilterStr', type=str,default=None, \
help='Remove a filter on a particular word or phrase') help='Remove a filter on a particular word or phrase')
parser.add_argument('--domainmax', dest='domainMaxPostsPerDay', type=int,default=1000, \
help='Maximum number of received posts from a domain per day')
parser.add_argument('--accountmax', dest='accountMaxPostsPerDay', type=int,default=1000, \
help='Maximum number of received posts from an account per day')
args = parser.parse_args() args = parser.parse_args()
debug=False debug=False
@ -664,4 +668,6 @@ runDaemon(args.client,baseDir,domain,port,httpPrefix, \
federationList, \ federationList, \
args.noreply,args.nolike,args.nopics, \ args.noreply,args.nolike,args.nopics, \
args.noannounce,args.cw,ocapAlways, \ args.noannounce,args.cw,ocapAlways, \
useTor,args.maxReplies,debug) useTor,args.maxReplies, \
args.domainMaxPostsPerDay,args.accountMaxPostsPerDay, \
debug)

View File

@ -124,11 +124,16 @@ def savePostToInboxQueue(baseDir: str,httpPrefix: str,nickname: str, domain: str
# block at the ealiest stage possible, which means the data # block at the ealiest stage possible, which means the data
# isn't written to file # isn't written to file
postNickname=None
postDomain=None
if postJsonObject.get('actor'): if postJsonObject.get('actor'):
postNickname=getNicknameFromActor(postJsonObject['actor']) postNickname=getNicknameFromActor(postJsonObject['actor'])
postDomain,postPort=getDomainFromActor(postJsonObject['actor']) postDomain,postPort=getDomainFromActor(postJsonObject['actor'])
if isBlocked(baseDir,nickname,domain,postNickname,postDomain): if isBlocked(baseDir,nickname,domain,postNickname,postDomain):
return None return None
if postPort:
if postPort!=80 and postPort!=443:
postDomain=postDomain+':'+str(postPort)
if postJsonObject.get('object'): if postJsonObject.get('object'):
if isinstance(postJsonObject['object'], dict): if isinstance(postJsonObject['object'], dict):
@ -164,6 +169,8 @@ def savePostToInboxQueue(baseDir: str,httpPrefix: str,nickname: str, domain: str
'id': postId, 'id': postId,
'nickname': nickname, 'nickname': nickname,
'domain': domain, 'domain': domain,
'postNickname': postNickname,
'postDomain': postDomain,
'sharedInbox': sharedInboxItem, 'sharedInbox': sharedInboxItem,
'published': published, 'published': published,
'host': host, 'host': host,
@ -810,7 +817,9 @@ def restoreQueueItems(baseDir: str,queue: []) -> None:
def runInboxQueue(baseDir: str,httpPrefix: str,sendThreads: [],postLog: [], \ def runInboxQueue(baseDir: str,httpPrefix: str,sendThreads: [],postLog: [], \
cachedWebfingers: {},personCache: {},queue: [], \ cachedWebfingers: {},personCache: {},queue: [], \
domain: str,port: int,useTor: bool,federationList: [], \ domain: str,port: int,useTor: bool,federationList: [], \
ocapAlways: bool,maxReplies: int,debug: bool, \ ocapAlways: bool,maxReplies: int, \
domainMaxPostsPerDay: int,accountMaxPostsPerDay: int, \
debug: bool, \
acceptedCaps=["inbox:write","objects:read"]) -> None: acceptedCaps=["inbox:write","objects:read"]) -> None:
"""Processes received items and moves them to """Processes received items and moves them to
the appropriate directories the appropriate directories
@ -825,14 +834,23 @@ def runInboxQueue(baseDir: str,httpPrefix: str,sendThreads: [],postLog: [], \
# if queue processing was interrupted (eg server crash) # if queue processing was interrupted (eg server crash)
# then this loads any outstanding items back into the queue # then this loads any outstanding items back into the queue
restoreQueueItems(baseDir,queue) restoreQueueItems(baseDir,queue)
# keep track of numbers of incoming posts per unit of time
quotasLastUpdate=int(time.time())
quotas={
'domains': {},
'accounts': {}
}
while True: while True:
time.sleep(1) time.sleep(1)
if len(queue)>0: if len(queue)>0:
currSessionTime=int(time.time()) currTime=int(time.time())
if currSessionTime-sessionLastUpdate>1200:
# recreate the session periodically
if currTime-sessionLastUpdate>1200:
session=createSession(domain,port,useTor) session=createSession(domain,port,useTor)
sessionLastUpdate=currSessionTime sessionLastUpdate=currTime
# oldest item first # oldest item first
queue.sort() queue.sort()
@ -847,6 +865,37 @@ def runInboxQueue(baseDir: str,httpPrefix: str,sendThreads: [],postLog: [], \
with open(queueFilename, 'r') as fp: with open(queueFilename, 'r') as fp:
queueJson=commentjson.load(fp) queueJson=commentjson.load(fp)
# clear the daily quotas for maximum numbers of received posts
if currTime-quotasLastUpdate>60*60*24:
quotas={
'domains': {},
'accounts': {}
}
quotasLastUpdate=currTime
# limit the number of posts which can arrive per domain per day
postDomain=queueJson['postDomain']
if postDomain:
if quotas['domains'].get(postDomain):
if quotas['domains'][postDomain]>domainMaxPostsPerDay:
queue.pop(0)
continue
quotas['domains'][postDomain]+=1
else:
quotas['domains'][postDomain]=1
postHandle=queueJson['postNickname']+'@'+postDomain
if quotas['accounts'].get(postHandle):
if quotas['accounts'][postHandle]>accountMaxPostsPerDay:
queue.pop(0)
continue
quotas['accounts'][postHandle]+=1
else:
quotas['accounts'][postHandle]=1
if debug:
pprint(quotas)
# Try a few times to obtain the public key # Try a few times to obtain the public key
pubKey=None pubKey=None
keyId=None keyId=None

View File

@ -145,6 +145,8 @@ def createServerAlice(path: str,domain: str,port: int,federationList: [], \
cw=False cw=False
useBlurhash=True useBlurhash=True
maxReplies=64 maxReplies=64
domainMaxPostsPerDay=1000
accountMaxPostsPerDay=1000
privateKeyPem,publicKeyPem,person,wfEndpoint= \ privateKeyPem,publicKeyPem,person,wfEndpoint= \
createPerson(path,nickname,domain,port,httpPrefix,True,password) createPerson(path,nickname,domain,port,httpPrefix,True,password)
deleteAllPosts(path,nickname,domain,'inbox') deleteAllPosts(path,nickname,domain,'inbox')
@ -171,7 +173,9 @@ def createServerAlice(path: str,domain: str,port: int,federationList: [], \
print('Server running: Alice') print('Server running: Alice')
runDaemon(False,path,domain,port,httpPrefix,federationList, \ runDaemon(False,path,domain,port,httpPrefix,federationList, \
noreply,nolike,nopics,noannounce,cw,ocapAlways, \ noreply,nolike,nopics,noannounce,cw,ocapAlways, \
useTor,maxReplies,True) useTor,maxReplies, \
domainMaxPostsPerDay,accountMaxPostsPerDay, \
True)
def createServerBob(path: str,domain: str,port: int,federationList: [], \ def createServerBob(path: str,domain: str,port: int,federationList: [], \
hasFollows: bool,hasPosts :bool,ocapAlways :bool): hasFollows: bool,hasPosts :bool,ocapAlways :bool):
@ -192,6 +196,8 @@ def createServerBob(path: str,domain: str,port: int,federationList: [], \
cw=False cw=False
useBlurhash=False useBlurhash=False
maxReplies=64 maxReplies=64
domainMaxPostsPerDay=1000
accountMaxPostsPerDay=1000
privateKeyPem,publicKeyPem,person,wfEndpoint= \ privateKeyPem,publicKeyPem,person,wfEndpoint= \
createPerson(path,nickname,domain,port,httpPrefix,True,password) createPerson(path,nickname,domain,port,httpPrefix,True,password)
deleteAllPosts(path,nickname,domain,'inbox') deleteAllPosts(path,nickname,domain,'inbox')
@ -218,7 +224,9 @@ def createServerBob(path: str,domain: str,port: int,federationList: [], \
print('Server running: Bob') print('Server running: Bob')
runDaemon(False,path,domain,port,httpPrefix,federationList, \ runDaemon(False,path,domain,port,httpPrefix,federationList, \
noreply,nolike,nopics,noannounce,cw,ocapAlways, \ noreply,nolike,nopics,noannounce,cw,ocapAlways, \
useTor,maxReplies,True) useTor,maxReplies, \
domainMaxPostsPerDay,accountMaxPostsPerDay, \
True)
def createServerEve(path: str,domain: str,port: int,federationList: [], \ def createServerEve(path: str,domain: str,port: int,federationList: [], \
hasFollows: bool,hasPosts :bool,ocapAlways :bool): hasFollows: bool,hasPosts :bool,ocapAlways :bool):