__filename__ = "tests.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.2.0" __maintainer__ = "Bob Mottram" __email__ = "bob@freedombone.net" __status__ = "Production" import time import os import shutil import json from time import gmtime, strftime from pprint import pprint from httpsig import signPostHeaders from httpsig import verifyPostHeaders from httpsig import messageContentDigest from cache import storePersonInCache from cache import getPersonFromCache from threads import threadWithTrace from daemon import runDaemon from session import createSession from posts import getMentionedPeople from posts import validContentWarning from posts import deleteAllPosts from posts import createPublicPost from posts import sendPost from posts import noOfFollowersOnDomain from posts import groupFollowersByDomain from posts import archivePostsForPerson from posts import sendPostViaServer from follow import clearFollows from follow import clearFollowers from follow import sendFollowRequestViaServer from follow import sendUnfollowRequestViaServer from utils import decodedHost from utils import getFullDomain from utils import validNickname from utils import firstParagraphFromString from utils import removeIdEnding from siteactive import siteIsActive from utils import updateRecentPostsCache from utils import followPerson from utils import getNicknameFromActor from utils import getDomainFromActor from utils import copytree from utils import loadJson from utils import saveJson from utils import getStatusNumber from utils import getFollowersOfPerson from utils import removeHtml from utils import dangerousMarkup from follow import followerOfPerson from follow import unfollowAccount from follow import unfollowerOfAccount from follow import sendFollowRequest from person import createPerson from person import setDisplayNickname from person import setBio # from person import generateRSAKey from skills import setSkillLevel from roles import setRole from roles import outboxDelegate from auth import constantTimeStringCheck from auth import createBasicAuthHeader from auth import authorizeBasic from auth import storeBasicCredentials from like import likePost from like import sendLikeViaServer from announce import announcePublic from announce import sendAnnounceViaServer from media import getMediaPath from media import getAttachmentMediaType from delete import sendDeleteViaServer from inbox import jsonPostAllowsComments from inbox import validInbox from inbox import validInboxFilenames from categories import guessHashtagCategory from content import validHashTag from content import htmlReplaceEmailQuote from content import htmlReplaceQuoteMarks from content import dangerousCSS from content import addWebLinks from content import replaceEmojiFromTags from content import addHtmlTags from content import removeLongWords from content import replaceContentDuplicates from content import removeTextFormatting from content import removeHtmlTag from theme import setCSSparam from linked_data_sig import generateJsonSignature from linked_data_sig import verifyJsonSignature from newsdaemon import hashtagRuleTree from newsdaemon import hashtagRuleResolve from newswire import getNewswireTags from newswire import parseFeedDate from mastoapiv1 import getMastoApiV1IdFromNickname from mastoapiv1 import getNicknameFromMastoApiV1Id from webapp_post import prepareHtmlPostNickname testServerAliceRunning = False testServerBobRunning = False testServerEveRunning = False thrAlice = None thrBob = None thrEve = None def _testHttpsigBase(withDigest): print('testHttpsig(' + str(withDigest) + ')') baseDir = os.getcwd() path = baseDir + '/.testHttpsigBase' if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) os.chdir(path) contentType = 'application/activity+json' nickname = 'socrates' domain = 'argumentative.social' httpPrefix = 'https' port = 5576 password = 'SuperSecretPassword' privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(path, nickname, domain, port, httpPrefix, False, False, password) assert privateKeyPem messageBodyJson = { "a key": "a value", "another key": "A string", "yet another key": "Another string" } messageBodyJsonStr = json.dumps(messageBodyJson) headersDomain = getFullDomain(domain, port) dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) boxpath = '/inbox' if not withDigest: headers = { 'host': headersDomain, 'date': dateStr, 'content-type': 'application/json' } signatureHeader = \ signPostHeaders(dateStr, privateKeyPem, nickname, domain, port, domain, port, boxpath, httpPrefix, None) else: bodyDigest = messageContentDigest(messageBodyJsonStr) contentLength = len(messageBodyJsonStr) headers = { 'host': headersDomain, 'date': dateStr, 'digest': f'SHA-256={bodyDigest}', 'content-type': contentType, 'content-length': str(contentLength) } signatureHeader = \ signPostHeaders(dateStr, privateKeyPem, nickname, domain, port, domain, port, boxpath, httpPrefix, messageBodyJsonStr) headers['signature'] = signatureHeader assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, boxpath, False, None, messageBodyJsonStr, False) if withDigest: # everything correct except for content-length headers['content-length'] = str(contentLength + 2) assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, boxpath, False, None, messageBodyJsonStr, False) is False assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, '/parambulator' + boxpath, False, None, messageBodyJsonStr, False) is False assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, boxpath, True, None, messageBodyJsonStr, False) is False if not withDigest: # fake domain headers = { 'host': 'bogon.domain', 'date': dateStr, 'content-type': 'application/json' } else: # correct domain but fake message messageBodyJsonStr = \ '{"a key": "a value", "another key": "Fake GNUs", ' + \ '"yet another key": "More Fake GNUs"}' contentLength = len(messageBodyJsonStr) bodyDigest = messageContentDigest(messageBodyJsonStr) headers = { 'host': domain, 'date': dateStr, 'digest': f'SHA-256={bodyDigest}', 'content-type': contentType, 'content-length': str(contentLength) } headers['signature'] = signatureHeader assert verifyPostHeaders(httpPrefix, publicKeyPem, headers, boxpath, True, None, messageBodyJsonStr, False) is False os.chdir(baseDir) shutil.rmtree(path) def testHttpsig(): _testHttpsigBase(True) _testHttpsigBase(False) def testCache(): print('testCache') personUrl = "cat@cardboard.box" personJson = { "id": 123456, "test": "This is a test" } personCache = {} storePersonInCache(None, personUrl, personJson, personCache, True) result = getPersonFromCache(None, personUrl, personCache, True) assert result['id'] == 123456 assert result['test'] == 'This is a test' def testThreadsFunction(param: str): for i in range(10000): time.sleep(2) def testThreads(): print('testThreads') thr = \ threadWithTrace(target=testThreadsFunction, args=('test',), daemon=True) thr.start() assert thr.is_alive() is True time.sleep(1) thr.kill() thr.join() assert thr.is_alive() is False def createServerAlice(path: str, domain: str, port: int, bobAddress: str, federationList: [], hasFollows: bool, hasPosts: bool, sendThreads: []): print('Creating test server: Alice on port ' + str(port)) if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) os.chdir(path) nickname = 'alice' httpPrefix = 'http' proxyType = None password = 'alicepass' maxReplies = 64 domainMaxPostsPerDay = 1000 accountMaxPostsPerDay = 1000 allowDeletion = True privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(path, nickname, domain, port, httpPrefix, True, False, password) deleteAllPosts(path, nickname, domain, 'inbox') deleteAllPosts(path, nickname, domain, 'outbox') assert setSkillLevel(path, nickname, domain, 'hacking', 90) assert setRole(path, nickname, domain, 'someproject', 'guru') if hasFollows: followPerson(path, nickname, domain, 'bob', bobAddress, federationList, False) followerOfPerson(path, nickname, domain, 'bob', bobAddress, federationList, False) if hasPosts: testFollowersOnly = False testSaveToFile = True clientToServer = False testCommentsEnabled = True testAttachImageFilename = None testMediaType = None testImageDescription = None createPublicPost(path, nickname, domain, port, httpPrefix, "No wise fish would go anywhere without a porpoise", testFollowersOnly, testSaveToFile, clientToServer, testCommentsEnabled, testAttachImageFilename, testMediaType, testImageDescription) createPublicPost(path, nickname, domain, port, httpPrefix, "Curiouser and curiouser!", testFollowersOnly, testSaveToFile, clientToServer, testCommentsEnabled, testAttachImageFilename, testMediaType, testImageDescription) createPublicPost(path, nickname, domain, port, httpPrefix, "In the gardens of memory, in the palace " + "of dreams, that is where you and I shall meet", testFollowersOnly, testSaveToFile, clientToServer, testCommentsEnabled, testAttachImageFilename, testMediaType, testImageDescription) global testServerAliceRunning testServerAliceRunning = True maxMentions = 10 maxEmoji = 10 onionDomain = None i2pDomain = None allowLocalNetworkAccess = True maxNewswirePosts = 20 dormantMonths = 3 sendThreadsTimeoutMins = 30 maxFollowers = 10 verifyAllSignatures = True print('Server running: Alice') runDaemon(verifyAllSignatures, sendThreadsTimeoutMins, dormantMonths, maxNewswirePosts, allowLocalNetworkAccess, 2048, False, True, False, False, True, maxFollowers, 0, 100, 1024, 5, False, 0, False, 1, False, False, False, 5, True, True, 'en', __version__, "instanceId", False, path, domain, onionDomain, i2pDomain, None, port, port, httpPrefix, federationList, maxMentions, maxEmoji, False, proxyType, maxReplies, domainMaxPostsPerDay, accountMaxPostsPerDay, allowDeletion, True, True, False, sendThreads, False) def createServerBob(path: str, domain: str, port: int, aliceAddress: str, federationList: [], hasFollows: bool, hasPosts: bool, sendThreads: []): print('Creating test server: Bob on port ' + str(port)) if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) os.chdir(path) nickname = 'bob' httpPrefix = 'http' proxyType = None clientToServer = False password = 'bobpass' maxReplies = 64 domainMaxPostsPerDay = 1000 accountMaxPostsPerDay = 1000 allowDeletion = True privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(path, nickname, domain, port, httpPrefix, True, False, password) deleteAllPosts(path, nickname, domain, 'inbox') deleteAllPosts(path, nickname, domain, 'outbox') assert setRole(path, nickname, domain, 'bandname', 'bass player') assert setRole(path, nickname, domain, 'bandname', 'publicist') if hasFollows: followPerson(path, nickname, domain, 'alice', aliceAddress, federationList, False) followerOfPerson(path, nickname, domain, 'alice', aliceAddress, federationList, False) if hasPosts: testFollowersOnly = False testSaveToFile = True testCommentsEnabled = True testAttachImageFilename = None testImageDescription = None testMediaType = None createPublicPost(path, nickname, domain, port, httpPrefix, "It's your life, live it your way.", testFollowersOnly, testSaveToFile, clientToServer, testCommentsEnabled, testAttachImageFilename, testMediaType, testImageDescription) createPublicPost(path, nickname, domain, port, httpPrefix, "One of the things I've realised is that " + "I am very simple", testFollowersOnly, testSaveToFile, clientToServer, testCommentsEnabled, testAttachImageFilename, testMediaType, testImageDescription) createPublicPost(path, nickname, domain, port, httpPrefix, "Quantum physics is a bit of a passion of mine", testFollowersOnly, testSaveToFile, clientToServer, testCommentsEnabled, testAttachImageFilename, testMediaType, testImageDescription) global testServerBobRunning testServerBobRunning = True maxMentions = 10 maxEmoji = 10 onionDomain = None i2pDomain = None allowLocalNetworkAccess = True maxNewswirePosts = 20 dormantMonths = 3 sendThreadsTimeoutMins = 30 maxFollowers = 10 verifyAllSignatures = True print('Server running: Bob') runDaemon(verifyAllSignatures, sendThreadsTimeoutMins, dormantMonths, maxNewswirePosts, allowLocalNetworkAccess, 2048, False, True, False, False, True, maxFollowers, 0, 100, 1024, 5, False, 0, False, 1, False, False, False, 5, True, True, 'en', __version__, "instanceId", False, path, domain, onionDomain, i2pDomain, None, port, port, httpPrefix, federationList, maxMentions, maxEmoji, False, proxyType, maxReplies, domainMaxPostsPerDay, accountMaxPostsPerDay, allowDeletion, True, True, False, sendThreads, False) def createServerEve(path: str, domain: str, port: int, federationList: [], hasFollows: bool, hasPosts: bool, sendThreads: []): print('Creating test server: Eve on port ' + str(port)) if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) os.chdir(path) nickname = 'eve' httpPrefix = 'http' proxyType = None password = 'evepass' maxReplies = 64 allowDeletion = True privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(path, nickname, domain, port, httpPrefix, True, False, password) deleteAllPosts(path, nickname, domain, 'inbox') deleteAllPosts(path, nickname, domain, 'outbox') global testServerEveRunning testServerEveRunning = True maxMentions = 10 maxEmoji = 10 onionDomain = None i2pDomain = None allowLocalNetworkAccess = True maxNewswirePosts = 20 dormantMonths = 3 sendThreadsTimeoutMins = 30 maxFollowers = 10 verifyAllSignatures = True print('Server running: Eve') runDaemon(verifyAllSignatures, sendThreadsTimeoutMins, dormantMonths, maxNewswirePosts, allowLocalNetworkAccess, 2048, False, True, False, False, True, maxFollowers, 0, 100, 1024, 5, False, 0, False, 1, False, False, False, 5, True, True, 'en', __version__, "instanceId", False, path, domain, onionDomain, i2pDomain, None, port, port, httpPrefix, federationList, maxMentions, maxEmoji, False, proxyType, maxReplies, allowDeletion, True, True, False, sendThreads, False) def testPostMessageBetweenServers(): print('Testing sending message from one server to the inbox of another') global testServerAliceRunning global testServerBobRunning testServerAliceRunning = False testServerBobRunning = False httpPrefix = 'http' proxyType = None baseDir = os.getcwd() if os.path.isdir(baseDir + '/.tests'): shutil.rmtree(baseDir + '/.tests') os.mkdir(baseDir + '/.tests') # create the servers aliceDir = baseDir + '/.tests/alice' aliceDomain = '127.0.0.50' alicePort = 61935 aliceAddress = aliceDomain + ':' + str(alicePort) bobDir = baseDir + '/.tests/bob' bobDomain = '127.0.0.100' bobPort = 61936 federationList = [bobDomain, aliceDomain] aliceSendThreads = [] bobSendThreads = [] bobAddress = bobDomain + ':' + str(bobPort) global thrAlice if thrAlice: while thrAlice.is_alive(): thrAlice.stop() time.sleep(1) thrAlice.kill() thrAlice = \ threadWithTrace(target=createServerAlice, args=(aliceDir, aliceDomain, alicePort, bobAddress, federationList, False, False, aliceSendThreads), daemon=True) global thrBob if thrBob: while thrBob.is_alive(): thrBob.stop() time.sleep(1) thrBob.kill() thrBob = \ threadWithTrace(target=createServerBob, args=(bobDir, bobDomain, bobPort, aliceAddress, federationList, False, False, bobSendThreads), daemon=True) thrAlice.start() thrBob.start() assert thrAlice.is_alive() is True assert thrBob.is_alive() is True # wait for both servers to be running while not (testServerAliceRunning and testServerBobRunning): time.sleep(1) time.sleep(1) print('\n\n*******************************************************') print('Alice sends to Bob') os.chdir(aliceDir) sessionAlice = createSession(proxyType) inReplyTo = None inReplyToAtomUri = None subject = None alicePostLog = [] followersOnly = False saveToFile = True clientToServer = False ccUrl = None alicePersonCache = {} aliceCachedWebfingers = {} attachedImageFilename = baseDir + '/img/logo.png' mediaType = getAttachmentMediaType(attachedImageFilename) attachedImageDescription = 'Logo' isArticle = False # nothing in Alice's outbox outboxPath = aliceDir + '/accounts/alice@' + aliceDomain + '/outbox' assert len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 0 sendResult = \ sendPost(__version__, sessionAlice, aliceDir, 'alice', aliceDomain, alicePort, 'bob', bobDomain, bobPort, ccUrl, httpPrefix, 'Why is a mouse when it spins? ' + 'यह एक परीक्षण है #sillyquestion', followersOnly, saveToFile, clientToServer, True, attachedImageFilename, mediaType, attachedImageDescription, federationList, aliceSendThreads, alicePostLog, aliceCachedWebfingers, alicePersonCache, isArticle, inReplyTo, inReplyToAtomUri, subject) print('sendResult: ' + str(sendResult)) queuePath = bobDir + '/accounts/bob@' + bobDomain + '/queue' inboxPath = bobDir + '/accounts/bob@' + bobDomain + '/inbox' mPath = getMediaPath() mediaPath = aliceDir + '/' + mPath for i in range(30): if os.path.isdir(inboxPath): if len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) > 0: if len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 1: if len([name for name in os.listdir(mediaPath) if os.path.isfile(os.path.join(mediaPath, name))]) > 0: if len([name for name in os.listdir(queuePath) if os.path.isfile(os.path.join(queuePath, name))]) == 0: break time.sleep(1) # check that a news account exists newsActorDir = aliceDir + '/accounts/news@' + aliceDomain print("newsActorDir: " + newsActorDir) assert os.path.isdir(newsActorDir) newsActorFile = newsActorDir + '.json' assert os.path.isfile(newsActorFile) newsActorJson = loadJson(newsActorFile) assert newsActorJson assert newsActorJson.get("id") # check the id of the news actor print('News actor Id: ' + newsActorJson["id"]) assert (newsActorJson["id"] == httpPrefix + '://' + aliceAddress + '/users/news') # Image attachment created assert len([name for name in os.listdir(mediaPath) if os.path.isfile(os.path.join(mediaPath, name))]) > 0 # inbox item created assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 1 # queue item removed testval = len([name for name in os.listdir(queuePath) if os.path.isfile(os.path.join(queuePath, name))]) print('queuePath: ' + queuePath + ' '+str(testval)) assert testval == 0 assert validInbox(bobDir, 'bob', bobDomain) assert validInboxFilenames(bobDir, 'bob', bobDomain, aliceDomain, alicePort) print('Check that message received from Alice contains the expected text') for name in os.listdir(inboxPath): filename = os.path.join(inboxPath, name) assert os.path.isfile(filename) receivedJson = loadJson(filename, 0) if receivedJson: pprint(receivedJson['object']['content']) assert receivedJson assert 'Why is a mouse when it spins?' in \ receivedJson['object']['content'] assert 'यह एक परीक्षण है' in receivedJson['object']['content'] print('\n\n*******************************************************') print("Bob likes Alice's post") aliceDomainStr = aliceDomain + ':' + str(alicePort) followerOfPerson(bobDir, 'bob', bobDomain, 'alice', aliceDomainStr, federationList, False) bobDomainStr = bobDomain + ':' + str(bobPort) followPerson(aliceDir, 'alice', aliceDomain, 'bob', bobDomainStr, federationList, False) sessionBob = createSession(proxyType) bobPostLog = [] bobPersonCache = {} bobCachedWebfingers = {} statusNumber = None outboxPostFilename = None outboxPath = aliceDir + '/accounts/alice@' + aliceDomain + '/outbox' for name in os.listdir(outboxPath): if '#statuses#' in name: statusNumber = \ int(name.split('#statuses#')[1].replace('.json', '')) outboxPostFilename = outboxPath + '/' + name assert statusNumber > 0 assert outboxPostFilename assert likePost({}, sessionBob, bobDir, federationList, 'bob', bobDomain, bobPort, httpPrefix, 'alice', aliceDomain, alicePort, [], statusNumber, False, bobSendThreads, bobPostLog, bobPersonCache, bobCachedWebfingers, True, __version__) for i in range(20): if 'likes' in open(outboxPostFilename).read(): break time.sleep(1) alicePostJson = loadJson(outboxPostFilename, 0) if alicePostJson: pprint(alicePostJson) assert 'likes' in open(outboxPostFilename).read() print('\n\n*******************************************************') print("Bob repeats Alice's post") objectUrl = \ httpPrefix + '://' + aliceDomain + ':' + str(alicePort) + \ '/users/alice/statuses/' + str(statusNumber) inboxPath = aliceDir + '/accounts/alice@' + aliceDomain + '/inbox' outboxPath = bobDir + '/accounts/bob@' + bobDomain + '/outbox' outboxBeforeAnnounceCount = \ len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) beforeAnnounceCount = \ len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) print('inbox items before announce: ' + str(beforeAnnounceCount)) print('outbox items before announce: ' + str(outboxBeforeAnnounceCount)) assert outboxBeforeAnnounceCount == 0 assert beforeAnnounceCount == 0 announcePublic(sessionBob, bobDir, federationList, 'bob', bobDomain, bobPort, httpPrefix, objectUrl, False, bobSendThreads, bobPostLog, bobPersonCache, bobCachedWebfingers, True, __version__) announceMessageArrived = False outboxMessageArrived = False for i in range(10): time.sleep(1) if not os.path.isdir(inboxPath): continue if len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) > 0: outboxMessageArrived = True print('Announce created by Bob') if len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) > 0: announceMessageArrived = True print('Announce message sent to Alice!') if announceMessageArrived and outboxMessageArrived: break afterAnnounceCount = \ len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) outboxAfterAnnounceCount = \ len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) print('inbox items after announce: ' + str(afterAnnounceCount)) print('outbox items after announce: ' + str(outboxAfterAnnounceCount)) assert afterAnnounceCount == beforeAnnounceCount+1 assert outboxAfterAnnounceCount == outboxBeforeAnnounceCount + 1 # stop the servers thrAlice.kill() thrAlice.join() assert thrAlice.is_alive() is False thrBob.kill() thrBob.join() assert thrBob.is_alive() is False os.chdir(baseDir) shutil.rmtree(aliceDir) shutil.rmtree(bobDir) def testFollowBetweenServers(): print('Testing sending a follow request from one server to another') global testServerAliceRunning global testServerBobRunning testServerAliceRunning = False testServerBobRunning = False httpPrefix = 'http' proxyType = None federationList = [] baseDir = os.getcwd() if os.path.isdir(baseDir + '/.tests'): shutil.rmtree(baseDir + '/.tests') os.mkdir(baseDir + '/.tests') # create the servers aliceDir = baseDir + '/.tests/alice' aliceDomain = '127.0.0.47' alicePort = 61935 aliceSendThreads = [] aliceAddress = aliceDomain + ':' + str(alicePort) bobDir = baseDir + '/.tests/bob' bobDomain = '127.0.0.79' bobPort = 61936 bobSendThreads = [] bobAddress = bobDomain + ':' + str(bobPort) global thrAlice if thrAlice: while thrAlice.is_alive(): thrAlice.stop() time.sleep(1) thrAlice.kill() thrAlice = \ threadWithTrace(target=createServerAlice, args=(aliceDir, aliceDomain, alicePort, bobAddress, federationList, False, False, aliceSendThreads), daemon=True) global thrBob if thrBob: while thrBob.is_alive(): thrBob.stop() time.sleep(1) thrBob.kill() thrBob = \ threadWithTrace(target=createServerBob, args=(bobDir, bobDomain, bobPort, aliceAddress, federationList, False, False, bobSendThreads), daemon=True) thrAlice.start() thrBob.start() assert thrAlice.is_alive() is True assert thrBob.is_alive() is True # wait for all servers to be running ctr = 0 while not (testServerAliceRunning and testServerBobRunning): time.sleep(1) ctr += 1 if ctr > 60: break print('Alice online: ' + str(testServerAliceRunning)) print('Bob online: ' + str(testServerBobRunning)) assert ctr <= 60 time.sleep(1) # In the beginning all was calm and there were no follows print('*********************************************************') print('Alice sends a follow request to Bob') os.chdir(aliceDir) sessionAlice = createSession(proxyType) inReplyTo = None inReplyToAtomUri = None subject = None alicePostLog = [] followersOnly = False saveToFile = True clientToServer = False ccUrl = None alicePersonCache = {} aliceCachedWebfingers = {} alicePostLog = [] bobActor = httpPrefix + '://' + bobAddress + '/users/bob' sendResult = \ sendFollowRequest(sessionAlice, aliceDir, 'alice', aliceDomain, alicePort, httpPrefix, 'bob', bobDomain, bobActor, bobPort, httpPrefix, clientToServer, federationList, aliceSendThreads, alicePostLog, aliceCachedWebfingers, alicePersonCache, True, __version__) print('sendResult: ' + str(sendResult)) for t in range(16): if os.path.isfile(bobDir + '/accounts/bob@' + bobDomain + '/followers.txt'): if os.path.isfile(aliceDir + '/accounts/alice@' + aliceDomain + '/following.txt'): if os.path.isfile(aliceDir + '/accounts/alice@' + aliceDomain + '/followingCalendar.txt'): break time.sleep(1) assert validInbox(bobDir, 'bob', bobDomain) assert validInboxFilenames(bobDir, 'bob', bobDomain, aliceDomain, alicePort) assert 'alice@' + aliceDomain in open(bobDir + '/accounts/bob@' + bobDomain + '/followers.txt').read() assert 'bob@' + bobDomain in open(aliceDir + '/accounts/alice@' + aliceDomain + '/following.txt').read() assert 'bob@' + bobDomain in open(aliceDir + '/accounts/alice@' + aliceDomain + '/followingCalendar.txt').read() print('\n\n*********************************************************') print('Alice sends a message to Bob') alicePostLog = [] alicePersonCache = {} aliceCachedWebfingers = {} alicePostLog = [] isArticle = False sendResult = \ sendPost(__version__, sessionAlice, aliceDir, 'alice', aliceDomain, alicePort, 'bob', bobDomain, bobPort, ccUrl, httpPrefix, 'Alice message', followersOnly, saveToFile, clientToServer, True, None, None, None, federationList, aliceSendThreads, alicePostLog, aliceCachedWebfingers, alicePersonCache, isArticle, inReplyTo, inReplyToAtomUri, subject) print('sendResult: ' + str(sendResult)) queuePath = bobDir + '/accounts/bob@' + bobDomain + '/queue' inboxPath = bobDir + '/accounts/bob@' + bobDomain + '/inbox' aliceMessageArrived = False for i in range(20): time.sleep(1) if os.path.isdir(inboxPath): if len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) > 0: aliceMessageArrived = True print('Alice message sent to Bob!') break assert aliceMessageArrived is True print('Message from Alice to Bob succeeded') # stop the servers thrAlice.kill() thrAlice.join() assert thrAlice.is_alive() is False thrBob.kill() thrBob.join() assert thrBob.is_alive() is False # queue item removed time.sleep(4) assert len([name for name in os.listdir(queuePath) if os.path.isfile(os.path.join(queuePath, name))]) == 0 os.chdir(baseDir) shutil.rmtree(baseDir + '/.tests') def testFollowersOfPerson(): print('testFollowersOfPerson') currDir = os.getcwd() nickname = 'mxpop' domain = 'diva.domain' password = 'birb' port = 80 httpPrefix = 'https' federationList = [] baseDir = currDir + '/.tests_followersofperson' if os.path.isdir(baseDir): shutil.rmtree(baseDir) os.mkdir(baseDir) os.chdir(baseDir) createPerson(baseDir, nickname, domain, port, httpPrefix, True, False, password) createPerson(baseDir, 'maxboardroom', domain, port, httpPrefix, True, False, password) createPerson(baseDir, 'ultrapancake', domain, port, httpPrefix, True, False, password) createPerson(baseDir, 'drokk', domain, port, httpPrefix, True, False, password) createPerson(baseDir, 'sausagedog', domain, port, httpPrefix, True, False, password) clearFollows(baseDir, nickname, domain) followPerson(baseDir, nickname, domain, 'maxboardroom', domain, federationList, False) followPerson(baseDir, 'drokk', domain, 'ultrapancake', domain, federationList, False) # deliberate duplication followPerson(baseDir, 'drokk', domain, 'ultrapancake', domain, federationList, False) followPerson(baseDir, 'sausagedog', domain, 'ultrapancake', domain, federationList, False) followPerson(baseDir, nickname, domain, 'ultrapancake', domain, federationList, False) followPerson(baseDir, nickname, domain, 'someother', 'randodomain.net', federationList, False) followList = getFollowersOfPerson(baseDir, 'ultrapancake', domain) assert len(followList) == 3 assert 'mxpop@' + domain in followList assert 'drokk@' + domain in followList assert 'sausagedog@' + domain in followList os.chdir(currDir) shutil.rmtree(baseDir) def testNoOfFollowersOnDomain(): print('testNoOfFollowersOnDomain') currDir = os.getcwd() nickname = 'mxpop' domain = 'diva.domain' otherdomain = 'soup.dragon' password = 'birb' port = 80 httpPrefix = 'https' federationList = [] baseDir = currDir + '/.tests_nooffollowersOndomain' if os.path.isdir(baseDir): shutil.rmtree(baseDir) os.mkdir(baseDir) os.chdir(baseDir) createPerson(baseDir, nickname, domain, port, httpPrefix, True, False, password) createPerson(baseDir, 'maxboardroom', otherdomain, port, httpPrefix, True, False, password) createPerson(baseDir, 'ultrapancake', otherdomain, port, httpPrefix, True, False, password) createPerson(baseDir, 'drokk', otherdomain, port, httpPrefix, True, False, password) createPerson(baseDir, 'sausagedog', otherdomain, port, httpPrefix, True, False, password) followPerson(baseDir, 'drokk', otherdomain, nickname, domain, federationList, False) followPerson(baseDir, 'sausagedog', otherdomain, nickname, domain, federationList, False) followPerson(baseDir, 'maxboardroom', otherdomain, nickname, domain, federationList, False) followerOfPerson(baseDir, nickname, domain, 'cucumber', 'sandwiches.party', federationList, False) followerOfPerson(baseDir, nickname, domain, 'captainsensible', 'damned.zone', federationList, False) followerOfPerson(baseDir, nickname, domain, 'pilchard', 'zombies.attack', federationList, False) followerOfPerson(baseDir, nickname, domain, 'drokk', otherdomain, federationList, False) followerOfPerson(baseDir, nickname, domain, 'sausagedog', otherdomain, federationList, False) followerOfPerson(baseDir, nickname, domain, 'maxboardroom', otherdomain, federationList, False) followersOnOtherDomain = \ noOfFollowersOnDomain(baseDir, nickname + '@' + domain, otherdomain) assert followersOnOtherDomain == 3 unfollowerOfAccount(baseDir, nickname, domain, 'sausagedog', otherdomain) followersOnOtherDomain = \ noOfFollowersOnDomain(baseDir, nickname + '@' + domain, otherdomain) assert followersOnOtherDomain == 2 os.chdir(currDir) shutil.rmtree(baseDir) def testGroupFollowers(): print('testGroupFollowers') currDir = os.getcwd() nickname = 'test735' domain = 'mydomain.com' password = 'somepass' port = 80 httpPrefix = 'https' federationList = [] baseDir = currDir + '/.tests_testgroupfollowers' if os.path.isdir(baseDir): shutil.rmtree(baseDir) os.mkdir(baseDir) os.chdir(baseDir) createPerson(baseDir, nickname, domain, port, httpPrefix, True, False, password) clearFollowers(baseDir, nickname, domain) followerOfPerson(baseDir, nickname, domain, 'badger', 'wild.domain', federationList, False) followerOfPerson(baseDir, nickname, domain, 'squirrel', 'wild.domain', federationList, False) followerOfPerson(baseDir, nickname, domain, 'rodent', 'wild.domain', federationList, False) followerOfPerson(baseDir, nickname, domain, 'utterly', 'clutterly.domain', federationList, False) followerOfPerson(baseDir, nickname, domain, 'zonked', 'zzz.domain', federationList, False) followerOfPerson(baseDir, nickname, domain, 'nap', 'zzz.domain', federationList, False) grouped = groupFollowersByDomain(baseDir, nickname, domain) assert len(grouped.items()) == 3 assert grouped.get('zzz.domain') assert grouped.get('clutterly.domain') assert grouped.get('wild.domain') assert len(grouped['zzz.domain']) == 2 assert len(grouped['wild.domain']) == 3 assert len(grouped['clutterly.domain']) == 1 os.chdir(currDir) shutil.rmtree(baseDir) def testFollows(): print('testFollows') currDir = os.getcwd() nickname = 'test529' domain = 'testdomain.com' password = 'mypass' port = 80 httpPrefix = 'https' federationList = ['wild.com', 'mesh.com'] baseDir = currDir + '/.tests_testfollows' if os.path.isdir(baseDir): shutil.rmtree(baseDir) os.mkdir(baseDir) os.chdir(baseDir) createPerson(baseDir, nickname, domain, port, httpPrefix, True, False, password) clearFollows(baseDir, nickname, domain) followPerson(baseDir, nickname, domain, 'badger', 'wild.com', federationList, False) followPerson(baseDir, nickname, domain, 'squirrel', 'secret.com', federationList, False) followPerson(baseDir, nickname, domain, 'rodent', 'drainpipe.com', federationList, False) followPerson(baseDir, nickname, domain, 'batman', 'mesh.com', federationList, False) followPerson(baseDir, nickname, domain, 'giraffe', 'trees.com', federationList, False) f = open(baseDir + '/accounts/' + nickname + '@' + domain + '/following.txt', "r") domainFound = False for followingDomain in f: testDomain = followingDomain.split('@')[1] testDomain = testDomain.replace('\n', '').replace('\r', '') if testDomain == 'mesh.com': domainFound = True if testDomain not in federationList: print(testDomain) assert(False) assert(domainFound) unfollowAccount(baseDir, nickname, domain, 'batman', 'mesh.com') domainFound = False for followingDomain in f: testDomain = followingDomain.split('@')[1] testDomain = testDomain.replace('\n', '').replace('\r', '') if testDomain == 'mesh.com': domainFound = True assert(domainFound is False) clearFollowers(baseDir, nickname, domain) followerOfPerson(baseDir, nickname, domain, 'badger', 'wild.com', federationList, False) followerOfPerson(baseDir, nickname, domain, 'squirrel', 'secret.com', federationList, False) followerOfPerson(baseDir, nickname, domain, 'rodent', 'drainpipe.com', federationList, False) followerOfPerson(baseDir, nickname, domain, 'batman', 'mesh.com', federationList, False) followerOfPerson(baseDir, nickname, domain, 'giraffe', 'trees.com', federationList, False) f = open(baseDir + '/accounts/' + nickname + '@' + domain + '/followers.txt', "r") for followerDomain in f: testDomain = followerDomain.split('@')[1] testDomain = testDomain.replace('\n', '').replace('\r', '') if testDomain not in federationList: print(testDomain) assert(False) os.chdir(currDir) shutil.rmtree(baseDir) def testCreatePerson(): print('testCreatePerson') currDir = os.getcwd() nickname = 'test382' domain = 'badgerdomain.com' password = 'mypass' port = 80 httpPrefix = 'https' clientToServer = False baseDir = currDir + '/.tests_createperson' if os.path.isdir(baseDir): shutil.rmtree(baseDir) os.mkdir(baseDir) os.chdir(baseDir) privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(baseDir, nickname, domain, port, httpPrefix, True, False, password) assert os.path.isfile(baseDir + '/accounts/passwords') deleteAllPosts(baseDir, nickname, domain, 'inbox') deleteAllPosts(baseDir, nickname, domain, 'outbox') setDisplayNickname(baseDir, nickname, domain, 'badger') setBio(baseDir, nickname, domain, 'Randomly roaming in your backyard') archivePostsForPerson(nickname, domain, baseDir, 'inbox', None, {}, 4) archivePostsForPerson(nickname, domain, baseDir, 'outbox', None, {}, 4) createPublicPost(baseDir, nickname, domain, port, httpPrefix, "G'day world!", False, True, clientToServer, True, None, None, None, None, 'Not suitable for Vogons') os.chdir(currDir) shutil.rmtree(baseDir) def testDelegateRoles(): print('testDelegateRoles') currDir = os.getcwd() nickname = 'test382' nicknameDelegated = 'test383' domain = 'badgerdomain.com' password = 'mypass' port = 80 httpPrefix = 'https' baseDir = currDir + '/.tests_delegaterole' if os.path.isdir(baseDir): shutil.rmtree(baseDir) os.mkdir(baseDir) os.chdir(baseDir) privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(baseDir, nickname, domain, port, httpPrefix, True, False, password) privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(baseDir, nicknameDelegated, domain, port, httpPrefix, True, False, 'insecure') httpPrefix = 'http' project = 'artechoke' role = 'delegator' actorDelegated = \ httpPrefix + '://' + domain + '/users/' + nicknameDelegated newRoleJson = { 'type': 'Delegate', 'actor': httpPrefix + '://' + domain + '/users/' + nickname, 'object': { 'type': 'Role', 'actor': actorDelegated, 'object': project + ';' + role, 'to': [], 'cc': [] }, 'to': [], 'cc': [] } assert outboxDelegate(baseDir, nickname, newRoleJson, False) # second time delegation has already happened so should return false assert outboxDelegate(baseDir, nickname, newRoleJson, False) is False assert '"delegator"' in open(baseDir + '/accounts/' + nickname + '@' + domain + '.json').read() assert '"delegator"' in open(baseDir + '/accounts/' + nicknameDelegated + '@' + domain + '.json').read() newRoleJson = { 'type': 'Delegate', 'actor': httpPrefix + '://' + domain + '/users/' + nicknameDelegated, 'object': { 'type': 'Role', 'actor': httpPrefix + '://' + domain + '/users/' + nickname, 'object': 'otherproject;otherrole', 'to': [], 'cc': [] }, 'to': [], 'cc': [] } # non-delegators cannot assign roles assert outboxDelegate(baseDir, nicknameDelegated, newRoleJson, False) is False assert '"otherrole"' not in open(baseDir + '/accounts/' + nickname + '@' + domain + '.json').read() os.chdir(currDir) shutil.rmtree(baseDir) def testAuthentication(): print('testAuthentication') currDir = os.getcwd() nickname = 'test8743' password = 'SuperSecretPassword12345' baseDir = currDir + '/.tests_authentication' if os.path.isdir(baseDir): shutil.rmtree(baseDir) os.mkdir(baseDir) os.chdir(baseDir) assert storeBasicCredentials(baseDir, 'othernick', 'otherpass') assert storeBasicCredentials(baseDir, 'bad:nick', 'otherpass') is False assert storeBasicCredentials(baseDir, 'badnick', 'otherpa:ss') is False assert storeBasicCredentials(baseDir, nickname, password) authHeader = createBasicAuthHeader(nickname, password) assert authorizeBasic(baseDir, '/users/' + nickname + '/inbox', authHeader, False) assert authorizeBasic(baseDir, '/users/' + nickname, authHeader, False) is False assert authorizeBasic(baseDir, '/users/othernick/inbox', authHeader, False) is False authHeader = createBasicAuthHeader(nickname, password + '1') assert authorizeBasic(baseDir, '/users/' + nickname + '/inbox', authHeader, False) is False password = 'someOtherPassword' assert storeBasicCredentials(baseDir, nickname, password) authHeader = createBasicAuthHeader(nickname, password) assert authorizeBasic(baseDir, '/users/' + nickname + '/inbox', authHeader, False) os.chdir(currDir) shutil.rmtree(baseDir) def testClientToServer(): print('Testing sending a post via c2s') global testServerAliceRunning global testServerBobRunning testServerAliceRunning = False testServerBobRunning = False httpPrefix = 'http' proxyType = None federationList = [] baseDir = os.getcwd() if os.path.isdir(baseDir + '/.tests'): shutil.rmtree(baseDir + '/.tests') os.mkdir(baseDir + '/.tests') # create the servers aliceDir = baseDir + '/.tests/alice' aliceDomain = '127.0.0.42' alicePort = 61935 aliceSendThreads = [] aliceAddress = aliceDomain + ':' + str(alicePort) bobDir = baseDir + '/.tests/bob' bobDomain = '127.0.0.64' bobPort = 61936 bobSendThreads = [] bobAddress = bobDomain + ':' + str(bobPort) global thrAlice if thrAlice: while thrAlice.is_alive(): thrAlice.stop() time.sleep(1) thrAlice.kill() thrAlice = \ threadWithTrace(target=createServerAlice, args=(aliceDir, aliceDomain, alicePort, bobAddress, federationList, False, False, aliceSendThreads), daemon=True) global thrBob if thrBob: while thrBob.is_alive(): thrBob.stop() time.sleep(1) thrBob.kill() thrBob = \ threadWithTrace(target=createServerBob, args=(bobDir, bobDomain, bobPort, aliceAddress, federationList, False, False, bobSendThreads), daemon=True) thrAlice.start() thrBob.start() assert thrAlice.is_alive() is True assert thrBob.is_alive() is True # wait for both servers to be running ctr = 0 while not (testServerAliceRunning and testServerBobRunning): time.sleep(1) ctr += 1 if ctr > 60: break print('Alice online: ' + str(testServerAliceRunning)) print('Bob online: ' + str(testServerBobRunning)) time.sleep(1) print('\n\n*******************************************************') print('Alice sends to Bob via c2s') sessionAlice = createSession(proxyType) followersOnly = False attachedImageFilename = baseDir+'/img/logo.png' mediaType = getAttachmentMediaType(attachedImageFilename) attachedImageDescription = 'Logo' isArticle = False cachedWebfingers = {} personCache = {} password = 'alicepass' outboxPath = aliceDir + '/accounts/alice@' + aliceDomain + '/outbox' inboxPath = bobDir + '/accounts/bob@' + bobDomain + '/inbox' assert len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 0 assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 0 sendResult = \ sendPostViaServer(__version__, aliceDir, sessionAlice, 'alice', password, aliceDomain, alicePort, 'bob', bobDomain, bobPort, None, httpPrefix, 'Sent from my ActivityPub client', followersOnly, True, attachedImageFilename, mediaType, attachedImageDescription, cachedWebfingers, personCache, isArticle, True, None, None, None) print('sendResult: ' + str(sendResult)) for i in range(30): if os.path.isdir(outboxPath): if len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 1: break time.sleep(1) assert len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 1 print(">>> c2s post arrived in Alice's outbox") for i in range(30): if os.path.isdir(inboxPath): if len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 1: break time.sleep(1) assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 1 print(">>> s2s post arrived in Bob's inbox") print("c2s send success") print('\n\nGetting message id for the post') statusNumber = 0 outboxPostFilename = None outboxPostId = None for name in os.listdir(outboxPath): if '#statuses#' in name: statusNumber = name.split('#statuses#')[1].replace('.json', '') statusNumber = int(statusNumber.replace('#activity', '')) outboxPostFilename = outboxPath + '/' + name postJsonObject = loadJson(outboxPostFilename, 0) if postJsonObject: outboxPostId = removeIdEnding(postJsonObject['id']) assert outboxPostId print('message id obtained: ' + outboxPostId) assert validInbox(bobDir, 'bob', bobDomain) assert validInboxFilenames(bobDir, 'bob', bobDomain, aliceDomain, alicePort) print('\n\nAlice follows Bob') sendFollowRequestViaServer(aliceDir, sessionAlice, 'alice', password, aliceDomain, alicePort, 'bob', bobDomain, bobPort, httpPrefix, cachedWebfingers, personCache, True, __version__) alicePetnamesFilename = aliceDir + '/accounts/' + \ 'alice@' + aliceDomain + '/petnames.txt' aliceFollowingFilename = \ aliceDir + '/accounts/alice@' + aliceDomain + '/following.txt' bobFollowersFilename = \ bobDir + '/accounts/bob@' + bobDomain + '/followers.txt' for t in range(10): if os.path.isfile(bobFollowersFilename): if 'alice@' + aliceDomain + ':' + str(alicePort) in \ open(bobFollowersFilename).read(): if os.path.isfile(aliceFollowingFilename) and \ os.path.isfile(alicePetnamesFilename): if 'bob@' + bobDomain + ':' + str(bobPort) in \ open(aliceFollowingFilename).read(): break time.sleep(1) assert os.path.isfile(bobFollowersFilename) assert os.path.isfile(aliceFollowingFilename) assert os.path.isfile(alicePetnamesFilename) assert 'bob bob@' + bobDomain in \ open(alicePetnamesFilename).read() print('alice@' + aliceDomain + ':' + str(alicePort) + ' in ' + bobFollowersFilename) assert 'alice@' + aliceDomain + ':' + str(alicePort) in \ open(bobFollowersFilename).read() print('bob@' + bobDomain + ':' + str(bobPort) + ' in ' + aliceFollowingFilename) assert 'bob@' + bobDomain + ':' + str(bobPort) in \ open(aliceFollowingFilename).read() assert validInbox(bobDir, 'bob', bobDomain) assert validInboxFilenames(bobDir, 'bob', bobDomain, aliceDomain, alicePort) print('\n\nBob follows Alice') sendFollowRequestViaServer(aliceDir, sessionAlice, 'bob', 'bobpass', bobDomain, bobPort, 'alice', aliceDomain, alicePort, httpPrefix, cachedWebfingers, personCache, True, __version__) for t in range(10): if os.path.isfile(aliceDir + '/accounts/alice@' + aliceDomain + '/followers.txt'): if 'bob@' + bobDomain + ':' + str(bobPort) in \ open(aliceDir + '/accounts/alice@' + aliceDomain + '/followers.txt').read(): if os.path.isfile(bobDir + '/accounts/bob@' + bobDomain + '/following.txt'): aliceHandleStr = \ 'alice@' + aliceDomain + ':' + str(alicePort) if aliceHandleStr in \ open(bobDir + '/accounts/bob@' + bobDomain + '/following.txt').read(): if os.path.isfile(bobDir + '/accounts/bob@' + bobDomain + '/followingCalendar.txt'): if aliceHandleStr in \ open(bobDir + '/accounts/bob@' + bobDomain + '/followingCalendar.txt').read(): break time.sleep(1) assert os.path.isfile(aliceDir + '/accounts/alice@' + aliceDomain + '/followers.txt') assert os.path.isfile(bobDir + '/accounts/bob@' + bobDomain + '/following.txt') assert 'bob@' + bobDomain + ':' + str(bobPort) in \ open(aliceDir + '/accounts/alice@' + aliceDomain + '/followers.txt').read() assert 'alice@' + aliceDomain + ':' + str(alicePort) in \ open(bobDir + '/accounts/bob@' + bobDomain + '/following.txt').read() print('\n\nBob likes the post') sessionBob = createSession(proxyType) password = 'bobpass' outboxPath = bobDir + '/accounts/bob@' + bobDomain + '/outbox' inboxPath = aliceDir + '/accounts/alice@' + aliceDomain + '/inbox' print(str(len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]))) assert len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 1 print(str(len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]))) assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 1 sendLikeViaServer(bobDir, sessionBob, 'bob', 'bobpass', bobDomain, bobPort, httpPrefix, outboxPostId, cachedWebfingers, personCache, True, __version__) for i in range(20): if os.path.isdir(outboxPath) and os.path.isdir(inboxPath): if len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 2: test = len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) if test == 1: break time.sleep(1) assert len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 2 assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 1 print('Post liked') print('\n\nBob repeats the post') print(str(len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]))) assert len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 2 print(str(len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]))) assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 1 sendAnnounceViaServer(bobDir, sessionBob, 'bob', password, bobDomain, bobPort, httpPrefix, outboxPostId, cachedWebfingers, personCache, True, __version__) for i in range(20): if os.path.isdir(outboxPath) and os.path.isdir(inboxPath): if len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 3: if len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 2: break time.sleep(1) assert len([name for name in os.listdir(outboxPath) if os.path.isfile(os.path.join(outboxPath, name))]) == 3 assert len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) == 2 print('Post repeated') inboxPath = bobDir + '/accounts/bob@' + bobDomain + '/inbox' outboxPath = aliceDir + '/accounts/alice@' + aliceDomain + '/outbox' postsBefore = \ len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) print('\n\nAlice deletes her post: ' + outboxPostId + ' ' + str(postsBefore)) password = 'alicepass' sendDeleteViaServer(aliceDir, sessionAlice, 'alice', password, aliceDomain, alicePort, httpPrefix, outboxPostId, cachedWebfingers, personCache, True, __version__) for i in range(30): if os.path.isdir(inboxPath): test = len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) if test == postsBefore-1: break time.sleep(1) test = len([name for name in os.listdir(inboxPath) if os.path.isfile(os.path.join(inboxPath, name))]) assert test == postsBefore - 1 print(">>> post deleted from Alice's outbox and Bob's inbox") assert validInbox(bobDir, 'bob', bobDomain) assert validInboxFilenames(bobDir, 'bob', bobDomain, aliceDomain, alicePort) print('\n\nAlice unfollows Bob') password = 'alicepass' sendUnfollowRequestViaServer(baseDir, sessionAlice, 'alice', password, aliceDomain, alicePort, 'bob', bobDomain, bobPort, httpPrefix, cachedWebfingers, personCache, True, __version__) for t in range(10): if 'alice@' + aliceDomain + ':' + str(alicePort) not in \ open(bobFollowersFilename).read(): if 'bob@' + bobDomain + ':' + str(bobPort) not in \ open(aliceFollowingFilename).read(): break time.sleep(1) assert os.path.isfile(bobFollowersFilename) assert os.path.isfile(aliceFollowingFilename) assert 'alice@' + aliceDomain + ':' + str(alicePort) \ not in open(bobFollowersFilename).read() assert 'bob@' + bobDomain + ':' + str(bobPort) \ not in open(aliceFollowingFilename).read() assert validInbox(bobDir, 'bob', bobDomain) assert validInboxFilenames(bobDir, 'bob', bobDomain, aliceDomain, alicePort) assert validInbox(aliceDir, 'alice', aliceDomain) assert validInboxFilenames(aliceDir, 'alice', aliceDomain, bobDomain, bobPort) # stop the servers thrAlice.kill() thrAlice.join() assert thrAlice.is_alive() is False thrBob.kill() thrBob.join() assert thrBob.is_alive() is False os.chdir(baseDir) # shutil.rmtree(aliceDir) # shutil.rmtree(bobDir) def testActorParsing(): print('testActorParsing') actor = 'https://mydomain:72/users/mynick' domain, port = getDomainFromActor(actor) assert domain == 'mydomain' assert port == 72 nickname = getNicknameFromActor(actor) assert nickname == 'mynick' actor = 'https://element/accounts/badger' domain, port = getDomainFromActor(actor) assert domain == 'element' nickname = getNicknameFromActor(actor) assert nickname == 'badger' actor = 'egg@chicken.com' domain, port = getDomainFromActor(actor) assert domain == 'chicken.com' nickname = getNicknameFromActor(actor) assert nickname == 'egg' actor = '@waffle@cardboard' domain, port = getDomainFromActor(actor) assert domain == 'cardboard' nickname = getNicknameFromActor(actor) assert nickname == 'waffle' actor = 'https://astral/channel/sky' domain, port = getDomainFromActor(actor) assert domain == 'astral' nickname = getNicknameFromActor(actor) assert nickname == 'sky' actor = 'https://randomain/users/rando' domain, port = getDomainFromActor(actor) assert domain == 'randomain' nickname = getNicknameFromActor(actor) assert nickname == 'rando' actor = 'https://otherdomain:49/@othernick' domain, port = getDomainFromActor(actor) assert domain == 'otherdomain' assert port == 49 nickname = getNicknameFromActor(actor) assert nickname == 'othernick' def testWebLinks(): print('testWebLinks') exampleText = \ '

