forked from indymedia/epicyon
				
			
		
			
				
	
	
		
			1485 lines
		
	
	
		
			51 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			1485 lines
		
	
	
		
			51 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "utils.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.1.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@freedombone.net"
 | |
| __status__ = "Production"
 | |
| 
 | |
| import os
 | |
| import time
 | |
| import shutil
 | |
| import datetime
 | |
| import json
 | |
| from socket import error as SocketError
 | |
| import errno
 | |
| import urllib.request
 | |
| from pprint import pprint
 | |
| from calendar import monthrange
 | |
| from followingCalendar import addPersonToCalendar
 | |
| 
 | |
| 
 | |
| def removeHtml(content: str) -> str:
 | |
|     """Removes html links from the given content.
 | |
|     Used to ensure that profile descriptions don't contain dubious content
 | |
|     """
 | |
|     if '<' not in content:
 | |
|         return content
 | |
|     removing = False
 | |
|     content = content.replace('<q>', '"').replace('</q>', '"')
 | |
|     result = ''
 | |
|     for ch in content:
 | |
|         if ch == '<':
 | |
|             removing = True
 | |
|         elif ch == '>':
 | |
|             removing = False
 | |
|         elif not removing:
 | |
|             result += ch
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def isSystemAccount(nickname: str) -> bool:
 | |
|     """Returns true if the given nickname is a system account
 | |
|     """
 | |
|     if nickname == 'news' or nickname == 'inbox':
 | |
|         return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def createConfig(baseDir: str) -> None:
 | |
|     """Creates a configuration file
 | |
|     """
 | |
|     configFilename = baseDir + '/config.json'
 | |
|     if os.path.isfile(configFilename):
 | |
|         return
 | |
|     configJson = {
 | |
|     }
 | |
|     saveJson(configJson, configFilename)
 | |
| 
 | |
| 
 | |
| def setConfigParam(baseDir: str, variableName: str, variableValue) -> None:
 | |
|     """Sets a configuration value
 | |
|     """
 | |
|     createConfig(baseDir)
 | |
|     configFilename = baseDir + '/config.json'
 | |
|     configJson = {}
 | |
|     if os.path.isfile(configFilename):
 | |
|         configJson = loadJson(configFilename)
 | |
|     configJson[variableName] = variableValue
 | |
|     saveJson(configJson, configFilename)
 | |
| 
 | |
| 
 | |
| def getConfigParam(baseDir: str, variableName: str):
 | |
|     """Gets a configuration value
 | |
|     """
 | |
|     createConfig(baseDir)
 | |
|     configFilename = baseDir + '/config.json'
 | |
|     configJson = loadJson(configFilename)
 | |
|     if configJson:
 | |
|         if variableName in configJson:
 | |
|             return configJson[variableName]
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def isSuspended(baseDir: str, nickname: str) -> bool:
 | |
|     """Returns true if the given nickname is suspended
 | |
|     """
 | |
|     adminNickname = getConfigParam(baseDir, 'admin')
 | |
|     if not adminNickname:
 | |
|         return False
 | |
|     if nickname == adminNickname:
 | |
|         return False
 | |
| 
 | |
|     suspendedFilename = baseDir + '/accounts/suspended.txt'
 | |
|     if os.path.isfile(suspendedFilename):
 | |
|         with open(suspendedFilename, "r") as f:
 | |
|             lines = f.readlines()
 | |
|         for suspended in lines:
 | |
|             if suspended.strip('\n').strip('\r') == nickname:
 | |
|                 return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def getFollowersList(baseDir: str,
 | |
|                      nickname: str, domain: str,
 | |
|                      followFile='following.txt') -> []:
 | |
|     """Returns a list of followers for the given account
 | |
|     """
 | |
|     filename = \
 | |
|         baseDir + '/accounts/' + nickname + '@' + domain + '/' + followFile
 | |
| 
 | |
|     if not os.path.isfile(filename):
 | |
|         return []
 | |
| 
 | |
|     with open(filename, "r") as f:
 | |
|         lines = f.readlines()
 | |
|         for i in range(len(lines)):
 | |
|             lines[i] = lines[i].strip()
 | |
|         return lines
 | |
|     return []
 | |
| 
 | |
| 
 | |
| def getFollowersOfPerson(baseDir: str,
 | |
|                          nickname: str, domain: str,
 | |
|                          followFile='following.txt') -> []:
 | |
|     """Returns a list containing the followers of the given person
 | |
|     Used by the shared inbox to know who to send incoming mail to
 | |
|     """
 | |
|     followers = []
 | |
|     if ':' in domain:
 | |
|         domain = domain.split(':')[0]
 | |
|     handle = nickname + '@' + domain
 | |
|     if not os.path.isdir(baseDir + '/accounts/' + handle):
 | |
|         return followers
 | |
|     for subdir, dirs, files in os.walk(baseDir + '/accounts'):
 | |
|         for account in dirs:
 | |
|             filename = os.path.join(subdir, account) + '/' + followFile
 | |
|             if account == handle or account.startswith('inbox@'):
 | |
|                 continue
 | |
|             if not os.path.isfile(filename):
 | |
|                 continue
 | |
|             with open(filename, 'r') as followingfile:
 | |
|                 for followingHandle in followingfile:
 | |
|                     followingHandle2 = followingHandle.replace('\n', '')
 | |
|                     followingHandle2 = followingHandle2.replace('\r', '')
 | |
|                     if followingHandle2 == handle:
 | |
|                         if account not in followers:
 | |
|                             followers.append(account)
 | |
|                         break
 | |
|     return followers
 | |
| 
 | |
| 
 | |
| def removeIdEnding(idStr: str) -> str:
 | |
|     """Removes endings such as /activity and /undo
 | |
|     """
 | |
|     if idStr.endswith('/activity'):
 | |
|         idStr = idStr[:-len('/activity')]
 | |
|     elif idStr.endswith('/undo'):
 | |
|         idStr = idStr[:-len('/undo')]
 | |
|     elif idStr.endswith('/event'):
 | |
|         idStr = idStr[:-len('/event')]
 | |
|     elif idStr.endswith('/replies'):
 | |
|         idStr = idStr[:-len('/replies')]
 | |
|     return idStr
 | |
| 
 | |
| 
 | |
| def getProtocolPrefixes() -> []:
 | |
|     """Returns a list of valid prefixes
 | |
|     """
 | |
|     return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://',
 | |
|             'hyper://', 'gemini://', 'gopher://')
 | |
| 
 | |
| 
 | |
| def getLinkPrefixes() -> []:
 | |
|     """Returns a list of valid web link prefixes
 | |
|     """
 | |
|     return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://',
 | |
|             'hyper://', 'gemini://', 'gopher://', 'briar:')
 | |
| 
 | |
| 
 | |
| def removeAvatarFromCache(baseDir: str, actorStr: str) -> None:
 | |
|     """Removes any existing avatar entries from the cache
 | |
|     This avoids duplicate entries with differing extensions
 | |
|     """
 | |
|     avatarFilenameExtensions = ('png', 'jpg', 'gif', 'webp', 'avif')
 | |
|     for extension in avatarFilenameExtensions:
 | |
|         avatarFilename = \
 | |
|             baseDir + '/cache/avatars/' + actorStr + '.' + extension
 | |
|         if os.path.isfile(avatarFilename):
 | |
|             os.remove(avatarFilename)
 | |
| 
 | |
| 
 | |
| def saveJson(jsonObject: {}, filename: str) -> bool:
 | |
|     """Saves json to a file
 | |
|     """
 | |
|     tries = 0
 | |
|     while tries < 5:
 | |
|         try:
 | |
|             with open(filename, 'w+') as fp:
 | |
|                 fp.write(json.dumps(jsonObject))
 | |
|                 return True
 | |
|         except BaseException:
 | |
|             print('WARN: saveJson ' + str(tries))
 | |
|             time.sleep(1)
 | |
|             tries += 1
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def loadJson(filename: str, delaySec=2, maxTries=5) -> {}:
 | |
