Consolidate permissions functions

master
Bob Mottram 2019-07-02 11:39:55 +01:00
parent e77cf39f4c
commit faaccbe1db
5 changed files with 27 additions and 31 deletions

View File

@ -11,11 +11,12 @@ from pprint import pprint
import os import os
import sys import sys
from person import validUsername from person import validUsername
from utils import domainPermitted
def followPerson(baseDir: str,username: str, domain: str, followUsername: str, followDomain: str, federationList: [], followFile='following.txt') -> bool: def followPerson(baseDir: str,username: str, domain: str, followUsername: str, followDomain: str, federationList: [], followFile='following.txt') -> bool:
"""Adds a person to the follow list """Adds a person to the follow list
""" """
if followDomain.lower().replace('\n','') not in federationList: if not domainPermitted(followDomain.lower().replace('\n',''), federationList):
return False return False
handle=username.lower()+'@'+domain.lower() handle=username.lower()+'@'+domain.lower()
handleToFollow=followUsername.lower()+'@'+followDomain.lower() handleToFollow=followUsername.lower()+'@'+followDomain.lower()

View File

@ -9,6 +9,7 @@ __status__ = "Production"
import json import json
import os import os
import datetime import datetime
from utils import urlPermitted
def inboxPermittedMessage(domain: str,messageJson: {},federationList: []) -> bool: def inboxPermittedMessage(domain: str,messageJson: {},federationList: []) -> bool:
""" check that we are receiving from a permitted domain """ check that we are receiving from a permitted domain
@ -21,23 +22,13 @@ def inboxPermittedMessage(domain: str,messageJson: {},federationList: []) -> boo
if domain in actor: if domain in actor:
return True return True
permittedDomain=False if not urlPermitted(actor,federationList):
for domain in federationList:
if domain in actor:
permittedDomain=True
break
if not permittedDomain:
return False return False
if messageJson.get('object'): if messageJson.get('object'):
if messageJson['object'].get('inReplyTo'): if messageJson['object'].get('inReplyTo'):
inReplyTo=messageJson['object']['inReplyTo'] inReplyTo=messageJson['object']['inReplyTo']
permittedReplyDomain=False if not urlPermitted(inReplyTo, federationList):
for domain in federationList:
if domain in inReplyTo:
permittedReplyDomain=True
break
if not permittedReplyDomain:
return False return False
return True return True

View File

@ -28,6 +28,7 @@ from webfinger import webfingerHandle
from httpsig import createSignedHeader from httpsig import createSignedHeader
from utils import getStatusNumber from utils import getStatusNumber
from utils import createOutboxDir from utils import createOutboxDir
from utils import urlPermitted
try: try:
from BeautifulSoup import BeautifulSoup from BeautifulSoup import BeautifulSoup
except ImportError: except ImportError:
@ -46,14 +47,6 @@ def getPersonKey(username: str,domain: str,baseDir: str,keyType='public'):
if len(keyPem)<20: if len(keyPem)<20:
return '' return ''
return keyPem return keyPem
def permitted(url: str,federationList: []) -> bool:
"""Is a url from one of the permitted domains?
"""
for domain in federationList:
if domain in url:
return True
return False
def cleanHtml(rawHtml: str) -> str: def cleanHtml(rawHtml: str) -> str:
text = BeautifulSoup(rawHtml, 'html.parser').get_text() text = BeautifulSoup(rawHtml, 'html.parser').get_text()
@ -153,7 +146,7 @@ def getPosts(session,outboxUrl: str,maxPosts: int,maxMentions: int,maxEmoji: int
if tagItem.get('name') and tagItem.get('icon'): if tagItem.get('name') and tagItem.get('icon'):
if tagItem['icon'].get('url'): if tagItem['icon'].get('url'):
# No emoji from non-permitted domains # No emoji from non-permitted domains
if permitted(tagItem['icon']['url'],federationList): if urlPermitted(tagItem['icon']['url'],federationList):
emojiName=tagItem['name'] emojiName=tagItem['name']
emojiIcon=tagItem['icon']['url'] emojiIcon=tagItem['icon']['url']
emoji[emojiName]=emojiIcon emoji[emojiName]=emojiIcon
@ -175,7 +168,7 @@ def getPosts(session,outboxUrl: str,maxPosts: int,maxMentions: int,maxEmoji: int
if item['object'].get('inReplyTo'): if item['object'].get('inReplyTo'):
if item['object']['inReplyTo']: if item['object']['inReplyTo']:
# No replies to non-permitted domains # No replies to non-permitted domains
if not permitted(item['object']['inReplyTo'],federationList): if not urlPermitted(item['object']['inReplyTo'],federationList):
continue continue
inReplyTo = item['object']['inReplyTo'] inReplyTo = item['object']['inReplyTo']
@ -183,7 +176,7 @@ def getPosts(session,outboxUrl: str,maxPosts: int,maxMentions: int,maxEmoji: int
if item['object'].get('conversation'): if item['object'].get('conversation'):
if item['object']['conversation']: if item['object']['conversation']:
# no conversations originated in non-permitted domains # no conversations originated in non-permitted domains
if permitted(item['object']['conversation'],federationList): if urlPermitted(item['object']['conversation'],federationList):
conversation = item['object']['conversation'] conversation = item['object']['conversation']
attachment = [] attachment = []
@ -192,7 +185,7 @@ def getPosts(session,outboxUrl: str,maxPosts: int,maxMentions: int,maxEmoji: int
for attach in item['object']['attachment']: for attach in item['object']['attachment']:
if attach.get('name') and attach.get('url'): if attach.get('name') and attach.get('url'):
# no attachments from non-permitted domains # no attachments from non-permitted domains
if permitted(attach['url'],federationList): if urlPermitted(attach['url'],federationList):
attachment.append([attach['name'],attach['url']]) attachment.append([attach['name'],attach['url']])
sensitive = False sensitive = False

View File

@ -8,6 +8,7 @@ __status__ = "Production"
import requests import requests
from requests_toolbelt.adapters.source import SourceAddressAdapter from requests_toolbelt.adapters.source import SourceAddressAdapter
from utils import urlPermitted
import json import json
baseDirectory=None baseDirectory=None
@ -40,12 +41,7 @@ def postJson(session,postJsonObject: {},federationList: [],inboxUrl: str,headers
"""Post a json message to the inbox of another person """Post a json message to the inbox of another person
""" """
# check that we are posting to a permitted domain # check that we are posting to a permitted domain
permittedDomain=False if not urlPermitted(inboxUrl,federationList):
for domain in federationList:
if domain in inboxUrl:
permittedDomain=True
break
if not permittedDomain:
return None return None
postResult = session.post(url = inboxUrl, data = json.dumps(postJsonObject), headers=headers) postResult = session.post(url = inboxUrl, data = json.dumps(postJsonObject), headers=headers)

View File

@ -30,3 +30,18 @@ def createOutboxDir(username: str,domain: str,baseDir: str) -> str:
if not os.path.isdir(outboxDir): if not os.path.isdir(outboxDir):
os.mkdir(outboxDir) os.mkdir(outboxDir)
return outboxDir return outboxDir
def domainPermitted(domain: str, federationList: []):
if len(federationList)==0:
return True
if domain in federationList:
return True
return False
def urlPermitted(url: str, federationList: []):
if len(federationList)==0:
return True
for domain in federationList:
if domain in url:
return True
return False