@foo Some ' + \ 'random text.

AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' + \ 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' + \ 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' + \ 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' + \ 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' + \ 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

' resultText = removeLongWords(exampleText, 40, []) assert resultText == \ '

@foo ' + \ 'Some random text.

' exampleText = \ 'This post has a web links https://somesite.net\n\nAnd some other text' linkedText = addWebLinks(exampleText) assert \ 'somesite.net1. HAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAH' + \ 'AHAHAHHAHAHAHAHAHAHAHAHAHAHAHAHHAHAHAHAHAHAHAHAH

' resultText = removeLongWords(exampleText, 40, []) assert resultText == '

1. HAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHA

' exampleText = \ '

Tox address is 88AB9DED6F9FBEF43E105FB72060A2D89F9B93C74' + \ '4E8C45AB3C5E42C361C837155AFCFD9D448

' resultText = removeLongWords(exampleText, 40, []) assert resultText == exampleText exampleText = \ 'some.incredibly.long.and.annoying.word.which.should.be.removed: ' + \ 'The remaining text' resultText = removeLongWords(exampleText, 40, []) assert resultText == \ 'some.incredibly.long.and.annoying.word.w\n' + \ 'hich.should.be.removed: The remaining text' exampleText = \ '

Tox address is 88AB9DED6F9FBEF43E105FB72060A2D89F9B93C74' + \ '4E8C45AB3C5E42C361C837155AFCFD9D448