|     """Makes a few attempts to load a json formatted file
 | |
|     """
 | |
|     jsonObject = None
 | |
|     tries = 0
 | |
|     while tries < maxTries:
 | |
|         try:
 | |
|             with open(filename, 'r') as fp:
 | |
|                 data = fp.read()
 | |
|                 jsonObject = json.loads(data)
 | |
|                 break
 | |
|         except BaseException:
 | |
|             print('WARN: loadJson exception')
 | |
|             if delaySec > 0:
 | |
|                 time.sleep(delaySec)
 | |
|             tries += 1
 | |
|     return jsonObject
 | |
| 
 | |
| 
 | |
| def loadJsonOnionify(filename: str, domain: str, onionDomain: str,
 | |
|                      delaySec=2) -> {}:
 | |
|     """Makes a few attempts to load a json formatted file
 | |
|     This also converts the domain name to the onion domain
 | |
|     """
 | |
|     jsonObject = None
 | |
|     tries = 0
 | |
|     while tries < 5:
 | |
|         try:
 | |
|             with open(filename, 'r') as fp:
 | |
|                 data = fp.read()
 | |
|                 if data:
 | |
|                     data = data.replace(domain, onionDomain)
 | |
|                     data = data.replace('https:', 'http:')
 | |
|                     print('*****data: ' + data)
 | |
|                 jsonObject = json.loads(data)
 | |
|                 break
 | |
|         except BaseException:
 | |
|             print('WARN: loadJson exception')
 | |
|             if delaySec > 0:
 | |
|                 time.sleep(delaySec)
 | |
|             tries += 1
 | |
|     return jsonObject
 | |
| 
 | |
| 
 | |
| def getStatusNumber(publishedStr=None) -> (str, str):
 | |
|     """Returns the status number and published date
 | |
|     """
 | |
|     if not publishedStr:
 | |
|         currTime = datetime.datetime.utcnow()
 | |
|     else:
 | |
|         currTime = \
 | |
|             datetime.datetime.strptime(publishedStr, '%Y-%m-%dT%H:%M:%SZ')
 | |
|     daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
 | |
|     # status is the number of seconds since epoch
 | |
|     statusNumber = \
 | |
|         str(((daysSinceEpoch * 24 * 60 * 60) +
 | |
|              (currTime.hour * 60 * 60) +
 | |
|              (currTime.minute * 60) +
 | |
|              currTime.second) * 1000 +
 | |
|             int(currTime.microsecond / 1000))
 | |
|     # See https://github.com/tootsuite/mastodon/blob/
 | |
|     # 995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb
 | |
|     # use the leftover microseconds as the sequence number
 | |
|     sequenceId = currTime.microsecond % 1000
 | |
|     # shift by 16bits "sequence data"
 | |
|     statusNumber = str((int(statusNumber) << 16) + sequenceId)
 | |
|     published = currTime.strftime("%Y-%m-%dT%H:%M:%SZ")
 | |
|     return statusNumber, published
 | |
| 
 | |
| 
 | |
| def evilIncarnate() -> []:
 | |
|     return ('gab.com', 'gabfed.com', 'spinster.xyz',
 | |
|             'kiwifarms.cc', 'djitter.com')
 | |
| 
 | |
| 
 | |
| def isEvil(domain: str) -> bool:
 | |
|     if not isinstance(domain, str):
 | |
|         print('WARN: Malformed domain ' + str(domain))
 | |
|         return True
 | |
|     # https://www.youtube.com/watch?v=5qw1hcevmdU
 | |
|     evilDomains = evilIncarnate()
 | |
|     for concentratedEvil in evilDomains:
 | |
|         if domain.endswith(concentratedEvil):
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def containsInvalidChars(jsonStr: str) -> bool:
 | |
|     """Does the given json string contain invalid characters?
 | |
|     e.g. dubious clacks/admin dogwhistles
 | |
|     """
 | |
|     invalidStrings = {
 | |
|         '卐', '卍', '࿕', '࿖', '࿗', '࿘'
 | |
|     }
 | |
|     for isInvalid in invalidStrings:
 | |
|         if isInvalid in jsonStr:
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def createPersonDir(nickname: str, domain: str, baseDir: str,
 | |
|                     dirname: str) -> str:
 | |
|     """Create a directory for a person
 | |
|     """
 | |
|     handle = nickname + '@' + domain
 | |
|     if not os.path.isdir(baseDir + '/accounts/' + handle):
 | |
|         os.mkdir(baseDir + '/accounts/' + handle)
 | |
|     boxDir = baseDir + '/accounts/' + handle + '/' + dirname
 | |
|     if not os.path.isdir(boxDir):
 | |
|         os.mkdir(boxDir)
 | |
|     return boxDir
 | |
| 
 | |
| 
 | |
| def createOutboxDir(nickname: str, domain: str, baseDir: str) -> str:
 | |
|     """Create an outbox for a person
 | |
|     """
 | |
|     return createPersonDir(nickname, domain, baseDir, 'outbox')
 | |
| 
 | |
| 
 | |
| def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str:
 | |
|     """Create an inbox queue and returns the feed filename and directory
 | |
|     """
 | |
|     return createPersonDir(nickname, domain, baseDir, 'queue')
 | |
| 
 | |
| 
 | |
| def domainPermitted(domain: str, federationList: []):
 | |
|     if len(federationList) == 0:
 | |
|         return True
 | |
|     if ':' in domain:
 | |
|         domain = domain.split(':')[0]
 | |
|     if domain in federationList:
 | |
|         return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def urlPermitted(url: str, federationList: []):
 | |
|     if isEvil(url):
 | |
|         return False
 | |
|     if not federationList:
 | |
|         return True
 | |
|     for domain in federationList:
 | |
|         if domain in url:
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str:
 | |
|     """Returns the display name for the given actor
 | |
|     """
 | |
|     if '/statuses/' in actor:
 | |
|         actor = actor.split('/statuses/')[0]
 | |
|     if not personCache.get(actor):
 | |
|         return None
 | |
|     if personCache[actor].get('actor'):
 | |
|         if personCache[actor]['actor'].get('name'):
 | |
|             return personCache[actor]['actor']['name']
 | |
|     else:
 | |
|         # Try to obtain from the cached actors
 | |
|         cachedActorFilename = \
 | |
|             baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
 | |
|         if os.path.isfile(cachedActorFilename):
 | |
|             actorJson = loadJson(cachedActorFilename, 1)
 | |
|             if actorJson:
 | |
|                 if actorJson.get('name'):
 | |
|                     return(actorJson['name'])
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def getNicknameFromActor(actor: str) -> str:
 | |
|     """Returns the nickname from an actor url
 | |
|     """
 | |
|     if actor.startswith('@'):
 | |
|         actor = actor[1:]
 | |
|     if '/users/' not in actor:
 | |
|         if '/profile/' in actor:
 | |
|             nickStr = actor.split('/profile/')[1].replace('@', '')
 | |
|             if '/' not in nickStr:
 | |
|                 return nickStr
 | |
|             else:
 | |
|                 return nickStr.split('/')[0]
 | |
|         elif '/channel/' in actor:
 | |
|             nickStr = actor.split('/channel/')[1].replace('@', '')
 | |
|             if '/' not in nickStr:
 | |
|                 return nickStr
 | |
|             else:
 | |
|                 return nickStr.split('/')[0]
 | |
|         elif '/accounts/' in actor:
 | |
|             nickStr = actor.split('/accounts/')[1].replace('@', '')
 | |
|             if '/' not in nickStr:
 | |
|                 return nickStr
 | |
|             else:
 | |
|                 return nickStr.split('/')[0]
 | |
|         elif '/@' in actor:
 | |
|             # https://domain/@nick
 | |
|             nickStr = actor.split('/@')[1]
 | |
|             if '/' in nickStr:
 | |
|                 nickStr = nickStr.split('/')[0]
 | |
|             return nickStr
 | |
|         elif '@' in actor:
 | |
|             nickStr = actor.split('@')[0]
 | |
