Merge branch 'main' of ssh://code.freedombone.net:2222/bashrc/epicyon

main
Bob Mottram 2021-06-20 14:48:02 +01:00
commit c830c0eab2
14 changed files with 231 additions and 146 deletions

View File

@ -253,7 +253,8 @@ def sendAnnounceViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newAnnounceJson, [], inboxUrl, postResult = postJson(httpPrefix, fromDomainFull,
session, newAnnounceJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if not postResult: if not postResult:
print('WARN: announce not posted') print('WARN: announce not posted')
@ -332,7 +333,8 @@ def sendUndoAnnounceViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, unAnnounceJson, [], inboxUrl, postResult = postJson(httpPrefix, domainFull,
session, unAnnounceJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if not postResult: if not postResult:
print('WARN: undo announce not posted') print('WARN: undo announce not posted')

View File

@ -143,7 +143,8 @@ def sendAvailabilityViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newAvailabilityJson, [], postResult = postJson(httpPrefix, domainFull,
session, newAvailabilityJson, [],
inboxUrl, headers, 30, True) inboxUrl, headers, 30, True)
if not postResult: if not postResult:
print('WARN: availability failed to post') print('WARN: availability failed to post')

View File

@ -418,7 +418,8 @@ def sendBookmarkViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newBookmarkJson, [], inboxUrl, postResult = postJson(httpPrefix, domainFull,
session, newBookmarkJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if not postResult: if not postResult:
if debug: if debug:
@ -502,7 +503,8 @@ def sendUndoBookmarkViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newBookmarkJson, [], inboxUrl, postResult = postJson(httpPrefix, domainFull,
session, newBookmarkJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if not postResult: if not postResult:
if debug: if debug:

View File

@ -452,6 +452,40 @@ class PubServer(BaseHTTPRequestHandler):
else: else:
print('ERROR: unable to create vote') print('ERROR: unable to create vote')
def _userAgentDomain(self) -> str:
"""Returns the domain specified within User-Agent header
"""
if not self.headers.get('User-Agent'):
return None
agentStr = self.headers.get('User-Agent')
if '+' not in agentStr:
return None
agentDomain = agentStr.split('+')[1].strip()
if '://' in agentDomain:
agentDomain = agentDomain.split('://')[1]
if '/' in agentDomain:
agentDomain = agentDomain.split('/')[0]
if ' ' in agentDomain:
agentDomain = agentDomain.replace(' ', '')
if ';' in agentDomain:
agentDomain = agentDomain.replace(';', '')
if '.' not in agentDomain:
return None
return agentDomain
def _blockedUserAgent(self) -> bool:
"""Should a GET or POST be blocked based upon its user agent?
"""
agentDomain = self._userAgentDomain()
if not agentDomain:
if self.server.userAgentDomainRequired:
return True
return False
blockedUA = isBlockedDomain(self.server.baseDir, agentDomain)
if blockedUA and self.server.debug:
print('Blocked User agent: ' + agentDomain)
return blockedUA
def _requestHTTP(self) -> bool: def _requestHTTP(self) -> bool:
"""Should a http response be given? """Should a http response be given?
""" """
@ -10594,6 +10628,10 @@ class PubServer(BaseHTTPRequestHandler):
self._400() self._400()
return return
if self._blockedUserAgent():
self._400()
return
GETstartTime = time.time() GETstartTime = time.time()
GETtimings = {} GETtimings = {}
@ -14843,7 +14881,8 @@ def loadTokens(baseDir: str, tokensDict: {}, tokensLookup: {}) -> None:
break break
def runDaemon(logLoginFailures: bool, def runDaemon(userAgentDomainRequired: bool,
logLoginFailures: bool,
city: str, city: str,
showNodeInfoAccounts: bool, showNodeInfoAccounts: bool,
showNodeInfoVersion: bool, showNodeInfoVersion: bool,
@ -14969,6 +15008,10 @@ def runDaemon(logLoginFailures: bool,
httpd.keyShortcuts = {} httpd.keyShortcuts = {}
loadAccessKeysForAccounts(baseDir, httpd.keyShortcuts, httpd.accessKeys) loadAccessKeysForAccounts(baseDir, httpd.keyShortcuts, httpd.accessKeys)
# if set to True then the calling domain must be specified
# within the User-Agent header
httpd.userAgentDomainRequired = userAgentDomainRequired
httpd.unitTest = unitTest httpd.unitTest = unitTest
httpd.allowLocalNetworkAccess = allowLocalNetworkAccess httpd.allowLocalNetworkAccess = allowLocalNetworkAccess
if unitTest: if unitTest:

View File

@ -93,7 +93,8 @@ def sendDeleteViaServer(baseDir: str, session,
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = \ postResult = \
postJson(session, newDeleteJson, [], inboxUrl, headers, 3, True) postJson(httpPrefix, fromDomainFull,
session, newDeleteJson, [], inboxUrl, headers, 3, True)
if not postResult: if not postResult:
if debug: if debug:
print('DEBUG: POST delete failed for c2s to ' + inboxUrl) print('DEBUG: POST delete failed for c2s to ' + inboxUrl)

View File

@ -274,6 +274,12 @@ parser.add_argument("--repliesEnabled", "--commentsEnabled",
type=str2bool, nargs='?', type=str2bool, nargs='?',
const=True, default=True, const=True, default=True,
help="Enable replies to a post") help="Enable replies to a post")
parser.add_argument("--userAgentDomainRequired",
dest='userAgentDomainRequired',
type=str2bool, nargs='?',
const=True, default=False,
help="Whether User-Agent header must " +
"contain the calling domain")
parser.add_argument("--showPublishAsIcon", parser.add_argument("--showPublishAsIcon",
dest='showPublishAsIcon', dest='showPublishAsIcon',
type=str2bool, nargs='?', type=str2bool, nargs='?',
@ -2516,6 +2522,11 @@ showNodeInfoVersion = \
if showNodeInfoVersion is not None: if showNodeInfoVersion is not None:
args.showNodeInfoVersion = bool(showNodeInfoVersion) args.showNodeInfoVersion = bool(showNodeInfoVersion)
userAgentDomainRequired = \
getConfigParam(baseDir, 'userAgentDomainRequired')
if userAgentDomainRequired is not None:
args.userAgentDomainRequired = bool(userAgentDomainRequired)
city = \ city = \
getConfigParam(baseDir, 'city') getConfigParam(baseDir, 'city')
if city is not None: if city is not None:
@ -2552,7 +2563,8 @@ if args.registration:
print('New registrations closed') print('New registrations closed')
if __name__ == "__main__": if __name__ == "__main__":
runDaemon(args.logLoginFailures, runDaemon(args.userAgentDomainRequired,
args.logLoginFailures,
args.city, args.city,
args.showNodeInfoAccounts, args.showNodeInfoAccounts,
args.showNodeInfoVersion, args.showNodeInfoVersion,

View File

@ -1029,7 +1029,8 @@ def sendFollowRequestViaServer(baseDir: str, session,
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = \ postResult = \
postJson(session, newFollowJson, [], inboxUrl, headers, 3, True) postJson(httpPrefix, fromDomainFull,
session, newFollowJson, [], inboxUrl, headers, 3, True)
if not postResult: if not postResult:
if debug: if debug:
print('DEBUG: POST follow request failed for c2s to ' + inboxUrl) print('DEBUG: POST follow request failed for c2s to ' + inboxUrl)
@ -1122,7 +1123,8 @@ def sendUnfollowRequestViaServer(baseDir: str, session,
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = \ postResult = \
postJson(session, unfollowJson, [], inboxUrl, headers, 3, True) postJson(httpPrefix, fromDomainFull,
session, unfollowJson, [], inboxUrl, headers, 3, True)
if not postResult: if not postResult:
if debug: if debug:
print('DEBUG: POST unfollow failed for c2s to ' + inboxUrl) print('DEBUG: POST unfollow failed for c2s to ' + inboxUrl)

View File

@ -205,7 +205,8 @@ def sendLikeViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newLikeJson, [], inboxUrl, postResult = postJson(httpPrefix, fromDomainFull,
session, newLikeJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if not postResult: if not postResult:
if debug: if debug:
@ -287,7 +288,8 @@ def sendUndoLikeViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newUndoLikeJson, [], inboxUrl, postResult = postJson(httpPrefix, fromDomainFull,
session, newUndoLikeJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if not postResult: if not postResult:
if debug: if debug:

3
pgp.py
View File

@ -588,7 +588,8 @@ def pgpPublicKeyUpload(baseDir: str, session,
tries = 0 tries = 0
while tries < 4: while tries < 4:
postResult = \ postResult = \
postJson(session, actorUpdate, [], inboxUrl, postJson(httpPrefix, domainFull,
session, actorUpdate, [], inboxUrl,
headers, 5, quiet) headers, 5, quiet)
if postResult: if postResult:
break break

View File

@ -4198,7 +4198,8 @@ def sendBlockViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newBlockJson, [], inboxUrl, postResult = postJson(httpPrefix, fromDomainFull,
session, newBlockJson, [], inboxUrl,
headers, 30, True) headers, 30, True)
if not postResult: if not postResult:
print('WARN: block unable to post') print('WARN: block unable to post')
@ -4273,7 +4274,8 @@ def sendMuteViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newMuteJson, [], inboxUrl, postResult = postJson(httpPrefix, fromDomainFull,
session, newMuteJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if postResult is None: if postResult is None:
print('WARN: mute unable to post') print('WARN: mute unable to post')
@ -4354,7 +4356,8 @@ def sendUndoMuteViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, undoMuteJson, [], inboxUrl, postResult = postJson(httpPrefix, fromDomainFull,
session, undoMuteJson, [], inboxUrl,
headers, 3, True) headers, 3, True)
if postResult is None: if postResult is None:
print('WARN: undo mute unable to post') print('WARN: undo mute unable to post')
@ -4438,7 +4441,8 @@ def sendUndoBlockViaServer(baseDir: str, session,
'Content-type': 'application/json', 'Content-type': 'application/json',
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = postJson(session, newBlockJson, [], inboxUrl, postResult = postJson(httpPrefix, fromDomainFull,
session, newBlockJson, [], inboxUrl,
headers, 30, True) headers, 30, True)
if not postResult: if not postResult:
print('WARN: unblock unable to post') print('WARN: unblock unable to post')

View File

@ -149,7 +149,8 @@ def getJson(session, url: str, headers: {}, params: {}, debug: bool,
return None return None
def postJson(session, postJsonObject: {}, federationList: [], def postJson(httpPrefix: str, domainFull: str,
session, postJsonObject: {}, federationList: [],
inboxUrl: str, headers: {}, timeoutSec: int = 60, inboxUrl: str, headers: {}, timeoutSec: int = 60,
quiet: bool = False) -> str: quiet: bool = False) -> str:
"""Post a json message to the inbox of another person """Post a json message to the inbox of another person
@ -160,6 +161,11 @@ def postJson(session, postJsonObject: {}, federationList: [],
print('postJson: ' + inboxUrl + ' not permitted') print('postJson: ' + inboxUrl + ' not permitted')
return None return None
sessionHeaders = headers
sessionHeaders['User-Agent'] = 'Epicyon/' + __version__
sessionHeaders['User-Agent'] += \
'; +' + httpPrefix + '://' + domainFull + '/'
try: try:
postResult = \ postResult = \
session.post(url=inboxUrl, session.post(url=inboxUrl,

View File

@ -410,7 +410,8 @@ def sendShareViaServer(baseDir, session,
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = \ postResult = \
postJson(session, newShareJson, [], inboxUrl, headers, 30, True) postJson(httpPrefix, fromDomainFull,
session, newShareJson, [], inboxUrl, headers, 30, True)
if not postResult: if not postResult:
if debug: if debug:
print('DEBUG: POST share failed for c2s to ' + inboxUrl) print('DEBUG: POST share failed for c2s to ' + inboxUrl)
@ -500,7 +501,8 @@ def sendUndoShareViaServer(baseDir: str, session,
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = \ postResult = \
postJson(session, undoShareJson, [], inboxUrl, postJson(httpPrefix, fromDomainFull,
session, undoShareJson, [], inboxUrl,
headers, 30, True) headers, 30, True)
if not postResult: if not postResult:
if debug: if debug:

View File

@ -246,7 +246,8 @@ def sendSkillViaServer(baseDir: str, session, nickname: str, password: str,
'Authorization': authHeader 'Authorization': authHeader
} }
postResult = \ postResult = \
postJson(session, newSkillJson, [], inboxUrl, postJson(httpPrefix, domainFull,
session, newSkillJson, [], inboxUrl,
headers, 30, True) headers, 30, True)
if not postResult: if not postResult:
if debug: if debug:

252
tests.py
View File

@ -125,7 +125,7 @@ thrBob = None
thrEve = None thrEve = None
def testHttpSigNew(): def _testHttpSigNew():
print('testHttpSigNew') print('testHttpSigNew')
messageBodyJson = {"hello": "world"} messageBodyJson = {"hello": "world"}
messageBodyJsonStr = json.dumps(messageBodyJson) messageBodyJsonStr = json.dumps(messageBodyJson)
@ -398,12 +398,12 @@ def _testHttpsigBase(withDigest):
shutil.rmtree(path) shutil.rmtree(path)
def testHttpsig(): def _testHttpsig():
_testHttpsigBase(True) _testHttpsigBase(True)
_testHttpsigBase(False) _testHttpsigBase(False)
def testCache(): def _testCache():
print('testCache') print('testCache')
personUrl = "cat@cardboard.box" personUrl = "cat@cardboard.box"
personJson = { personJson = {
@ -417,15 +417,15 @@ def testCache():
assert result['test'] == 'This is a test' assert result['test'] == 'This is a test'
def testThreadsFunction(param: str): def _testThreadsFunction(param: str):
for i in range(10000): for i in range(10000):
time.sleep(2) time.sleep(2)
def testThreads(): def _testThreads():
print('testThreads') print('testThreads')
thr = \ thr = \
threadWithTrace(target=testThreadsFunction, threadWithTrace(target=_testThreadsFunction,
args=('test',), args=('test',),
daemon=True) daemon=True)
thr.start() thr.start()
@ -519,8 +519,10 @@ def createServerAlice(path: str, domain: str, port: int,
showNodeInfoVersion = True showNodeInfoVersion = True
city = 'London, England' city = 'London, England'
logLoginFailures = False logLoginFailures = False
userAgentDomainRequired = False
print('Server running: Alice') print('Server running: Alice')
runDaemon(logLoginFailures, city, runDaemon(userAgentDomainRequired,
logLoginFailures, city,
showNodeInfoAccounts, showNodeInfoAccounts,
showNodeInfoVersion, showNodeInfoVersion,
brochMode, brochMode,
@ -622,8 +624,10 @@ def createServerBob(path: str, domain: str, port: int,
showNodeInfoVersion = True showNodeInfoVersion = True
city = 'London, England' city = 'London, England'
logLoginFailures = False logLoginFailures = False
userAgentDomainRequired = False
print('Server running: Bob') print('Server running: Bob')
runDaemon(logLoginFailures, city, runDaemon(userAgentDomainRequired,
logLoginFailures, city,
showNodeInfoAccounts, showNodeInfoAccounts,
showNodeInfoVersion, showNodeInfoVersion,
brochMode, brochMode,
@ -680,8 +684,10 @@ def createServerEve(path: str, domain: str, port: int, federationList: [],
showNodeInfoVersion = True showNodeInfoVersion = True
city = 'London, England' city = 'London, England'
logLoginFailures = False logLoginFailures = False
userAgentDomainRequired = False
print('Server running: Eve') print('Server running: Eve')
runDaemon(logLoginFailures, city, runDaemon(userAgentDomainRequired,
logLoginFailures, city,
showNodeInfoAccounts, showNodeInfoAccounts,
showNodeInfoVersion, showNodeInfoVersion,
brochMode, brochMode,
@ -1151,7 +1157,7 @@ def testFollowBetweenServers():
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
def testFollowersOfPerson(): def _testFollowersOfPerson():
print('testFollowersOfPerson') print('testFollowersOfPerson')
currDir = os.getcwd() currDir = os.getcwd()
nickname = 'mxpop' nickname = 'mxpop'
@ -1200,7 +1206,7 @@ def testFollowersOfPerson():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def testNoOfFollowersOnDomain(): def _testNoOfFollowersOnDomain():
print('testNoOfFollowersOnDomain') print('testNoOfFollowersOnDomain')
currDir = os.getcwd() currDir = os.getcwd()
nickname = 'mxpop' nickname = 'mxpop'
@ -1261,7 +1267,7 @@ def testNoOfFollowersOnDomain():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def testGroupFollowers(): def _testGroupFollowers():
print('testGroupFollowers') print('testGroupFollowers')
currDir = os.getcwd() currDir = os.getcwd()
@ -1306,7 +1312,7 @@ def testGroupFollowers():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def testFollows(): def _testFollows():
print('testFollows') print('testFollows')
currDir = os.getcwd() currDir = os.getcwd()
nickname = 'test529' nickname = 'test529'
@ -1383,7 +1389,7 @@ def testFollows():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def testCreatePerson(): def _testCreatePerson():
print('testCreatePerson') print('testCreatePerson')
currDir = os.getcwd() currDir = os.getcwd()
nickname = 'test382' nickname = 'test382'
@ -1417,7 +1423,7 @@ def testCreatePerson():
shutil.rmtree(baseDir) shutil.rmtree(baseDir)
def testAuthentication(): def _testAuthentication():
print('testAuthentication') print('testAuthentication')
currDir = os.getcwd() currDir = os.getcwd()
nickname = 'test8743' nickname = 'test8743'
@ -1824,7 +1830,7 @@ def testClientToServer():
# shutil.rmtree(bobDir) # shutil.rmtree(bobDir)
def testActorParsing(): def _testActorParsing():
print('testActorParsing') print('testActorParsing')
actor = 'https://mydomain:72/users/mynick' actor = 'https://mydomain:72/users/mynick'
domain, port = getDomainFromActor(actor) domain, port = getDomainFromActor(actor)
@ -1871,7 +1877,7 @@ def testActorParsing():
assert nickname == 'othernick' assert nickname == 'othernick'
def testWebLinks(): def _testWebLinks():
print('testWebLinks') print('testWebLinks')
exampleText = \ exampleText = \
@ -1998,7 +2004,7 @@ def testWebLinks():
assert resultText == exampleText assert resultText == exampleText
def testAddEmoji(): def _testAddEmoji():
print('testAddEmoji') print('testAddEmoji')
content = "Emoji :lemon: :strawberry: :banana:" content = "Emoji :lemon: :strawberry: :banana:"
httpPrefix = 'http' httpPrefix = 'http'
@ -2045,7 +2051,7 @@ def testAddEmoji():
shutil.rmtree(baseDirOriginal + '/.tests') shutil.rmtree(baseDirOriginal + '/.tests')
def testGetStatusNumber(): def _testGetStatusNumber():
print('testGetStatusNumber') print('testGetStatusNumber')
prevStatusNumber = None prevStatusNumber = None
for i in range(1, 20): for i in range(1, 20):
@ -2056,7 +2062,7 @@ def testGetStatusNumber():
prevStatusNumber = int(statusNumber) prevStatusNumber = int(statusNumber)
def testJsonString() -> None: def _testJsonString() -> None:
print('testJsonString') print('testJsonString')
filename = '.epicyon_tests_testJsonString.json' filename = '.epicyon_tests_testJsonString.json'
messageStr = "Crème brûlée यह एक परीक्षण ह" messageStr = "Crème brûlée यह एक परीक्षण ह"
@ -2072,7 +2078,7 @@ def testJsonString() -> None:
os.remove(filename) os.remove(filename)
def testSaveLoadJson(): def _testSaveLoadJson():
print('testSaveLoadJson') print('testSaveLoadJson')
testJson = { testJson = {
"param1": 3, "param1": 3,
@ -2092,7 +2098,7 @@ def testSaveLoadJson():
os.remove(testFilename) os.remove(testFilename)
def testTheme(): def _testTheme():
print('testTheme') print('testTheme')
css = 'somestring --background-value: 24px; --foreground-value: 24px;' css = 'somestring --background-value: 24px; --foreground-value: 24px;'
result = setCSSparam(css, 'background-value', '32px') result = setCSSparam(css, 'background-value', '32px')
@ -2110,7 +2116,7 @@ def testTheme():
assert result == '--background-value: 32px; --foreground-value: 24px;' assert result == '--background-value: 32px; --foreground-value: 24px;'
def testRecentPostsCache(): def _testRecentPostsCache():
print('testRecentPostsCache') print('testRecentPostsCache')
recentPostsCache = {} recentPostsCache = {}
maxRecentPosts = 3 maxRecentPosts = 3
@ -2126,7 +2132,7 @@ def testRecentPostsCache():
assert len(recentPostsCache['html'].items()) == maxRecentPosts assert len(recentPostsCache['html'].items()) == maxRecentPosts
def testRemoveTextFormatting(): def _testRemoveTextFormatting():
print('testRemoveTextFormatting') print('testRemoveTextFormatting')
testStr = '<p>Text without formatting</p>' testStr = '<p>Text without formatting</p>'
resultStr = removeTextFormatting(testStr) resultStr = removeTextFormatting(testStr)
@ -2136,7 +2142,7 @@ def testRemoveTextFormatting():
assert(resultStr == '<p>Text with formatting</p>') assert(resultStr == '<p>Text with formatting</p>')
def testJsonld(): def _testJsonld():
print("testJsonld") print("testJsonld")
jldDocument = { jldDocument = {
@ -2225,14 +2231,14 @@ def testJsonld():
signedDocument2['signature']['signatureValue']) signedDocument2['signature']['signatureValue'])
def testSiteIsActive(): def _testSiteIsActive():
print('testSiteIsActive') print('testSiteIsActive')
assert(siteIsActive('https://archive.org')) assert(siteIsActive('https://archive.org'))
assert(siteIsActive('https://mastodon.social')) assert(siteIsActive('https://mastodon.social'))
assert(not siteIsActive('https://notarealwebsite.a.b.c')) assert(not siteIsActive('https://notarealwebsite.a.b.c'))
def testRemoveHtml(): def _testRemoveHtml():
print('testRemoveHtml') print('testRemoveHtml')
testStr = 'This string has no html.' testStr = 'This string has no html.'
assert(removeHtml(testStr) == testStr) assert(removeHtml(testStr) == testStr)
@ -2249,7 +2255,7 @@ def testRemoveHtml():
'This string contains a url http://somesite.or.other') 'This string contains a url http://somesite.or.other')
def testDangerousCSS(): def _testDangerousCSS():
print('testDangerousCSS') print('testDangerousCSS')
baseDir = os.getcwd() baseDir = os.getcwd()
for subdir, dirs, files in os.walk(baseDir): for subdir, dirs, files in os.walk(baseDir):
@ -2260,7 +2266,7 @@ def testDangerousCSS():
break break
def testDangerousMarkup(): def _testDangerousMarkup():
print('testDangerousMarkup') print('testDangerousMarkup')
allowLocalNetworkAccess = False allowLocalNetworkAccess = False
content = '<p>This is a valid message</p>' content = '<p>This is a valid message</p>'
@ -2322,7 +2328,7 @@ def testDangerousMarkup():
assert(not dangerousMarkup(content, allowLocalNetworkAccess)) assert(not dangerousMarkup(content, allowLocalNetworkAccess))
def runHtmlReplaceQuoteMarks(): def _runHtmlReplaceQuoteMarks():
print('htmlReplaceQuoteMarks') print('htmlReplaceQuoteMarks')
testStr = 'The "cat" "sat" on the mat' testStr = 'The "cat" "sat" on the mat'
result = htmlReplaceQuoteMarks(testStr) result = htmlReplaceQuoteMarks(testStr)
@ -2341,7 +2347,7 @@ def runHtmlReplaceQuoteMarks():
assert result == '“hello” <a href="somesite.html">“test” html</a>' assert result == '“hello” <a href="somesite.html">“test” html</a>'
def testJsonPostAllowsComments(): def _testJsonPostAllowsComments():
print('testJsonPostAllowsComments') print('testJsonPostAllowsComments')
postJsonObject = { postJsonObject = {
"id": "123" "id": "123"
@ -2373,7 +2379,7 @@ def testJsonPostAllowsComments():
assert not jsonPostAllowsComments(postJsonObject) assert not jsonPostAllowsComments(postJsonObject)
def testRemoveIdEnding(): def _testRemoveIdEnding():
print('testRemoveIdEnding') print('testRemoveIdEnding')
testStr = 'https://activitypub.somedomain.net' testStr = 'https://activitypub.somedomain.net'
resultStr = removeIdEnding(testStr) resultStr = removeIdEnding(testStr)
@ -2399,7 +2405,7 @@ def testRemoveIdEnding():
'https://event.somedomain.net/users/foo/statuses/34544814814' 'https://event.somedomain.net/users/foo/statuses/34544814814'
def testValidContentWarning(): def _testValidContentWarning():
print('testValidContentWarning') print('testValidContentWarning')
resultStr = validContentWarning('Valid content warning') resultStr = validContentWarning('Valid content warning')
assert resultStr == 'Valid content warning' assert resultStr == 'Valid content warning'
@ -2412,7 +2418,7 @@ def testValidContentWarning():
assert resultStr == 'Invalid content warning' assert resultStr == 'Invalid content warning'
def testTranslations(): def _testTranslations():
print('testTranslations') print('testTranslations')
languagesStr = ('ar', 'ca', 'cy', 'de', 'es', 'fr', 'ga', languagesStr = ('ar', 'ca', 'cy', 'de', 'es', 'fr', 'ga',
'hi', 'it', 'ja', 'oc', 'pt', 'ru', 'zh') 'hi', 'it', 'ja', 'oc', 'pt', 'ru', 'zh')
@ -2438,7 +2444,7 @@ def testTranslations():
assert langJson.get(englishStr) assert langJson.get(englishStr)
def testConstantTimeStringCheck(): def _testConstantTimeStringCheck():
print('testConstantTimeStringCheck') print('testConstantTimeStringCheck')
assert constantTimeStringCheck('testing', 'testing') assert constantTimeStringCheck('testing', 'testing')
assert not constantTimeStringCheck('testing', '1234') assert not constantTimeStringCheck('testing', '1234')
@ -2476,7 +2482,7 @@ def testConstantTimeStringCheck():
assert int(timeDiffMicroseconds) < 10 assert int(timeDiffMicroseconds) < 10
def testReplaceEmailQuote(): def _testReplaceEmailQuote():
print('testReplaceEmailQuote') print('testReplaceEmailQuote')
testStr = '<p>This content has no quote.</p>' testStr = '<p>This content has no quote.</p>'
assert htmlReplaceEmailQuote(testStr) == testStr assert htmlReplaceEmailQuote(testStr) == testStr
@ -2534,7 +2540,7 @@ def testReplaceEmailQuote():
assert resultStr == expectedStr assert resultStr == expectedStr
def testRemoveHtmlTag(): def _testRemoveHtmlTag():
print('testRemoveHtmlTag') print('testRemoveHtmlTag')
testStr = "<p><img width=\"864\" height=\"486\" " + \ testStr = "<p><img width=\"864\" height=\"486\" " + \
"src=\"https://somesiteorother.com/image.jpg\"></p>" "src=\"https://somesiteorother.com/image.jpg\"></p>"
@ -2543,7 +2549,7 @@ def testRemoveHtmlTag():
"src=\"https://somesiteorother.com/image.jpg\"></p>" "src=\"https://somesiteorother.com/image.jpg\"></p>"
def testHashtagRuleTree(): def _testHashtagRuleTree():
print('testHashtagRuleTree') print('testHashtagRuleTree')
operators = ('not', 'and', 'or', 'xor', 'from', 'contains') operators = ('not', 'and', 'or', 'xor', 'from', 'contains')
@ -2675,7 +2681,7 @@ def testHashtagRuleTree():
assert not hashtagRuleResolve(tree, hashtags, moderated, content, url) assert not hashtagRuleResolve(tree, hashtags, moderated, content, url)
def testGetNewswireTags(): def _testGetNewswireTags():
print('testGetNewswireTags') print('testGetNewswireTags')
rssDescription = '<img src="https://somesite/someimage.jpg" ' + \ rssDescription = '<img src="https://somesite/someimage.jpg" ' + \
'class="misc-stuff" alt="#ExcitingHashtag" ' + \ 'class="misc-stuff" alt="#ExcitingHashtag" ' + \
@ -2689,7 +2695,7 @@ def testGetNewswireTags():
assert '#ExcitingHashtag' in tags assert '#ExcitingHashtag' in tags
def testFirstParagraphFromString(): def _testFirstParagraphFromString():
print('testFirstParagraphFromString') print('testFirstParagraphFromString')
testStr = \ testStr = \
'<p><a href="https://somesite.com/somepath">This is a test</a></p>' + \ '<p><a href="https://somesite.com/somepath">This is a test</a></p>' + \
@ -2704,7 +2710,7 @@ def testFirstParagraphFromString():
assert resultStr == testStr assert resultStr == testStr
def testParseFeedDate(): def _testParseFeedDate():
print('testParseFeedDate') print('testParseFeedDate')
pubDate = "2020-12-14T00:08:06+00:00" pubDate = "2020-12-14T00:08:06+00:00"
@ -2724,7 +2730,7 @@ def testParseFeedDate():
assert publishedDate == "2020-11-22 18:51:33+00:00" assert publishedDate == "2020-11-22 18:51:33+00:00"
def testValidNickname(): def _testValidNickname():
print('testValidNickname') print('testValidNickname')
domain = 'somedomain.net' domain = 'somedomain.net'
@ -2741,7 +2747,7 @@ def testValidNickname():
assert not validNickname(domain, nickname) assert not validNickname(domain, nickname)
def testGuessHashtagCategory() -> None: def _testGuessHashtagCategory() -> None:
print('testGuessHashtagCategory') print('testGuessHashtagCategory')
hashtagCategories = { hashtagCategories = {
"foo": ["swan", "goose"], "foo": ["swan", "goose"],
@ -2754,7 +2760,7 @@ def testGuessHashtagCategory() -> None:
assert guess == "bar" assert guess == "bar"
def testGetMentionedPeople() -> None: def _testGetMentionedPeople() -> None:
print('testGetMentionedPeople') print('testGetMentionedPeople')
baseDir = os.getcwd() baseDir = os.getcwd()
@ -2768,7 +2774,7 @@ def testGetMentionedPeople() -> None:
assert actors[1] == "https://cave.site/users/bat" assert actors[1] == "https://cave.site/users/bat"
def testReplyToPublicPost() -> None: def _testReplyToPublicPost() -> None:
baseDir = os.getcwd() baseDir = os.getcwd()
nickname = 'test7492362' nickname = 'test7492362'
domain = 'other.site' domain = 'other.site'
@ -2810,7 +2816,7 @@ def testReplyToPublicPost() -> None:
httpPrefix + '://rat.site/users/ninjarodent' httpPrefix + '://rat.site/users/ninjarodent'
def getFunctionCallArgs(name: str, lines: [], startLineCtr: int) -> []: def _getFunctionCallArgs(name: str, lines: [], startLineCtr: int) -> []:
"""Returns the arguments of a function call given lines """Returns the arguments of a function call given lines
of source code and a starting line number of source code and a starting line number
""" """
@ -2848,7 +2854,7 @@ def getFunctionCalls(name: str, lines: [], startLineCtr: int,
return callsFunctions return callsFunctions
def functionArgsMatch(callArgs: [], funcArgs: []): def _functionArgsMatch(callArgs: [], funcArgs: []):
"""Do the function artuments match the function call arguments """Do the function artuments match the function call arguments
""" """
if len(callArgs) == len(funcArgs): if len(callArgs) == len(funcArgs):
@ -2872,7 +2878,7 @@ def functionArgsMatch(callArgs: [], funcArgs: []):
return callArgsCtr >= funcArgsCtr return callArgsCtr >= funcArgsCtr
def testFunctions(): def _testFunctions():
print('testFunctions') print('testFunctions')
function = {} function = {}
functionProperties = {} functionProperties = {}
@ -2959,11 +2965,11 @@ def testFunctions():
lineCtr += 1 lineCtr += 1
continue continue
callArgs = \ callArgs = \
getFunctionCallArgs(name, _getFunctionCallArgs(name,
modules[modName]['lines'], modules[modName]['lines'],
lineCtr) lineCtr)
if not functionArgsMatch(callArgs, funcArgs = functionProperties[name]['args']
functionProperties[name]['args']): if not _functionArgsMatch(callArgs, funcArgs):
print('Call to function ' + name + print('Call to function ' + name +
' does not match its arguments') ' does not match its arguments')
print('def args: ' + print('def args: ' +
@ -3012,7 +3018,7 @@ def testFunctions():
'runSharesExpireWatchdog', 'runSharesExpireWatchdog',
'getThisWeeksEvents', 'getThisWeeksEvents',
'getAvailability', 'getAvailability',
'testThreadsFunction', '_testThreadsFunction',
'createServerAlice', 'createServerAlice',
'createServerBob', 'createServerBob',
'createServerEve', 'createServerEve',
@ -3224,7 +3230,7 @@ def testFunctions():
'-Gsep=+120 -Tx11 epicyon.dot') '-Gsep=+120 -Tx11 epicyon.dot')
def testLinksWithinPost() -> None: def _testLinksWithinPost() -> None:
baseDir = os.getcwd() baseDir = os.getcwd()
nickname = 'test27636' nickname = 'test27636'
domain = 'rando.site' domain = 'rando.site'
@ -3276,7 +3282,7 @@ def testLinksWithinPost() -> None:
assert postJsonObject['object']['content'] == content assert postJsonObject['object']['content'] == content
def testMastoApi(): def _testMastoApi():
print('testMastoApi') print('testMastoApi')
nickname = 'ThisIsATestNickname' nickname = 'ThisIsATestNickname'
mastoId = getMastoApiV1IdFromNickname(nickname) mastoId = getMastoApiV1IdFromNickname(nickname)
@ -3287,7 +3293,7 @@ def testMastoApi():
assert nickname2 == nickname assert nickname2 == nickname
def testDomainHandling(): def _testDomainHandling():
print('testDomainHandling') print('testDomainHandling')
testDomain = 'localhost' testDomain = 'localhost'
assert decodedHost(testDomain) == testDomain assert decodedHost(testDomain) == testDomain
@ -3299,7 +3305,7 @@ def testDomainHandling():
assert decodedHost(testDomain) == "españa.icom.museum" assert decodedHost(testDomain) == "españa.icom.museum"
def testPrepareHtmlPostNickname(): def _testPrepareHtmlPostNickname():
print('testPrepareHtmlPostNickname') print('testPrepareHtmlPostNickname')
postHtml = '<a class="imageAnchor" href="/users/bob?replyfollowers=' postHtml = '<a class="imageAnchor" href="/users/bob?replyfollowers='
postHtml += '<a class="imageAnchor" href="/users/bob?repeatprivate=' postHtml += '<a class="imageAnchor" href="/users/bob?repeatprivate='
@ -3314,7 +3320,7 @@ def testPrepareHtmlPostNickname():
assert result == expectedHtml assert result == expectedHtml
def testValidHashTag(): def _testValidHashTag():
print('testValidHashTag') print('testValidHashTag')
assert validHashTag('ThisIsValid') assert validHashTag('ThisIsValid')
assert validHashTag('ThisIsValid12345') assert validHashTag('ThisIsValid12345')
@ -3329,7 +3335,7 @@ def testValidHashTag():
assert not validHashTag('This=IsAlsoNotValid"') assert not validHashTag('This=IsAlsoNotValid"')
def testMarkdownToHtml(): def _testMarkdownToHtml():
print('testMarkdownToHtml') print('testMarkdownToHtml')
markdown = 'This is just plain text' markdown = 'This is just plain text'
assert markdownToHtml(markdown) == markdown assert markdownToHtml(markdown) == markdown
@ -3376,7 +3382,7 @@ def testMarkdownToHtml():
'Or <img class="markdownImage" src="/cat.jpg" alt="pounce" />.' 'Or <img class="markdownImage" src="/cat.jpg" alt="pounce" />.'
def testExtractTextFieldsInPOST(): def _testExtractTextFieldsInPOST():
print('testExtractTextFieldsInPOST') print('testExtractTextFieldsInPOST')
boundary = '-----------------------------116202748023898664511855843036' boundary = '-----------------------------116202748023898664511855843036'
formData = '-----------------------------116202748023898664511855' + \ formData = '-----------------------------116202748023898664511855' + \
@ -3413,7 +3419,7 @@ def testExtractTextFieldsInPOST():
assert fields['message'] == 'This is a ; test' assert fields['message'] == 'This is a ; test'
def testSpeakerReplaceLinks(): def _testSpeakerReplaceLinks():
print('testSpeakerReplaceLinks') print('testSpeakerReplaceLinks')
text = 'The Tor Project: For Snowflake volunteers: If you use ' + \ text = 'The Tor Project: For Snowflake volunteers: If you use ' + \
'Firefox, Brave, or Chrome, our Snowflake extension turns ' + \ 'Firefox, Brave, or Chrome, our Snowflake extension turns ' + \
@ -3431,7 +3437,7 @@ def testSpeakerReplaceLinks():
assert 'Web link support.torproject.org' in result assert 'Web link support.torproject.org' in result
def testCamelCaseSplit(): def _testCamelCaseSplit():
print('testCamelCaseSplit') print('testCamelCaseSplit')
testStr = 'ThisIsCamelCase' testStr = 'ThisIsCamelCase'
assert camelCaseSplit(testStr) == 'This Is Camel Case' assert camelCaseSplit(testStr) == 'This Is Camel Case'
@ -3440,7 +3446,7 @@ def testCamelCaseSplit():
assert camelCaseSplit(testStr) == 'Notcamelcase test' assert camelCaseSplit(testStr) == 'Notcamelcase test'
def testEmojiImages(): def _testEmojiImages():
print('testEmojiImages') print('testEmojiImages')
emojiFilename = 'emoji/default_emoji.json' emojiFilename = 'emoji/default_emoji.json'
assert os.path.isfile(emojiFilename) assert os.path.isfile(emojiFilename)
@ -3454,7 +3460,7 @@ def testEmojiImages():
assert os.path.isfile(emojiImageFilename) assert os.path.isfile(emojiImageFilename)
def testExtractPGPPublicKey(): def _testExtractPGPPublicKey():
print('testExtractPGPPublicKey') print('testExtractPGPPublicKey')
pubKey = \ pubKey = \
'-----BEGIN PGP PUBLIC KEY BLOCK-----\n\n' + \ '-----BEGIN PGP PUBLIC KEY BLOCK-----\n\n' + \
@ -3601,7 +3607,7 @@ def testUpdateActor():
shutil.rmtree(baseDir + '/.tests') shutil.rmtree(baseDir + '/.tests')
def testRemovePostInteractions() -> None: def _testRemovePostInteractions() -> None:
print('testRemovePostInteractions') print('testRemovePostInteractions')
postJsonObject = { postJsonObject = {
"type": "Create", "type": "Create",
@ -3633,7 +3639,7 @@ def testRemovePostInteractions() -> None:
assert not removePostInteractions(postJsonObject, False) assert not removePostInteractions(postJsonObject, False)
def testSpoofGeolocation() -> None: def _testSpoofGeolocation() -> None:
print('testSpoofGeolocation') print('testSpoofGeolocation')
nogoLine = \ nogoLine = \
'NEW YORK, USA: 73.951W,40.879, 73.974W,40.83, ' + \ 'NEW YORK, USA: 73.951W,40.879, 73.974W,40.83, ' + \
@ -3882,7 +3888,7 @@ def testSpoofGeolocation() -> None:
kmlFile.close() kmlFile.close()
def testSkills() -> None: def _testSkills() -> None:
print('testSkills') print('testSkills')
actorJson = { actorJson = {
'hasOccupation': [ 'hasOccupation': [
@ -3908,7 +3914,7 @@ def testSkills() -> None:
assert actorSkillValue(actorJson, 'gardening') == 70 assert actorSkillValue(actorJson, 'gardening') == 70
def testRoles() -> None: def _testRoles() -> None:
print('testRoles') print('testRoles')
actorJson = { actorJson = {
'hasOccupation': [ 'hasOccupation': [
@ -3935,60 +3941,60 @@ def testRoles() -> None:
def runAllTests(): def runAllTests():
print('Running tests...') print('Running tests...')
updateDefaultThemesList(os.getcwd()) updateDefaultThemesList(os.getcwd())
testFunctions() _testFunctions()
testRoles() _testRoles()
testSkills() _testSkills()
testSpoofGeolocation() _testSpoofGeolocation()
testRemovePostInteractions() _testRemovePostInteractions()
testExtractPGPPublicKey() _testExtractPGPPublicKey()
testEmojiImages() _testEmojiImages()
testCamelCaseSplit() _testCamelCaseSplit()
testSpeakerReplaceLinks() _testSpeakerReplaceLinks()
testExtractTextFieldsInPOST() _testExtractTextFieldsInPOST()
testMarkdownToHtml() _testMarkdownToHtml()
testValidHashTag() _testValidHashTag()
testPrepareHtmlPostNickname() _testPrepareHtmlPostNickname()
testDomainHandling() _testDomainHandling()
testMastoApi() _testMastoApi()
testLinksWithinPost() _testLinksWithinPost()
testReplyToPublicPost() _testReplyToPublicPost()
testGetMentionedPeople() _testGetMentionedPeople()
testGuessHashtagCategory() _testGuessHashtagCategory()
testValidNickname() _testValidNickname()
testParseFeedDate() _testParseFeedDate()
testFirstParagraphFromString() _testFirstParagraphFromString()
testGetNewswireTags() _testGetNewswireTags()
testHashtagRuleTree() _testHashtagRuleTree()
testRemoveHtmlTag() _testRemoveHtmlTag()
testReplaceEmailQuote() _testReplaceEmailQuote()
testConstantTimeStringCheck() _testConstantTimeStringCheck()
testTranslations() _testTranslations()
testValidContentWarning() _testValidContentWarning()
testRemoveIdEnding() _testRemoveIdEnding()
testJsonPostAllowsComments() _testJsonPostAllowsComments()
runHtmlReplaceQuoteMarks() _runHtmlReplaceQuoteMarks()
testDangerousCSS() _testDangerousCSS()
testDangerousMarkup() _testDangerousMarkup()
testRemoveHtml() _testRemoveHtml()
testSiteIsActive() _testSiteIsActive()
testJsonld() _testJsonld()
testRemoveTextFormatting() _testRemoveTextFormatting()
testWebLinks() _testWebLinks()
testRecentPostsCache() _testRecentPostsCache()
testTheme() _testTheme()
testSaveLoadJson() _testSaveLoadJson()
testJsonString() _testJsonString()
testGetStatusNumber() _testGetStatusNumber()
testAddEmoji() _testAddEmoji()
testActorParsing() _testActorParsing()
testHttpsig() _testHttpsig()
testHttpSigNew() _testHttpSigNew()
testCache() _testCache()
testThreads() _testThreads()
testCreatePerson() _testCreatePerson()
testAuthentication() _testAuthentication()
testFollowersOfPerson() _testFollowersOfPerson()
testNoOfFollowersOnDomain() _testNoOfFollowersOnDomain()
testFollows() _testFollows()
testGroupFollowers() _testGroupFollowers()
print('Tests succeeded\n') print('Tests succeeded\n')