' resultText = removeLongWords(exampleText, 40, []) assert resultText == \ '

Tox address is 88AB9DED6F9FBEF43E105FB72060A2D89F9B93C7\n' + \ '44E8C45AB3C5E42C361C837155AFCFD9D448

' exampleText = \ '

ABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCA' + \ 'BCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCAB' + \ 'CABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABC' + \ 'ABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCA' + \ 'BCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCAB' + \ 'CABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABC' + \ 'ABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCA' + \ 'BCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCABCAB' + \ 'CABCABCABCABCABCABCABCABC

' resultText = removeLongWords(exampleText, 40, []) assert resultText == r'

ABCABCABCABCABCABCABCABCABCABCABCABCABCA<\p>' exampleText = \ '"the nucleus of mutual-support institutions, habits, and customs ' + \ 'remains alive with the millions; it keeps them together; and ' + \ 'they prefer to cling to their customs, beliefs, and traditions ' + \ 'rather than to accept the teachings of a war of each ' + \ 'against all"\n\n--Peter Kropotkin' testFnStr = addWebLinks(exampleText) resultText = removeLongWords(testFnStr, 40, []) assert resultText == exampleText assert 'ellipsis' not in resultText exampleText = \ '

filepopout=' + \ 'TemplateAttachmentRichPopout<<\\p>' resultText = replaceContentDuplicates(exampleText) assert resultText == \ '