|             return nickStr
 | |
|         return None
 | |
|     nickStr = actor.split('/users/')[1].replace('@', '')
 | |
|     if '/' not in nickStr:
 | |
|         return nickStr
 | |
|     else:
 | |
|         return nickStr.split('/')[0]
 | |
| 
 | |
| 
 | |
| def getDomainFromActor(actor: str) -> (str, int):
 | |
|     """Returns the domain name from an actor url
 | |
|     """
 | |
|     if actor.startswith('@'):
 | |
|         actor = actor[1:]
 | |
|     port = None
 | |
|     prefixes = getProtocolPrefixes()
 | |
|     if '/profile/' in actor:
 | |
|         domain = actor.split('/profile/')[0]
 | |
|         for prefix in prefixes:
 | |
|             domain = domain.replace(prefix, '')
 | |
|     elif '/accounts/' in actor:
 | |
|         domain = actor.split('/accounts/')[0]
 | |
|         for prefix in prefixes:
 | |
|             domain = domain.replace(prefix, '')
 | |
|     elif '/channel/' in actor:
 | |
|         domain = actor.split('/channel/')[0]
 | |
|         for prefix in prefixes:
 | |
|             domain = domain.replace(prefix, '')
 | |
|     elif '/users/' in actor:
 | |
|         domain = actor.split('/users/')[0]
 | |
|         for prefix in prefixes:
 | |
|             domain = domain.replace(prefix, '')
 | |
|     elif '/@' in actor:
 | |
|         domain = actor.split('/@')[0]
 | |
|         for prefix in prefixes:
 | |
|             domain = domain.replace(prefix, '')
 | |
|     elif '@' in actor:
 | |
|         domain = actor.split('@')[1].strip()
 | |
|     else:
 | |
|         domain = actor
 | |
|         for prefix in prefixes:
 | |
|             domain = domain.replace(prefix, '')
 | |
|         if '/' in actor:
 | |
|             domain = domain.split('/')[0]
 | |
|     if ':' in domain:
 | |
|         portStr = domain.split(':')[1]
 | |
|         if not portStr.isdigit():
 | |
|             return None, None
 | |
|         port = int(portStr)
 | |
|         domain = domain.split(':')[0]
 | |
|     return domain, port
 | |
| 
 | |
| 
 | |
| def followPerson(baseDir: str, nickname: str, domain: str,
 | |
|                  followNickname: str, followDomain: str,
 | |
|                  federationList: [], debug: bool,
 | |
|                  followFile='following.txt') -> bool:
 | |
|     """Adds a person to the follow list
 | |
|     """
 | |
|     if not domainPermitted(followDomain.lower().replace('\n', ''),
 | |
|                            federationList):
 | |
|         if debug:
 | |
|             print('DEBUG: follow of domain ' +
 | |
|                   followDomain + ' not permitted')
 | |
|         return False
 | |
|     if debug:
 | |
|         print('DEBUG: follow of domain ' + followDomain)
 | |
| 
 | |
|     if ':' in domain:
 | |
|         handle = nickname + '@' + domain.split(':')[0]
 | |
|     else:
 | |
|         handle = nickname + '@' + domain
 | |
| 
 | |
|     if not os.path.isdir(baseDir + '/accounts/' + handle):
 | |
|         print('WARN: account for ' + handle + ' does not exist')
 | |
|         return False
 | |
| 
 | |
|     if ':' in followDomain:
 | |
|         handleToFollow = followNickname + '@' + followDomain.split(':')[0]
 | |
|     else:
 | |
|         handleToFollow = followNickname + '@' + followDomain
 | |
| 
 | |
|     # was this person previously unfollowed?
 | |
|     unfollowedFilename = baseDir + '/accounts/' + handle + '/unfollowed.txt'
 | |
|     if os.path.isfile(unfollowedFilename):
 | |
|         if handleToFollow in open(unfollowedFilename).read():
 | |
|             # remove them from the unfollowed file
 | |
|             newLines = ''
 | |
|             with open(unfollowedFilename, "r") as f:
 | |
|                 lines = f.readlines()
 | |
|                 for line in lines:
 | |
|                     if handleToFollow not in line:
 | |
|                         newLines += line
 | |
|             with open(unfollowedFilename, 'w+') as f:
 | |
|                 f.write(newLines)
 | |
| 
 | |
|     if not os.path.isdir(baseDir + '/accounts'):
 | |
|         os.mkdir(baseDir + '/accounts')
 | |
|     handleToFollow = followNickname + '@' + followDomain
 | |
|     filename = baseDir + '/accounts/' + handle + '/' + followFile
 | |
|     if os.path.isfile(filename):
 | |
|         if handleToFollow in open(filename).read():
 | |
|             if debug:
 | |
|                 print('DEBUG: follow already exists')
 | |
|             return True
 | |
|         # prepend to follow file
 | |
|         try:
 | |
|             with open(filename, 'r+') as f:
 | |
|                 content = f.read()
 | |
|                 f.seek(0, 0)
 | |
|                 f.write(handleToFollow + '\n' + content)
 | |
|                 print('DEBUG: follow added')
 | |
|         except Exception as e:
 | |
|             print('WARN: Failed to write entry to follow file ' +
 | |
|                   filename + ' ' + str(e))
 | |
|     else:
 | |
|         # first follow
 | |
|         if debug:
 | |
|             print('DEBUG: ' + handle +
 | |
|                   ' creating new following file to follow ' + handleToFollow +
 | |
|                   ', filename is ' + filename)
 | |
|         with open(filename, 'w+') as f:
 | |
|             f.write(handleToFollow + '\n')
 | |
| 
 | |
|     # Default to adding new follows to the calendar.
 | |
|     # Possibly this could be made optional
 | |
|     if followFile.endswith('following.txt'):
 | |
|         # if following a person add them to the list of
 | |
|         # calendar follows
 | |
|         print('DEBUG: adding ' +
 | |
|               followNickname + '@' + followDomain + ' to calendar of ' +
 | |
|               nickname + '@' + domain)
 | |
|         addPersonToCalendar(baseDir, nickname, domain,
 | |
|                             followNickname, followDomain)
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def votesOnNewswireItem(status: []) -> int:
 | |
|     """Returns the number of votes on a newswire item
 | |
|     """
 | |
|     totalVotes = 0
 | |
|     for line in status:
 | |
|         if 'vote:' in line:
 | |
|             totalVotes += 1
 | |
|     return totalVotes
 | |
| 
 | |
| 
 | |
| def locateNewsVotes(baseDir: str, domain: str,
 | |
|                     postUrl: str) -> str:
 | |
|     """Returns the votes filename for a news post
 | |
|     within the news user account
 | |
|     """
 | |
|     postUrl = \
 | |
|         postUrl.strip().replace('\n', '').replace('\r', '')
 | |
| 
 | |
|     # if this post in the shared inbox?
 | |
|     postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
 | |
| 
 | |
|     if postUrl.endswith('.json'):
 | |
|         postUrl = postUrl + '.votes'
 | |
|     else:
 | |
|         postUrl = postUrl + '.json.votes'
 | |
| 
 | |
|     accountDir = baseDir + '/accounts/news@' + domain + '/'
 | |
|     postFilename = accountDir + 'outbox/' + postUrl
 | |
|     if os.path.isfile(postFilename):
 | |
|         return postFilename
 | |
| 
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def locateNewsArrival(baseDir: str, domain: str,
 | |
|                       postUrl: str) -> str:
 | |
|     """Returns the arrival time for a news post
 | |
|     within the news user account
 | |
|     """
 | |
|     postUrl = \
 | |
|         postUrl.strip().replace('\n', '').replace('\r', '')
 | |
| 
 | |
|     # if this post in the shared inbox?
 | |
|     postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
 | |
| 
 | |
|     if postUrl.endswith('.json'):
 | |
|         postUrl = postUrl + '.arrived'
 | |
|     else:
 | |
|         postUrl = postUrl + '.json.arrived'
 | |
| 
 | |
