Merge branch 'main' of ssh://code.freedombone.net:2222/bashrc/epicyon

main
Bob Mottram 2021-06-20 19:19:12 +01:00
commit e6ca85c08e
4 changed files with 101 additions and 51 deletions

View File

@ -207,6 +207,7 @@ from shares import addShare
from shares import removeShare
from shares import expireShares
from categories import setHashtagCategory
from utils import userAgentDomain
from utils import isLocalNetworkAddress
from utils import permittedDir
from utils import isAccountDir
@ -452,37 +453,43 @@ class PubServer(BaseHTTPRequestHandler):
else:
print('ERROR: unable to create vote')
def _userAgentDomain(self) -> str:
"""Returns the domain specified within User-Agent header
"""
if not self.headers.get('User-Agent'):
return None
agentStr = self.headers.get('User-Agent')
if '+' not in agentStr:
return None
agentDomain = agentStr.split('+')[1].strip()
if '://' in agentDomain:
agentDomain = agentDomain.split('://')[1]
if '/' in agentDomain:
agentDomain = agentDomain.split('/')[0]
if ' ' in agentDomain:
agentDomain = agentDomain.replace(' ', '')
if ';' in agentDomain:
agentDomain = agentDomain.replace(';', '')
if '.' not in agentDomain:
return None
return agentDomain
def _blockedUserAgent(self) -> bool:
def _blockedUserAgent(self, callingDomain: str) -> bool:
"""Should a GET or POST be blocked based upon its user agent?
"""
agentDomain = self._userAgentDomain()
if not agentDomain:
if self.server.userAgentDomainRequired:
agentDomain = None
agentStr = None
if self.headers.get('User-Agent'):
agentStr = self.headers['User-Agent']
# is this a web crawler? If so the block it
agentStrLower = agentStr.lower()
if 'bot/' in agentStrLower or 'bot-' in agentStrLower:
print('Blocked Crawler: ' + agentStr)
return True
# get domain name from User-Agent
agentDomain = userAgentDomain(agentStr, self.server.debug)
else:
# no User-Agent header is present
return True
# is the User-Agent type blocked? eg. "Mastodon"
if self.server.userAgentsBlocked:
blockedUA = False
for agentName in self.server.userAgentsBlocked:
if agentName in agentStr:
blockedUA = True
break
if blockedUA:
return True
if not agentDomain:
return False
# is the User-Agent domain blocked
blockedUA = False
if not agentDomain.startswith(callingDomain):
blockedUA = isBlockedDomain(self.server.baseDir, agentDomain)
if blockedUA and self.server.debug:
# if self.server.debug:
if blockedUA:
print('Blocked User agent: ' + agentDomain)
return blockedUA
@ -10628,7 +10635,7 @@ class PubServer(BaseHTTPRequestHandler):
self._400()
return
if self._blockedUserAgent():
if self._blockedUserAgent(callingDomain):
self._400()
return
@ -14130,6 +14137,10 @@ class PubServer(BaseHTTPRequestHandler):
self._400()
return
if self._blockedUserAgent(callingDomain):
self._400()
return
self.server.POSTbusy = True
if not self.headers.get('Content-type'):
print('Content-type header missing')
@ -14881,7 +14892,7 @@ def loadTokens(baseDir: str, tokensDict: {}, tokensLookup: {}) -> None:
break
def runDaemon(userAgentDomainRequired: bool,
def runDaemon(userAgentsBlocked: [],
logLoginFailures: bool,
city: str,
showNodeInfoAccounts: bool,
@ -15008,9 +15019,8 @@ def runDaemon(userAgentDomainRequired: bool,
httpd.keyShortcuts = {}
loadAccessKeysForAccounts(baseDir, httpd.keyShortcuts, httpd.accessKeys)
# if set to True then the calling domain must be specified
# within the User-Agent header
httpd.userAgentDomainRequired = userAgentDomainRequired
# list of blocked user agent types within the User-Agent header
httpd.userAgentsBlocked = userAgentsBlocked
httpd.unitTest = unitTest
httpd.allowLocalNetworkAccess = allowLocalNetworkAccess

View File

@ -104,6 +104,9 @@ def str2bool(v) -> bool:
parser = argparse.ArgumentParser(description='ActivityPub Server')
parser.add_argument('--userAgentBlocks', type=str,
default=None,
help='List of blocked user agents, separated by commas')
parser.add_argument('-n', '--nickname', dest='nickname', type=str,
default=None,
help='Nickname of the account to use')
@ -274,12 +277,6 @@ parser.add_argument("--repliesEnabled", "--commentsEnabled",
type=str2bool, nargs='?',
const=True, default=True,
help="Enable replies to a post")
parser.add_argument("--userAgentDomainRequired",
dest='userAgentDomainRequired',
type=str2bool, nargs='?',
const=True, default=False,
help="Whether User-Agent header must " +
"contain the calling domain")
parser.add_argument("--showPublishAsIcon",
dest='showPublishAsIcon',
type=str2bool, nargs='?',
@ -2522,10 +2519,17 @@ showNodeInfoVersion = \
if showNodeInfoVersion is not None:
args.showNodeInfoVersion = bool(showNodeInfoVersion)
userAgentDomainRequired = \
getConfigParam(baseDir, 'userAgentDomainRequired')
if userAgentDomainRequired is not None:
args.userAgentDomainRequired = bool(userAgentDomainRequired)
userAgentsBlocked = []
if args.userAgentBlocks:
userAgentsBlockedStr = args.userAgentBlocks
setConfigParam(baseDir, 'userAgentsBlocked', userAgentsBlockedStr)
else:
userAgentsBlockedStr = \
getConfigParam(baseDir, 'userAgentsBlocked')
if userAgentsBlockedStr:
agentBlocksList = userAgentsBlockedStr.split(',')
for agentBlockStr in agentBlocksList:
userAgentsBlocked.append(agentBlockStr.strip())
city = \
getConfigParam(baseDir, 'city')
@ -2563,7 +2567,7 @@ if args.registration:
print('New registrations closed')
if __name__ == "__main__":
runDaemon(args.userAgentDomainRequired,
runDaemon(userAgentsBlocked,
args.logLoginFailures,
args.city,
args.showNodeInfoAccounts,

View File

@ -37,13 +37,14 @@ from follow import clearFollows
from follow import clearFollowers
from follow import sendFollowRequestViaServer
from follow import sendUnfollowRequestViaServer
from siteactive import siteIsActive
from utils import userAgentDomain
from utils import camelCaseSplit
from utils import decodedHost
from utils import getFullDomain
from utils import validNickname
from utils import firstParagraphFromString
from utils import removeIdEnding
from siteactive import siteIsActive
from utils import updateRecentPostsCache
from utils import followPerson
from utils import getNicknameFromActor
@ -519,9 +520,9 @@ def createServerAlice(path: str, domain: str, port: int,
showNodeInfoVersion = True
city = 'London, England'
logLoginFailures = False
userAgentDomainRequired = False
userAgentsBlocked = []
print('Server running: Alice')
runDaemon(userAgentDomainRequired,
runDaemon(userAgentsBlocked,
logLoginFailures, city,
showNodeInfoAccounts,
showNodeInfoVersion,
@ -624,9 +625,9 @@ def createServerBob(path: str, domain: str, port: int,
showNodeInfoVersion = True
city = 'London, England'
logLoginFailures = False
userAgentDomainRequired = False
userAgentsBlocked = []
print('Server running: Bob')
runDaemon(userAgentDomainRequired,
runDaemon(userAgentsBlocked,
logLoginFailures, city,
showNodeInfoAccounts,
showNodeInfoVersion,
@ -684,9 +685,9 @@ def createServerEve(path: str, domain: str, port: int, federationList: [],
showNodeInfoVersion = True
city = 'London, England'
logLoginFailures = False
userAgentDomainRequired = False
userAgentsBlocked = []
print('Server running: Eve')
runDaemon(userAgentDomainRequired,
runDaemon(userAgentsBlocked,
logLoginFailures, city,
showNodeInfoAccounts,
showNodeInfoVersion,
@ -3938,10 +3939,21 @@ def _testRoles() -> None:
assert not actorHasRole(actorJson, "artist")
def _testUserAgentDomain() -> None:
print('testUserAgentDomain')
userAgent = \
'http.rb/4.4.1 (Mastodon/9.10.11; +https://mastodon.something/)'
assert userAgentDomain(userAgent, False) == 'mastodon.something'
userAgent = \
'Mozilla/70.0 (X11; Linux x86_64; rv:1.0) Gecko/20450101 Firefox/1.0'
assert userAgentDomain(userAgent, False) is None
def runAllTests():
print('Running tests...')
updateDefaultThemesList(os.getcwd())
_testFunctions()
_testUserAgentDomain()
_testRoles()
_testSkills()
_testSpoofGeolocation()

View File

@ -2433,3 +2433,27 @@ def permittedDir(path: str) -> bool:
path.startswith('/accounts'):
return False
return True
def userAgentDomain(userAgent: str, debug: bool) -> str:
"""If the User-Agent string contains a domain
then return it
"""
if '+http' not in userAgent:
return None
agentDomain = userAgent.split('+http')[1].strip()
if '://' in agentDomain:
agentDomain = agentDomain.split('://')[1]
if '/' in agentDomain:
agentDomain = agentDomain.split('/')[0]
if ')' in agentDomain:
agentDomain = agentDomain.split(')')[0].strip()
if ' ' in agentDomain:
agentDomain = agentDomain.replace(' ', '')
if ';' in agentDomain:
agentDomain = agentDomain.replace(';', '')
if '.' not in agentDomain:
return None
if debug:
print('User-Agent Domain: ' + agentDomain)
return agentDomain