diff --git a/daemon.py b/daemon.py index 3c76d90a3..955bebeee 100644 --- a/daemon.py +++ b/daemon.py @@ -207,6 +207,7 @@ from shares import addShare from shares import removeShare from shares import expireShares from categories import setHashtagCategory +from utils import userAgentDomain from utils import isLocalNetworkAddress from utils import permittedDir from utils import isAccountDir @@ -452,38 +453,44 @@ class PubServer(BaseHTTPRequestHandler): else: print('ERROR: unable to create vote') - def _userAgentDomain(self) -> str: - """Returns the domain specified within User-Agent header - """ - if not self.headers.get('User-Agent'): - return None - agentStr = self.headers.get('User-Agent') - if '+' not in agentStr: - return None - agentDomain = agentStr.split('+')[1].strip() - if '://' in agentDomain: - agentDomain = agentDomain.split('://')[1] - if '/' in agentDomain: - agentDomain = agentDomain.split('/')[0] - if ' ' in agentDomain: - agentDomain = agentDomain.replace(' ', '') - if ';' in agentDomain: - agentDomain = agentDomain.replace(';', '') - if '.' not in agentDomain: - return None - return agentDomain - - def _blockedUserAgent(self) -> bool: + def _blockedUserAgent(self, callingDomain: str) -> bool: """Should a GET or POST be blocked based upon its user agent? """ - agentDomain = self._userAgentDomain() - if not agentDomain: - if self.server.userAgentDomainRequired: + agentDomain = None + agentStr = None + if self.headers.get('User-Agent'): + agentStr = self.headers['User-Agent'] + # is this a web crawler? If so the block it + agentStrLower = agentStr.lower() + if 'bot/' in agentStrLower or 'bot-' in agentStrLower: + print('Blocked Crawler: ' + agentStr) return True + # get domain name from User-Agent + agentDomain = userAgentDomain(agentStr, self.server.debug) + else: + # no User-Agent header is present + return True + + # is the User-Agent type blocked? eg. "Mastodon" + if self.server.userAgentsBlocked: + blockedUA = False + for agentName in self.server.userAgentsBlocked: + if agentName in agentStr: + blockedUA = True + break + if blockedUA: + return True + + if not agentDomain: return False - blockedUA = isBlockedDomain(self.server.baseDir, agentDomain) - if blockedUA and self.server.debug: - print('Blocked User agent: ' + agentDomain) + + # is the User-Agent domain blocked + blockedUA = False + if not agentDomain.startswith(callingDomain): + blockedUA = isBlockedDomain(self.server.baseDir, agentDomain) + # if self.server.debug: + if blockedUA: + print('Blocked User agent: ' + agentDomain) return blockedUA def _requestHTTP(self) -> bool: @@ -10628,7 +10635,7 @@ class PubServer(BaseHTTPRequestHandler): self._400() return - if self._blockedUserAgent(): + if self._blockedUserAgent(callingDomain): self._400() return @@ -14130,6 +14137,10 @@ class PubServer(BaseHTTPRequestHandler): self._400() return + if self._blockedUserAgent(callingDomain): + self._400() + return + self.server.POSTbusy = True if not self.headers.get('Content-type'): print('Content-type header missing') @@ -14881,7 +14892,7 @@ def loadTokens(baseDir: str, tokensDict: {}, tokensLookup: {}) -> None: break -def runDaemon(userAgentDomainRequired: bool, +def runDaemon(userAgentsBlocked: [], logLoginFailures: bool, city: str, showNodeInfoAccounts: bool, @@ -15008,9 +15019,8 @@ def runDaemon(userAgentDomainRequired: bool, httpd.keyShortcuts = {} loadAccessKeysForAccounts(baseDir, httpd.keyShortcuts, httpd.accessKeys) - # if set to True then the calling domain must be specified - # within the User-Agent header - httpd.userAgentDomainRequired = userAgentDomainRequired + # list of blocked user agent types within the User-Agent header + httpd.userAgentsBlocked = userAgentsBlocked httpd.unitTest = unitTest httpd.allowLocalNetworkAccess = allowLocalNetworkAccess diff --git a/epicyon.py b/epicyon.py index 406ba3856..a36e4af93 100644 --- a/epicyon.py +++ b/epicyon.py @@ -104,6 +104,9 @@ def str2bool(v) -> bool: parser = argparse.ArgumentParser(description='ActivityPub Server') +parser.add_argument('--userAgentBlocks', type=str, + default=None, + help='List of blocked user agents, separated by commas') parser.add_argument('-n', '--nickname', dest='nickname', type=str, default=None, help='Nickname of the account to use') @@ -274,12 +277,6 @@ parser.add_argument("--repliesEnabled", "--commentsEnabled", type=str2bool, nargs='?', const=True, default=True, help="Enable replies to a post") -parser.add_argument("--userAgentDomainRequired", - dest='userAgentDomainRequired', - type=str2bool, nargs='?', - const=True, default=False, - help="Whether User-Agent header must " + - "contain the calling domain") parser.add_argument("--showPublishAsIcon", dest='showPublishAsIcon', type=str2bool, nargs='?', @@ -2522,10 +2519,17 @@ showNodeInfoVersion = \ if showNodeInfoVersion is not None: args.showNodeInfoVersion = bool(showNodeInfoVersion) -userAgentDomainRequired = \ - getConfigParam(baseDir, 'userAgentDomainRequired') -if userAgentDomainRequired is not None: - args.userAgentDomainRequired = bool(userAgentDomainRequired) +userAgentsBlocked = [] +if args.userAgentBlocks: + userAgentsBlockedStr = args.userAgentBlocks + setConfigParam(baseDir, 'userAgentsBlocked', userAgentsBlockedStr) +else: + userAgentsBlockedStr = \ + getConfigParam(baseDir, 'userAgentsBlocked') +if userAgentsBlockedStr: + agentBlocksList = userAgentsBlockedStr.split(',') + for agentBlockStr in agentBlocksList: + userAgentsBlocked.append(agentBlockStr.strip()) city = \ getConfigParam(baseDir, 'city') @@ -2563,7 +2567,7 @@ if args.registration: print('New registrations closed') if __name__ == "__main__": - runDaemon(args.userAgentDomainRequired, + runDaemon(userAgentsBlocked, args.logLoginFailures, args.city, args.showNodeInfoAccounts, diff --git a/tests.py b/tests.py index 54f17b7d3..f270ae9e1 100644 --- a/tests.py +++ b/tests.py @@ -37,13 +37,14 @@ from follow import clearFollows from follow import clearFollowers from follow import sendFollowRequestViaServer from follow import sendUnfollowRequestViaServer +from siteactive import siteIsActive +from utils import userAgentDomain from utils import camelCaseSplit from utils import decodedHost from utils import getFullDomain from utils import validNickname from utils import firstParagraphFromString from utils import removeIdEnding -from siteactive import siteIsActive from utils import updateRecentPostsCache from utils import followPerson from utils import getNicknameFromActor @@ -519,9 +520,9 @@ def createServerAlice(path: str, domain: str, port: int, showNodeInfoVersion = True city = 'London, England' logLoginFailures = False - userAgentDomainRequired = False + userAgentsBlocked = [] print('Server running: Alice') - runDaemon(userAgentDomainRequired, + runDaemon(userAgentsBlocked, logLoginFailures, city, showNodeInfoAccounts, showNodeInfoVersion, @@ -624,9 +625,9 @@ def createServerBob(path: str, domain: str, port: int, showNodeInfoVersion = True city = 'London, England' logLoginFailures = False - userAgentDomainRequired = False + userAgentsBlocked = [] print('Server running: Bob') - runDaemon(userAgentDomainRequired, + runDaemon(userAgentsBlocked, logLoginFailures, city, showNodeInfoAccounts, showNodeInfoVersion, @@ -684,9 +685,9 @@ def createServerEve(path: str, domain: str, port: int, federationList: [], showNodeInfoVersion = True city = 'London, England' logLoginFailures = False - userAgentDomainRequired = False + userAgentsBlocked = [] print('Server running: Eve') - runDaemon(userAgentDomainRequired, + runDaemon(userAgentsBlocked, logLoginFailures, city, showNodeInfoAccounts, showNodeInfoVersion, @@ -3938,10 +3939,21 @@ def _testRoles() -> None: assert not actorHasRole(actorJson, "artist") +def _testUserAgentDomain() -> None: + print('testUserAgentDomain') + userAgent = \ + 'http.rb/4.4.1 (Mastodon/9.10.11; +https://mastodon.something/)' + assert userAgentDomain(userAgent, False) == 'mastodon.something' + userAgent = \ + 'Mozilla/70.0 (X11; Linux x86_64; rv:1.0) Gecko/20450101 Firefox/1.0' + assert userAgentDomain(userAgent, False) is None + + def runAllTests(): print('Running tests...') updateDefaultThemesList(os.getcwd()) _testFunctions() + _testUserAgentDomain() _testRoles() _testSkills() _testSpoofGeolocation() diff --git a/utils.py b/utils.py index 0905a810f..ad277004b 100644 --- a/utils.py +++ b/utils.py @@ -2433,3 +2433,27 @@ def permittedDir(path: str) -> bool: path.startswith('/accounts'): return False return True + + +def userAgentDomain(userAgent: str, debug: bool) -> str: + """If the User-Agent string contains a domain + then return it + """ + if '+http' not in userAgent: + return None + agentDomain = userAgent.split('+http')[1].strip() + if '://' in agentDomain: + agentDomain = agentDomain.split('://')[1] + if '/' in agentDomain: + agentDomain = agentDomain.split('/')[0] + if ')' in agentDomain: + agentDomain = agentDomain.split(')')[0].strip() + if ' ' in agentDomain: + agentDomain = agentDomain.replace(' ', '') + if ';' in agentDomain: + agentDomain = agentDomain.replace(';', '') + if '.' not in agentDomain: + return None + if debug: + print('User-Agent Domain: ' + agentDomain) + return agentDomain