filepopout=' + \ 'TemplateAttachmentRichPopout' exampleText = \ '

Test1 test2 #YetAnotherExcessivelyLongwindedAndBoringHashtag

' testFnStr = addWebLinks(exampleText) resultText = removeLongWords(testFnStr, 40, []) assert(resultText == '

Test1 test2 ' '#YetAnotherExcessivelyLongwindedAndBorin\ngHashtag

') exampleText = \ "

Don't remove a p2p link " + \ "rad:git:hwd1yrerc3mcgn8ga9rho3dqi4w33nep7kxmqezss4topyfgmexihp" + \ "33xcw

" testFnStr = addWebLinks(exampleText) resultText = removeLongWords(testFnStr, 40, []) assert resultText == exampleText def testAddEmoji(): print('testAddEmoji') content = "Emoji :lemon: :strawberry: :banana:" httpPrefix = 'http' nickname = 'testuser' domain = 'testdomain.net' port = 3682 recipients = [] hashtags = {} baseDir = os.getcwd() baseDirOriginal = os.getcwd() path = baseDir + '/.tests' if not os.path.isdir(path): os.mkdir(path) path = baseDir + '/.tests/emoji' if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) baseDir = path path = baseDir + '/emoji' if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) copytree(baseDirOriginal + '/emoji', baseDir + '/emoji') os.chdir(baseDir) privateKeyPem, publicKeyPem, person, wfEndpoint = \ createPerson(baseDir, nickname, domain, port, httpPrefix, True, False, 'password') contentModified = \ addHtmlTags(baseDir, httpPrefix, nickname, domain, content, recipients, hashtags, True) assert ':lemon:' in contentModified assert contentModified.startswith('