|     accountDir = baseDir + '/accounts/news@' + domain + '/'
 | |
|     postFilename = accountDir + 'outbox/' + postUrl
 | |
|     if os.path.isfile(postFilename):
 | |
|         with open(postFilename, 'r') as arrivalFile:
 | |
|             arrival = arrivalFile.read()
 | |
|             if arrival:
 | |
|                 arrivalDate = \
 | |
|                     datetime.datetime.strptime(arrival,
 | |
|                                                "%Y-%m-%dT%H:%M:%SZ")
 | |
|                 return arrivalDate
 | |
| 
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def clearFromPostCaches(baseDir: str, recentPostsCache: {},
 | |
|                         postId: str) -> None:
 | |
|     """Clears cached html for the given post, so that edits
 | |
|     to news will appear
 | |
|     """
 | |
|     filename = '/postcache/' + postId + '.html'
 | |
|     for subdir, dirs, files in os.walk(baseDir + '/accounts'):
 | |
|         for acct in dirs:
 | |
|             if '@' not in acct:
 | |
|                 continue
 | |
|             if 'inbox@' in acct:
 | |
|                 continue
 | |
|             cacheDir = os.path.join(baseDir + '/accounts', acct)
 | |
|             postFilename = cacheDir + filename
 | |
|             if os.path.isfile(postFilename):
 | |
|                 try:
 | |
|                     os.remove(postFilename)
 | |
|                 except BaseException:
 | |
|                     print('WARN: clearFromPostCaches file not removed ' +
 | |
|                           postFilename)
 | |
|                     pass
 | |
|             # if the post is in the recent posts cache then remove it
 | |
|             if recentPostsCache.get('index'):
 | |
|                 if postId in recentPostsCache['index']:
 | |
|                     recentPostsCache['index'].remove(postId)
 | |
|             if recentPostsCache.get('json'):
 | |
|                 if recentPostsCache['json'].get(postId):
 | |
|                     del recentPostsCache['json'][postId]
 | |
|             if recentPostsCache.get('html'):
 | |
|                 if recentPostsCache['html'].get(postId):
 | |
|                     del recentPostsCache['html'][postId]
 | |
| 
 | |
| 
 | |
| def locatePost(baseDir: str, nickname: str, domain: str,
 | |
|                postUrl: str, replies=False) -> str:
 | |
|     """Returns the filename for the given status post url
 | |
|     """
 | |
|     if not replies:
 | |
|         extension = 'json'
 | |
|     else:
 | |
|         extension = 'replies'
 | |
| 
 | |
|     # if this post in the shared inbox?
 | |
|     postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
 | |
| 
 | |
|     # add the extension
 | |
|     postUrl = postUrl + '.' + extension
 | |
| 
 | |
|     # search boxes
 | |
|     boxes = ('inbox', 'outbox', 'tlblogs', 'tlevents')
 | |
|     accountDir = baseDir + '/accounts/' + nickname + '@' + domain + '/'
 | |
|     for boxName in boxes:
 | |
|         postFilename = accountDir + boxName + '/' + postUrl
 | |
|         if os.path.isfile(postFilename):
 | |
|             return postFilename
 | |
| 
 | |
|     # check news posts
 | |
|     accountDir = baseDir + '/accounts/news' + '@' + domain + '/'
 | |
|     postFilename = accountDir + 'outbox/' + postUrl
 | |
|     if os.path.isfile(postFilename):
 | |
|         return postFilename
 | |
| 
 | |
|     # is it in the announce cache?
 | |
|     postFilename = baseDir + '/cache/announce/' + nickname + '/' + postUrl
 | |
|     if os.path.isfile(postFilename):
 | |
|         return postFilename
 | |
| 
 | |
|     # print('WARN: unable to locate ' + nickname + ' ' + postUrl)
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def removeAttachment(baseDir: str, httpPrefix: str, domain: str, postJson: {}):
 | |
|     if not postJson.get('attachment'):
 | |
|         return
 | |
|     if not postJson['attachment'][0].get('url'):
 | |
|         return
 | |
| #    if port:
 | |
| #        if port != 80 and port != 443:
 | |
| #            if ':' not in domain:
 | |
| #                domain = domain + ':' + str(port)
 | |
|     attachmentUrl = postJson['attachment'][0]['url']
 | |
|     if not attachmentUrl:
 | |
|         return
 | |
|     mediaFilename = baseDir + '/' + \
 | |
|         attachmentUrl.replace(httpPrefix + '://' + domain + '/', '')
 | |
|     if os.path.isfile(mediaFilename):
 | |
|         os.remove(mediaFilename)
 | |
|     etagFilename = mediaFilename + '.etag'
 | |
|     if os.path.isfile(etagFilename):
 | |
|         os.remove(etagFilename)
 | |
|     postJson['attachment'] = []
 | |
| 
 | |
| 
 | |
| def removeModerationPostFromIndex(baseDir: str, postUrl: str,
 | |
|                                   debug: bool) -> None:
 | |
|     """Removes a url from the moderation index
 | |
|     """
 | |
|     moderationIndexFile = baseDir + '/accounts/moderation.txt'
 | |
|     if not os.path.isfile(moderationIndexFile):
 | |
|         return
 | |
|     postId = removeIdEnding(postUrl)
 | |
|     if postId in open(moderationIndexFile).read():
 | |
|         with open(moderationIndexFile, "r") as f:
 | |
|             lines = f.readlines()
 | |
|             with open(moderationIndexFile, "w+") as f:
 | |
|                 for line in lines:
 | |
|                     if line.strip("\n").strip("\r") != postId:
 | |
|                         f.write(line)
 | |
|                     else:
 | |
|                         if debug:
 | |
|                             print('DEBUG: removed ' + postId +
 | |
|                                   ' from moderation index')
 | |
| 
 | |
| 
 | |
| def isReplyToBlogPost(baseDir: str, nickname: str, domain: str,
 | |
|                       postJsonObject: str):
 | |
|     """Is the given post a reply to a blog post?
 | |
|     """
 | |
|     if not postJsonObject.get('object'):
 | |
|         return False
 | |
|     if not isinstance(postJsonObject['object'], dict):
 | |
|         return False
 | |
|     if not postJsonObject['object'].get('inReplyTo'):
 | |
|         return False
 | |
|     if not isinstance(postJsonObject['object']['inReplyTo'], str):
 | |
|         return False
 | |
|     blogsIndexFilename = baseDir + '/accounts/' + \
 | |
|         nickname + '@' + domain + '/tlblogs.index'
 | |
|     if not os.path.isfile(blogsIndexFilename):
 | |
|         return False
 | |
|     postId = removeIdEnding(postJsonObject['object']['inReplyTo'])
 | |
|     postId = postId.replace('/', '#')
 | |
|     if postId in open(blogsIndexFilename).read():
 | |
|         return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def deletePost(baseDir: str, httpPrefix: str,
 | |
|                nickname: str, domain: str, postFilename: str,
 | |
|                debug: bool, recentPostsCache: {}) -> None:
 | |
|     """Recursively deletes a post and its replies and attachments
 | |
|     """
 | |
|     postJsonObject = loadJson(postFilename, 1)
 | |
|     if postJsonObject:
 | |
|         # don't allow deletion of bookmarked posts
 | |
|         bookmarksIndexFilename = \
 | |
|             baseDir + '/accounts/' + nickname + '@' + domain + \
 | |
|             '/bookmarks.index'
 | |
|         if os.path.isfile(bookmarksIndexFilename):
 | |
|             bookmarkIndex = postFilename.split('/')[-1] + '\n'
 | |
|             if bookmarkIndex in open(bookmarksIndexFilename).read():
 | |
|                 return
 | |
| 
 | |
|         # don't remove replies to blog posts
 | |
|         if isReplyToBlogPost(baseDir, nickname, domain,
 | |
|                              postJsonObject):
 | |
|             return
 | |
| 
 | |
|         # remove from recent posts cache in memory
 | |
|         if recentPostsCache:
 | |
|             postId = \
 | |
