epicyon/utils.py

1497 lines
52 KiB
Python

__filename__ = "utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
import time
import shutil
import datetime
import json
from socket import error as SocketError
import errno
import urllib.request
from pprint import pprint
from calendar import monthrange
from followingCalendar import addPersonToCalendar
def removeHtml(content: str) -> str:
"""Removes html links from the given content.
Used to ensure that profile descriptions don't contain dubious content
"""
if '<' not in content:
return content
removing = False
content = content.replace('<q>', '"').replace('</q>', '"')
result = ''
for ch in content:
if ch == '<':
removing = True
elif ch == '>':
removing = False
elif not removing:
result += ch
return result
def firstParagraphFromString(content: str) -> str:
"""Get the first paragraph from a blog post
to be used as a summary in the newswire feed
"""
if '<p>' not in content or '</p>' not in content:
return removeHtml(content)
paragraph = content.split('<p>')[1]
if '</p>' in paragraph:
paragraph = paragraph.split('</p>')[0]
return removeHtml(paragraph)
def isSystemAccount(nickname: str) -> bool:
"""Returns true if the given nickname is a system account
"""
if nickname == 'news' or nickname == 'inbox':
return True
return False
def createConfig(baseDir: str) -> None:
"""Creates a configuration file
"""
configFilename = baseDir + '/config.json'
if os.path.isfile(configFilename):
return
configJson = {
}
saveJson(configJson, configFilename)
def setConfigParam(baseDir: str, variableName: str, variableValue) -> None:
"""Sets a configuration value
"""
createConfig(baseDir)
configFilename = baseDir + '/config.json'
configJson = {}
if os.path.isfile(configFilename):
configJson = loadJson(configFilename)
configJson[variableName] = variableValue
saveJson(configJson, configFilename)
def getConfigParam(baseDir: str, variableName: str):
"""Gets a configuration value
"""
createConfig(baseDir)
configFilename = baseDir + '/config.json'
configJson = loadJson(configFilename)
if configJson:
if variableName in configJson:
return configJson[variableName]
return None
def isSuspended(baseDir: str, nickname: str) -> bool:
"""Returns true if the given nickname is suspended
"""
adminNickname = getConfigParam(baseDir, 'admin')
if not adminNickname:
return False
if nickname == adminNickname:
return False
suspendedFilename = baseDir + '/accounts/suspended.txt'
if os.path.isfile(suspendedFilename):
with open(suspendedFilename, "r") as f:
lines = f.readlines()
for suspended in lines:
if suspended.strip('\n').strip('\r') == nickname:
return True
return False
def getFollowersList(baseDir: str,
nickname: str, domain: str,
followFile='following.txt') -> []:
"""Returns a list of followers for the given account
"""
filename = \
baseDir + '/accounts/' + nickname + '@' + domain + '/' + followFile
if not os.path.isfile(filename):
return []
with open(filename, "r") as f:
lines = f.readlines()
for i in range(len(lines)):
lines[i] = lines[i].strip()
return lines
return []
def getFollowersOfPerson(baseDir: str,
nickname: str, domain: str,
followFile='following.txt') -> []:
"""Returns a list containing the followers of the given person
Used by the shared inbox to know who to send incoming mail to
"""
followers = []
if ':' in domain:
domain = domain.split(':')[0]
handle = nickname + '@' + domain
if not os.path.isdir(baseDir + '/accounts/' + handle):
return followers
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for account in dirs:
filename = os.path.join(subdir, account) + '/' + followFile
if account == handle or account.startswith('inbox@'):
continue
if not os.path.isfile(filename):
continue
with open(filename, 'r') as followingfile:
for followingHandle in followingfile:
followingHandle2 = followingHandle.replace('\n', '')
followingHandle2 = followingHandle2.replace('\r', '')
if followingHandle2 == handle:
if account not in followers:
followers.append(account)
break
return followers
def removeIdEnding(idStr: str) -> str:
"""Removes endings such as /activity and /undo
"""
if idStr.endswith('/activity'):
idStr = idStr[:-len('/activity')]
elif idStr.endswith('/undo'):
idStr = idStr[:-len('/undo')]
elif idStr.endswith('/event'):
idStr = idStr[:-len('/event')]
elif idStr.endswith('/replies'):
idStr = idStr[:-len('/replies')]
return idStr
def getProtocolPrefixes() -> []:
"""Returns a list of valid prefixes
"""
return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://',
'hyper://', 'gemini://', 'gopher://')
def getLinkPrefixes() -> []:
"""Returns a list of valid web link prefixes
"""
return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://',
'hyper://', 'gemini://', 'gopher://', 'briar:')
def removeAvatarFromCache(baseDir: str, actorStr: str) -> None:
"""Removes any existing avatar entries from the cache
This avoids duplicate entries with differing extensions
"""
avatarFilenameExtensions = ('png', 'jpg', 'gif', 'webp', 'avif')
for extension in avatarFilenameExtensions:
avatarFilename = \
baseDir + '/cache/avatars/' + actorStr + '.' + extension
if os.path.isfile(avatarFilename):
os.remove(avatarFilename)
def saveJson(jsonObject: {}, filename: str) -> bool:
"""Saves json to a file
"""
tries = 0
while tries < 5:
try:
with open(filename, 'w+') as fp:
fp.write(json.dumps(jsonObject))
return True
except BaseException:
print('WARN: saveJson ' + str(tries))
time.sleep(1)
tries += 1
return False
def loadJson(filename: str, delaySec=2, maxTries=5) -> {}:
"""Makes a few attempts to load a json formatted file
"""
jsonObject = None
tries = 0
while tries < maxTries:
try:
with open(filename, 'r') as fp:
data = fp.read()
jsonObject = json.loads(data)
break
except BaseException:
print('WARN: loadJson exception')
if delaySec > 0:
time.sleep(delaySec)
tries += 1
return jsonObject
def loadJsonOnionify(filename: str, domain: str, onionDomain: str,
delaySec=2) -> {}:
"""Makes a few attempts to load a json formatted file
This also converts the domain name to the onion domain
"""
jsonObject = None
tries = 0
while tries < 5:
try:
with open(filename, 'r') as fp:
data = fp.read()
if data:
data = data.replace(domain, onionDomain)
data = data.replace('https:', 'http:')
print('*****data: ' + data)
jsonObject = json.loads(data)
break
except BaseException:
print('WARN: loadJson exception')
if delaySec > 0:
time.sleep(delaySec)
tries += 1
return jsonObject
def getStatusNumber(publishedStr=None) -> (str, str):
"""Returns the status number and published date
"""
if not publishedStr:
currTime = datetime.datetime.utcnow()
else:
currTime = \
datetime.datetime.strptime(publishedStr, '%Y-%m-%dT%H:%M:%SZ')
daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
# status is the number of seconds since epoch
statusNumber = \
str(((daysSinceEpoch * 24 * 60 * 60) +
(currTime.hour * 60 * 60) +
(currTime.minute * 60) +
currTime.second) * 1000 +
int(currTime.microsecond / 1000))
# See https://github.com/tootsuite/mastodon/blob/
# 995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb
# use the leftover microseconds as the sequence number
sequenceId = currTime.microsecond % 1000
# shift by 16bits "sequence data"
statusNumber = str((int(statusNumber) << 16) + sequenceId)
published = currTime.strftime("%Y-%m-%dT%H:%M:%SZ")
return statusNumber, published
def evilIncarnate() -> []:
return ('gab.com', 'gabfed.com', 'spinster.xyz',
'kiwifarms.cc', 'djitter.com')
def isEvil(domain: str) -> bool:
if not isinstance(domain, str):
print('WARN: Malformed domain ' + str(domain))
return True
# https://www.youtube.com/watch?v=5qw1hcevmdU
evilDomains = evilIncarnate()
for concentratedEvil in evilDomains:
if domain.endswith(concentratedEvil):
return True
return False
def containsInvalidChars(jsonStr: str) -> bool:
"""Does the given json string contain invalid characters?
e.g. dubious clacks/admin dogwhistles
"""
invalidStrings = {
'', '', '', '', '', ''
}
for isInvalid in invalidStrings:
if isInvalid in jsonStr:
return True
return False
def createPersonDir(nickname: str, domain: str, baseDir: str,
dirname: str) -> str:
"""Create a directory for a person
"""
handle = nickname + '@' + domain
if not os.path.isdir(baseDir + '/accounts/' + handle):
os.mkdir(baseDir + '/accounts/' + handle)
boxDir = baseDir + '/accounts/' + handle + '/' + dirname
if not os.path.isdir(boxDir):
os.mkdir(boxDir)
return boxDir
def createOutboxDir(nickname: str, domain: str, baseDir: str) -> str:
"""Create an outbox for a person
"""
return createPersonDir(nickname, domain, baseDir, 'outbox')
def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str:
"""Create an inbox queue and returns the feed filename and directory
"""
return createPersonDir(nickname, domain, baseDir, 'queue')
def domainPermitted(domain: str, federationList: []):
if len(federationList) == 0:
return True
if ':' in domain:
domain = domain.split(':')[0]
if domain in federationList:
return True
return False
def urlPermitted(url: str, federationList: []):
if isEvil(url):
return False
if not federationList:
return True
for domain in federationList:
if domain in url:
return True
return False
def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str:
"""Returns the display name for the given actor
"""
if '/statuses/' in actor:
actor = actor.split('/statuses/')[0]
if not personCache.get(actor):
return None
if personCache[actor].get('actor'):
if personCache[actor]['actor'].get('name'):
return personCache[actor]['actor']['name']
else:
# Try to obtain from the cached actors
cachedActorFilename = \
baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cachedActorFilename):
actorJson = loadJson(cachedActorFilename, 1)
if actorJson:
if actorJson.get('name'):
return(actorJson['name'])
return None
def getNicknameFromActor(actor: str) -> str:
"""Returns the nickname from an actor url
"""
if actor.startswith('@'):
actor = actor[1:]
if '/users/' not in actor:
if '/profile/' in actor:
nickStr = actor.split('/profile/')[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
elif '/channel/' in actor:
nickStr = actor.split('/channel/')[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
elif '/accounts/' in actor:
nickStr = actor.split('/accounts/')[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
elif '/@' in actor:
# https://domain/@nick
nickStr = actor.split('/@')[1]
if '/' in nickStr:
nickStr = nickStr.split('/')[0]
return nickStr
elif '@' in actor:
nickStr = actor.split('@')[0]
return nickStr
return None
nickStr = actor.split('/users/')[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
def getDomainFromActor(actor: str) -> (str, int):
"""Returns the domain name from an actor url
"""
if actor.startswith('@'):
actor = actor[1:]
port = None
prefixes = getProtocolPrefixes()
if '/profile/' in actor:
domain = actor.split('/profile/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/accounts/' in actor:
domain = actor.split('/accounts/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/channel/' in actor:
domain = actor.split('/channel/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/users/' in actor:
domain = actor.split('/users/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/@' in actor:
domain = actor.split('/@')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '@' in actor:
domain = actor.split('@')[1].strip()
else:
domain = actor
for prefix in prefixes:
domain = domain.replace(prefix, '')
if '/' in actor:
domain = domain.split('/')[0]
if ':' in domain:
portStr = domain.split(':')[1]
if not portStr.isdigit():
return None, None
port = int(portStr)
domain = domain.split(':')[0]
return domain, port
def followPerson(baseDir: str, nickname: str, domain: str,
followNickname: str, followDomain: str,
federationList: [], debug: bool,
followFile='following.txt') -> bool:
"""Adds a person to the follow list
"""
if not domainPermitted(followDomain.lower().replace('\n', ''),
federationList):
if debug:
print('DEBUG: follow of domain ' +
followDomain + ' not permitted')
return False
if debug:
print('DEBUG: follow of domain ' + followDomain)
if ':' in domain:
handle = nickname + '@' + domain.split(':')[0]
else:
handle = nickname + '@' + domain
if not os.path.isdir(baseDir + '/accounts/' + handle):
print('WARN: account for ' + handle + ' does not exist')
return False
if ':' in followDomain:
handleToFollow = followNickname + '@' + followDomain.split(':')[0]
else:
handleToFollow = followNickname + '@' + followDomain
# was this person previously unfollowed?
unfollowedFilename = baseDir + '/accounts/' + handle + '/unfollowed.txt'
if os.path.isfile(unfollowedFilename):
if handleToFollow in open(unfollowedFilename).read():
# remove them from the unfollowed file
newLines = ''
with open(unfollowedFilename, "r") as f:
lines = f.readlines()
for line in lines:
if handleToFollow not in line:
newLines += line
with open(unfollowedFilename, 'w+') as f:
f.write(newLines)
if not os.path.isdir(baseDir + '/accounts'):
os.mkdir(baseDir + '/accounts')
handleToFollow = followNickname + '@' + followDomain
filename = baseDir + '/accounts/' + handle + '/' + followFile
if os.path.isfile(filename):
if handleToFollow in open(filename).read():
if debug:
print('DEBUG: follow already exists')
return True
# prepend to follow file
try:
with open(filename, 'r+') as f:
content = f.read()
f.seek(0, 0)
f.write(handleToFollow + '\n' + content)
print('DEBUG: follow added')
except Exception as e:
print('WARN: Failed to write entry to follow file ' +
filename + ' ' + str(e))
else:
# first follow
if debug:
print('DEBUG: ' + handle +
' creating new following file to follow ' + handleToFollow +
', filename is ' + filename)
with open(filename, 'w+') as f:
f.write(handleToFollow + '\n')
# Default to adding new follows to the calendar.
# Possibly this could be made optional
if followFile.endswith('following.txt'):
# if following a person add them to the list of
# calendar follows
print('DEBUG: adding ' +
followNickname + '@' + followDomain + ' to calendar of ' +
nickname + '@' + domain)
addPersonToCalendar(baseDir, nickname, domain,
followNickname, followDomain)
return True
def votesOnNewswireItem(status: []) -> int:
"""Returns the number of votes on a newswire item
"""
totalVotes = 0
for line in status:
if 'vote:' in line:
totalVotes += 1
return totalVotes
def locateNewsVotes(baseDir: str, domain: str,
postUrl: str) -> str:
"""Returns the votes filename for a news post
within the news user account
"""
postUrl = \
postUrl.strip().replace('\n', '').replace('\r', '')
# if this post in the shared inbox?
postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
if postUrl.endswith('.json'):
postUrl = postUrl + '.votes'
else:
postUrl = postUrl + '.json.votes'
accountDir = baseDir + '/accounts/news@' + domain + '/'
postFilename = accountDir + 'outbox/' + postUrl
if os.path.isfile(postFilename):
return postFilename
return None
def locateNewsArrival(baseDir: str, domain: str,
postUrl: str) -> str:
"""Returns the arrival time for a news post
within the news user account
"""
postUrl = \
postUrl.strip().replace('\n', '').replace('\r', '')
# if this post in the shared inbox?
postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
if postUrl.endswith('.json'):
postUrl = postUrl + '.arrived'
else:
postUrl = postUrl + '.json.arrived'
accountDir = baseDir + '/accounts/news@' + domain + '/'
postFilename = accountDir + 'outbox/' + postUrl
if os.path.isfile(postFilename):
with open(postFilename, 'r') as arrivalFile:
arrival = arrivalFile.read()
if arrival:
arrivalDate = \
datetime.datetime.strptime(arrival,
"%Y-%m-%dT%H:%M:%SZ")
return arrivalDate
return None
def clearFromPostCaches(baseDir: str, recentPostsCache: {},
postId: str) -> None:
"""Clears cached html for the given post, so that edits
to news will appear
"""
filename = '/postcache/' + postId + '.html'
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if 'inbox@' in acct:
continue
cacheDir = os.path.join(baseDir + '/accounts', acct)
postFilename = cacheDir + filename
if os.path.isfile(postFilename):
try:
os.remove(postFilename)
except BaseException:
print('WARN: clearFromPostCaches file not removed ' +
postFilename)
pass
# if the post is in the recent posts cache then remove it
if recentPostsCache.get('index'):
if postId in recentPostsCache['index']:
recentPostsCache['index'].remove(postId)
if recentPostsCache.get('json'):
if recentPostsCache['json'].get(postId):
del recentPostsCache['json'][postId]
if recentPostsCache.get('html'):
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
def locatePost(baseDir: str, nickname: str, domain: str,
postUrl: str, replies=False) -> str:
"""Returns the filename for the given status post url
"""
if not replies:
extension = 'json'
else:
extension = 'replies'
# if this post in the shared inbox?
postUrl = removeIdEnding(postUrl.strip()).replace('/', '#')
# add the extension
postUrl = postUrl + '.' + extension
# search boxes
boxes = ('inbox', 'outbox', 'tlblogs', 'tlevents')
accountDir = baseDir + '/accounts/' + nickname + '@' + domain + '/'
for boxName in boxes:
postFilename = accountDir + boxName + '/' + postUrl
if os.path.isfile(postFilename):
return postFilename
# check news posts
accountDir = baseDir + '/accounts/news' + '@' + domain + '/'
postFilename = accountDir + 'outbox/' + postUrl
if os.path.isfile(postFilename):
return postFilename
# is it in the announce cache?
postFilename = baseDir + '/cache/announce/' + nickname + '/' + postUrl
if os.path.isfile(postFilename):
return postFilename
# print('WARN: unable to locate ' + nickname + ' ' + postUrl)
return None
def removeAttachment(baseDir: str, httpPrefix: str, domain: str, postJson: {}):
if not postJson.get('attachment'):
return
if not postJson['attachment'][0].get('url'):
return
# if port:
# if port != 80 and port != 443:
# if ':' not in domain:
# domain = domain + ':' + str(port)
attachmentUrl = postJson['attachment'][0]['url']
if not attachmentUrl:
return
mediaFilename = baseDir + '/' + \
attachmentUrl.replace(httpPrefix + '://' + domain + '/', '')
if os.path.isfile(mediaFilename):
os.remove(mediaFilename)
etagFilename = mediaFilename + '.etag'
if os.path.isfile(etagFilename):
os.remove(etagFilename)
postJson['attachment'] = []
def removeModerationPostFromIndex(baseDir: str, postUrl: str,
debug: bool) -> None:
"""Removes a url from the moderation index
"""
moderationIndexFile = baseDir + '/accounts/moderation.txt'
if not os.path.isfile(moderationIndexFile):
return
postId = removeIdEnding(postUrl)
if postId in open(moderationIndexFile).read():
with open(moderationIndexFile, "r") as f:
lines = f.readlines()
with open(moderationIndexFile, "w+") as f:
for line in lines:
if line.strip("\n").strip("\r") != postId:
f.write(line)
else:
if debug:
print('DEBUG: removed ' + postId +
' from moderation index')
def isReplyToBlogPost(baseDir: str, nickname: str, domain: str,
postJsonObject: str):
"""Is the given post a reply to a blog post?
"""
if not postJsonObject.get('object'):
return False
if not isinstance(postJsonObject['object'], dict):
return False
if not postJsonObject['object'].get('inReplyTo'):
return False
if not isinstance(postJsonObject['object']['inReplyTo'], str):
return False
blogsIndexFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/tlblogs.index'
if not os.path.isfile(blogsIndexFilename):
return False
postId = removeIdEnding(postJsonObject['object']['inReplyTo'])
postId = postId.replace('/', '#')
if postId in open(blogsIndexFilename).read():
return True
return False
def deletePost(baseDir: str, httpPrefix: str,
nickname: str, domain: str, postFilename: str,
debug: bool, recentPostsCache: {}) -> None:
"""Recursively deletes a post and its replies and attachments
"""
postJsonObject = loadJson(postFilename, 1)
if postJsonObject:
# don't allow deletion of bookmarked posts
bookmarksIndexFilename = \
baseDir + '/accounts/' + nickname + '@' + domain + \
'/bookmarks.index'
if os.path.isfile(bookmarksIndexFilename):
bookmarkIndex = postFilename.split('/')[-1] + '\n'
if bookmarkIndex in open(bookmarksIndexFilename).read():
return
# don't remove replies to blog posts
if isReplyToBlogPost(baseDir, nickname, domain,
postJsonObject):
return
# remove from recent posts cache in memory
if recentPostsCache:
postId = \
removeIdEnding(postJsonObject['id']).replace('/', '#')
if recentPostsCache.get('index'):
if postId in recentPostsCache['index']:
recentPostsCache['index'].remove(postId)
if recentPostsCache.get('json'):
if recentPostsCache['json'].get(postId):
del recentPostsCache['json'][postId]
if recentPostsCache.get('html'):
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
# remove any attachment
removeAttachment(baseDir, httpPrefix, domain, postJsonObject)
extensions = ('votes', 'arrived', 'muted')
for ext in extensions:
extFilename = postFilename + '.' + ext
if os.path.isfile(extFilename):
os.remove(extFilename)
# remove cached html version of the post
cachedPostFilename = \
getCachedPostFilename(baseDir, nickname, domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
os.remove(cachedPostFilename)
# removePostFromCache(postJsonObject,recentPostsCache)
hasObject = False
if postJsonObject.get('object'):
hasObject = True
# remove from moderation index file
if hasObject:
if isinstance(postJsonObject['object'], dict):
if postJsonObject['object'].get('moderationStatus'):
if postJsonObject.get('id'):
postId = removeIdEnding(postJsonObject['id'])
removeModerationPostFromIndex(baseDir, postId, debug)
# remove any hashtags index entries
removeHashtagIndex = False
if hasObject:
if hasObject and isinstance(postJsonObject['object'], dict):
if postJsonObject['object'].get('content'):
if '#' in postJsonObject['object']['content']:
removeHashtagIndex = True
if removeHashtagIndex:
if postJsonObject['object'].get('id') and \
postJsonObject['object'].get('tag'):
# get the id of the post
postId = removeIdEnding(postJsonObject['object']['id'])
for tag in postJsonObject['object']['tag']:
if tag['type'] != 'Hashtag':
continue
if not tag.get('name'):
continue
# find the index file for this tag
tagIndexFilename = \
baseDir + '/tags/' + tag['name'][1:] + '.txt'
if not os.path.isfile(tagIndexFilename):
continue
# remove postId from the tag index file
lines = None
with open(tagIndexFilename, "r") as f:
lines = f.readlines()
if lines:
newlines = ''
for fileLine in lines:
if postId in fileLine:
continue
newlines += fileLine
if not newlines.strip():
# if there are no lines then remove the
# hashtag file
os.remove(tagIndexFilename)
else:
with open(tagIndexFilename, "w+") as f:
f.write(newlines)
# remove any replies
repliesFilename = postFilename.replace('.json', '.replies')
if os.path.isfile(repliesFilename):
if debug:
print('DEBUG: removing replies to ' + postFilename)
with open(repliesFilename, 'r') as f:
for replyId in f:
replyFile = locatePost(baseDir, nickname, domain, replyId)
if replyFile:
if os.path.isfile(replyFile):
deletePost(baseDir, httpPrefix,
nickname, domain, replyFile, debug,
recentPostsCache)
# remove the replies file
os.remove(repliesFilename)
# finally, remove the post itself
os.remove(postFilename)
def validNickname(domain: str, nickname: str) -> bool:
forbiddenChars = ('.', ' ', '/', '?', ':', ';', '@', '#')
for c in forbiddenChars:
if c in nickname:
return False
if nickname == domain:
return False
reservedNames = ('inbox', 'dm', 'outbox', 'following',
'public', 'followers',
'channel', 'calendar',
'tlreplies', 'tlmedia', 'tlblogs',
'tlevents', 'tlblogs',
'moderation', 'activity', 'undo',
'reply', 'replies', 'question', 'like',
'likes', 'users', 'statuses',
'accounts', 'channels', 'profile',
'updates', 'repeat', 'announce',
'shares', 'fonts', 'icons', 'avatars')
if nickname in reservedNames:
return False
return True
def noOfAccounts(baseDir: str) -> bool:
"""Returns the number of accounts on the system
"""
accountCtr = 0
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for account in dirs:
if '@' in account:
if not account.startswith('inbox@'):
accountCtr += 1
return accountCtr
def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool:
"""Returns the number of accounts on the system this month
"""
accountCtr = 0
currTime = int(time.time())
monthSeconds = int(60*60*24*30*months)
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for account in dirs:
if '@' in account:
if not account.startswith('inbox@'):
lastUsedFilename = \
baseDir + '/accounts/' + account + '/.lastUsed'
if os.path.isfile(lastUsedFilename):
with open(lastUsedFilename, 'r') as lastUsedFile:
lastUsed = lastUsedFile.read()
if lastUsed.isdigit():
timeDiff = (currTime - int(lastUsed))
if timeDiff < monthSeconds:
accountCtr += 1
return accountCtr
def isPublicPostFromUrl(baseDir: str, nickname: str, domain: str,
postUrl: str) -> bool:
"""Returns whether the given url is a public post
"""
postFilename = locatePost(baseDir, nickname, domain, postUrl)
if not postFilename:
return False
postJsonObject = loadJson(postFilename, 1)
if not postJsonObject:
return False
return isPublicPost(postJsonObject)
def isPublicPost(postJsonObject: {}) -> bool:
"""Returns true if the given post is public
"""
if not postJsonObject.get('type'):
return False
if postJsonObject['type'] != 'Create':
return False
if not postJsonObject.get('object'):
return False
if not isinstance(postJsonObject['object'], dict):
return False
if not postJsonObject['object'].get('to'):
return False
for recipient in postJsonObject['object']['to']:
if recipient.endswith('#Public'):
return True
return False
def copytree(src: str, dst: str, symlinks=False, ignore=None):
"""Copy a directory
"""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def getCachedPostDirectory(baseDir: str, nickname: str, domain: str) -> str:
"""Returns the directory where the html post cache exists
"""
htmlPostCacheDir = baseDir + '/accounts/' + \
nickname + '@' + domain + '/postcache'
return htmlPostCacheDir
def getCachedPostFilename(baseDir: str, nickname: str, domain: str,
postJsonObject: {}) -> str:
"""Returns the html cache filename for the given post
"""
cachedPostDir = getCachedPostDirectory(baseDir, nickname, domain)
if not os.path.isdir(cachedPostDir):
# print('ERROR: invalid html cache directory '+cachedPostDir)
return None
if '@' not in cachedPostDir:
# print('ERROR: invalid html cache directory '+cachedPostDir)
return None
cachedPostId = removeIdEnding(postJsonObject['id'])
cachedPostFilename = cachedPostDir + '/' + cachedPostId.replace('/', '#')
return cachedPostFilename + '.html'
def removePostFromCache(postJsonObject: {}, recentPostsCache: {}):
""" if the post exists in the recent posts cache then remove it
"""
if not postJsonObject.get('id'):
return
if not recentPostsCache.get('index'):
return
postId = postJsonObject['id']
if '#' in postId:
postId = postId.split('#', 1)[0]
postId = removeIdEnding(postId).replace('/', '#')
if postId not in recentPostsCache['index']:
return
if recentPostsCache['json'].get(postId):
del recentPostsCache['json'][postId]
if recentPostsCache['html'].get(postId):
del recentPostsCache['html'][postId]
recentPostsCache['index'].remove(postId)
def updateRecentPostsCache(recentPostsCache: {}, maxRecentPosts: int,
postJsonObject: {}, htmlStr: str) -> None:
"""Store recent posts in memory so that they can be quickly recalled
"""
if not postJsonObject.get('id'):
return
postId = postJsonObject['id']
if '#' in postId:
postId = postId.split('#', 1)[0]
postId = removeIdEnding(postId).replace('/', '#')
if recentPostsCache.get('index'):
if postId in recentPostsCache['index']:
return
recentPostsCache['index'].append(postId)
postJsonObject['muted'] = False
recentPostsCache['json'][postId] = json.dumps(postJsonObject)
recentPostsCache['html'][postId] = htmlStr
while len(recentPostsCache['html'].items()) > maxRecentPosts:
postId = recentPostsCache['index'][0]
recentPostsCache['index'].pop(0)
del recentPostsCache['json'][postId]
del recentPostsCache['html'][postId]
else:
recentPostsCache['index'] = [postId]
recentPostsCache['json'] = {}
recentPostsCache['html'] = {}
recentPostsCache['json'][postId] = json.dumps(postJsonObject)
recentPostsCache['html'][postId] = htmlStr
def fileLastModified(filename: str) -> str:
"""Returns the date when a file was last modified
"""
t = os.path.getmtime(filename)
modifiedTime = datetime.datetime.fromtimestamp(t)
return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ")
def getCSS(baseDir: str, cssFilename: str, cssCache: {}) -> str:
"""Retrieves the css for a given file, or from a cache
"""
# does the css file exist?
if not os.path.isfile(cssFilename):
return None
lastModified = fileLastModified(cssFilename)
# has this already been loaded into the cache?
if cssCache.get(cssFilename):
if cssCache[cssFilename][0] == lastModified:
# file hasn't changed, so return the version in the cache
return cssCache[cssFilename][1]
with open(cssFilename, 'r') as fpCSS:
css = fpCSS.read()
if cssCache.get(cssFilename):
# alter the cache contents
cssCache[cssFilename][0] = lastModified
cssCache[cssFilename][1] = css
else:
# add entry to the cache
cssCache[cssFilename] = [lastModified, css]
return css
return None
def daysInMonth(year: int, monthNumber: int) -> int:
"""Returns the number of days in the month
"""
if monthNumber < 1 or monthNumber > 12:
return None
daysRange = monthrange(year, monthNumber)
return daysRange[1]
def mergeDicts(dict1: {}, dict2: {}) -> {}:
"""Merges two dictionaries
"""
res = {**dict1, **dict2}
return res
def isEventPost(messageJson: {}) -> bool:
"""Is the given post a mobilizon-type event activity?
See https://framagit.org/framasoft/mobilizon/-/blob/
master/lib/federation/activity_stream/converter/event.ex
"""
if not messageJson.get('id'):
return False
if not messageJson.get('actor'):
return False
if not messageJson.get('object'):
return False
if not isinstance(messageJson['object'], dict):
return False
if not messageJson['object'].get('type'):
return False
if messageJson['object']['type'] != 'Event':
return False
print('Event arriving')
if not messageJson['object'].get('startTime'):
print('No event start time')
return False
if not messageJson['object'].get('actor'):
print('No event actor')
return False
if not messageJson['object'].get('content'):
print('No event content')
return False
if not messageJson['object'].get('name'):
print('No event name')
return False
if not messageJson['object'].get('uuid'):
print('No event UUID')
return False
print('Event detected')
return True
def isBlogPost(postJsonObject: {}) -> bool:
"""Is the given post a blog post?
"""
if postJsonObject['type'] != 'Create':
return False
if not postJsonObject.get('object'):
return False
if not isinstance(postJsonObject['object'], dict):
return False
if not postJsonObject['object'].get('type'):
return False
if not postJsonObject['object'].get('content'):
return False
if postJsonObject['object']['type'] != 'Article':
return False
return True
def isNewsPost(postJsonObject: {}) -> bool:
"""Is the given post a blog post?
"""
return postJsonObject.get('news')
def searchBoxPosts(baseDir: str, nickname: str, domain: str,
searchStr: str, maxResults: int,
boxName='outbox') -> []:
"""Search your posts and return a list of the filenames
containing matching strings
"""
path = baseDir + '/accounts/' + nickname + '@' + domain + '/' + boxName
if not os.path.isdir(path):
return []
searchStr = searchStr.lower().strip()
if '+' in searchStr:
searchWords = searchStr.split('+')
for index in range(len(searchWords)):
searchWords[index] = searchWords[index].strip()
print('SEARCH: ' + str(searchWords))
else:
searchWords = [searchStr]
res = []
for root, dirs, fnames in os.walk(path):
for fname in fnames:
filePath = os.path.join(root, fname)
with open(filePath, 'r') as postFile:
data = postFile.read().lower()
notFound = False
for keyword in searchWords:
if keyword not in data:
notFound = True
break
if notFound:
continue
res.append(filePath)
if len(res) >= maxResults:
return res
return res
def getFileCaseInsensitive(path: str) -> str:
"""Returns a case specific filename given a case insensitive version of it
"""
if os.path.isfile(path):
return path
if path != path.lower():
if os.path.isfile(path.lower()):
return path.lower()
# directory, filename = os.path.split(path)
# directory, filename = (directory or '.'), filename.lower()
# for f in os.listdir(directory):
# if f.lower() == filename:
# newpath = os.path.join(directory, f)
# if os.path.isfile(newpath):
# return newpath
return None
def undoLikesCollectionEntry(recentPostsCache: {},
baseDir: str, postFilename: str, objectUrl: str,
actor: str, domain: str, debug: bool) -> None:
"""Undoes a like for a particular actor
"""
postJsonObject = loadJson(postFilename)
if postJsonObject:
# remove any cached version of this post so that the
# like icon is changed
nickname = getNicknameFromActor(actor)
cachedPostFilename = getCachedPostFilename(baseDir, nickname,
domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
os.remove(cachedPostFilename)
removePostFromCache(postJsonObject, recentPostsCache)
if not postJsonObject.get('type'):
return
if postJsonObject['type'] != 'Create':
return
if not postJsonObject.get('object'):
if debug:
pprint(postJsonObject)
print('DEBUG: post '+objectUrl+' has no object')
return
if not isinstance(postJsonObject['object'], dict):
return
if not postJsonObject['object'].get('likes'):
return
if not isinstance(postJsonObject['object']['likes'], dict):
return
if not postJsonObject['object']['likes'].get('items'):
return
totalItems = 0
if postJsonObject['object']['likes'].get('totalItems'):
totalItems = postJsonObject['object']['likes']['totalItems']
itemFound = False
for likeItem in postJsonObject['object']['likes']['items']:
if likeItem.get('actor'):
if likeItem['actor'] == actor:
if debug:
print('DEBUG: like was removed for ' + actor)
postJsonObject['object']['likes']['items'].remove(likeItem)
itemFound = True
break
if itemFound:
if totalItems == 1:
if debug:
print('DEBUG: likes was removed from post')
del postJsonObject['object']['likes']
else:
itlen = len(postJsonObject['object']['likes']['items'])
postJsonObject['object']['likes']['totalItems'] = itlen
saveJson(postJsonObject, postFilename)
def updateLikesCollection(recentPostsCache: {},
baseDir: str, postFilename: str,
objectUrl: str,
actor: str, domain: str, debug: bool) -> None:
"""Updates the likes collection within a post
"""
postJsonObject = loadJson(postFilename)
if not postJsonObject:
return
# remove any cached version of this post so that the
# like icon is changed
nickname = getNicknameFromActor(actor)
cachedPostFilename = getCachedPostFilename(baseDir, nickname,
domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
os.remove(cachedPostFilename)
removePostFromCache(postJsonObject, recentPostsCache)
if not postJsonObject.get('object'):
if debug:
pprint(postJsonObject)
print('DEBUG: post ' + objectUrl + ' has no object')
return
if not isinstance(postJsonObject['object'], dict):
return
if not objectUrl.endswith('/likes'):
objectUrl = objectUrl + '/likes'
if not postJsonObject['object'].get('likes'):
if debug:
print('DEBUG: Adding initial like to ' + objectUrl)
likesJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'id': objectUrl,
'type': 'Collection',
"totalItems": 1,
'items': [{
'type': 'Like',
'actor': actor
}]
}
postJsonObject['object']['likes'] = likesJson
else:
if not postJsonObject['object']['likes'].get('items'):
postJsonObject['object']['likes']['items'] = []
for likeItem in postJsonObject['object']['likes']['items']:
if likeItem.get('actor'):
if likeItem['actor'] == actor:
return
newLike = {
'type': 'Like',
'actor': actor
}
postJsonObject['object']['likes']['items'].append(newLike)
itlen = len(postJsonObject['object']['likes']['items'])
postJsonObject['object']['likes']['totalItems'] = itlen
if debug:
print('DEBUG: saving post with likes added')
pprint(postJsonObject)
saveJson(postJsonObject, postFilename)
def undoAnnounceCollectionEntry(recentPostsCache: {},
baseDir: str, postFilename: str,
actor: str, domain: str, debug: bool) -> None:
"""Undoes an announce for a particular actor by removing it from
the "shares" collection within a post. Note that the "shares"
collection has no relation to shared items in shares.py. It's
shares of posts, not shares of physical objects.
"""
postJsonObject = loadJson(postFilename)
if postJsonObject:
# remove any cached version of this announce so that the announce
# icon is changed
nickname = getNicknameFromActor(actor)
cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain,
postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
os.remove(cachedPostFilename)
removePostFromCache(postJsonObject, recentPostsCache)
if not postJsonObject.get('type'):
return
if postJsonObject['type'] != 'Create':
return
if not postJsonObject.get('object'):
if debug:
pprint(postJsonObject)
print('DEBUG: post has no object')
return
if not isinstance(postJsonObject['object'], dict):
return
if not postJsonObject['object'].get('shares'):
return
if not postJsonObject['object']['shares'].get('items'):
return
totalItems = 0
if postJsonObject['object']['shares'].get('totalItems'):
totalItems = postJsonObject['object']['shares']['totalItems']
itemFound = False
for announceItem in postJsonObject['object']['shares']['items']:
if announceItem.get('actor'):
if announceItem['actor'] == actor:
if debug:
print('DEBUG: Announce was removed for ' + actor)
anIt = announceItem
postJsonObject['object']['shares']['items'].remove(anIt)
itemFound = True
break
if itemFound:
if totalItems == 1:
if debug:
print('DEBUG: shares (announcements) ' +
'was removed from post')
del postJsonObject['object']['shares']
else:
itlen = len(postJsonObject['object']['shares']['items'])
postJsonObject['object']['shares']['totalItems'] = itlen
saveJson(postJsonObject, postFilename)
def updateAnnounceCollection(recentPostsCache: {},
baseDir: str, postFilename: str,
actor: str, domain: str, debug: bool) -> None:
"""Updates the announcements collection within a post
Confusingly this is known as "shares", but isn't the
same as shared items within shares.py
It's shares of posts, not shares of physical objects.
"""
postJsonObject = loadJson(postFilename)
if postJsonObject:
# remove any cached version of this announce so that the announce
# icon is changed
nickname = getNicknameFromActor(actor)
cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain,
postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
os.remove(cachedPostFilename)
removePostFromCache(postJsonObject, recentPostsCache)
if not postJsonObject.get('object'):
if debug:
pprint(postJsonObject)
print('DEBUG: post ' + postFilename + ' has no object')
return
if not isinstance(postJsonObject['object'], dict):
return
postUrl = removeIdEnding(postJsonObject['id']) + '/shares'
if not postJsonObject['object'].get('shares'):
if debug:
print('DEBUG: Adding initial shares (announcements) to ' +
postUrl)
announcementsJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'id': postUrl,
'type': 'Collection',
"totalItems": 1,
'items': [{
'type': 'Announce',
'actor': actor
}]
}
postJsonObject['object']['shares'] = announcementsJson
else:
if postJsonObject['object']['shares'].get('items'):
sharesItems = postJsonObject['object']['shares']['items']
for announceItem in sharesItems:
if announceItem.get('actor'):
if announceItem['actor'] == actor:
return
newAnnounce = {
'type': 'Announce',
'actor': actor
}
postJsonObject['object']['shares']['items'].append(newAnnounce)
itlen = len(postJsonObject['object']['shares']['items'])
postJsonObject['object']['shares']['totalItems'] = itlen
else:
if debug:
print('DEBUG: shares (announcements) section of post ' +
'has no items list')
if debug:
print('DEBUG: saving post with shares (announcements) added')
pprint(postJsonObject)
saveJson(postJsonObject, postFilename)
def siteIsActive(url: str) -> bool:
"""Returns true if the current url is resolvable.
This can be used to check that an instance is online before
trying to send posts to it.
"""
if not url.startswith('http'):
return False
try:
req = urllib.request.Request(url)
urllib.request.urlopen(req, timeout=10) # nosec
return True
except SocketError as e:
if e.errno == errno.ECONNRESET:
print('WARN: connection was reset during siteIsActive')
return False