') assert contentModified.endswith('

') tags = [] for tagName, tag in hashtags.items(): tags.append(tag) content = contentModified contentModified = replaceEmojiFromTags(content, tags, 'content') # print('contentModified: '+contentModified) assert contentModified == '

Emoji 🍋 🍓 🍌

' os.chdir(baseDirOriginal) shutil.rmtree(baseDirOriginal + '/.tests') def testGetStatusNumber(): print('testGetStatusNumber') prevStatusNumber = None for i in range(1, 20): statusNumber, published = getStatusNumber() if prevStatusNumber: assert len(statusNumber) == 18 assert int(statusNumber) > prevStatusNumber prevStatusNumber = int(statusNumber) def testJsonString() -> None: print('testJsonString') filename = '.epicyon_tests_testJsonString.json' messageStr = "Crème brûlée यह एक परीक्षण ह" testJson = { "content": messageStr } assert saveJson(testJson, filename) receivedJson = loadJson(filename, 0) assert receivedJson assert receivedJson['content'] == messageStr encodedStr = json.dumps(testJson, ensure_ascii=False) assert messageStr in encodedStr os.remove(filename) def testSaveLoadJson(): print('testSaveLoadJson') testJson = { "param1": 3, "param2": '"Crème brûlée यह एक परीक्षण ह"' } testFilename = '.epicyon_tests_testSaveLoadJson.json' if os.path.isfile(testFilename): os.remove(testFilename) assert saveJson(testJson, testFilename) assert os.path.isfile(testFilename) testLoadJson = loadJson(testFilename) assert(testLoadJson) assert testLoadJson.get('param1') assert testLoadJson.get('param2') assert testLoadJson['param1'] == 3 assert testLoadJson['param2'] == '"Crème brûlée यह एक परीक्षण ह"' os.remove(testFilename) def testTheme(): print('testTheme') css = 'somestring --background-value: 24px; --foreground-value: 24px;' result = setCSSparam(css, 'background-value', '32px') assert result == \ 'somestring --background-value: 32px; --foreground-value: 24px;' css = \ 'somestring --background-value: 24px; --foreground-value: 24px; ' + \ '--background-value: 24px;' result = setCSSparam(css, 'background-value', '32px') assert result == \ 'somestring --background-value: 32px; --foreground-value: 24px; ' + \ '--background-value: 32px;' css = '--background-value: 24px; --foreground-value: 24px;' result = setCSSparam(css, 'background-value', '32px') assert result == '--background-value: 32px; --foreground-value: 24px;' def testRecentPostsCache(): print('testRecentPostsCache') recentPostsCache = {} maxRecentPosts = 3 htmlStr = '' for i in range(5): postJsonObject = { "id": "https://somesite.whatever/users/someuser/statuses/"+str(i) } updateRecentPostsCache(recentPostsCache, maxRecentPosts, postJsonObject, htmlStr) assert len(recentPostsCache['index']) == maxRecentPosts assert len(recentPostsCache['json'].items()) == maxRecentPosts assert len(recentPostsCache['html'].items()) == maxRecentPosts def testRemoveTextFormatting(): print('testRemoveTextFormatting') testStr = '