|                 removeIdEnding(postJsonObject['id']).replace('/', '#')
 | |
|             if recentPostsCache.get('index'):
 | |
|                 if postId in recentPostsCache['index']:
 | |
|                     recentPostsCache['index'].remove(postId)
 | |
|             if recentPostsCache.get('json'):
 | |
|                 if recentPostsCache['json'].get(postId):
 | |
|                     del recentPostsCache['json'][postId]
 | |
|             if recentPostsCache.get('html'):
 | |
|                 if recentPostsCache['html'].get(postId):
 | |
|                     del recentPostsCache['html'][postId]
 | |
| 
 | |
|         # remove any attachment
 | |
|         removeAttachment(baseDir, httpPrefix, domain, postJsonObject)
 | |
| 
 | |
|         extensions = ('votes', 'arrived', 'muted')
 | |
|         for ext in extensions:
 | |
|             extFilename = postFilename + '.' + ext
 | |
|             if os.path.isfile(extFilename):
 | |
|                 os.remove(extFilename)
 | |
| 
 | |
|         # remove cached html version of the post
 | |
|         cachedPostFilename = \
 | |
|             getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
 | |
|         if cachedPostFilename:
 | |
|             if os.path.isfile(cachedPostFilename):
 | |
|                 os.remove(cachedPostFilename)
 | |
|         # removePostFromCache(postJsonObject,recentPostsCache)
 | |
| 
 | |
|         hasObject = False
 | |
|         if postJsonObject.get('object'):
 | |
|             hasObject = True
 | |
| 
 | |
|         # remove from moderation index file
 | |
|         if hasObject:
 | |
|             if isinstance(postJsonObject['object'], dict):
 | |
|                 if postJsonObject['object'].get('moderationStatus'):
 | |
|                     if postJsonObject.get('id'):
 | |
|                         postId = removeIdEnding(postJsonObject['id'])
 | |
|                         removeModerationPostFromIndex(baseDir, postId, debug)
 | |
| 
 | |
|         # remove any hashtags index entries
 | |
|         removeHashtagIndex = False
 | |
|         if hasObject:
 | |
|             if hasObject and isinstance(postJsonObject['object'], dict):
 | |
|                 if postJsonObject['object'].get('content'):
 | |
|                     if '#' in postJsonObject['object']['content']:
 | |
|                         removeHashtagIndex = True
 | |
|         if removeHashtagIndex:
 | |
|             if postJsonObject['object'].get('id') and \
 | |
|                postJsonObject['object'].get('tag'):
 | |
|                 # get the id of the post
 | |
|                 postId = removeIdEnding(postJsonObject['object']['id'])
 | |
|                 for tag in postJsonObject['object']['tag']:
 | |
|                     if tag['type'] != 'Hashtag':
 | |
|                         continue
 | |
|                     if not tag.get('name'):
 | |
|                         continue
 | |
|                     # find the index file for this tag
 | |
|                     tagIndexFilename = \
 | |
|                         baseDir + '/tags/' + tag['name'][1:] + '.txt'
 | |
|                     if not os.path.isfile(tagIndexFilename):
 | |
|                         continue
 | |
|                     # remove postId from the tag index file
 | |
|                     lines = None
 | |
|                     with open(tagIndexFilename, "r") as f:
 | |
|                         lines = f.readlines()
 | |
|                     if lines:
 | |
|                         newlines = ''
 | |
|                         for fileLine in lines:
 | |
|                             if postId in fileLine:
 | |
|                                 continue
 | |
|                             newlines += fileLine
 | |
|                         if not newlines.strip():
 | |
|                             # if there are no lines then remove the
 | |
|                             # hashtag file
 | |
|                             os.remove(tagIndexFilename)
 | |
|                         else:
 | |
|                             with open(tagIndexFilename, "w+") as f:
 | |
|                                 f.write(newlines)
 | |
| 
 | |
|     # remove any replies
 | |
|     repliesFilename = postFilename.replace('.json', '.replies')
 | |
|     if os.path.isfile(repliesFilename):
 | |
|         if debug:
 | |
|             print('DEBUG: removing replies to ' + postFilename)
 | |
|         with open(repliesFilename, 'r') as f:
 | |
|             for replyId in f:
 | |
|                 replyFile = locatePost(baseDir, nickname, domain, replyId)
 | |
|                 if replyFile:
 | |
|                     if os.path.isfile(replyFile):
 | |
|                         deletePost(baseDir, httpPrefix,
 | |
|                                    nickname, domain, replyFile, debug,
 | |
|                                    recentPostsCache)
 | |
|         # remove the replies file
 | |
|         os.remove(repliesFilename)
 | |
|     # finally, remove the post itself
 | |
|     os.remove(postFilename)
 | |
| 
 | |
| 
 | |
| def validNickname(domain: str, nickname: str) -> bool:
 | |
|     forbiddenChars = ('.', ' ', '/', '?', ':', ';', '@', '#')
 | |
|     for c in forbiddenChars:
 | |
|         if c in nickname:
 | |
|             return False
 | |
|     if nickname == domain:
 | |
|         return False
 | |
|     reservedNames = ('inbox', 'dm', 'outbox', 'following',
 | |
|                      'public', 'followers',
 | |
|                      'channel', 'calendar',
 | |
|                      'tlreplies', 'tlmedia', 'tlblogs',
 | |
|                      'tlevents', 'tlblogs',
 | |
|                      'moderation', 'activity', 'undo',
 | |
|                      'reply', 'replies', 'question', 'like',
 | |
|                      'likes', 'users', 'statuses',
 | |
|                      'accounts', 'channels', 'profile',
 | |
|                      'updates', 'repeat', 'announce',
 | |
|                      'shares', 'fonts', 'icons', 'avatars')
 | |
|     if nickname in reservedNames:
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def noOfAccounts(baseDir: str) -> bool:
 | |
|     """Returns the number of accounts on the system
 | |
|     """
 | |
|     accountCtr = 0
 | |
|     for subdir, dirs, files in os.walk(baseDir + '/accounts'):
 | |
|         for account in dirs:
 | |
|             if '@' in account:
 | |
|                 if not account.startswith('inbox@'):
 | |
|                     accountCtr += 1
 | |
|     return accountCtr
 | |
| 
 | |
| 
 | |
| def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool:
 | |
|     """Returns the number of accounts on the system this month
 | |
|     """
 | |
|     accountCtr = 0
 | |
|     currTime = int(time.time())
 | |
|     monthSeconds = int(60*60*24*30*months)
 | |
|     for subdir, dirs, files in os.walk(baseDir + '/accounts'):
 | |
|         for account in dirs:
 | |
|             if '@' in account:
 | |
|                 if not account.startswith('inbox@'):
 | |
|                     lastUsedFilename = \
 | |
|                         baseDir + '/accounts/' + account + '/.lastUsed'
 | |
|                     if os.path.isfile(lastUsedFilename):
 | |
|                         with open(lastUsedFilename, 'r') as lastUsedFile:
 | |
|                             lastUsed = lastUsedFile.read()
 | |
|                             if lastUsed.isdigit():
 | |
|                                 timeDiff = (currTime - int(lastUsed))
 | |
|                                 if timeDiff < monthSeconds:
 | |
|                                     accountCtr += 1
 | |
|     return accountCtr
 | |
| 
 | |
| 
 | |
| def isPublicPostFromUrl(baseDir: str, nickname: str, domain: str,
 | |
|                         postUrl: str) -> bool:
 | |
|     """Returns whether the given url is a public post
 | |
|     """
 | |
|     postFilename = locatePost(baseDir, nickname, domain, postUrl)
 | |
|     if not postFilename:
 | |
|         return False
 | |
|     postJsonObject = loadJson(postFilename, 1)
 | |
|     if not postJsonObject:
 | |
|         return False
 | |
|     return isPublicPost(postJsonObject)
 | |
| 
 | |
| 
 | |
| def isPublicPost(postJsonObject: {}) -> bool:
 | |
|     """Returns true if the given post is public
 | |
|     """
 | |
