Check user agent domain is not blocked

main
Bob Mottram 2021-06-20 14:25:18 +01:00
parent 0e5e60d654
commit c76a10480c
3 changed files with 66 additions and 5 deletions

View File

@ -452,6 +452,40 @@ class PubServer(BaseHTTPRequestHandler):
else:
print('ERROR: unable to create vote')
def _userAgentDomain(self) -> str:
"""Returns the domain specified within User-Agent header
"""
if not self.headers.get('User-Agent'):
return None
agentStr = self.headers.get('User-Agent')
if '+' not in agentStr:
return None
agentDomain = agentStr.split('+')[1].strip()
if '://' in agentDomain:
agentDomain = agentDomain.split('://')[1]
if '/' in agentDomain:
agentDomain = agentDomain.split('/')[0]
if ' ' in agentDomain:
agentDomain = agentDomain.replace(' ', '')
if ';' in agentDomain:
agentDomain = agentDomain.replace(';', '')
if '.' not in agentDomain:
return None
return agentDomain
def _blockedUserAgent(self) -> bool:
"""Should a GET or POST be blocked based upon its user agent?
"""
agentDomain = self._userAgentDomain()
if not agentDomain:
if self.server.userAgentDomainRequired:
return True
return False
blockedUA = isBlockedDomain(self.server.baseDir, agentDomain)
if blockedUA and self.server.debug:
print('Blocked User agent: ' + agentDomain)
return blockedUA
def _requestHTTP(self) -> bool:
"""Should a http response be given?
"""
@ -10594,6 +10628,10 @@ class PubServer(BaseHTTPRequestHandler):
self._400()
return
if self._blockedUserAgent():
self._400()
return
GETstartTime = time.time()
GETtimings = {}
@ -14843,7 +14881,8 @@ def loadTokens(baseDir: str, tokensDict: {}, tokensLookup: {}) -> None:
break
def runDaemon(logLoginFailures: bool,
def runDaemon(userAgentDomainRequired: bool,
logLoginFailures: bool,
city: str,
showNodeInfoAccounts: bool,
showNodeInfoVersion: bool,
@ -14969,6 +15008,10 @@ def runDaemon(logLoginFailures: bool,
httpd.keyShortcuts = {}
loadAccessKeysForAccounts(baseDir, httpd.keyShortcuts, httpd.accessKeys)
# if set to True then the calling domain must be specified
# within the User-Agent header
httpd.userAgentDomainRequired = userAgentDomainRequired
httpd.unitTest = unitTest
httpd.allowLocalNetworkAccess = allowLocalNetworkAccess
if unitTest:

View File

@ -274,6 +274,12 @@ parser.add_argument("--repliesEnabled", "--commentsEnabled",
type=str2bool, nargs='?',
const=True, default=True,
help="Enable replies to a post")
parser.add_argument("--userAgentDomainRequired",
dest='userAgentDomainRequired',
type=str2bool, nargs='?',
const=True, default=False,
help="Whether User-Agent header must " +
"contain the calling domain")
parser.add_argument("--showPublishAsIcon",
dest='showPublishAsIcon',
type=str2bool, nargs='?',
@ -2516,6 +2522,11 @@ showNodeInfoVersion = \
if showNodeInfoVersion is not None:
args.showNodeInfoVersion = bool(showNodeInfoVersion)
userAgentDomainRequired = \
getConfigParam(baseDir, 'userAgentDomainRequired')
if userAgentDomainRequired is not None:
args.userAgentDomainRequired = bool(userAgentDomainRequired)
city = \
getConfigParam(baseDir, 'city')
if city is not None:
@ -2552,7 +2563,8 @@ if args.registration:
print('New registrations closed')
if __name__ == "__main__":
runDaemon(args.logLoginFailures,
runDaemon(args.userAgentDomainRequired,
args.logLoginFailures,
args.city,
args.showNodeInfoAccounts,
args.showNodeInfoVersion,

View File

@ -519,8 +519,10 @@ def createServerAlice(path: str, domain: str, port: int,
showNodeInfoVersion = True
city = 'London, England'
logLoginFailures = False
userAgentDomainRequired = False
print('Server running: Alice')
runDaemon(logLoginFailures, city,
runDaemon(userAgentDomainRequired,
logLoginFailures, city,
showNodeInfoAccounts,
showNodeInfoVersion,
brochMode,
@ -622,8 +624,10 @@ def createServerBob(path: str, domain: str, port: int,
showNodeInfoVersion = True
city = 'London, England'
logLoginFailures = False
userAgentDomainRequired = False
print('Server running: Bob')
runDaemon(logLoginFailures, city,
runDaemon(userAgentDomainRequired,
logLoginFailures, city,
showNodeInfoAccounts,
showNodeInfoVersion,
brochMode,
@ -680,8 +684,10 @@ def createServerEve(path: str, domain: str, port: int, federationList: [],
showNodeInfoVersion = True
city = 'London, England'
logLoginFailures = False
userAgentDomainRequired = False
print('Server running: Eve')
runDaemon(logLoginFailures, city,
runDaemon(userAgentDomainRequired,
logLoginFailures, city,
showNodeInfoAccounts,
showNodeInfoVersion,
brochMode,