Text without formatting

' resultStr = removeTextFormatting(testStr) assert(resultStr == testStr) testStr = '

Text with

formatting

' resultStr = removeTextFormatting(testStr) assert(resultStr == '

Text with formatting

') def testJsonld(): print("testJsonld") jldDocument = { "@context": "https://www.w3.org/ns/activitystreams", "actor": "https://somesite.net/users/gerbil", "description": "My json document", "numberField": 83582, "object": { "content": "valid content" } } # privateKeyPem, publicKeyPem = generateRSAKey() privateKeyPem = '-----BEGIN RSA PRIVATE KEY-----\n' \ 'MIIEowIBAAKCAQEAod9iHfIn4ugY/2byFrFjUprrFLkkH5bCrjiBq2/MdHFg99IQ\n' \ '7li2x2mg5fkBMhU5SJIxlN8kiZMFq7JUXSA97Yo4puhVubqTSHihIh6Xn2mTjTgs\n' \ 'zNo9SBbmN3YiyBPTcr0rF4jGWZAduJ8u6i7Eky2QH+UBKyUNRZrcfoVq+7grHUIA\n' \ '45pE7vAfEEWtgRiw32Nwlx55N3hayHax0y8gMdKEF/vfYKRLcM7rZgEASMtlCpgy\n' \ 'fsyHwFCDzl/BP8AhP9u3dM+SEundeAvF58AiXx1pKvBpxqttDNAsKWCRQ06/WI/W\n' \ '2Rwihl9yCjobqRoFsZ/cTEi6FG9AbDAds5YjTwIDAQABAoIBAERL3rbpy8Bl0t43\n' \ 'jh7a+yAIMvVMZBxb3InrV3KAug/LInGNFQ2rKnsaawN8uu9pmwCuhfLc7yqIeJUH\n' \ 'qaadCuPlNJ/fWQQC309tbfbaV3iv78xejjBkSATZfIqb8nLeQpGflMXaNG3na1LQ\n' \ '/tdZoiDC0ZNTaNnOSTo765oKKqhHUTQkwkGChrwG3Js5jekV4zpPMLhUafXk6ksd\n' \ '8XLlZdCF3RUnuguXAg2xP/duxMYmTCx3eeGPkXBPQl0pahu8/6OtBoYvBrqNdQcx\n' \ 'jnEtYX9PCqDY3hAXW9GWsxNfu02DKhWigFHFNRUQtMI++438+QIfzXPslE2bTQIt\n' \ '0OXUlwECgYEAxTKUZ7lwIBb5XKPJq53RQmX66M3ArxI1RzFSKm1+/CmxvYiN0c+5\n' \ '2Aq62WEIauX6hoZ7yQb4zhdeNRzinLR7rsmBvIcP12FidXG37q9v3Vu70KmHniJE\n' \ 'TPbt5lHQ0bNACFxkar4Ab/JZN4CkMRgJdlcZ5boYNmcGOYCvw9izuM8CgYEA0iQ1\n' \ 'khIFZ6fCiXwVRGvEHmqSnkBmBHz8MY8fczv2Z4Gzfq3Tlh9VxpigK2F2pFt7keWc\n' \ '53HerYFHFpf5otDhEyRwA1LyIcwbj5HopumxsB2WG+/M2as45lLfWa6KO73OtPpU\n' \ 'wGZYW+i/otdk9eFphceYtw19mxI+3lYoeI8EjYECgYBxOtTKJkmCs45lqkp/d3QT\n' \ '2zjSempcXGkpQuG6KPtUUaCUgxdj1RISQj792OCbeQh8PDZRvOYaeIKInthkQKIQ\n' \ 'P/Z1yVvIQUvmwfBqZmQmR6k1bFLJ80UiqFr7+BiegH2RD3Q9cnIP1aly3DPrWLD+\n' \ 'OY9OQKfsfQWu+PxzyTeRMwKBgD8Zjlh5PtQ8RKcB8mTkMzSq7bHFRpzsZtH+1wPE\n' \ 'Kp40DRDp41H9wMTsiZPdJUH/EmDh4LaCs8nHuu/m3JfuPtd/pn7pBjntzwzSVFji\n' \ 'bW+jwrJK1Gk8B87pbZXBWlLMEOi5Dn/je37Fqd2c7f0DHauFHq9AxsmsteIPXwGs\n' \ 'eEKBAoGBAIzJX/5yFp3ObkPracIfOJ/U/HF1UdP6Y8qmOJBZOg5s9Y+JAdY76raK\n' \ '0SbZPsOpuFUdTiRkSI3w/p1IuM5dPxgCGH9MHqjqogU5QwXr3vLF+a/PFhINkn1x\n' \ 'lozRZjDcF1y6xHfExotPC973UZnKEviq9/FqOsovZpvSQkzAYSZF\n' \ '-----END RSA PRIVATE KEY-----' publicKeyPem = '-----BEGIN PUBLIC KEY-----\n' \ 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAod9iHfIn4ugY/2byFrFj\n' \ 'UprrFLkkH5bCrjiBq2/MdHFg99IQ7li2x2mg5fkBMhU5SJIxlN8kiZMFq7JUXSA9\n' \ '7Yo4puhVubqTSHihIh6Xn2mTjTgszNo9SBbmN3YiyBPTcr0rF4jGWZAduJ8u6i7E\n' \ 'ky2QH+UBKyUNRZrcfoVq+7grHUIA45pE7vAfEEWtgRiw32Nwlx55N3hayHax0y8g\n' \ 'MdKEF/vfYKRLcM7rZgEASMtlCpgyfsyHwFCDzl/BP8AhP9u3dM+SEundeAvF58Ai\n' \ 'Xx1pKvBpxqttDNAsKWCRQ06/WI/W2Rwihl9yCjobqRoFsZ/cTEi6FG9AbDAds5Yj\n' \ 'TwIDAQAB\n' \ '-----END PUBLIC KEY-----' signedDocument = jldDocument.copy() generateJsonSignature(signedDocument, privateKeyPem) assert(signedDocument) assert(signedDocument.get('signature')) assert(signedDocument['signature'].get('signatureValue')) assert(signedDocument['signature'].get('type')) assert(len(signedDocument['signature']['signatureValue']) > 50) # print(str(signedDocument['signature'])) assert(signedDocument['signature']['type'] == 'RsaSignature2017') assert(verifyJsonSignature(signedDocument, publicKeyPem)) # alter the signed document signedDocument['object']['content'] = 'forged content' assert(not verifyJsonSignature(signedDocument, publicKeyPem)) jldDocument2 = { "@context": "https://www.w3.org/ns/activitystreams", "actor": "https://somesite.net/users/gerbil", "description": "Another json document", "numberField": 13353, "object": { "content": "More content" } } signedDocument2 = jldDocument2.copy() generateJsonSignature(signedDocument2, privateKeyPem) assert(signedDocument2) assert(signedDocument2.get('signature')) assert(signedDocument2['signature'].get('signatureValue')) # changed signature on different document if signedDocument['signature']['signatureValue'] == \ signedDocument2['signature']['signatureValue']: print('json signature has not changed for different documents') assert '.' not in str(signedDocument['signature']['signatureValue']) assert len(str(signedDocument['signature']['signatureValue'])) > 340 assert(signedDocument['signature']['signatureValue'] != signedDocument2['signature']['signatureValue']) def testSiteIsActive(): print('testSiteIsActive') assert(siteIsActive('https://archive.org')) assert(siteIsActive('https://mastodon.social')) assert(not siteIsActive('https://notarealwebsite.a.b.c')) def testRemoveHtml(): print('testRemoveHtml') testStr = 'This string has no html.' assert(removeHtml(testStr) == testStr) testStr = 'This string
has html.' assert(removeHtml(testStr) == 'This string has html.') def testDangerousCSS(): print('testDangerousCSS') baseDir = os.getcwd() for subdir, dirs, files in os.walk(baseDir): for f in files: if not f.endswith('.css'): continue assert not dangerousCSS(baseDir + '/' + f, False) break def testDangerousMarkup(): print('testDangerousMarkup') allowLocalNetworkAccess = False content = '

This is a valid message

' assert(not dangerousMarkup(content, allowLocalNetworkAccess)) content = 'This is a valid message without markup' assert(not dangerousMarkup(content, allowLocalNetworkAccess)) content = '

This is a valid-looking message. But wait... ' + \ '

' assert(dangerousMarkup(content, allowLocalNetworkAccess)) content = '

This html contains more than you expected... ' + \ '

' assert(dangerousMarkup(content, allowLocalNetworkAccess)) content = '

This is a valid-looking message. But wait... ' + \ '