epicyon/utils.py

3023 lines
97 KiB
Python
Raw Normal View History

2020-04-04 13:44:49 +00:00
__filename__ = "utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
2021-01-26 10:07:42 +00:00
__version__ = "1.2.0"
2020-04-04 13:44:49 +00:00
__maintainer__ = "Bob Mottram"
2021-09-10 16:14:50 +00:00
__email__ = "bob@libreserver.org"
2020-04-04 13:44:49 +00:00
__status__ = "Production"
2021-06-26 11:16:41 +00:00
__module_group__ = "Core"
2019-07-02 09:25:29 +00:00
import os
import re
2019-10-11 18:03:58 +00:00
import time
2019-09-29 18:48:34 +00:00
import shutil
2019-07-02 09:25:29 +00:00
import datetime
2019-11-23 10:20:30 +00:00
import json
import idna
2021-03-18 17:27:46 +00:00
import locale
2020-06-06 18:16:16 +00:00
from pprint import pprint
from followingCalendar import addPersonToCalendar
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
2021-02-11 10:33:56 +00:00
# posts containing these strings will always get screened out,
# both incoming and outgoing.
# Could include dubious clacks or admin dogwhistles
invalidCharacters = (
2021-08-30 18:15:51 +00:00
'', '', '', '', '', '', 'ϟϟ', '🏳️‍🌈🚫', '⚡⚡'
2021-02-11 10:33:56 +00:00
)
2021-08-14 11:13:39 +00:00
def localActorUrl(httpPrefix: str, nickname: str, domainFull: str) -> str:
"""Returns the url for an actor on this instance
"""
return httpPrefix + '://' + domainFull + '/users/' + nickname
def getActorLanguagesList(actorJson: {}) -> []:
"""Returns a list containing languages used by the given actor
"""
if not actorJson.get('attachment'):
return []
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith('languages'):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
2021-08-11 09:00:17 +00:00
if isinstance(propertyValue['value'], list):
langList = propertyValue['value']
langList.sort()
return langList
elif isinstance(propertyValue['value'], str):
langStr = propertyValue['value']
langListTemp = []
if ',' in langStr:
langListTemp = langStr.split(',')
elif ';' in langStr:
langListTemp = langStr.split(';')
elif '/' in langStr:
langListTemp = langStr.split('/')
elif '+' in langStr:
langListTemp = langStr.split('+')
elif ' ' in langStr:
langListTemp = langStr.split(' ')
langList = []
for lang in langListTemp:
lang = lang.strip()
if lang not in langList:
langList.append(lang)
langList.sort()
return langList
return []
def getContentFromPost(postJsonObject: {}, systemLanguage: str,
languagesUnderstood: []) -> str:
"""Returns the content from the post in the given language
2021-07-19 19:40:04 +00:00
including searching for a matching entry within contentMap
"""
thisPostJson = postJsonObject
if hasObjectDict(postJsonObject):
thisPostJson = postJsonObject['object']
if not thisPostJson.get('content'):
return ''
2021-07-20 12:28:56 +00:00
content = ''
if thisPostJson.get('contentMap'):
if isinstance(thisPostJson['contentMap'], dict):
if thisPostJson['contentMap'].get(systemLanguage):
if isinstance(thisPostJson['contentMap'][systemLanguage], str):
return thisPostJson['contentMap'][systemLanguage]
else:
# is there a contentMap entry for one of
# the understood languages?
for lang in languagesUnderstood:
if thisPostJson['contentMap'].get(lang):
return thisPostJson['contentMap'][lang]
2021-07-20 12:28:56 +00:00
else:
if isinstance(thisPostJson['content'], str):
content = thisPostJson['content']
return content
2021-07-19 19:40:04 +00:00
def getBaseContentFromPost(postJsonObject: {}, systemLanguage: str) -> str:
"""Returns the content from the post in the given language
"""
thisPostJson = postJsonObject
if hasObjectDict(postJsonObject):
thisPostJson = postJsonObject['object']
if not thisPostJson.get('content'):
return ''
return thisPostJson['content']
2021-07-13 21:59:53 +00:00
def acctDir(baseDir: str, nickname: str, domain: str) -> str:
return baseDir + '/accounts/' + nickname + '@' + domain
2021-02-13 11:37:02 +00:00
def isFeaturedWriter(baseDir: str, nickname: str, domain: str) -> bool:
"""Is the given account a featured writer, appearing in the features
timeline on news instances?
"""
featuresBlockedFilename = \
2021-07-13 21:59:53 +00:00
acctDir(baseDir, nickname, domain) + '/.nofeatures'
2021-02-13 11:37:02 +00:00
return not os.path.isfile(featuresBlockedFilename)
def refreshNewswire(baseDir: str):
"""Causes the newswire to be updates after a change to user accounts
"""
refreshNewswireFilename = baseDir + '/accounts/.refresh_newswire'
if os.path.isfile(refreshNewswireFilename):
return
2021-06-22 12:27:10 +00:00
with open(refreshNewswireFilename, 'w+') as refreshFile:
refreshFile.write('\n')
def getSHA256(msg: str):
"""Returns a SHA256 hash of the given string
"""
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(msg)
return digest.finalize()
2021-09-08 10:05:45 +00:00
def getSHA512(msg: str):
"""Returns a SHA512 hash of the given string
"""
digest = hashes.Hash(hashes.SHA512(), backend=default_backend())
digest.update(msg)
return digest.finalize()
2019-07-02 09:25:29 +00:00
2020-04-04 13:44:49 +00:00
def _localNetworkHost(host: str) -> bool:
"""Returns true if the given host is on the local network
"""
2021-01-25 11:51:42 +00:00
if host.startswith('localhost') or \
host.startswith('192.') or \
host.startswith('127.') or \
host.startswith('10.'):
return True
return False
def decodedHost(host: str) -> str:
"""Convert hostname to internationalized domain
https://en.wikipedia.org/wiki/Internationalized_domain_name
"""
if ':' not in host:
# eg. mydomain:8000
if not _localNetworkHost(host):
if not host.endswith('.onion'):
if not host.endswith('.i2p'):
return idna.decode(host)
return host
def getLockedAccount(actorJson: {}) -> bool:
"""Returns whether the given account requires follower approval
"""
if not actorJson.get('manuallyApprovesFollowers'):
return False
if actorJson['manuallyApprovesFollowers'] is True:
return True
return False
2020-12-23 10:57:44 +00:00
def hasUsersPath(pathStr: str) -> bool:
"""Whether there is a /users/ path (or equivalent) in the given string
"""
usersList = getUserPaths()
2020-12-23 10:57:44 +00:00
for usersStr in usersList:
if usersStr in pathStr:
2020-12-23 10:57:44 +00:00
return True
2021-06-03 18:30:48 +00:00
if '://' in pathStr:
domain = pathStr.split('://')[1]
if '/' in domain:
domain = domain.split('/')[0]
if '://' + domain + '/' not in pathStr:
return False
nickname = pathStr.split('://' + domain + '/')[1]
if '/' in nickname or '.' in nickname:
return False
return True
2020-12-23 10:57:44 +00:00
return False
2021-06-20 11:28:35 +00:00
def validPostDate(published: str, maxAgeDays: int = 90,
debug: bool = False) -> bool:
"""Returns true if the published date is recent and is not in the future
"""
baselineTime = datetime.datetime(1970, 1, 1)
daysDiff = datetime.datetime.utcnow() - baselineTime
nowDaysSinceEpoch = daysDiff.days
2021-01-09 10:23:05 +00:00
try:
postTimeObject = \
datetime.datetime.strptime(published, "%Y-%m-%dT%H:%M:%SZ")
except BaseException:
return False
daysDiff = postTimeObject - baselineTime
postDaysSinceEpoch = daysDiff.days
if postDaysSinceEpoch > nowDaysSinceEpoch:
2021-03-14 19:53:22 +00:00
if debug:
print("Inbox post has a published date in the future!")
return False
if nowDaysSinceEpoch - postDaysSinceEpoch >= maxAgeDays:
2021-03-14 19:53:22 +00:00
if debug:
print("Inbox post is not recent enough")
return False
return True
2020-12-16 10:30:54 +00:00
def getFullDomain(domain: str, port: int) -> str:
"""Returns the full domain name, including port number
"""
if not port:
return domain
if ':' in domain:
return domain
if port == 80 or port == 443:
return domain
return domain + ':' + str(port)
def isDormant(baseDir: str, nickname: str, domain: str, actor: str,
2021-07-13 15:49:29 +00:00
dormantMonths: int = 3) -> bool:
"""Is the given followed actor dormant, from the standpoint
of the given account
"""
2021-07-13 21:59:53 +00:00
lastSeenFilename = acctDir(baseDir, nickname, domain) + \
'/lastseen/' + actor.replace('/', '#') + '.txt'
if not os.path.isfile(lastSeenFilename):
return False
with open(lastSeenFilename, 'r') as lastSeenFile:
daysSinceEpochStr = lastSeenFile.read()
2020-12-13 13:45:06 +00:00
daysSinceEpoch = int(daysSinceEpochStr)
currTime = datetime.datetime.utcnow()
currDaysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
timeDiffMonths = \
2020-12-13 13:45:06 +00:00
int((currDaysSinceEpoch - daysSinceEpoch) / 30)
if timeDiffMonths >= dormantMonths:
return True
return False
2020-12-01 21:44:27 +00:00
def isEditor(baseDir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an editor
"""
editorsFile = baseDir + '/accounts/editors.txt'
if not os.path.isfile(editorsFile):
adminName = getConfigParam(baseDir, 'admin')
if not adminName:
return False
if adminName == nickname:
return True
return False
2021-07-13 14:40:49 +00:00
with open(editorsFile, 'r') as f:
2020-12-01 21:44:27 +00:00
lines = f.readlines()
if len(lines) == 0:
adminName = getConfigParam(baseDir, 'admin')
if not adminName:
return False
if adminName == nickname:
return True
for editor in lines:
editor = editor.strip('\n').strip('\r')
if editor == nickname:
return True
return False
def isArtist(baseDir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an artist
"""
artistsFile = baseDir + '/accounts/artists.txt'
if not os.path.isfile(artistsFile):
adminName = getConfigParam(baseDir, 'admin')
if not adminName:
return False
if adminName == nickname:
return True
return False
2021-07-13 14:40:49 +00:00
with open(artistsFile, 'r') as f:
lines = f.readlines()
if len(lines) == 0:
adminName = getConfigParam(baseDir, 'admin')
if not adminName:
return False
if adminName == nickname:
return True
for artist in lines:
artist = artist.strip('\n').strip('\r')
if artist == nickname:
return True
return False
2021-08-03 09:09:04 +00:00
def getVideoExtensions() -> []:
"""Returns a list of the possible video file extensions
"""
return ('mp4', 'webm', 'ogv')
def getAudioExtensions() -> []:
"""Returns a list of the possible audio file extensions
"""
return ('mp3', 'ogg', 'flac')
2020-11-21 11:21:05 +00:00
def getImageExtensions() -> []:
"""Returns a list of the possible image file extensions
"""
2021-01-11 22:27:57 +00:00
return ('png', 'jpg', 'jpeg', 'gif', 'webp', 'avif', 'svg')
2020-11-21 11:21:05 +00:00
2021-07-09 20:53:49 +00:00
def getImageMimeType(imageFilename: str) -> str:
"""Returns the mime type for the given image
"""
extensionsToMime = {
'png': 'png',
'jpg': 'jpeg',
'gif': 'gif',
'avif': 'avif',
'svg': 'svg+xml',
'webp': 'webp'
}
for ext, mimeExt in extensionsToMime.items():
if imageFilename.endswith('.' + ext):
return 'image/' + mimeExt
return 'image/png'
def getImageExtensionFromMimeType(contentType: str) -> str:
"""Returns the image extension from a mime type, such as image/jpeg
"""
imageMedia = {
'png': 'png',
'jpeg': 'jpg',
'gif': 'gif',
'svg+xml': 'svg',
'webp': 'webp',
'avif': 'avif'
}
for mimeExt, ext in imageMedia.items():
if contentType.endswith(mimeExt):
return ext
return 'png'
2020-11-21 11:54:29 +00:00
def getMediaExtensions() -> []:
"""Returns a list of the possible media file extensions
"""
return getImageExtensions() + getVideoExtensions() + getAudioExtensions()
2020-11-21 11:21:05 +00:00
def getImageFormats() -> str:
"""Returns a string of permissable image formats
used when selecting an image for a new post
"""
imageExt = getImageExtensions()
imageFormats = ''
for ext in imageExt:
if imageFormats:
imageFormats += ', '
imageFormats += '.' + ext
return imageFormats
2021-07-01 09:51:16 +00:00
def isImageFile(filename: str) -> bool:
"""Is the given filename an image?
"""
for ext in getImageExtensions():
if filename.endswith('.' + ext):
return True
return False
2020-11-21 11:54:29 +00:00
def getMediaFormats() -> str:
"""Returns a string of permissable media formats
used when selecting an attachment for a new post
"""
mediaExt = getMediaExtensions()
mediaFormats = ''
for ext in mediaExt:
if mediaFormats:
mediaFormats += ', '
mediaFormats += '.' + ext
return mediaFormats
def removeHtml(content: str) -> str:
"""Removes html links from the given content.
Used to ensure that profile descriptions don't contain dubious content
"""
if '<' not in content:
return content
removing = False
content = content.replace('<a href', ' <a href')
content = content.replace('<q>', '"').replace('</q>', '"')
2021-03-23 10:52:10 +00:00
content = content.replace('</p>', '\n\n').replace('<br>', '\n')
result = ''
for ch in content:
if ch == '<':
removing = True
elif ch == '>':
removing = False
elif not removing:
result += ch
2021-03-23 11:22:09 +00:00
2021-03-23 10:38:03 +00:00
plainText = result.replace(' ', ' ')
# insert spaces after full stops
strLen = len(plainText)
result = ''
for i in range(strLen):
result += plainText[i]
if plainText[i] == '.' and i < strLen - 1:
if plainText[i + 1] >= 'A' and plainText[i + 1] <= 'Z':
result += ' '
2021-03-23 11:22:09 +00:00
result = result.replace(' ', ' ').strip()
return result
2020-11-08 10:52:07 +00:00
def firstParagraphFromString(content: str) -> str:
"""Get the first paragraph from a blog post
to be used as a summary in the newswire feed
"""
if '<p>' not in content or '</p>' not in content:
return removeHtml(content)
paragraph = content.split('<p>')[1]
if '</p>' in paragraph:
paragraph = paragraph.split('</p>')[0]
return removeHtml(paragraph)
2020-10-13 11:13:32 +00:00
def isSystemAccount(nickname: str) -> bool:
"""Returns true if the given nickname is a system account
"""
if nickname == 'news' or nickname == 'inbox':
return True
return False
def _createConfig(baseDir: str) -> None:
2020-10-06 08:58:44 +00:00
"""Creates a configuration file
"""
configFilename = baseDir + '/config.json'
if os.path.isfile(configFilename):
return
configJson = {
}
saveJson(configJson, configFilename)
def setConfigParam(baseDir: str, variableName: str, variableValue) -> None:
"""Sets a configuration value
"""
_createConfig(baseDir)
2020-10-06 08:58:44 +00:00
configFilename = baseDir + '/config.json'
configJson = {}
if os.path.isfile(configFilename):
configJson = loadJson(configFilename)
configJson[variableName] = variableValue
saveJson(configJson, configFilename)
def getConfigParam(baseDir: str, variableName: str):
"""Gets a configuration value
"""
_createConfig(baseDir)
2020-10-06 08:58:44 +00:00
configFilename = baseDir + '/config.json'
configJson = loadJson(configFilename)
if configJson:
2020-10-13 20:33:23 +00:00
if variableName in configJson:
2020-10-06 08:58:44 +00:00
return configJson[variableName]
return None
def isSuspended(baseDir: str, nickname: str) -> bool:
"""Returns true if the given nickname is suspended
"""
adminNickname = getConfigParam(baseDir, 'admin')
2020-10-10 16:10:32 +00:00
if not adminNickname:
return False
2020-10-06 08:58:44 +00:00
if nickname == adminNickname:
return False
suspendedFilename = baseDir + '/accounts/suspended.txt'
if os.path.isfile(suspendedFilename):
2021-07-13 14:40:49 +00:00
with open(suspendedFilename, 'r') as f:
2020-10-06 08:58:44 +00:00
lines = f.readlines()
for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname:
return True
return False
2020-09-25 14:14:59 +00:00
def getFollowersList(baseDir: str,
nickname: str, domain: str,
followFile='following.txt') -> []:
"""Returns a list of followers for the given account
"""
2021-07-13 21:59:53 +00:00
filename = acctDir(baseDir, nickname, domain) + '/' + followFile
2020-09-25 14:14:59 +00:00
if not os.path.isfile(filename):
return []
2021-07-13 14:40:49 +00:00
with open(filename, 'r') as f:
2020-09-25 14:14:59 +00:00
lines = f.readlines()
for i in range(len(lines)):
lines[i] = lines[i].strip()
return lines
return []
2020-09-25 13:21:56 +00:00
def getFollowersOfPerson(baseDir: str,
nickname: str, domain: str,
followFile='following.txt') -> []:
"""Returns a list containing the followers of the given person
Used by the shared inbox to know who to send incoming mail to
"""
followers = []
domain = removeDomainPort(domain)
2020-09-25 13:21:56 +00:00
handle = nickname + '@' + domain
if not os.path.isdir(baseDir + '/accounts/' + handle):
return followers
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for account in dirs:
filename = os.path.join(subdir, account) + '/' + followFile
2021-04-21 16:09:56 +00:00
if account == handle or \
account.startswith('inbox@') or \
account.startswith('news@'):
2020-09-25 13:21:56 +00:00
continue
if not os.path.isfile(filename):
continue
with open(filename, 'r') as followingfile:
for followingHandle in followingfile:
followingHandle2 = followingHandle.replace('\n', '')
followingHandle2 = followingHandle2.replace('\r', '')
if followingHandle2 == handle:
if account not in followers:
followers.append(account)
break
2020-12-13 22:13:45 +00:00
break
2020-09-25 13:21:56 +00:00
return followers
2020-08-23 11:13:35 +00:00
def removeIdEnding(idStr: str) -> str:
"""Removes endings such as /activity and /undo
"""
if idStr.endswith('/activity'):
idStr = idStr[:-len('/activity')]
elif idStr.endswith('/undo'):
idStr = idStr[:-len('/undo')]
elif idStr.endswith('/event'):
idStr = idStr[:-len('/event')]
2020-08-23 14:45:58 +00:00
elif idStr.endswith('/replies'):
idStr = idStr[:-len('/replies')]
2020-08-23 11:13:35 +00:00
return idStr
2020-06-11 12:26:15 +00:00
def getProtocolPrefixes() -> []:
"""Returns a list of valid prefixes
"""
2021-01-02 10:37:19 +00:00
return ('https://', 'http://', 'ftp://',
'dat://', 'i2p://', 'gnunet://',
2020-06-11 12:26:15 +00:00
'hyper://', 'gemini://', 'gopher://')
def getLinkPrefixes() -> []:
"""Returns a list of valid web link prefixes
"""
2021-01-02 10:37:19 +00:00
return ('https://', 'http://', 'ftp://',
2021-09-21 10:24:42 +00:00
'dat://', 'i2p://', 'gnunet://', 'payto://',
2020-06-11 12:26:15 +00:00
'hyper://', 'gemini://', 'gopher://', 'briar:')
2020-04-04 13:44:49 +00:00
def removeAvatarFromCache(baseDir: str, actorStr: str) -> None:
"""Removes any existing avatar entries from the cache
This avoids duplicate entries with differing extensions
"""
2020-11-21 11:54:29 +00:00
avatarFilenameExtensions = getImageExtensions()
for extension in avatarFilenameExtensions:
2020-04-04 13:44:49 +00:00
avatarFilename = \
baseDir + '/cache/avatars/' + actorStr + '.' + extension
if os.path.isfile(avatarFilename):
try:
os.remove(avatarFilename)
except BaseException:
pass
2020-04-04 13:44:49 +00:00
def saveJson(jsonObject: {}, filename: str) -> bool:
2019-10-22 11:55:06 +00:00
"""Saves json to a file
"""
2020-04-04 13:44:49 +00:00
tries = 0
while tries < 5:
try:
with open(filename, 'w+') as fp:
fp.write(json.dumps(jsonObject))
return True
except BaseException:
print('WARN: saveJson ' + str(tries))
time.sleep(1)
tries += 1
2019-10-22 11:55:06 +00:00
return False
2020-04-04 13:44:49 +00:00
2021-07-13 15:49:29 +00:00
def loadJson(filename: str, delaySec: int = 2, maxTries: int = 5) -> {}:
2019-10-22 11:55:06 +00:00
"""Makes a few attempts to load a json formatted file
"""
2020-04-04 13:44:49 +00:00
jsonObject = None
tries = 0
2020-08-29 09:09:15 +00:00
while tries < maxTries:
2019-10-22 11:55:06 +00:00
try:
with open(filename, 'r') as fp:
data = fp.read()
2020-04-04 13:44:49 +00:00
jsonObject = json.loads(data)
2019-10-22 11:55:06 +00:00
break
2020-04-04 13:44:49 +00:00
except BaseException:
2019-10-26 13:01:32 +00:00
print('WARN: loadJson exception')
2020-04-04 13:44:49 +00:00
if delaySec > 0:
time.sleep(delaySec)
2020-04-04 13:44:49 +00:00
tries += 1
2019-10-22 11:55:06 +00:00
return jsonObject
2020-04-04 13:44:49 +00:00
def loadJsonOnionify(filename: str, domain: str, onionDomain: str,
2021-07-13 15:49:29 +00:00
delaySec: int = 2) -> {}:
2020-03-02 14:35:44 +00:00
"""Makes a few attempts to load a json formatted file
This also converts the domain name to the onion domain
"""
2020-04-04 13:44:49 +00:00
jsonObject = None
tries = 0
while tries < 5:
2020-03-02 14:35:44 +00:00
try:
with open(filename, 'r') as fp:
data = fp.read()
if data:
data = data.replace(domain, onionDomain)
data = data.replace('https:', 'http:')
print('*****data: ' + data)
jsonObject = json.loads(data)
break
2020-04-04 13:44:49 +00:00
except BaseException:
2020-03-02 14:35:44 +00:00
print('WARN: loadJson exception')
2020-04-04 13:44:49 +00:00
if delaySec > 0:
2020-03-02 14:35:44 +00:00
time.sleep(delaySec)
2020-04-04 13:44:49 +00:00
tries += 1
2020-03-02 14:35:44 +00:00
return jsonObject
2020-04-04 13:44:49 +00:00
2021-06-20 11:28:35 +00:00
def getStatusNumber(publishedStr: str = None) -> (str, str):
2019-07-02 09:25:29 +00:00
"""Returns the status number and published date
"""
2020-10-07 16:55:15 +00:00
if not publishedStr:
currTime = datetime.datetime.utcnow()
else:
currTime = \
datetime.datetime.strptime(publishedStr, '%Y-%m-%dT%H:%M:%SZ')
2020-04-04 13:44:49 +00:00
daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
2019-07-02 09:25:29 +00:00
# status is the number of seconds since epoch
2020-04-04 13:44:49 +00:00
statusNumber = \
str(((daysSinceEpoch * 24 * 60 * 60) +
(currTime.hour * 60 * 60) +
(currTime.minute * 60) +
currTime.second) * 1000 +
int(currTime.microsecond / 1000))
# See https://github.com/tootsuite/mastodon/blob/
# 995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb
2019-10-12 12:45:53 +00:00
# use the leftover microseconds as the sequence number
2020-04-04 13:44:49 +00:00
sequenceId = currTime.microsecond % 1000
2019-10-12 12:45:53 +00:00
# shift by 16bits "sequence data"
2020-04-04 13:44:49 +00:00
statusNumber = str((int(statusNumber) << 16) + sequenceId)
published = currTime.strftime("%Y-%m-%dT%H:%M:%SZ")
return statusNumber, published
2019-07-02 09:25:29 +00:00
2020-03-28 10:33:04 +00:00
def evilIncarnate() -> []:
2020-04-04 13:44:49 +00:00
return ('gab.com', 'gabfed.com', 'spinster.xyz',
'kiwifarms.cc', 'djitter.com')
2020-03-28 10:33:04 +00:00
2019-09-09 15:53:23 +00:00
def isEvil(domain: str) -> bool:
2021-02-06 21:05:09 +00:00
# https://www.youtube.com/watch?v=5qw1hcevmdU
2020-01-17 23:19:17 +00:00
if not isinstance(domain, str):
2020-04-04 13:44:49 +00:00
print('WARN: Malformed domain ' + str(domain))
2020-01-17 23:19:17 +00:00
return True
2021-02-06 21:05:09 +00:00
# if a domain contains any of these strings then it is
# declaring itself to be hostile
evilEmporium = (
'nazi', 'extremis', 'extreemis', 'gendercritic',
'kiwifarm', 'illegal', 'raplst', 'rapist',
'antivax', 'plandemic'
)
for hostileStr in evilEmporium:
if hostileStr in domain:
return True
2020-04-04 13:44:49 +00:00
evilDomains = evilIncarnate()
2019-09-09 15:53:23 +00:00
for concentratedEvil in evilDomains:
if domain.endswith(concentratedEvil):
return True
return False
2020-04-04 13:44:49 +00:00
2020-10-15 08:59:08 +00:00
def containsInvalidChars(jsonStr: str) -> bool:
"""Does the given json string contain invalid characters?
"""
2021-02-11 10:33:56 +00:00
for isInvalid in invalidCharacters:
2020-10-15 08:59:08 +00:00
if isInvalid in jsonStr:
return True
return False
2021-02-11 11:02:05 +00:00
def removeInvalidChars(text: str) -> str:
2021-02-11 10:33:56 +00:00
"""Removes any invalid characters from a string
"""
for isInvalid in invalidCharacters:
if isInvalid not in text:
continue
text = text.replace(isInvalid, '')
return text
2020-04-04 13:44:49 +00:00
def createPersonDir(nickname: str, domain: str, baseDir: str,
dirname: str) -> str:
2019-07-04 10:02:56 +00:00
"""Create a directory for a person
2019-07-02 09:25:29 +00:00
"""
2020-04-04 13:44:49 +00:00
handle = nickname + '@' + domain
if not os.path.isdir(baseDir + '/accounts/' + handle):
os.mkdir(baseDir + '/accounts/' + handle)
boxDir = baseDir + '/accounts/' + handle + '/' + dirname
2019-07-04 10:02:56 +00:00
if not os.path.isdir(boxDir):
os.mkdir(boxDir)
return boxDir
2020-04-04 13:44:49 +00:00
def createOutboxDir(nickname: str, domain: str, baseDir: str) -> str:
2019-07-04 10:02:56 +00:00
"""Create an outbox for a person
"""
2020-04-04 13:44:49 +00:00
return createPersonDir(nickname, domain, baseDir, 'outbox')
2019-07-04 10:02:56 +00:00
2020-04-04 13:44:49 +00:00
def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str:
2019-07-04 10:02:56 +00:00
"""Create an inbox queue and returns the feed filename and directory
"""
2020-04-04 13:44:49 +00:00
return createPersonDir(nickname, domain, baseDir, 'queue')
2019-07-02 10:39:55 +00:00
def domainPermitted(domain: str, federationList: []):
2020-04-04 13:44:49 +00:00
if len(federationList) == 0:
2019-07-02 10:39:55 +00:00
return True
domain = removeDomainPort(domain)
2019-07-02 10:39:55 +00:00
if domain in federationList:
return True
return False
2020-04-04 13:44:49 +00:00
2020-09-27 19:27:24 +00:00
def urlPermitted(url: str, federationList: []):
2019-09-09 15:53:23 +00:00
if isEvil(url):
return False
2019-11-16 12:14:14 +00:00
if not federationList:
2019-07-02 10:39:55 +00:00
return True
for domain in federationList:
if domain in url:
return True
return False
2019-07-06 15:17:21 +00:00
2020-04-04 13:44:49 +00:00
2021-02-15 10:06:49 +00:00
def getLocalNetworkAddresses() -> []:
"""Returns patterns for local network address detection
"""
return ('localhost', '127.0.', '192.168', '10.0.')
2021-06-09 14:01:26 +00:00
def isLocalNetworkAddress(ipAddress: str) -> bool:
"""
"""
localIPs = getLocalNetworkAddresses()
for ipAddr in localIPs:
if ipAddress.startswith(ipAddr):
return True
return False
def _isDangerousString(content: str, allowLocalNetworkAccess: bool,
separators: [], invalidStrings: []) -> bool:
"""Returns true if the given string is dangerous
"""
2021-05-19 11:29:37 +00:00
for separatorStyle in separators:
startChar = separatorStyle[0]
endChar = separatorStyle[1]
if startChar not in content:
continue
2021-05-19 11:29:37 +00:00
if endChar not in content:
continue
contentSections = content.split(startChar)
invalidPartials = ()
if not allowLocalNetworkAccess:
invalidPartials = getLocalNetworkAddresses()
for markup in contentSections:
if endChar not in markup:
continue
markup = markup.split(endChar)[0].strip()
for partialMatch in invalidPartials:
if partialMatch in markup:
return True
2021-05-19 11:29:37 +00:00
if ' ' not in markup:
for badStr in invalidStrings:
if badStr in markup:
return True
else:
for badStr in invalidStrings:
if badStr + ' ' in markup:
return True
return False
def dangerousMarkup(content: str, allowLocalNetworkAccess: bool) -> bool:
"""Returns true if the given content contains dangerous html markup
"""
separators = [['<', '>'], ['&lt;', '&gt;']]
invalidStrings = [
2021-09-19 15:54:51 +00:00
'script', 'noscript', 'code', 'pre',
'canvas', 'style', 'abbr',
'frame', 'iframe', 'html', 'body',
'hr', 'allow-popups', 'allow-scripts'
]
return _isDangerousString(content, allowLocalNetworkAccess,
separators, invalidStrings)
def dangerousSVG(content: str, allowLocalNetworkAccess: bool) -> bool:
"""Returns true if the given svg file content contains dangerous scripts
"""
separators = [['<', '>'], ['&lt;', '&gt;']]
invalidStrings = [
'script'
]
return _isDangerousString(content, allowLocalNetworkAccess,
separators, invalidStrings)
2020-04-04 13:44:49 +00:00
def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str:
"""Returns the display name for the given actor
2019-08-22 12:41:16 +00:00
"""
if '/statuses/' in actor:
2020-04-04 13:44:49 +00:00
actor = actor.split('/statuses/')[0]
2019-08-22 13:29:57 +00:00
if not personCache.get(actor):
return None
nameFound = None
2019-08-22 12:56:33 +00:00
if personCache[actor].get('actor'):
if personCache[actor]['actor'].get('name'):
nameFound = personCache[actor]['actor']['name']
else:
# Try to obtain from the cached actors
2020-04-04 13:44:49 +00:00
cachedActorFilename = \
baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cachedActorFilename):
2020-04-04 13:44:49 +00:00
actorJson = loadJson(cachedActorFilename, 1)
if actorJson:
if actorJson.get('name'):
nameFound = actorJson['name']
if nameFound:
if dangerousMarkup(nameFound, False):
nameFound = "*ADVERSARY*"
return nameFound
2019-08-22 12:41:16 +00:00
2020-04-04 13:44:49 +00:00
2021-06-24 19:25:39 +00:00
def _genderFromString(translate: {}, text: str) -> str:
2021-06-24 19:28:26 +00:00
"""Given some text, does it contain a gender description?
"""
2021-06-24 19:25:39 +00:00
gender = None
2021-07-23 14:32:21 +00:00
if not text:
return None
2021-06-24 19:25:39 +00:00
textOrig = text
text = text.lower()
if translate['He/Him'].lower() in text or \
translate['boy'].lower() in text:
gender = 'He/Him'
elif (translate['She/Her'].lower() in text or
translate['girl'].lower() in text):
gender = 'She/Her'
elif 'him' in text or 'male' in text:
gender = 'He/Him'
elif 'her' in text or 'she' in text or \
'fem' in text or 'woman' in text:
gender = 'She/Her'
elif 'man' in text or 'He' in textOrig:
gender = 'He/Him'
return gender
2021-03-03 13:37:18 +00:00
def getGenderFromBio(baseDir: str, actor: str, personCache: {},
translate: {}) -> str:
2021-03-03 13:02:47 +00:00
"""Tries to ascertain gender from bio description
2021-06-24 19:25:39 +00:00
This is for use by text-to-speech for pitch setting
2021-03-03 13:02:47 +00:00
"""
2021-06-24 19:25:39 +00:00
defaultGender = 'They/Them'
2021-03-03 13:02:47 +00:00
if '/statuses/' in actor:
actor = actor.split('/statuses/')[0]
if not personCache.get(actor):
2021-06-24 19:25:39 +00:00
return defaultGender
2021-03-03 13:02:47 +00:00
bioFound = None
2021-03-03 19:15:32 +00:00
if translate:
pronounStr = translate['pronoun'].lower()
else:
pronounStr = 'pronoun'
2021-06-24 19:10:23 +00:00
actorJson = None
2021-03-03 13:02:47 +00:00
if personCache[actor].get('actor'):
2021-06-24 19:10:23 +00:00
actorJson = personCache[actor]['actor']
2021-03-03 13:02:47 +00:00
else:
# Try to obtain from the cached actors
cachedActorFilename = \
baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cachedActorFilename):
actorJson = loadJson(cachedActorFilename, 1)
2021-06-24 19:10:23 +00:00
if not actorJson:
2021-06-24 19:25:39 +00:00
return defaultGender
2021-06-24 19:10:23 +00:00
# is gender defined as a profile tag?
if actorJson.get('attachment'):
tagsList = actorJson['attachment']
if isinstance(tagsList, list):
2021-06-24 19:25:39 +00:00
# look for a gender field name
2021-06-24 19:10:23 +00:00
for tag in tagsList:
if not isinstance(tag, dict):
continue
if not tag.get('name') or not tag.get('value'):
continue
if tag['name'].lower() == \
translate['gender'].lower():
bioFound = tag['value']
break
elif tag['name'].lower().startswith(pronounStr):
bioFound = tag['value']
break
2021-06-24 19:25:39 +00:00
# the field name could be anything,
# just look at the value
if not bioFound:
for tag in tagsList:
if not isinstance(tag, dict):
continue
if not tag.get('name') or not tag.get('value'):
continue
gender = _genderFromString(translate, tag['value'])
if gender:
return gender
2021-06-24 19:10:23 +00:00
# if not then use the bio
if not bioFound and actorJson.get('summary'):
bioFound = actorJson['summary']
2021-03-03 13:02:47 +00:00
if not bioFound:
2021-06-24 19:25:39 +00:00
return defaultGender
gender = _genderFromString(translate, bioFound)
if not gender:
gender = defaultGender
2021-03-03 13:02:47 +00:00
return gender
2019-07-06 15:17:21 +00:00
def getNicknameFromActor(actor: str) -> str:
"""Returns the nickname from an actor url
"""
2020-08-13 16:41:02 +00:00
if actor.startswith('@'):
actor = actor[1:]
2021-07-04 22:58:01 +00:00
usersPaths = getUserPaths()
2021-06-24 19:55:29 +00:00
for possiblePath in usersPaths:
if possiblePath in actor:
nickStr = actor.split(possiblePath)[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
2021-06-24 19:55:29 +00:00
if '/@' in actor:
# https://domain/@nick
nickStr = actor.split('/@')[1]
if '/' in nickStr:
nickStr = nickStr.split('/')[0]
2019-07-10 09:47:07 +00:00
return nickStr
2021-06-24 19:55:29 +00:00
elif '@' in actor:
nickStr = actor.split('@')[0]
return nickStr
elif '://' in actor:
domain = actor.split('://')[1]
if '/' in domain:
domain = domain.split('/')[0]
if '://' + domain + '/' not in actor:
return None
nickStr = actor.split('://' + domain + '/')[1]
if '/' in nickStr or '.' in nickStr:
return None
return nickStr
return None
2019-07-06 15:17:21 +00:00
2020-04-04 13:44:49 +00:00
2021-07-04 22:58:01 +00:00
def getUserPaths() -> []:
"""Returns possible user paths
e.g. /users/nickname, /channel/nickname
2021-07-04 22:58:01 +00:00
"""
2021-09-13 13:57:37 +00:00
return ('/users/', '/profile/', '/accounts/', '/channel/', '/u/',
'/c/', '/video-channels/')
2021-07-04 22:58:01 +00:00
2021-07-30 13:00:23 +00:00
def getGroupPaths() -> []:
"""Returns possible group paths
2021-07-30 13:03:29 +00:00
e.g. https://lemmy/c/groupname
2021-07-30 13:00:23 +00:00
"""
2021-09-13 13:57:37 +00:00
return ['/c/', '/video-channels/']
2021-07-30 13:00:23 +00:00
2020-04-04 13:44:49 +00:00
def getDomainFromActor(actor: str) -> (str, int):
2019-07-06 15:17:21 +00:00
"""Returns the domain name from an actor url
"""
2020-08-13 16:41:02 +00:00
if actor.startswith('@'):
actor = actor[1:]
2020-04-04 13:44:49 +00:00
port = None
2020-06-11 12:46:44 +00:00
prefixes = getProtocolPrefixes()
2021-07-04 22:58:01 +00:00
usersPaths = getUserPaths()
2021-06-24 19:55:29 +00:00
for possiblePath in usersPaths:
if possiblePath in actor:
domain = actor.split(possiblePath)[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
break
if '/@' in actor:
2020-08-13 16:41:02 +00:00
domain = actor.split('/@')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '@' in actor:
domain = actor.split('@')[1].strip()
2019-07-06 15:17:21 +00:00
else:
2020-08-13 16:41:02 +00:00
domain = actor
2020-08-13 16:19:35 +00:00
for prefix in prefixes:
domain = domain.replace(prefix, '')
2020-08-13 16:41:02 +00:00
if '/' in actor:
domain = domain.split('/')[0]
2019-07-06 15:17:21 +00:00
if ':' in domain:
port = getPortFromDomain(domain)
domain = removeDomainPort(domain)
2020-04-04 13:44:49 +00:00
return domain, port
def _setDefaultPetName(baseDir: str, nickname: str, domain: str,
followNickname: str, followDomain: str) -> None:
2020-11-23 15:07:55 +00:00
"""Sets a default petname
This helps especially when using onion or i2p address
"""
domain = removeDomainPort(domain)
2021-07-13 21:59:53 +00:00
userPath = acctDir(baseDir, nickname, domain)
2020-11-23 15:07:55 +00:00
petnamesFilename = userPath + '/petnames.txt'
petnameLookupEntry = followNickname + ' ' + \
followNickname + '@' + followDomain + '\n'
if not os.path.isfile(petnamesFilename):
# if there is no existing petnames lookup file
with open(petnamesFilename, 'w+') as petnamesFile:
petnamesFile.write(petnameLookupEntry)
2020-11-23 15:07:55 +00:00
return
with open(petnamesFilename, 'r') as petnamesFile:
petnamesStr = petnamesFile.read()
if petnamesStr:
petnamesList = petnamesStr.split('\n')
for pet in petnamesList:
if pet.startswith(followNickname + ' '):
# petname already exists
return
2020-11-23 15:07:55 +00:00
# petname doesn't already exist
with open(petnamesFilename, 'a+') as petnamesFile:
petnamesFile.write(petnameLookupEntry)
2020-11-23 15:07:55 +00:00
2020-04-04 13:44:49 +00:00
def followPerson(baseDir: str, nickname: str, domain: str,
followNickname: str, followDomain: str,
federationList: [], debug: bool,
2021-07-30 16:06:34 +00:00
groupAccount: bool,
followFile: str = 'following.txt') -> bool:
2019-07-06 19:24:52 +00:00
"""Adds a person to the follow list
"""
followDomainStrLower = followDomain.lower().replace('\n', '')
if not domainPermitted(followDomainStrLower,
2019-07-06 19:24:52 +00:00
federationList):
if debug:
2020-04-04 13:44:49 +00:00
print('DEBUG: follow of domain ' +
followDomain + ' not permitted')
2019-07-06 19:24:52 +00:00
return False
2019-07-11 12:29:31 +00:00
if debug:
2020-04-04 13:44:49 +00:00
print('DEBUG: follow of domain ' + followDomain)
2019-07-16 22:57:45 +00:00
if ':' in domain:
domainOnly = removeDomainPort(domain)
handle = nickname + '@' + domainOnly
2019-07-16 22:57:45 +00:00
else:
2020-09-15 09:16:03 +00:00
handle = nickname + '@' + domain
2020-03-03 11:02:34 +00:00
2020-04-04 13:44:49 +00:00
if not os.path.isdir(baseDir + '/accounts/' + handle):
print('WARN: account for ' + handle + ' does not exist')
2020-03-03 09:56:48 +00:00
return False
2019-07-16 22:57:45 +00:00
if ':' in followDomain:
followDomainOnly = removeDomainPort(followDomain)
handleToFollow = followNickname + '@' + followDomainOnly
2019-07-16 22:57:45 +00:00
else:
2020-04-04 13:44:49 +00:00
handleToFollow = followNickname + '@' + followDomain
2021-07-31 11:56:28 +00:00
if groupAccount:
handleToFollow = '!' + handleToFollow
# was this person previously unfollowed?
2020-04-04 13:44:49 +00:00
unfollowedFilename = baseDir + '/accounts/' + handle + '/unfollowed.txt'
if os.path.isfile(unfollowedFilename):
if handleToFollow in open(unfollowedFilename).read():
# remove them from the unfollowed file
2020-04-04 13:44:49 +00:00
newLines = ''
2021-07-13 14:40:49 +00:00
with open(unfollowedFilename, 'r') as f:
2020-04-04 13:44:49 +00:00
lines = f.readlines()
for line in lines:
if handleToFollow not in line:
2020-04-04 13:44:49 +00:00
newLines += line
with open(unfollowedFilename, 'w+') as f:
f.write(newLines)
2020-04-04 13:44:49 +00:00
if not os.path.isdir(baseDir + '/accounts'):
os.mkdir(baseDir + '/accounts')
handleToFollow = followNickname + '@' + followDomain
2021-07-31 11:56:28 +00:00
if groupAccount:
handleToFollow = '!' + handleToFollow
2020-04-04 13:44:49 +00:00
filename = baseDir + '/accounts/' + handle + '/' + followFile
2019-07-06 19:24:52 +00:00
if os.path.isfile(filename):
if handleToFollow in open(filename).read():
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: follow already exists')
2019-07-06 19:24:52 +00:00
return True
2019-10-26 15:15:38 +00:00
# prepend to follow file
try:
2020-09-03 10:12:11 +00:00
with open(filename, 'r+') as f:
content = f.read()
if handleToFollow + '\n' not in content:
f.seek(0, 0)
f.write(handleToFollow + '\n' + content)
print('DEBUG: follow added')
2019-10-26 15:15:38 +00:00
except Exception as e:
2020-04-04 13:44:49 +00:00
print('WARN: Failed to write entry to follow file ' +
filename + ' ' + str(e))
2020-09-03 10:09:40 +00:00
else:
# first follow
if debug:
print('DEBUG: ' + handle +
' creating new following file to follow ' + handleToFollow +
', filename is ' + filename)
with open(filename, 'w+') as f:
f.write(handleToFollow + '\n')
2020-09-03 10:09:40 +00:00
if followFile.endswith('following.txt'):
2020-11-23 15:07:55 +00:00
# Default to adding new follows to the calendar.
# Possibly this could be made optional
2020-09-03 10:09:40 +00:00
# if following a person add them to the list of
# calendar follows
print('DEBUG: adding ' +
followNickname + '@' + followDomain + ' to calendar of ' +
nickname + '@' + domain)
2020-09-03 10:09:40 +00:00
addPersonToCalendar(baseDir, nickname, domain,
followNickname, followDomain)
2020-11-23 15:07:55 +00:00
# add a default petname
_setDefaultPetName(baseDir, nickname, domain,
followNickname, followDomain)
2019-07-06 19:24:52 +00:00
return True
2019-07-11 12:29:31 +00:00
2020-04-04 13:44:49 +00:00
2020-10-08 19:47:23 +00:00
def votesOnNewswireItem(status: []) -> int:
"""Returns the number of votes on a newswire item
"""
totalVotes = 0
for line in status:
if 'vote:' in line:
totalVotes += 1
return totalVotes
def locateNewsVotes(baseDir: str, domain: str,
postUrl: str) -> str:
"""Returns the votes filename for a news post
within the news user account
"""
postUrl = \
postUrl.strip().replace('\n', '').replace('\r', '')
# if this post in the shared inbox?
postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
if postUrl.endswith('.json'):
postUrl = postUrl + '.votes'
else:
postUrl = postUrl + '.json.votes'
2020-10-09 12:15:20 +00:00
accountDir = baseDir + '/accounts/news@' + domain + '/'
2020-10-08 19:47:23 +00:00
postFilename = accountDir + 'outbox/' + postUrl
if os.path.isfile(postFilename):
return postFilename
2020-10-09 12:15:20 +00:00
return None
def locateNewsArrival(baseDir: str, domain: str,
postUrl: str) -> str:
"""Returns the arrival time for a news post
within the news user account
"""
postUrl = \
postUrl.strip().replace('\n', '').replace('\r', '')
# if this post in the shared inbox?
postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
if postUrl.endswith('.json'):
2020-10-09 12:38:58 +00:00
postUrl = postUrl + '.arrived'
2020-10-09 12:15:20 +00:00
else:
2020-10-09 12:38:58 +00:00
postUrl = postUrl + '.json.arrived'
2020-10-09 12:15:20 +00:00
accountDir = baseDir + '/accounts/news@' + domain + '/'
postFilename = accountDir + 'outbox/' + postUrl
if os.path.isfile(postFilename):
with open(postFilename, 'r') as arrivalFile:
arrival = arrivalFile.read()
if arrival:
arrivalDate = \
datetime.datetime.strptime(arrival,
"%Y-%m-%dT%H:%M:%SZ")
return arrivalDate
2020-10-09 12:15:20 +00:00
2020-10-08 19:47:23 +00:00
return None
def clearFromPostCaches(baseDir: str, recentPostsCache: {},
postId: str) -> None:
2020-10-18 16:19:28 +00:00
"""Clears cached html for the given post, so that edits
to news will appear
"""
2020-10-18 19:41:18 +00:00
filename = '/postcache/' + postId + '.html'
2020-10-18 16:19:28 +00:00
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
2021-04-21 16:09:56 +00:00
if acct.startswith('inbox@'):
2020-10-18 16:19:28 +00:00
continue
2020-10-18 19:41:18 +00:00
cacheDir = os.path.join(baseDir + '/accounts', acct)
2020-10-18 16:19:28 +00:00
postFilename = cacheDir + filename
if os.path.isfile(postFilename):
try:
os.remove(postFilename)
except BaseException:
2020-10-18 19:35:47 +00:00
print('WARN: clearFromPostCaches file not removed ' +
postFilename)
2020-10-18 16:19:28 +00:00
pass
# if the post is in the recent posts cache then remove it
if recentPostsCache.get('index'):
if postId in recentPostsCache['index']:
recentPostsCache['index'].remove(postId)
if recentPostsCache.get('json'):
if recentPostsCache['json'].get(postId):
del recentPostsCache['json'][postId]
if recentPostsCache.get('html'):
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
2020-12-13 22:13:45 +00:00
break
2020-10-18 16:19:28 +00:00
2020-04-04 13:44:49 +00:00
def locatePost(baseDir: str, nickname: str, domain: str,
2021-06-20 11:28:35 +00:00
postUrl: str, replies: bool = False) -> str:
2019-07-11 12:29:31 +00:00
"""Returns the filename for the given status post url
"""
2019-07-13 19:28:14 +00:00
if not replies:
2020-04-04 13:44:49 +00:00
extension = 'json'
2019-07-13 19:28:14 +00:00
else:
2020-04-04 13:44:49 +00:00
extension = 'replies'
2019-11-18 14:42:18 +00:00
2019-07-11 19:31:02 +00:00
# if this post in the shared inbox?
2020-08-23 11:13:35 +00:00
postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
2019-11-18 14:42:18 +00:00
2020-05-18 10:19:31 +00:00
# add the extension
postUrl = postUrl + '.' + extension
2020-05-18 10:14:29 +00:00
# search boxes
boxes = ('inbox', 'outbox', 'tlblogs')
2021-07-13 21:59:53 +00:00
accountDir = acctDir(baseDir, nickname, domain) + '/'
2020-05-18 10:14:29 +00:00
for boxName in boxes:
2020-05-18 10:19:31 +00:00
postFilename = accountDir + boxName + '/' + postUrl
2020-05-18 10:14:29 +00:00
if os.path.isfile(postFilename):
return postFilename
2020-10-08 13:07:17 +00:00
# check news posts
accountDir = baseDir + '/accounts/news' + '@' + domain + '/'
2020-10-08 19:47:23 +00:00
postFilename = accountDir + 'outbox/' + postUrl
2020-10-08 13:07:17 +00:00
if os.path.isfile(postFilename):
return postFilename
2020-05-18 10:14:29 +00:00
# is it in the announce cache?
2020-05-18 10:59:45 +00:00
postFilename = baseDir + '/cache/announce/' + nickname + '/' + postUrl
2019-11-18 14:42:18 +00:00
if os.path.isfile(postFilename):
return postFilename
2020-05-18 10:14:29 +00:00
2020-08-21 16:10:47 +00:00
# print('WARN: unable to locate ' + nickname + ' ' + postUrl)
2019-11-18 14:42:18 +00:00
return None
2019-07-14 16:37:01 +00:00
2020-04-04 13:44:49 +00:00
def _getPublishedDate(postJsonObject: {}) -> str:
"""Returns the published date on the given post
"""
published = None
if postJsonObject.get('published'):
published = postJsonObject['published']
elif postJsonObject.get('object'):
if isinstance(postJsonObject['object'], dict):
if postJsonObject['object'].get('published'):
published = postJsonObject['object']['published']
if not published:
return None
if not isinstance(published, str):
return None
return published
def getReplyIntervalHours(baseDir: str, nickname: str, domain: str,
defaultReplyIntervalHours: int) -> int:
"""Returns the reply interval for the given account.
The reply interval is the number of hours after a post being made
during which replies are allowed
"""
replyIntervalFilename = \
acctDir(baseDir, nickname, domain) + '/.replyIntervalHours'
if os.path.isfile(replyIntervalFilename):
with open(replyIntervalFilename, 'r') as fp:
hoursStr = fp.read()
if hoursStr.isdigit():
return int(hoursStr)
return defaultReplyIntervalHours
def setReplyIntervalHours(baseDir: str, nickname: str, domain: str,
replyIntervalHours: int) -> bool:
"""Sets the reply interval for the given account.
The reply interval is the number of hours after a post being made
during which replies are allowed
"""
replyIntervalFilename = \
acctDir(baseDir, nickname, domain) + '/.replyIntervalHours'
with open(replyIntervalFilename, 'w+') as fp:
try:
fp.write(str(replyIntervalHours))
return True
except BaseException:
pass
return False
def canReplyTo(baseDir: str, nickname: str, domain: str,
postUrl: str, replyIntervalHours: int,
2021-09-08 20:12:03 +00:00
currDateStr: str = None,
postJsonObject: {} = None) -> bool:
"""Is replying to the given post permitted?
This is a spam mitigation feature, so that spammers can't
add a lot of replies to old post which you don't notice.
"""
if '/statuses/' not in postUrl:
return True
2021-09-08 20:12:03 +00:00
if not postJsonObject:
postFilename = locatePost(baseDir, nickname, domain, postUrl)
if not postFilename:
return False
postJsonObject = loadJson(postFilename)
if not postJsonObject:
return False
published = _getPublishedDate(postJsonObject)
if not published:
return False
try:
pubDate = datetime.datetime.strptime(published, '%Y-%m-%dT%H:%M:%SZ')
except BaseException:
return False
if not currDateStr:
currDate = datetime.datetime.utcnow()
else:
try:
currDate = datetime.datetime.strptime(currDateStr,
'%Y-%m-%dT%H:%M:%SZ')
except BaseException:
return False
hoursSincePublication = int((currDate - pubDate).total_seconds() / 3600)
if hoursSincePublication < 0 or \
hoursSincePublication >= replyIntervalHours:
return False
return True
def _removeAttachment(baseDir: str, httpPrefix: str, domain: str,
postJson: {}):
2019-07-14 16:57:06 +00:00
if not postJson.get('attachment'):
return
if not postJson['attachment'][0].get('url'):
return
2020-04-04 13:44:49 +00:00
attachmentUrl = postJson['attachment'][0]['url']
2019-07-14 16:57:06 +00:00
if not attachmentUrl:
return
2020-04-04 13:44:49 +00:00
mediaFilename = baseDir + '/' + \
attachmentUrl.replace(httpPrefix + '://' + domain + '/', '')
2019-07-14 16:57:06 +00:00
if os.path.isfile(mediaFilename):
try:
os.remove(mediaFilename)
except BaseException:
pass
2020-04-04 13:44:49 +00:00
etagFilename = mediaFilename + '.etag'
2019-12-04 13:00:34 +00:00
if os.path.isfile(etagFilename):
try:
os.remove(etagFilename)
except BaseException:
pass
2020-04-04 13:44:49 +00:00
postJson['attachment'] = []
2019-07-14 16:57:06 +00:00
2020-04-04 13:44:49 +00:00
def removeModerationPostFromIndex(baseDir: str, postUrl: str,
debug: bool) -> None:
2019-08-12 18:02:29 +00:00
"""Removes a url from the moderation index
"""
2020-04-04 13:44:49 +00:00
moderationIndexFile = baseDir + '/accounts/moderation.txt'
2019-08-12 18:02:29 +00:00
if not os.path.isfile(moderationIndexFile):
return
2020-08-23 11:13:35 +00:00
postId = removeIdEnding(postUrl)
2019-08-12 18:02:29 +00:00
if postId in open(moderationIndexFile).read():
2021-07-13 14:40:49 +00:00
with open(moderationIndexFile, 'r') as f:
2020-04-04 13:44:49 +00:00
lines = f.readlines()
2021-07-13 14:40:49 +00:00
with open(moderationIndexFile, 'w+') as f:
2019-08-12 18:02:29 +00:00
for line in lines:
2020-05-22 11:32:38 +00:00
if line.strip("\n").strip("\r") != postId:
2019-08-12 18:02:29 +00:00
f.write(line)
else:
if debug:
2020-04-04 13:44:49 +00:00
print('DEBUG: removed ' + postId +
' from moderation index')
2019-08-12 18:02:29 +00:00
def _isReplyToBlogPost(baseDir: str, nickname: str, domain: str,
postJsonObject: str):
"""Is the given post a reply to a blog post?
"""
if not hasObjectDict(postJsonObject):
return False
if not postJsonObject['object'].get('inReplyTo'):
return False
2020-08-28 14:45:07 +00:00
if not isinstance(postJsonObject['object']['inReplyTo'], str):
return False
2021-07-13 21:59:53 +00:00
blogsIndexFilename = acctDir(baseDir, nickname, domain) + '/tlblogs.index'
if not os.path.isfile(blogsIndexFilename):
return False
2020-08-23 11:13:35 +00:00
postId = removeIdEnding(postJsonObject['object']['inReplyTo'])
postId = postId.replace('/', '#')
if postId in open(blogsIndexFilename).read():
return True
return False
2021-07-05 09:24:29 +00:00
def _deletePostRemoveReplies(baseDir: str, nickname: str, domain: str,
httpPrefix: str, postFilename: str,
recentPostsCache: {}, debug: bool) -> None:
"""Removes replies when deleting a post
"""
repliesFilename = postFilename.replace('.json', '.replies')
if not os.path.isfile(repliesFilename):
return
if debug:
print('DEBUG: removing replies to ' + postFilename)
with open(repliesFilename, 'r') as f:
for replyId in f:
replyFile = locatePost(baseDir, nickname, domain, replyId)
if not replyFile:
continue
if os.path.isfile(replyFile):
deletePost(baseDir, httpPrefix,
nickname, domain, replyFile, debug,
recentPostsCache)
# remove the replies file
try:
os.remove(repliesFilename)
except BaseException:
pass
2021-07-05 09:24:29 +00:00
def _isBookmarked(baseDir: str, nickname: str, domain: str,
postFilename: str) -> bool:
"""Returns True if the given post is bookmarked
"""
bookmarksIndexFilename = \
2021-07-13 21:59:53 +00:00
acctDir(baseDir, nickname, domain) + '/bookmarks.index'
2021-07-05 09:24:29 +00:00
if os.path.isfile(bookmarksIndexFilename):
bookmarkIndex = postFilename.split('/')[-1] + '\n'
if bookmarkIndex in open(bookmarksIndexFilename).read():
return True
return False
2021-07-05 10:09:11 +00:00
def removePostFromCache(postJsonObject: {}, recentPostsCache: {}) -> None:
""" if the post exists in the recent posts cache then remove it
2021-07-05 09:45:55 +00:00
"""
if not recentPostsCache:
return
2021-07-05 10:09:11 +00:00
if not postJsonObject.get('id'):
return
if not recentPostsCache.get('index'):
return
postId = postJsonObject['id']
if '#' in postId:
postId = postId.split('#', 1)[0]
postId = removeIdEnding(postId).replace('/', '#')
if postId not in recentPostsCache['index']:
return
2021-07-05 09:45:55 +00:00
if recentPostsCache.get('index'):
if postId in recentPostsCache['index']:
recentPostsCache['index'].remove(postId)
if recentPostsCache.get('json'):
if recentPostsCache['json'].get(postId):
del recentPostsCache['json'][postId]
if recentPostsCache.get('html'):
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
def _deleteCachedHtml(baseDir: str, nickname: str, domain: str,
postJsonObject: {}):
"""Removes cached html file for the given post
"""
cachedPostFilename = \
getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
try:
os.remove(cachedPostFilename)
except BaseException:
pass
2021-07-05 09:45:55 +00:00
def _deleteHashtagsOnPost(baseDir: str, postJsonObject: {}) -> None:
"""Removes hashtags when a post is deleted
"""
removeHashtagIndex = False
2021-07-05 09:51:07 +00:00
if hasObjectDict(postJsonObject):
2021-07-05 09:45:55 +00:00
if postJsonObject['object'].get('content'):
if '#' in postJsonObject['object']['content']:
removeHashtagIndex = True
if not removeHashtagIndex:
return
if not postJsonObject['object'].get('id') or \
not postJsonObject['object'].get('tag'):
return
# get the id of the post
postId = removeIdEnding(postJsonObject['object']['id'])
for tag in postJsonObject['object']['tag']:
if tag['type'] != 'Hashtag':
continue
if not tag.get('name'):
continue
# find the index file for this tag
tagIndexFilename = baseDir + '/tags/' + tag['name'][1:] + '.txt'
if not os.path.isfile(tagIndexFilename):
continue
# remove postId from the tag index file
lines = None
2021-07-13 14:40:49 +00:00
with open(tagIndexFilename, 'r') as f:
2021-07-05 09:45:55 +00:00
lines = f.readlines()
if not lines:
continue
newlines = ''
for fileLine in lines:
if postId in fileLine:
2021-07-05 09:55:01 +00:00
# skip over the deleted post
2021-07-05 09:45:55 +00:00
continue
newlines += fileLine
if not newlines.strip():
# if there are no lines then remove the hashtag file
try:
os.remove(tagIndexFilename)
except BaseException:
pass
2021-07-05 09:45:55 +00:00
else:
2021-07-05 09:55:01 +00:00
# write the new hashtag index without the given post in it
2021-07-13 14:40:49 +00:00
with open(tagIndexFilename, 'w+') as f:
2021-07-05 09:45:55 +00:00
f.write(newlines)
2021-08-12 10:22:04 +00:00
def _deleteConversationPost(baseDir: str, nickname: str, domain: str,
postJsonObject: {}) -> None:
"""Deletes a post from a conversation
"""
if not hasObjectDict(postJsonObject):
return False
if not postJsonObject['object'].get('conversation'):
return False
if not postJsonObject['object'].get('id'):
return False
conversationDir = acctDir(baseDir, nickname, domain) + '/conversation'
conversationId = postJsonObject['object']['conversation']
conversationId = conversationId.replace('/', '#')
postId = postJsonObject['object']['id']
2021-08-12 14:16:13 +00:00
conversationFilename = conversationDir + '/' + conversationId
2021-08-12 10:22:04 +00:00
if not os.path.isfile(conversationFilename):
return False
conversationStr = ''
with open(conversationFilename, 'r') as fp:
conversationStr = fp.read()
if postId + '\n' not in conversationStr:
return False
conversationStr = conversationStr.replace(postId + '\n', '')
if conversationStr:
with open(conversationFilename, 'w+') as fp:
fp.write(conversationStr)
else:
if os.path.isfile(conversationFilename + '.muted'):
try:
os.remove(conversationFilename + '.muted')
except BaseException:
pass
try:
os.remove(conversationFilename)
except BaseException:
pass
2021-08-12 10:22:04 +00:00
2020-04-04 13:44:49 +00:00
def deletePost(baseDir: str, httpPrefix: str,
nickname: str, domain: str, postFilename: str,
debug: bool, recentPostsCache: {}) -> None:
2019-07-14 16:37:01 +00:00
"""Recursively deletes a post and its replies and attachments
"""
2020-04-04 13:44:49 +00:00
postJsonObject = loadJson(postFilename, 1)
2021-07-05 09:24:29 +00:00
if not postJsonObject:
# remove any replies
_deletePostRemoveReplies(baseDir, nickname, domain,
httpPrefix, postFilename,
recentPostsCache, debug)
# finally, remove the post itself
try:
os.remove(postFilename)
except BaseException:
pass
2021-07-05 09:24:29 +00:00
return
2021-07-05 09:24:29 +00:00
# don't allow deletion of bookmarked posts
if _isBookmarked(baseDir, nickname, domain, postFilename):
return
# don't remove replies to blog posts
if _isReplyToBlogPost(baseDir, nickname, domain,
postJsonObject):
return
# remove from recent posts cache in memory
2021-07-05 10:09:11 +00:00
removePostFromCache(postJsonObject, recentPostsCache)
2021-07-05 09:24:29 +00:00
2021-08-12 10:22:04 +00:00
# remove from conversation index
_deleteConversationPost(baseDir, nickname, domain, postJsonObject)
2021-07-05 09:24:29 +00:00
# remove any attachment
_removeAttachment(baseDir, httpPrefix, domain, postJsonObject)
extensions = ('votes', 'arrived', 'muted', 'tts', 'reject')
for ext in extensions:
extFilename = postFilename + '.' + ext
if os.path.isfile(extFilename):
try:
os.remove(extFilename)
except BaseException:
pass
2021-07-05 09:24:29 +00:00
# remove cached html version of the post
2021-07-05 09:45:55 +00:00
_deleteCachedHtml(baseDir, nickname, domain, postJsonObject)
2021-07-05 09:24:29 +00:00
hasObject = False
if postJsonObject.get('object'):
hasObject = True
# remove from moderation index file
if hasObject:
2021-07-06 10:00:19 +00:00
if hasObjectDict(postJsonObject):
2021-07-05 09:24:29 +00:00
if postJsonObject['object'].get('moderationStatus'):
if postJsonObject.get('id'):
postId = removeIdEnding(postJsonObject['id'])
removeModerationPostFromIndex(baseDir, postId, debug)
# remove any hashtags index entries
if hasObject:
2021-07-05 09:45:55 +00:00
_deleteHashtagsOnPost(baseDir, postJsonObject)
2019-07-14 17:02:41 +00:00
# remove any replies
2021-07-05 09:24:29 +00:00
_deletePostRemoveReplies(baseDir, nickname, domain,
httpPrefix, postFilename,
recentPostsCache, debug)
2019-07-14 17:02:41 +00:00
# finally, remove the post itself
try:
os.remove(postFilename)
except BaseException:
pass
2019-07-27 22:48:34 +00:00
2020-04-04 13:44:49 +00:00
def isValidLanguage(text: str) -> bool:
"""Returns true if the given text contains a valid
natural language string
"""
naturalLanguages = {
"Latin": [65, 866],
"Cyrillic": [1024, 1274],
"Greek": [880, 1280],
"isArmenian": [1328, 1424],
"isHebrew": [1424, 1536],
"Arabic": [1536, 1792],
"Syriac": [1792, 1872],
"Thaan": [1920, 1984],
"Devanagari": [2304, 2432],
"Bengali": [2432, 2560],
"Gurmukhi": [2560, 2688],
"Gujarati": [2688, 2816],
"Oriya": [2816, 2944],
"Tamil": [2944, 3072],
"Telugu": [3072, 3200],
"Kannada": [3200, 3328],
"Malayalam": [3328, 3456],
"Sinhala": [3456, 3584],
"Thai": [3584, 3712],
"Lao": [3712, 3840],
"Tibetan": [3840, 4096],
"Myanmar": [4096, 4256],
"Georgian": [4256, 4352],
"HangulJamo": [4352, 4608],
"Cherokee": [5024, 5120],
"UCAS": [5120, 5760],
"Ogham": [5760, 5792],
"Runic": [5792, 5888],
"Khmer": [6016, 6144],
"Mongolian": [6144, 6320]
}
for langName, langRange in naturalLanguages.items():
okLang = True
for ch in text:
if ch.isdigit():
continue
if ord(ch) not in range(langRange[0], langRange[1]):
okLang = False
break
if okLang:
return True
return False
def _getReservedWords() -> str:
return ('inbox', 'dm', 'outbox', 'following',
'public', 'followers', 'category',
2021-09-13 13:57:37 +00:00
'channel', 'calendar', 'video-channels',
'tlreplies', 'tlmedia', 'tlblogs',
'tlblogs', 'tlfeatures',
'moderation', 'moderationaction',
'activity', 'undo', 'pinned',
2021-09-02 13:02:07 +00:00
'actor', 'Actor',
'reply', 'replies', 'question', 'like',
'likes', 'users', 'statuses', 'tags',
'accounts', 'headers',
'channels', 'profile', 'u', 'c',
'updates', 'repeat', 'announce',
'shares', 'fonts', 'icons', 'avatars',
'welcome', 'helpimages',
'bookmark', 'bookmarks', 'tlbookmarks',
'ignores', 'linksmobile', 'newswiremobile',
'minimal', 'search', 'eventdelete',
2021-08-08 17:05:26 +00:00
'searchemoji', 'catalog', 'conversationId',
2021-09-12 16:04:45 +00:00
'mention', 'http', 'https',
'ontologies', 'data')
def getNicknameValidationPattern() -> str:
"""Returns a html text input validation pattern for nickname
"""
reservedNames = _getReservedWords()
pattern = ''
for word in reservedNames:
if pattern:
2021-07-29 13:27:29 +00:00
pattern += '(?!.*\\b' + word + '\\b)'
else:
2021-07-29 13:27:29 +00:00
pattern = '^(?!.*\\b' + word + '\\b)'
return pattern + '.*${1,30}'
def _isReservedName(nickname: str) -> bool:
"""Is the given nickname reserved for some special function?
"""
reservedNames = _getReservedWords()
2019-07-27 22:48:34 +00:00
if nickname in reservedNames:
return True
return False
def validNickname(domain: str, nickname: str) -> bool:
"""Is the given nickname valid?
"""
2021-07-29 14:24:29 +00:00
if len(nickname) == 0:
return False
if len(nickname) > 30:
return False
if not isValidLanguage(nickname):
return False
forbiddenChars = ('.', ' ', '/', '?', ':', ';', '@', '#', '!')
for c in forbiddenChars:
if c in nickname:
return False
# this should only apply for the shared inbox
if nickname == domain:
return False
if _isReservedName(nickname):
2019-07-27 22:48:34 +00:00
return False
return True
2019-08-08 11:24:26 +00:00
2020-04-04 13:44:49 +00:00
2019-08-08 11:24:26 +00:00
def noOfAccounts(baseDir: str) -> bool:
"""Returns the number of accounts on the system
"""
2020-04-04 13:44:49 +00:00
accountCtr = 0
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
2019-08-08 11:24:26 +00:00
for account in dirs:
2021-07-05 10:15:35 +00:00
if isAccountDir(account):
2021-04-21 16:09:56 +00:00
accountCtr += 1
2020-12-13 22:13:45 +00:00
break
2019-08-08 11:24:26 +00:00
return accountCtr
2019-08-10 11:31:42 +00:00
2020-04-04 13:44:49 +00:00
def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool:
2019-11-13 15:15:08 +00:00
"""Returns the number of accounts on the system this month
"""
2020-04-04 13:44:49 +00:00
accountCtr = 0
currTime = int(time.time())
monthSeconds = int(60*60*24*30*months)
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
2019-11-13 15:15:08 +00:00
for account in dirs:
2021-06-25 18:02:05 +00:00
if not isAccountDir(account):
continue
lastUsedFilename = \
baseDir + '/accounts/' + account + '/.lastUsed'
if not os.path.isfile(lastUsedFilename):
continue
with open(lastUsedFilename, 'r') as lastUsedFile:
lastUsed = lastUsedFile.read()
if lastUsed.isdigit():
timeDiff = (currTime - int(lastUsed))
if timeDiff < monthSeconds:
accountCtr += 1
2020-12-13 22:13:45 +00:00
break
2019-11-13 15:15:08 +00:00
return accountCtr
2020-04-04 13:44:49 +00:00
def isPublicPostFromUrl(baseDir: str, nickname: str, domain: str,
postUrl: str) -> bool:
"""Returns whether the given url is a public post
"""
2020-04-04 13:44:49 +00:00
postFilename = locatePost(baseDir, nickname, domain, postUrl)
if not postFilename:
return False
2020-04-04 13:44:49 +00:00
postJsonObject = loadJson(postFilename, 1)
if not postJsonObject:
return False
return isPublicPost(postJsonObject)
2020-04-04 13:44:49 +00:00
2019-08-10 11:31:42 +00:00
def isPublicPost(postJsonObject: {}) -> bool:
"""Returns true if the given post is public
"""
if not postJsonObject.get('type'):
return False
2020-04-04 13:44:49 +00:00
if postJsonObject['type'] != 'Create':
2019-08-10 11:31:42 +00:00
return False
if not hasObjectDict(postJsonObject):
2019-08-10 11:31:42 +00:00
return False
if not postJsonObject['object'].get('to'):
return False
for recipient in postJsonObject['object']['to']:
if recipient.endswith('#Public'):
return True
return False
2019-09-29 18:48:34 +00:00
2020-04-04 13:44:49 +00:00
2021-06-20 11:28:35 +00:00
def copytree(src: str, dst: str, symlinks: str = False, ignore: bool = None):
2019-09-29 18:48:34 +00:00
"""Copy a directory
"""
for item in os.listdir(src):
2020-04-04 13:44:49 +00:00
s = os.path.join(src, item)
d = os.path.join(dst, item)
2019-09-29 18:48:34 +00:00
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
2019-10-19 17:50:05 +00:00
2020-04-04 13:44:49 +00:00
def getCachedPostDirectory(baseDir: str, nickname: str, domain: str) -> str:
2019-10-19 17:50:05 +00:00
"""Returns the directory where the html post cache exists
"""
2021-07-13 21:59:53 +00:00
htmlPostCacheDir = acctDir(baseDir, nickname, domain) + '/postcache'
2019-10-19 17:50:05 +00:00
return htmlPostCacheDir
2020-04-04 13:44:49 +00:00
def getCachedPostFilename(baseDir: str, nickname: str, domain: str,
2019-10-19 17:50:05 +00:00
postJsonObject: {}) -> str:
"""Returns the html cache filename for the given post
"""
2020-04-04 13:44:49 +00:00
cachedPostDir = getCachedPostDirectory(baseDir, nickname, domain)
2019-11-29 23:04:37 +00:00
if not os.path.isdir(cachedPostDir):
2021-06-22 12:42:52 +00:00
# print('ERROR: invalid html cache directory ' + cachedPostDir)
2019-11-29 23:04:37 +00:00
return None
if '@' not in cachedPostDir:
2021-06-22 12:42:52 +00:00
# print('ERROR: invalid html cache directory ' + cachedPostDir)
2019-11-29 23:04:37 +00:00
return None
2020-09-05 16:35:50 +00:00
cachedPostId = removeIdEnding(postJsonObject['id'])
cachedPostFilename = cachedPostDir + '/' + cachedPostId.replace('/', '#')
return cachedPostFilename + '.html'
2019-11-24 13:46:28 +00:00
2020-04-04 13:44:49 +00:00
def updateRecentPostsCache(recentPostsCache: {}, maxRecentPosts: int,
postJsonObject: {}, htmlStr: str) -> None:
"""Store recent posts in memory so that they can be quickly recalled
"""
if not postJsonObject.get('id'):
return
2020-04-04 13:44:49 +00:00
postId = postJsonObject['id']
2019-11-26 10:43:37 +00:00
if '#' in postId:
2020-04-04 13:44:49 +00:00
postId = postId.split('#', 1)[0]
2020-08-23 11:13:35 +00:00
postId = removeIdEnding(postId).replace('/', '#')
if recentPostsCache.get('index'):
if postId in recentPostsCache['index']:
return
recentPostsCache['index'].append(postId)
2020-04-04 13:44:49 +00:00
postJsonObject['muted'] = False
recentPostsCache['json'][postId] = json.dumps(postJsonObject)
recentPostsCache['html'][postId] = htmlStr
2020-04-04 13:44:49 +00:00
while len(recentPostsCache['html'].items()) > maxRecentPosts:
2020-08-26 12:42:17 +00:00
postId = recentPostsCache['index'][0]
recentPostsCache['index'].pop(0)
if recentPostsCache['json'].get(postId):
del recentPostsCache['json'][postId]
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
else:
2020-04-04 13:44:49 +00:00
recentPostsCache['index'] = [postId]
recentPostsCache['json'] = {}
recentPostsCache['html'] = {}
recentPostsCache['json'][postId] = json.dumps(postJsonObject)
recentPostsCache['html'][postId] = htmlStr
2020-02-21 10:19:02 +00:00
def fileLastModified(filename: str) -> str:
"""Returns the date when a file was last modified
"""
2020-04-04 13:44:49 +00:00
t = os.path.getmtime(filename)
modifiedTime = datetime.datetime.fromtimestamp(t)
2020-02-21 10:19:02 +00:00
return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ")
2020-02-22 16:00:27 +00:00
2020-04-04 13:44:49 +00:00
2020-10-29 12:48:58 +00:00
def getCSS(baseDir: str, cssFilename: str, cssCache: {}) -> str:
"""Retrieves the css for a given file, or from a cache
"""
# does the css file exist?
if not os.path.isfile(cssFilename):
return None
lastModified = fileLastModified(cssFilename)
# has this already been loaded into the cache?
if cssCache.get(cssFilename):
if cssCache[cssFilename][0] == lastModified:
# file hasn't changed, so return the version in the cache
return cssCache[cssFilename][1]
with open(cssFilename, 'r') as fpCSS:
css = fpCSS.read()
2020-10-29 12:48:58 +00:00
if cssCache.get(cssFilename):
# alter the cache contents
cssCache[cssFilename][0] = lastModified
cssCache[cssFilename][1] = css
else:
# add entry to the cache
cssCache[cssFilename] = [lastModified, css]
return css
return None
2020-02-24 23:14:49 +00:00
def isBlogPost(postJsonObject: {}) -> bool:
"""Is the given post a blog post?
"""
2020-04-04 13:44:49 +00:00
if postJsonObject['type'] != 'Create':
2020-02-24 23:14:49 +00:00
return False
if not hasObjectDict(postJsonObject):
2020-02-24 23:14:49 +00:00
return False
if not postJsonObject['object'].get('type'):
return False
if not postJsonObject['object'].get('content'):
return False
2020-04-04 13:44:49 +00:00
if postJsonObject['object']['type'] != 'Article':
2020-02-24 23:14:49 +00:00
return False
2020-03-22 21:16:02 +00:00
return True
2020-04-11 10:19:35 +00:00
def isNewsPost(postJsonObject: {}) -> bool:
"""Is the given post a blog post?
"""
return postJsonObject.get('news')
2021-05-03 22:31:06 +00:00
def _searchVirtualBoxPosts(baseDir: str, nickname: str, domain: str,
searchStr: str, maxResults: int,
boxName: str) -> []:
"""Searches through a virtual box, which is typically an index on the inbox
"""
indexFilename = \
2021-07-13 21:59:53 +00:00
acctDir(baseDir, nickname, domain) + '/' + boxName + '.index'
2021-05-03 22:31:06 +00:00
if boxName == 'bookmarks':
boxName = 'inbox'
2021-07-13 21:59:53 +00:00
path = acctDir(baseDir, nickname, domain) + '/' + boxName
2021-05-03 22:31:06 +00:00
if not os.path.isdir(path):
return []
searchStr = searchStr.lower().strip()
if '+' in searchStr:
searchWords = searchStr.split('+')
for index in range(len(searchWords)):
searchWords[index] = searchWords[index].strip()
print('SEARCH: ' + str(searchWords))
else:
searchWords = [searchStr]
res = []
with open(indexFilename, 'r') as indexFile:
postFilename = 'start'
while postFilename:
postFilename = indexFile.readline()
if not postFilename:
break
if '.json' not in postFilename:
break
postFilename = path + '/' + postFilename.strip()
if not os.path.isfile(postFilename):
continue
with open(postFilename, 'r') as postFile:
data = postFile.read().lower()
2021-05-03 22:31:06 +00:00
notFound = False
for keyword in searchWords:
if keyword not in data:
notFound = True
break
if notFound:
continue
res.append(postFilename)
if len(res) >= maxResults:
return res
return res
2020-04-11 10:19:35 +00:00
def searchBoxPosts(baseDir: str, nickname: str, domain: str,
2020-04-11 13:20:52 +00:00
searchStr: str, maxResults: int,
2020-04-11 10:19:35 +00:00
boxName='outbox') -> []:
2020-04-11 13:20:52 +00:00
"""Search your posts and return a list of the filenames
containing matching strings
2020-04-11 10:19:35 +00:00
"""
2021-07-13 21:59:53 +00:00
path = acctDir(baseDir, nickname, domain) + '/' + boxName
2021-07-05 10:22:23 +00:00
# is this a virtual box, such as direct messages?
2020-04-11 10:19:35 +00:00
if not os.path.isdir(path):
2021-05-03 22:31:06 +00:00
if os.path.isfile(path + '.index'):
return _searchVirtualBoxPosts(baseDir, nickname, domain,
searchStr, maxResults, boxName)
2020-04-11 10:19:35 +00:00
return []
2020-04-11 13:38:28 +00:00
searchStr = searchStr.lower().strip()
2020-04-11 13:20:52 +00:00
if '+' in searchStr:
searchWords = searchStr.split('+')
for index in range(len(searchWords)):
searchWords[index] = searchWords[index].strip()
2020-04-11 13:45:53 +00:00
print('SEARCH: ' + str(searchWords))
else:
searchWords = [searchStr]
2020-04-11 10:19:35 +00:00
res = []
for root, dirs, fnames in os.walk(path):
for fname in fnames:
filePath = os.path.join(root, fname)
with open(filePath, 'r') as postFile:
data = postFile.read().lower()
2020-04-11 13:35:22 +00:00
notFound = False
for keyword in searchWords:
if keyword not in data:
2020-04-11 13:35:22 +00:00
notFound = True
2020-04-11 13:45:53 +00:00
break
2020-04-11 13:35:22 +00:00
if notFound:
continue
2020-04-11 13:14:53 +00:00
2020-04-11 10:19:35 +00:00
res.append(filePath)
if len(res) >= maxResults:
return res
2020-12-13 22:13:45 +00:00
break
2020-04-11 10:19:35 +00:00
return res
2020-05-04 18:24:30 +00:00
2020-05-04 18:29:30 +00:00
def getFileCaseInsensitive(path: str) -> str:
2020-05-04 18:24:30 +00:00
"""Returns a case specific filename given a case insensitive version of it
"""
2020-08-29 11:14:19 +00:00
if os.path.isfile(path):
return path
if path != path.lower():
if os.path.isfile(path.lower()):
return path.lower()
2020-08-29 19:54:30 +00:00
return None
2020-06-06 18:16:16 +00:00
def undoLikesCollectionEntry(recentPostsCache: {},
baseDir: str, postFilename: str, objectUrl: str,
actor: str, domain: str, debug: bool) -> None:
"""Undoes a like for a particular actor
"""
postJsonObject = loadJson(postFilename)
2021-07-05 10:22:23 +00:00
if not postJsonObject:
return
# remove any cached version of this post so that the
# like icon is changed
nickname = getNicknameFromActor(actor)
cachedPostFilename = getCachedPostFilename(baseDir, nickname,
domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
try:
os.remove(cachedPostFilename)
except BaseException:
pass
2021-07-05 10:22:23 +00:00
removePostFromCache(postJsonObject, recentPostsCache)
if not postJsonObject.get('type'):
return
if postJsonObject['type'] != 'Create':
return
if not hasObjectDict(postJsonObject):
if debug:
pprint(postJsonObject)
print('DEBUG: post ' + objectUrl + ' has no object')
return
if not postJsonObject['object'].get('likes'):
return
if not isinstance(postJsonObject['object']['likes'], dict):
return
if not postJsonObject['object']['likes'].get('items'):
return
totalItems = 0
if postJsonObject['object']['likes'].get('totalItems'):
totalItems = postJsonObject['object']['likes']['totalItems']
itemFound = False
for likeItem in postJsonObject['object']['likes']['items']:
if likeItem.get('actor'):
if likeItem['actor'] == actor:
2020-06-06 18:16:16 +00:00
if debug:
2021-07-05 10:22:23 +00:00
print('DEBUG: like was removed for ' + actor)
postJsonObject['object']['likes']['items'].remove(likeItem)
itemFound = True
break
if not itemFound:
return
if totalItems == 1:
if debug:
print('DEBUG: likes was removed from post')
del postJsonObject['object']['likes']
else:
itlen = len(postJsonObject['object']['likes']['items'])
postJsonObject['object']['likes']['totalItems'] = itlen
2020-06-06 18:16:16 +00:00
2021-07-05 10:22:23 +00:00
saveJson(postJsonObject, postFilename)
2020-06-06 18:16:16 +00:00
def undoAnnounceCollectionEntry(recentPostsCache: {},
baseDir: str, postFilename: str,
actor: str, domain: str, debug: bool) -> None:
"""Undoes an announce for a particular actor by removing it from
the "shares" collection within a post. Note that the "shares"
collection has no relation to shared items in shares.py. It's
shares of posts, not shares of physical objects.
"""
postJsonObject = loadJson(postFilename)
2021-07-05 10:25:21 +00:00
if not postJsonObject:
return
# remove any cached version of this announce so that the announce
# icon is changed
nickname = getNicknameFromActor(actor)
cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain,
postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
try:
os.remove(cachedPostFilename)
except BaseException:
pass
2021-07-05 10:25:21 +00:00
removePostFromCache(postJsonObject, recentPostsCache)
if not postJsonObject.get('type'):
return
if postJsonObject['type'] != 'Create':
return
if not hasObjectDict(postJsonObject):
if debug:
pprint(postJsonObject)
print('DEBUG: post has no object')
return
if not postJsonObject['object'].get('shares'):
return
if not postJsonObject['object']['shares'].get('items'):
return
totalItems = 0
if postJsonObject['object']['shares'].get('totalItems'):
totalItems = postJsonObject['object']['shares']['totalItems']
itemFound = False
for announceItem in postJsonObject['object']['shares']['items']:
if announceItem.get('actor'):
if announceItem['actor'] == actor:
if debug:
2021-07-05 10:25:21 +00:00
print('DEBUG: Announce was removed for ' + actor)
anIt = announceItem
postJsonObject['object']['shares']['items'].remove(anIt)
itemFound = True
break
if not itemFound:
return
if totalItems == 1:
if debug:
print('DEBUG: shares (announcements) ' +
'was removed from post')
del postJsonObject['object']['shares']
else:
itlen = len(postJsonObject['object']['shares']['items'])
postJsonObject['object']['shares']['totalItems'] = itlen
2021-07-05 10:25:21 +00:00
saveJson(postJsonObject, postFilename)
def updateAnnounceCollection(recentPostsCache: {},
baseDir: str, postFilename: str,
actor: str,
nickname: str, domain: str, debug: bool) -> None:
"""Updates the announcements collection within a post
Confusingly this is known as "shares", but isn't the
same as shared items within shares.py
It's shares of posts, not shares of physical objects.
"""
postJsonObject = loadJson(postFilename)
2021-05-07 15:58:39 +00:00
if not postJsonObject:
return
# remove any cached version of this announce so that the announce
# icon is changed
cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain,
postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
try:
os.remove(cachedPostFilename)
except BaseException:
pass
2021-05-07 15:58:39 +00:00
removePostFromCache(postJsonObject, recentPostsCache)
if not hasObjectDict(postJsonObject):
2021-05-07 15:58:39 +00:00
if debug:
pprint(postJsonObject)
print('DEBUG: post ' + postFilename + ' has no object')
return
postUrl = removeIdEnding(postJsonObject['id']) + '/shares'
if not postJsonObject['object'].get('shares'):
if debug:
print('DEBUG: Adding initial shares (announcements) to ' +
postUrl)
announcementsJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'id': postUrl,
'type': 'Collection',
"totalItems": 1,
'items': [{
'type': 'Announce',
'actor': actor
}]
}
postJsonObject['object']['shares'] = announcementsJson
else:
if postJsonObject['object']['shares'].get('items'):
sharesItems = postJsonObject['object']['shares']['items']
for announceItem in sharesItems:
if announceItem.get('actor'):
if announceItem['actor'] == actor:
return
newAnnounce = {
'type': 'Announce',
'actor': actor
}
2021-05-07 15:58:39 +00:00
postJsonObject['object']['shares']['items'].append(newAnnounce)
itlen = len(postJsonObject['object']['shares']['items'])
postJsonObject['object']['shares']['totalItems'] = itlen
else:
2021-05-07 15:58:39 +00:00
if debug:
print('DEBUG: shares (announcements) section of post ' +
'has no items list')
2021-05-07 15:58:39 +00:00
if debug:
print('DEBUG: saving post with shares (announcements) added')
pprint(postJsonObject)
saveJson(postJsonObject, postFilename)
2020-06-22 16:55:19 +00:00
2020-11-09 19:41:01 +00:00
def weekDayOfMonthStart(monthNumber: int, year: int) -> int:
"""Gets the day number of the first day of the month
1=sun, 7=sat
"""
2020-11-09 20:15:17 +00:00
firstDayOfMonth = datetime.datetime(year, monthNumber, 1, 0, 0)
2020-11-09 19:41:01 +00:00
return int(firstDayOfMonth.strftime("%w")) + 1
2020-11-13 13:34:14 +00:00
def mediaFileMimeType(filename: str) -> str:
"""Given a media filename return its mime type
"""
if '.' not in filename:
return 'image/png'
extensions = {
'json': 'application/json',
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
2021-01-11 22:27:57 +00:00
'svg': 'image/svg+xml',
2020-11-13 13:34:14 +00:00
'webp': 'image/webp',
'avif': 'image/avif',
'mp3': 'audio/mpeg',
'ogg': 'audio/ogg',
2021-08-03 09:09:04 +00:00
'flac': 'audio/flac',
2020-11-13 13:34:14 +00:00
'mp4': 'video/mp4',
'ogv': 'video/ogv'
}
fileExt = filename.split('.')[-1]
if not extensions.get(fileExt):
return 'image/png'
return extensions[fileExt]
2021-07-13 15:49:29 +00:00
def isRecentPost(postJsonObject: {}, maxDays: int = 3) -> bool:
""" Is the given post recent?
"""
if not hasObjectDict(postJsonObject):
return False
if not postJsonObject['object'].get('published'):
return False
if not isinstance(postJsonObject['object']['published'], str):
return False
currTime = datetime.datetime.utcnow()
daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
recently = daysSinceEpoch - maxDays
publishedDateStr = postJsonObject['object']['published']
try:
publishedDate = \
datetime.datetime.strptime(publishedDateStr,
"%Y-%m-%dT%H:%M:%SZ")
except BaseException:
return False
publishedDaysSinceEpoch = \
(publishedDate - datetime.datetime(1970, 1, 1)).days
if publishedDaysSinceEpoch < recently:
return False
return True
def camelCaseSplit(text: str) -> str:
""" Splits CamelCase into "Camel Case"
"""
matches = re.finditer('.+?(?:(?<=[a-z])(?=[A-Z])|' +
'(?<=[A-Z])(?=[A-Z][a-z])|$)', text)
if not matches:
return text
resultStr = ''
for word in matches:
resultStr += word.group(0) + ' '
return resultStr.strip()
2021-03-05 19:00:37 +00:00
def rejectPostId(baseDir: str, nickname: str, domain: str,
postId: str, recentPostsCache: {}) -> None:
""" Marks the given post as rejected,
for example an announce which is too old
2021-03-05 19:00:37 +00:00
"""
postFilename = locatePost(baseDir, nickname, domain, postId)
if not postFilename:
return
if recentPostsCache.get('index'):
# if this is a full path then remove the directories
indexFilename = postFilename
if '/' in postFilename:
indexFilename = postFilename.split('/')[-1]
# filename of the post without any extension or path
# This should also correspond to any index entry in
# the posts cache
postUrl = \
indexFilename.replace('\n', '').replace('\r', '')
postUrl = postUrl.replace('.json', '').strip()
if postUrl in recentPostsCache['index']:
if recentPostsCache['json'].get(postUrl):
del recentPostsCache['json'][postUrl]
if recentPostsCache['html'].get(postUrl):
del recentPostsCache['html'][postUrl]
2021-06-22 12:27:10 +00:00
with open(postFilename + '.reject', 'w+') as rejectFile:
rejectFile.write('\n')
def isDM(postJsonObject: {}) -> bool:
"""Returns true if the given post is a DM
"""
if postJsonObject['type'] != 'Create':
return False
if not hasObjectDict(postJsonObject):
return False
if postJsonObject['object']['type'] != 'Note' and \
postJsonObject['object']['type'] != 'Patch' and \
postJsonObject['object']['type'] != 'EncryptedMessage' and \
postJsonObject['object']['type'] != 'Article':
return False
if postJsonObject['object'].get('moderationStatus'):
return False
fields = ('to', 'cc')
for f in fields:
if not postJsonObject['object'].get(f):
continue
for toAddress in postJsonObject['object'][f]:
if toAddress.endswith('#Public'):
return False
if toAddress.endswith('followers'):
return False
return True
def isReply(postJsonObject: {}, actor: str) -> bool:
"""Returns true if the given post is a reply to the given actor
"""
if postJsonObject['type'] != 'Create':
return False
if not hasObjectDict(postJsonObject):
return False
if postJsonObject['object'].get('moderationStatus'):
return False
if postJsonObject['object']['type'] != 'Note' and \
postJsonObject['object']['type'] != 'EncryptedMessage' and \
postJsonObject['object']['type'] != 'Article':
return False
if postJsonObject['object'].get('inReplyTo'):
if isinstance(postJsonObject['object']['inReplyTo'], str):
if postJsonObject['object']['inReplyTo'].startswith(actor):
return True
if not postJsonObject['object'].get('tag'):
return False
if not isinstance(postJsonObject['object']['tag'], list):
return False
for tag in postJsonObject['object']['tag']:
if not tag.get('type'):
continue
if tag['type'] == 'Mention':
if not tag.get('href'):
continue
if actor in tag['href']:
return True
return False
2021-03-12 12:04:34 +00:00
def containsPGPPublicKey(content: str) -> bool:
"""Returns true if the given content contains a PGP public key
"""
if '--BEGIN PGP PUBLIC KEY BLOCK--' in content:
if '--END PGP PUBLIC KEY BLOCK--' in content:
return True
return False
def isPGPEncrypted(content: str) -> bool:
"""Returns true if the given content is PGP encrypted
"""
if '--BEGIN PGP MESSAGE--' in content:
if '--END PGP MESSAGE--' in content:
return True
return False
2021-03-18 17:27:46 +00:00
def loadTranslationsFromFile(baseDir: str, language: str) -> ({}, str):
"""Returns the translations dictionary
"""
if not os.path.isdir(baseDir + '/translations'):
print('ERROR: translations directory not found')
return
if not language:
systemLanguage = locale.getdefaultlocale()[0]
else:
systemLanguage = language
if not systemLanguage:
systemLanguage = 'en'
if '_' in systemLanguage:
systemLanguage = systemLanguage.split('_')[0]
while '/' in systemLanguage:
systemLanguage = systemLanguage.split('/')[1]
if '.' in systemLanguage:
systemLanguage = systemLanguage.split('.')[0]
translationsFile = baseDir + '/translations/' + \
systemLanguage + '.json'
if not os.path.isfile(translationsFile):
systemLanguage = 'en'
translationsFile = baseDir + '/translations/' + \
systemLanguage + '.json'
return loadJson(translationsFile), systemLanguage
2021-04-22 09:27:20 +00:00
def dmAllowedFromDomain(baseDir: str,
nickname: str, domain: str,
2021-04-22 10:54:49 +00:00
sendingActorDomain: str) -> bool:
2021-04-22 09:27:20 +00:00
"""When a DM is received and the .followDMs flag file exists
Then optionally some domains can be specified as allowed,
regardless of individual follows.
i.e. Mostly you only want DMs from followers, but there are
a few particular instances that you trust
"""
dmAllowedInstancesFilename = \
2021-07-13 21:59:53 +00:00
acctDir(baseDir, nickname, domain) + '/dmAllowedInstances.txt'
if not os.path.isfile(dmAllowedInstancesFilename):
2021-04-22 09:27:20 +00:00
return False
if sendingActorDomain + '\n' in open(dmAllowedInstancesFilename).read():
2021-04-22 09:27:20 +00:00
return True
return False
2021-05-16 15:10:39 +00:00
def getOccupationSkills(actorJson: {}) -> []:
"""Returns the list of skills for an actor
"""
if 'hasOccupation' not in actorJson:
return []
if not isinstance(actorJson['hasOccupation'], list):
return []
for occupationItem in actorJson['hasOccupation']:
if not isinstance(occupationItem, dict):
continue
if not occupationItem.get('@type'):
continue
if not occupationItem['@type'] == 'Occupation':
continue
if not occupationItem.get('skills'):
continue
if isinstance(occupationItem['skills'], list):
return occupationItem['skills']
elif isinstance(occupationItem['skills'], str):
return [occupationItem['skills']]
break
return []
def getOccupationName(actorJson: {}) -> str:
"""Returns the occupation name an actor
"""
if not actorJson.get('hasOccupation'):
return ""
if not isinstance(actorJson['hasOccupation'], list):
return ""
for occupationItem in actorJson['hasOccupation']:
if not isinstance(occupationItem, dict):
continue
if not occupationItem.get('@type'):
continue
if occupationItem['@type'] != 'Occupation':
continue
if not occupationItem.get('name'):
continue
if isinstance(occupationItem['name'], str):
return occupationItem['name']
break
return ""
def setOccupationName(actorJson: {}, name: str) -> bool:
"""Sets the occupation name of an actor
"""
if not actorJson.get('hasOccupation'):
return False
if not isinstance(actorJson['hasOccupation'], list):
return False
for index in range(len(actorJson['hasOccupation'])):
occupationItem = actorJson['hasOccupation'][index]
if not isinstance(occupationItem, dict):
continue
if not occupationItem.get('@type'):
continue
if occupationItem['@type'] != 'Occupation':
continue
occupationItem['name'] = name
return True
return False
def setOccupationSkillsList(actorJson: {}, skillsList: []) -> bool:
"""Sets the occupation skills for an actor
"""
if 'hasOccupation' not in actorJson:
return False
if not isinstance(actorJson['hasOccupation'], list):
return False
for index in range(len(actorJson['hasOccupation'])):
occupationItem = actorJson['hasOccupation'][index]
if not isinstance(occupationItem, dict):
continue
if not occupationItem.get('@type'):
continue
if occupationItem['@type'] != 'Occupation':
continue
occupationItem['skills'] = skillsList
return True
return False
def isAccountDir(dirName: str) -> bool:
"""Is the given directory an account within /accounts ?
"""
if '@' not in dirName:
return False
if 'inbox@' in dirName or 'news@' in dirName:
return False
return True
2021-06-07 19:18:13 +00:00
def permittedDir(path: str) -> bool:
"""These are special paths which should not be accessible
directly via GET or POST
"""
if path.startswith('/wfendpoints') or \
path.startswith('/keys') or \
path.startswith('/accounts'):
return False
return True
2021-06-20 15:45:29 +00:00
def userAgentDomain(userAgent: str, debug: bool) -> str:
"""If the User-Agent string contains a domain
then return it
"""
if '+http' not in userAgent:
return None
agentDomain = userAgent.split('+http')[1].strip()
if '://' in agentDomain:
agentDomain = agentDomain.split('://')[1]
if '/' in agentDomain:
agentDomain = agentDomain.split('/')[0]
if ')' in agentDomain:
agentDomain = agentDomain.split(')')[0].strip()
if ' ' in agentDomain:
agentDomain = agentDomain.replace(' ', '')
if ';' in agentDomain:
agentDomain = agentDomain.replace(';', '')
if '.' not in agentDomain:
return None
if debug:
print('User-Agent Domain: ' + agentDomain)
return agentDomain
def hasObjectDict(postJsonObject: {}) -> bool:
"""Returns true if the given post has an object dict
"""
if postJsonObject.get('object'):
if isinstance(postJsonObject['object'], dict):
return True
return False
2021-06-26 11:16:41 +00:00
def getAltPath(actor: str, domainFull: str, callingDomain: str) -> str:
"""Returns alternate path from the actor
eg. https://clearnetdomain/path becomes http://oniondomain/path
"""
postActor = actor
if callingDomain not in actor and domainFull in actor:
if callingDomain.endswith('.onion') or \
callingDomain.endswith('.i2p'):
postActor = \
'http://' + callingDomain + actor.split(domainFull)[1]
print('Changed POST domain from ' + actor + ' to ' + postActor)
return postActor
def getActorPropertyUrl(actorJson: {}, propertyName: str) -> str:
"""Returns a url property from an actor
"""
if not actorJson.get('attachment'):
return ''
propertyName = propertyName.lower()
for propertyValue in actorJson['attachment']:
if not propertyValue.get('name'):
continue
if not propertyValue['name'].lower().startswith(propertyName):
continue
if not propertyValue.get('type'):
continue
if not propertyValue.get('value'):
continue
if propertyValue['type'] != 'PropertyValue':
continue
propertyValue['value'] = propertyValue['value'].strip()
prefixes = getProtocolPrefixes()
prefixFound = False
for prefix in prefixes:
if propertyValue['value'].startswith(prefix):
prefixFound = True
break
if not prefixFound:
continue
if '.' not in propertyValue['value']:
continue
if ' ' in propertyValue['value']:
continue
if ',' in propertyValue['value']:
continue
return propertyValue['value']
return ''
2021-06-26 14:21:24 +00:00
def removeDomainPort(domain: str) -> str:
"""If the domain has a port appended then remove it
eg. mydomain.com:80 becomes mydomain.com
"""
if ':' in domain:
if domain.startswith('did:'):
return domain
domain = domain.split(':')[0]
return domain
def getPortFromDomain(domain: str) -> int:
"""If the domain has a port number appended then return it
eg. mydomain.com:80 returns 80
"""
if ':' in domain:
if domain.startswith('did:'):
return None
portStr = domain.split(':')[1]
if portStr.isdigit():
return int(portStr)
return None
2021-07-06 09:44:45 +00:00
def validUrlPrefix(url: str) -> bool:
"""Does the given url have a valid prefix?
"""
if '/' not in url:
return False
prefixes = ('https:', 'http:', 'hyper:', 'i2p:', 'gnunet:')
for pre in prefixes:
if url.startswith(pre):
return True
return False
def removeLineEndings(text: str) -> str:
"""Removes any newline from the end of a string
"""
text = text.replace('\n', '')
text = text.replace('\r', '')
return text.strip()
2021-07-20 20:39:26 +00:00
def validPassword(password: str) -> bool:
"""Returns true if the given password is valid
"""
if len(password) < 8:
return False
return True
2021-07-25 13:09:39 +00:00
def isfloat(value):
try:
float(value)
return True
except ValueError:
return False
2021-07-28 09:35:21 +00:00
def dateStringToSeconds(dateStr: str) -> int:
"""Converts a date string (eg "published") into seconds since epoch
"""
try:
expiryTime = \
datetime.datetime.strptime(dateStr, '%Y-%m-%dT%H:%M:%SZ')
except BaseException:
return None
return int(datetime.datetime.timestamp(expiryTime))
def dateSecondsToString(dateSec: int) -> str:
"""Converts a date in seconds since epoch to a string
"""
thisDate = datetime.datetime.fromtimestamp(dateSec)
return thisDate.strftime("%Y-%m-%dT%H:%M:%SZ")
2021-07-30 16:06:34 +00:00
2021-08-01 13:25:11 +00:00
def hasGroupType(baseDir: str, actor: str, personCache: {},
debug: bool = False) -> bool:
2021-07-31 11:56:28 +00:00
"""Does the given actor url have a group type?
2021-07-30 16:06:34 +00:00
"""
2021-07-31 11:56:28 +00:00
# does the actor path clearly indicate that this is a group?
# eg. https://lemmy/c/groupname
2021-07-30 16:06:34 +00:00
groupPaths = getGroupPaths()
for grpPath in groupPaths:
if grpPath in actor:
2021-08-01 13:25:11 +00:00
if debug:
print('grpPath ' + grpPath + ' in ' + actor)
2021-07-30 16:06:34 +00:00
return True
2021-07-31 11:56:28 +00:00
# is there a cached actor which can be examined for Group type?
2021-08-01 13:25:11 +00:00
return isGroupActor(baseDir, actor, personCache, debug)
2021-07-31 11:56:28 +00:00
2021-08-01 13:25:11 +00:00
def isGroupActor(baseDir: str, actor: str, personCache: {},
debug: bool = False) -> bool:
2021-07-31 11:56:28 +00:00
"""Is the given actor a group?
"""
if personCache:
if personCache.get(actor):
if personCache[actor].get('actor'):
if personCache[actor]['actor'].get('type'):
if personCache[actor]['actor']['type'] == 'Group':
2021-08-01 13:25:11 +00:00
if debug:
print('Cached actor ' + actor + ' has Group type')
2021-07-31 11:56:28 +00:00
return True
return False
2021-08-01 13:25:11 +00:00
if debug:
print('Actor ' + actor + ' not in cache')
2021-07-31 11:56:28 +00:00
cachedActorFilename = \
baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if not os.path.isfile(cachedActorFilename):
2021-08-01 13:25:11 +00:00
if debug:
print('Cached actor file not found ' + cachedActorFilename)
2021-07-31 11:56:28 +00:00
return False
if '"type": "Group"' in open(cachedActorFilename).read():
2021-08-01 13:25:11 +00:00
if debug:
print('Group type found in ' + cachedActorFilename)
2021-07-31 11:56:28 +00:00
return True
2021-07-30 16:06:34 +00:00
return False
2021-08-07 17:44:25 +00:00
def isGroupAccount(baseDir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account is a group
"""
accountFilename = acctDir(baseDir, nickname, domain) + '.json'
if not os.path.isfile(accountFilename):
return False
if '"type": "Group"' in open(accountFilename).read():
return True
return False
2021-08-07 17:44:25 +00:00
def getCurrencies() -> {}:
"""Returns a dictionary of currencies
"""
return {
"CA$": "CAD",
"J$": "JMD",
"£": "GBP",
"": "EUR",
"؋": "AFN",
"ƒ": "AWG",
"": "AZN",
"Br": "BYN",
"BZ$": "BZD",
"$b": "BOB",
"KM": "BAM",
"P": "BWP",
"лв": "BGN",
"R$": "BRL",
"": "KHR",
"$U": "UYU",
"RD$": "DOP",
"$": "USD",
"": "CRC",
"kn": "HRK",
"": "CUP",
"": "CZK",
"kr": "NOK",
"¢": "GHS",
"Q": "GTQ",
"L": "HNL",
"Ft": "HUF",
"Rp": "IDR",
"": "INR",
"": "IRR",
"": "ILS",
"¥": "JPY",
"": "KRW",
"": "LAK",
"ден": "MKD",
"RM": "MYR",
"": "MUR",
"": "MNT",
"MT": "MZN",
"C$": "NIO",
"": "NGN",
"Gs": "PYG",
"": "PLN",
"lei": "RON",
"": "RUB",
"Дин": "RSD",
"S": "SOS",
"R": "ZAR",
"CHF": "CHF",
"NT$": "TWD",
"฿": "THB",
"TT$": "TTD",
"": "UAH",
"Bs": "VEF",
"": "VND",
"Z$": "ZQD"
}
2021-08-08 11:16:18 +00:00
def getSupportedLanguages(baseDir: str) -> []:
"""Returns a list of supported languages
"""
translationsDir = baseDir + '/translations'
languagesStr = []
for subdir, dirs, files in os.walk(translationsDir):
for f in files:
if not f.endswith('.json'):
continue
lang = f.split('.')[0]
if len(lang) == 2:
languagesStr.append(lang)
break
return languagesStr
def getCategoryTypes(baseDir: str) -> []:
"""Returns the list of ontologies
"""
ontologyDir = baseDir + '/ontology'
categories = []
for subdir, dirs, files in os.walk(ontologyDir):
for f in files:
if not f.endswith('.json'):
continue
2021-08-08 20:05:40 +00:00
if '#' in f or '~' in f:
continue
2021-08-08 19:55:54 +00:00
if f.startswith('custom'):
continue
ontologyFilename = f.split('.')[0]
2021-08-08 19:55:54 +00:00
if 'Types' in ontologyFilename:
categories.append(ontologyFilename.replace('Types', ''))
break
return categories
def getSharesFilesList() -> []:
"""Returns the possible shares files
"""
return ('shares', 'wanted')
def replaceUsersWithAt(actor: str) -> str:
""" https://domain/users/nick becomes https://domain/@nick
"""
uPaths = getUserPaths()
for path in uPaths:
if path in actor:
actor = actor.replace(path, '/@')
break
return actor