|     if not postJsonObject.get('type'):
 | |
|         return False
 | |
|     if postJsonObject['type'] != 'Create':
 | |
|         return False
 | |
|     if not postJsonObject.get('object'):
 | |
|         return False
 | |
|     if not isinstance(postJsonObject['object'], dict):
 | |
|         return False
 | |
|     if not postJsonObject['object'].get('to'):
 | |
|         return False
 | |
|     for recipient in postJsonObject['object']['to']:
 | |
|         if recipient.endswith('#Public'):
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def copytree(src: str, dst: str, symlinks=False, ignore=None):
 | |
|     """Copy a directory
 | |
|     """
 | |
|     for item in os.listdir(src):
 | |
|         s = os.path.join(src, item)
 | |
|         d = os.path.join(dst, item)
 | |
|         if os.path.isdir(s):
 | |
|             shutil.copytree(s, d, symlinks, ignore)
 | |
|         else:
 | |
|             shutil.copy2(s, d)
 | |
| 
 | |
| 
 | |
| def getCachedPostDirectory(baseDir: str, nickname: str, domain: str) -> str:
 | |
|     """Returns the directory where the html post cache exists
 | |
|     """
 | |
|     htmlPostCacheDir = baseDir + '/accounts/' + \
 | |
|         nickname + '@' + domain + '/postcache'
 | |
|     return htmlPostCacheDir
 | |
| 
 | |
| 
 | |
| def getCachedPostFilename(baseDir: str, nickname: str, domain: str,
 | |
|                           postJsonObject: {}) -> str:
 | |
|     """Returns the html cache filename for the given post
 | |
|     """
 | |
|     cachedPostDir = getCachedPostDirectory(baseDir, nickname, domain)
 | |
|     if not os.path.isdir(cachedPostDir):
 | |
|         # print('ERROR: invalid html cache directory '+cachedPostDir)
 | |
|         return None
 | |
|     if '@' not in cachedPostDir:
 | |
|         # print('ERROR: invalid html cache directory '+cachedPostDir)
 | |
|         return None
 | |
|     cachedPostId = removeIdEnding(postJsonObject['id'])
 | |
|     cachedPostFilename = cachedPostDir + '/' + cachedPostId.replace('/', '#')
 | |
|     return cachedPostFilename + '.html'
 | |
| 
 | |
| 
 | |
| def removePostFromCache(postJsonObject: {}, recentPostsCache: {}):
 | |
|     """ if the post exists in the recent posts cache then remove it
 | |
|     """
 | |
|     if not postJsonObject.get('id'):
 | |
|         return
 | |
| 
 | |
|     if not recentPostsCache.get('index'):
 | |
|         return
 | |
| 
 | |
|     postId = postJsonObject['id']
 | |
|     if '#' in postId:
 | |
|         postId = postId.split('#', 1)[0]
 | |
|     postId = removeIdEnding(postId).replace('/', '#')
 | |
|     if postId not in recentPostsCache['index']:
 | |
|         return
 | |
| 
 | |
|     if recentPostsCache['json'].get(postId):
 | |
|         del recentPostsCache['json'][postId]
 | |
|     if recentPostsCache['html'].get(postId):
 | |
|         del recentPostsCache['html'][postId]
 | |
|     recentPostsCache['index'].remove(postId)
 | |
| 
 | |
| 
 | |
| def updateRecentPostsCache(recentPostsCache: {}, maxRecentPosts: int,
 | |
|                            postJsonObject: {}, htmlStr: str) -> None:
 | |
|     """Store recent posts in memory so that they can be quickly recalled
 | |
|     """
 | |
|     if not postJsonObject.get('id'):
 | |
|         return
 | |
|     postId = postJsonObject['id']
 | |
|     if '#' in postId:
 | |
|         postId = postId.split('#', 1)[0]
 | |
|     postId = removeIdEnding(postId).replace('/', '#')
 | |
|     if recentPostsCache.get('index'):
 | |
|         if postId in recentPostsCache['index']:
 | |
|             return
 | |
|         recentPostsCache['index'].append(postId)
 | |
|         postJsonObject['muted'] = False
 | |
|         recentPostsCache['json'][postId] = json.dumps(postJsonObject)
 | |
|         recentPostsCache['html'][postId] = htmlStr
 | |
| 
 | |
|         while len(recentPostsCache['html'].items()) > maxRecentPosts:
 | |
|             postId = recentPostsCache['index'][0]
 | |
|             recentPostsCache['index'].pop(0)
 | |
|             del recentPostsCache['json'][postId]
 | |
|             del recentPostsCache['html'][postId]
 | |
|     else:
 | |
|         recentPostsCache['index'] = [postId]
 | |
|         recentPostsCache['json'] = {}
 | |
|         recentPostsCache['html'] = {}
 | |
|         recentPostsCache['json'][postId] = json.dumps(postJsonObject)
 | |
|         recentPostsCache['html'][postId] = htmlStr
 | |
| 
 | |
| 
 | |
| def fileLastModified(filename: str) -> str:
 | |
|     """Returns the date when a file was last modified
 | |
|     """
 | |
|     t = os.path.getmtime(filename)
 | |
|     modifiedTime = datetime.datetime.fromtimestamp(t)
 | |
|     return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ")
 | |
| 
 | |
| 
 | |
| def getCSS(baseDir: str, cssFilename: str, cssCache: {}) -> str:
 | |
|     """Retrieves the css for a given file, or from a cache
 | |
|     """
 | |
|     # does the css file exist?
 | |
|     if not os.path.isfile(cssFilename):
 | |
|         return None
 | |
| 
 | |
|     lastModified = fileLastModified(cssFilename)
 | |
| 
 | |
|     # has this already been loaded into the cache?
 | |
|     if cssCache.get(cssFilename):
 | |
|         if cssCache[cssFilename][0] == lastModified:
 | |
|             # file hasn't changed, so return the version in the cache
 | |
|             return cssCache[cssFilename][1]
 | |
| 
 | |
|     with open(cssFilename, 'r') as fpCSS:
 | |
|         css = fpCSS.read()
 | |
|         if cssCache.get(cssFilename):
 | |
|             # alter the cache contents
 | |
|             cssCache[cssFilename][0] = lastModified
 | |
|             cssCache[cssFilename][1] = css
 | |
|         else:
 | |
|             # add entry to the cache
 | |
|             cssCache[cssFilename] = [lastModified, css]
 | |
|         return css
 | |
| 
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def daysInMonth(year: int, monthNumber: int) -> int:
 | |
|     """Returns the number of days in the month
 | |
|     """
 | |
|     if monthNumber < 1 or monthNumber > 12:
 | |
|         return None
 | |
|     daysRange = monthrange(year, monthNumber)
 | |
|     return daysRange[1]
 | |
| 
 | |
| 
 | |
| def mergeDicts(dict1: {}, dict2: {}) -> {}:
 | |
|     """Merges two dictionaries
 | |
|     """
 | |
|     res = {**dict1, **dict2}
 | |
|     return res
 | |
| 
 | |
| 
 | |
| def isEventPost(messageJson: {}) -> bool:
 | |
|     """Is the given post a mobilizon-type event activity?
 | |
|     See https://framagit.org/framasoft/mobilizon/-/blob/
 | |
|     master/lib/federation/activity_stream/converter/event.ex
 | |
|     """
 | |
|     if not messageJson.get('id'):
 | |
|         return False
 | |
|     if not messageJson.get('actor'):
 | |
|         return False
 | |
|     if not messageJson.get('object'):
 | |
|         return False
 | |
|     if not isinstance(messageJson['object'], dict):
 | |
|         return False
 | |
|     if not messageJson['object'].get('type'):
 | |
|         return False
 | |
|     if messageJson['object']['type'] != 'Event':
 | |
|         return False
 | |
|     print('Event arriving')
 | |
|     if not messageJson['object'].get('startTime'):
 | |
|         print('No event start time')
 | |
|         return False
 | |
|     if not messageJson['object'].get('actor'):
 | |
|         print('No event actor')
 | |
|         return False
 | |
|     if not messageJson['object'].get('content'):
 | |
|         print('No event content')
 | |
|         return False
 | |
|     if not messageJson['object'].get('name'):
 | |
|         print('No event name')
 | |
|         return False
 | |
|     if not messageJson['object'].get('uuid'):
 | |
|         print('No event UUID')
 | |
|         return False
 | |
|     print('Event detected')
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def isBlogPost(postJsonObject: {}) -> bool:
 | |
|     """Is the given post a blog post?
 | |
|     """
 | |
|     if postJsonObject['type'] != 'Create':
 | |
|         return False
 | |
|     if not postJsonObject.get('object'):
 | |
|         return False
 | |
|     if not isinstance(postJsonObject['object'], dict):
 | |
|         return False
 | |
|     if not postJsonObject['object'].get('type'):
 | |
|         return False
 | |
|     if not postJsonObject['object'].get('content'):
 | |
|         return False
 | |
|     if postJsonObject['object']['type'] != 'Article':
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def isNewsPost(postJsonObject: {}) -> bool:
 | |
|     """Is the given post a blog post?
 | |
|     """
 | |
|     return postJsonObject.get('news')
 | |
| 
 | |
| 
 | |
| def searchBoxPosts(baseDir: str, nickname: str, domain: str,
 | |
|                    searchStr: str, maxResults: int,
 | |
|                    boxName='outbox') -> []:
 | |
|     """Search your posts and return a list of the filenames
 | |
|     containing matching strings
 | |
|     """
 | |
|     path = baseDir + '/accounts/' + nickname + '@' + domain + '/' + boxName
 | |
|     if not os.path.isdir(path):
 | |
|         return []
 | |
|     searchStr = searchStr.lower().strip()
 | |
| 
 | |
|     if '+' in searchStr:
 | |
|         searchWords = searchStr.split('+')
 | |
|         for index in range(len(searchWords)):
 | |
|             searchWords[index] = searchWords[index].strip()
 | |
|         print('SEARCH: ' + str(searchWords))
 | |
|     else:
 | |
|         searchWords = [searchStr]
 | |
| 
 | |
|     res = []
 | |
|     for root, dirs, fnames in os.walk(path):
 | |
|         for fname in fnames:
 | |
|             filePath = os.path.join(root, fname)
 | |
|             with open(filePath, 'r') as postFile:
 | |
|                 data = postFile.read().lower()
 | |
| 
 | |
|                 notFound = False
 | |
|                 for keyword in searchWords:
 | |
|                     if keyword not in data:
 | |
|                         notFound = True
 | |
|                         break
 | |
|                 if notFound:
 | |
|                     continue
 | |
| 
 | |
|                 res.append(filePath)
 | |
|                 if len(res) >= maxResults:
 | |
|                     return res
 | |
|     return res
 | |
| 
 | |
| 
 | |
| def getFileCaseInsensitive(path: str) -> str:
 | |
|     """Returns a case specific filename given a case insensitive version of it
 | |
|     """
 | |
|     if os.path.isfile(path):
 | |
|         return path
 | |
|     if path != path.lower():
 | |
|         if os.path.isfile(path.lower()):
 | |
|             return path.lower()
 | |
|     # directory, filename = os.path.split(path)
 | |
|     # directory, filename = (directory or '.'), filename.lower()
 | |
|     # for f in os.listdir(directory):
 | |
|     #     if f.lower() == filename:
 | |
|     #         newpath = os.path.join(directory, f)
 | |
|     #         if os.path.isfile(newpath):
 | |
|     #             return newpath
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def undoLikesCollectionEntry(recentPostsCache: {},
 | |
|                              baseDir: str, postFilename: str, objectUrl: str,
 | |
|                              actor: str, domain: str, debug: bool) -> None:
 | |
|     """Undoes a like for a particular actor
 | |
|     """
 | |
|     postJsonObject = loadJson(postFilename)
 | |
|     if postJsonObject:
 | |
|         # remove any cached version of this post so that the
 | |
|         # like icon is changed
 | |
|         nickname = getNicknameFromActor(actor)
 | |
|         cachedPostFilename = getCachedPostFilename(baseDir, nickname,
 | |
|                                                    domain, postJsonObject)
 | |
|         if cachedPostFilename:
 | |
|             if os.path.isfile(cachedPostFilename):
 | |
|                 os.remove(cachedPostFilename)
 | |
|         removePostFromCache(postJsonObject, recentPostsCache)
 | |
| 
 | |
|         if not postJsonObject.get('type'):
 | |
|             return
 | |
|         if postJsonObject['type'] != 'Create':
 | |
|             return
 | |
|         if not postJsonObject.get('object'):
 | |
|             if debug:
 | |
|                 pprint(postJsonObject)
 | |
|                 print('DEBUG: post '+objectUrl+' has no object')
 | |
|             return
 | |
|         if not isinstance(postJsonObject['object'], dict):
 | |
|             return
 | |
|         if not postJsonObject['object'].get('likes'):
 | |
|             return
 | |
|         if not isinstance(postJsonObject['object']['likes'], dict):
 | |
|             return
 | |
|         if not postJsonObject['object']['likes'].get('items'):
 | |
|             return
 | |
|         totalItems = 0
 | |
|         if postJsonObject['object']['likes'].get('totalItems'):
 | |
|             totalItems = postJsonObject['object']['likes']['totalItems']
 | |
|         itemFound = False
 | |
|         for likeItem in postJsonObject['object']['likes']['items']:
 | |
|             if likeItem.get('actor'):
 | |
|                 if likeItem['actor'] == actor:
 | |
|                     if debug:
 | |
|                         print('DEBUG: like was removed for ' + actor)
 | |
|                     postJsonObject['object']['likes']['items'].remove(likeItem)
 | |
|                     itemFound = True
 | |
|                     break
 | |
|         if itemFound:
 | |
|             if totalItems == 1:
 | |
|                 if debug:
 | |
|                     print('DEBUG: likes was removed from post')
 | |
|                 del postJsonObject['object']['likes']
 | |
|             else:
 | |
|                 itlen = len(postJsonObject['object']['likes']['items'])
 | |
|                 postJsonObject['object']['likes']['totalItems'] = itlen
 | |
| 
 | |
|             saveJson(postJsonObject, postFilename)
 | |
| 
 | |
| 
 | |
| def updateLikesCollection(recentPostsCache: {},
 | |
|                           baseDir: str, postFilename: str,
 | |
|                           objectUrl: str,
 | |
|                           actor: str, domain: str, debug: bool) -> None:
 | |
|     """Updates the likes collection within a post
 | |
|     """
 | |
|     postJsonObject = loadJson(postFilename)
 | |
|     if not postJsonObject:
 | |
|         return
 | |
|     # remove any cached version of this post so that the
 | |
|     # like icon is changed
 | |
|     nickname = getNicknameFromActor(actor)
 | |
|     cachedPostFilename = getCachedPostFilename(baseDir, nickname,
 | |
|                                                domain, postJsonObject)
 | |
|     if cachedPostFilename:
 | |
|         if os.path.isfile(cachedPostFilename):
 | |
|             os.remove(cachedPostFilename)
 | |
|     removePostFromCache(postJsonObject, recentPostsCache)
 | |
| 
 | |
|     if not postJsonObject.get('object'):
 | |
|         if debug:
 | |
|             pprint(postJsonObject)
 | |
|             print('DEBUG: post ' + objectUrl + ' has no object')
 | |
|         return
 | |
|     if not isinstance(postJsonObject['object'], dict):
 | |
|         return
 | |
|     if not objectUrl.endswith('/likes'):
 | |
|         objectUrl = objectUrl + '/likes'
 | |
|     if not postJsonObject['object'].get('likes'):
 | |
|         if debug:
 | |
|             print('DEBUG: Adding initial like to ' + objectUrl)
 | |
|         likesJson = {
 | |
|             "@context": "https://www.w3.org/ns/activitystreams",
 | |
|             'id': objectUrl,
 | |
|             'type': 'Collection',
 | |
|             "totalItems": 1,
 | |
|             'items': [{
 | |
|                 'type': 'Like',
 | |
|                 'actor': actor
 | |
|             }]
 | |
|         }
 | |
|         postJsonObject['object']['likes'] = likesJson
 | |
|     else:
 | |
|         if not postJsonObject['object']['likes'].get('items'):
 | |
|             postJsonObject['object']['likes']['items'] = []
 | |
|         for likeItem in postJsonObject['object']['likes']['items']:
 | |
|             if likeItem.get('actor'):
 | |
|                 if likeItem['actor'] == actor:
 | |
|                     return
 | |
|         newLike = {
 | |
|             'type': 'Like',
 | |
|             'actor': actor
 | |
|         }
 | |
|         postJsonObject['object']['likes']['items'].append(newLike)
 | |
|         itlen = len(postJsonObject['object']['likes']['items'])
 | |
|         postJsonObject['object']['likes']['totalItems'] = itlen
 | |
| 
 | |
|     if debug:
 | |
|         print('DEBUG: saving post with likes added')
 | |
|         pprint(postJsonObject)
 | |
|     saveJson(postJsonObject, postFilename)
 | |
| 
 | |
| 
 | |
| def undoAnnounceCollectionEntry(recentPostsCache: {},
 | |
|                                 baseDir: str, postFilename: str,
 | |
|                                 actor: str, domain: str, debug: bool) -> None:
 | |
|     """Undoes an announce for a particular actor by removing it from
 | |
|     the "shares" collection within a post. Note that the "shares"
 | |
|     collection has no relation to shared items in shares.py. It's
 | |
|     shares of posts, not shares of physical objects.
 | |
|     """
 | |
|     postJsonObject = loadJson(postFilename)
 | |
|     if postJsonObject:
 | |
|         # remove any cached version of this announce so that the announce
 | |
|         # icon is changed
 | |
|         nickname = getNicknameFromActor(actor)
 | |
|         cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain,
 | |
|                                                    postJsonObject)
 | |
|         if cachedPostFilename:
 | |
|             if os.path.isfile(cachedPostFilename):
 | |
|                 os.remove(cachedPostFilename)
 | |
|         removePostFromCache(postJsonObject, recentPostsCache)
 | |
| 
 | |
|         if not postJsonObject.get('type'):
 | |
|             return
 | |
|         if postJsonObject['type'] != 'Create':
 | |
|             return
 | |
|         if not postJsonObject.get('object'):
 | |
|             if debug:
 | |
|                 pprint(postJsonObject)
 | |
|                 print('DEBUG: post has no object')
 | |
|             return
 | |
|         if not isinstance(postJsonObject['object'], dict):
 | |
|             return
 | |
|         if not postJsonObject['object'].get('shares'):
 | |
|             return
 | |
|         if not postJsonObject['object']['shares'].get('items'):
 | |
|             return
 | |
|         totalItems = 0
 | |
|         if postJsonObject['object']['shares'].get('totalItems'):
 | |
|             totalItems = postJsonObject['object']['shares']['totalItems']
 | |
|         itemFound = False
 | |
|         for announceItem in postJsonObject['object']['shares']['items']:
 | |
|             if announceItem.get('actor'):
 | |
|                 if announceItem['actor'] == actor:
 | |
|                     if debug:
 | |
|                         print('DEBUG: Announce was removed for ' + actor)
 | |
|                     anIt = announceItem
 | |
|                     postJsonObject['object']['shares']['items'].remove(anIt)
 | |
|                     itemFound = True
 | |
|                     break
 | |
|         if itemFound:
 | |
|             if totalItems == 1:
 | |
|                 if debug:
 | |
|                     print('DEBUG: shares (announcements) ' +
 | |
|                           'was removed from post')
 | |
|                 del postJsonObject['object']['shares']
 | |
|             else:
 | |
|                 itlen = len(postJsonObject['object']['shares']['items'])
 | |
|                 postJsonObject['object']['shares']['totalItems'] = itlen
 | |
| 
 | |
|             saveJson(postJsonObject, postFilename)
 | |
| 
 | |
| 
 | |
| def updateAnnounceCollection(recentPostsCache: {},
 | |
|                              baseDir: str, postFilename: str,
 | |
|                              actor: str, domain: str, debug: bool) -> None:
 | |
|     """Updates the announcements collection within a post
 | |
|     Confusingly this is known as "shares", but isn't the
 | |
|     same as shared items within shares.py
 | |
|     It's shares of posts, not shares of physical objects.
 | |
|     """
 | |
|     postJsonObject = loadJson(postFilename)
 | |
|     if postJsonObject:
 | |
|         # remove any cached version of this announce so that the announce
 | |
|         # icon is changed
 | |
|         nickname = getNicknameFromActor(actor)
 | |
|         cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain,
 | |
|                                                    postJsonObject)
 | |
|         if cachedPostFilename:
 | |
|             if os.path.isfile(cachedPostFilename):
 | |
|                 os.remove(cachedPostFilename)
 | |
|         removePostFromCache(postJsonObject, recentPostsCache)
 | |
| 
 | |
|         if not postJsonObject.get('object'):
 | |
|             if debug:
 | |
|                 pprint(postJsonObject)
 | |
|                 print('DEBUG: post ' + postFilename + ' has no object')
 | |
|             return
 | |
|         if not isinstance(postJsonObject['object'], dict):
 | |
|             return
 | |
|         postUrl = removeIdEnding(postJsonObject['id']) + '/shares'
 | |
|         if not postJsonObject['object'].get('shares'):
 | |
|             if debug:
 | |
|                 print('DEBUG: Adding initial shares (announcements) to ' +
 | |
|                       postUrl)
 | |
|             announcementsJson = {
 | |
|                 "@context": "https://www.w3.org/ns/activitystreams",
 | |
|                 'id': postUrl,
 | |
|                 'type': 'Collection',
 | |
|                 "totalItems": 1,
 | |
|                 'items': [{
 | |
|                     'type': 'Announce',
 | |
|                     'actor': actor
 | |
|                 }]
 | |
|             }
 | |
|             postJsonObject['object']['shares'] = announcementsJson
 | |
|         else:
 | |
|             if postJsonObject['object']['shares'].get('items'):
 | |
|                 sharesItems = postJsonObject['object']['shares']['items']
 | |
|                 for announceItem in sharesItems:
 | |
|                     if announceItem.get('actor'):
 | |
|                         if announceItem['actor'] == actor:
 | |
|                             return
 | |
|                 newAnnounce = {
 | |
|                     'type': 'Announce',
 | |
|                     'actor': actor
 | |
|                 }
 | |
|                 postJsonObject['object']['shares']['items'].append(newAnnounce)
 | |
|                 itlen = len(postJsonObject['object']['shares']['items'])
 | |
|                 postJsonObject['object']['shares']['totalItems'] = itlen
 | |
|             else:
 | |
|                 if debug:
 | |
|                     print('DEBUG: shares (announcements) section of post ' +
 | |
|                           'has no items list')
 | |
| 
 | |
|         if debug:
 | |
|             print('DEBUG: saving post with shares (announcements) added')
 | |
|             pprint(postJsonObject)
 | |
|         saveJson(postJsonObject, postFilename)
 | |
| 
 | |
| 
 | |
| def siteIsActive(url: str) -> bool:
 | |
|     """Returns true if the current url is resolvable.
 | |
|     This can be used to check that an instance is online before
 | |
|     trying to send posts to it.
 | |
|     """
 | |
|     if not url.startswith('http'):
 | |
|         return False
 | |
|     try:
 | |
|         req = urllib.request.Request(url)
 | |
|         urllib.request.urlopen(req, timeout=10)  # nosec
 | |
|         return True
 | |
|     except SocketError as e:
 | |
|         if e.errno == errno.ECONNRESET:
 | |
|             print('WARN: connection was reset during siteIsActive')
 | |
|     return False
 |