__filename__ = "utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.1.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
import time
import shutil
import datetime
import json
from socket import error as SocketError
import errno
import urllib.request
from pprint import pprint
from calendar import monthrange
from followingCalendar import addPersonToCalendar
def isDormant(baseDir: str, nickname: str, domain: str, actor: str,
dormantMonths=3) -> bool:
"""Is the given followed actor dormant, from the standpoint
of the given account
"""
lastSeenFilename = \
baseDir + '/accounts/' + nickname + '@' + domain + \
'/lastseen/' + actor.replace('/', '#') + '.txt'
if not os.path.isfile(lastSeenFilename):
return False
with open(lastSeenFilename, 'r') as lastSeenFile:
daysSinceEpochStr = lastSeenFile.read()
daysSinceEpoch = int(daysSinceEpochStr)
currTime = datetime.datetime.utcnow()
currDaysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
timeDiffMonths = \
int((currDaysSinceEpoch - daysSinceEpoch) / 30)
if timeDiffMonths >= dormantMonths:
return True
return False
def getHashtagCategory(baseDir: str, hashtag: str) -> str:
"""Returns the category for the hashtag
"""
categoryFilename = baseDir + '/tags/' + hashtag + '.category'
if not os.path.isfile(categoryFilename):
categoryFilename = baseDir + '/tags/' + hashtag.title() + '.category'
if not os.path.isfile(categoryFilename):
categoryFilename = \
baseDir + '/tags/' + hashtag.upper() + '.category'
if not os.path.isfile(categoryFilename):
return ''
with open(categoryFilename, 'r') as fp:
categoryStr = fp.read()
if categoryStr:
return categoryStr
return ''
def getHashtagCategories(baseDir: str, recent=False, category=None) -> None:
"""Returns a dictionary containing hashtag categories
"""
hashtagCategories = {}
if recent:
currTime = datetime.datetime.utcnow()
daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
recently = daysSinceEpoch - 1
for subdir, dirs, files in os.walk(baseDir + '/tags'):
for f in files:
if not f.endswith('.category'):
continue
categoryFilename = os.path.join(baseDir + '/tags', f)
if not os.path.isfile(categoryFilename):
continue
hashtag = f.split('.')[0]
with open(categoryFilename, 'r') as fp:
categoryStr = fp.read()
if not categoryStr:
continue
if category:
# only return a dictionary for a specific category
if categoryStr != category:
continue
if recent:
tagsFilename = baseDir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(tagsFilename):
continue
modTimesinceEpoc = \
os.path.getmtime(tagsFilename)
lastModifiedDate = \
datetime.datetime.fromtimestamp(modTimesinceEpoc)
fileDaysSinceEpoch = \
(lastModifiedDate -
datetime.datetime(1970, 1, 1)).days
if fileDaysSinceEpoch < recently:
continue
if not hashtagCategories.get(categoryStr):
hashtagCategories[categoryStr] = [hashtag]
else:
if hashtag not in hashtagCategories[categoryStr]:
hashtagCategories[categoryStr].append(hashtag)
break
return hashtagCategories
def updateHashtagCategories(baseDir: str) -> None:
"""Regenerates the list of hashtag categories
"""
categoryListFilename = baseDir + '/accounts/categoryList.txt'
hashtagCategories = getHashtagCategories(baseDir)
if not hashtagCategories:
if os.path.isfile(categoryListFilename):
os.remove(categoryListFilename)
return
categoryList = []
for categoryStr, hashtagList in hashtagCategories.items():
categoryList.append(categoryStr)
categoryList.sort()
categoryListStr = ''
for categoryStr in categoryList:
categoryListStr += categoryStr + '\n'
# save a list of available categories for quick lookup
with open(categoryListFilename, 'w+') as fp:
fp.write(categoryListStr)
def validHashtagCategory(category: str) -> bool:
"""Returns true if the category name is valid
"""
if not category:
return False
invalidChars = (',', ' ', '<', ';', '\\')
for ch in invalidChars:
if ch in category:
return False
# too long
if len(category) > 40:
return False
return True
def setHashtagCategory(baseDir: str, hashtag: str, category: str,
force=False) -> bool:
"""Sets the category for the hashtag
"""
if not validHashtagCategory(category):
return False
if not force:
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtagFilename):
hashtag = hashtag.title()
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtagFilename):
hashtag = hashtag.upper()
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtagFilename):
return False
if not os.path.isdir(baseDir + '/tags'):
os.mkdir(baseDir + '/tags')
categoryFilename = baseDir + '/tags/' + hashtag + '.category'
if force:
# don't overwrite any existing categories
if os.path.isfile(categoryFilename):
return False
with open(categoryFilename, 'w+') as fp:
fp.write(category)
updateHashtagCategories(baseDir)
return True
return False
def isEditor(baseDir: str, nickname: str) -> bool:
"""Returns true if the given nickname is an editor
"""
editorsFile = baseDir + '/accounts/editors.txt'
if not os.path.isfile(editorsFile):
adminName = getConfigParam(baseDir, 'admin')
if not adminName:
return False
if adminName == nickname:
return True
return False
with open(editorsFile, "r") as f:
lines = f.readlines()
if len(lines) == 0:
adminName = getConfigParam(baseDir, 'admin')
if not adminName:
return False
if adminName == nickname:
return True
for editor in lines:
editor = editor.strip('\n').strip('\r')
if editor == nickname:
return True
return False
def getImageExtensions() -> []:
"""Returns a list of the possible image file extensions
"""
return ('png', 'jpg', 'jpeg', 'gif', 'webp', 'avif')
def getVideoExtensions() -> []:
"""Returns a list of the possible video file extensions
"""
return ('mp4', 'webm', 'ogv')
def getAudioExtensions() -> []:
"""Returns a list of the possible audio file extensions
"""
return ('mp3', 'ogg')
def getMediaExtensions() -> []:
"""Returns a list of the possible media file extensions
"""
return getImageExtensions() + getVideoExtensions() + getAudioExtensions()
def getImageFormats() -> str:
"""Returns a string of permissable image formats
used when selecting an image for a new post
"""
imageExt = getImageExtensions()
imageFormats = ''
for ext in imageExt:
if imageFormats:
imageFormats += ', '
imageFormats += '.' + ext
return imageFormats
def getMediaFormats() -> str:
"""Returns a string of permissable media formats
used when selecting an attachment for a new post
"""
mediaExt = getMediaExtensions()
mediaFormats = ''
for ext in mediaExt:
if mediaFormats:
mediaFormats += ', '
mediaFormats += '.' + ext
return mediaFormats
def removeHtml(content: str) -> str:
"""Removes html links from the given content.
Used to ensure that profile descriptions don't contain dubious content
"""
if '<' not in content:
return content
removing = False
content = content.replace('', '"').replace('
', '"')
result = ''
for ch in content:
if ch == '<':
removing = True
elif ch == '>':
removing = False
elif not removing:
result += ch
return result
def firstParagraphFromString(content: str) -> str:
"""Get the first paragraph from a blog post
to be used as a summary in the newswire feed
"""
if '
' not in content or '
' not in content: return removeHtml(content) paragraph = content.split('')[1] if '
' in paragraph: paragraph = paragraph.split('')[0] return removeHtml(paragraph) def isSystemAccount(nickname: str) -> bool: """Returns true if the given nickname is a system account """ if nickname == 'news' or nickname == 'inbox': return True return False def createConfig(baseDir: str) -> None: """Creates a configuration file """ configFilename = baseDir + '/config.json' if os.path.isfile(configFilename): return configJson = { } saveJson(configJson, configFilename) def setConfigParam(baseDir: str, variableName: str, variableValue) -> None: """Sets a configuration value """ createConfig(baseDir) configFilename = baseDir + '/config.json' configJson = {} if os.path.isfile(configFilename): configJson = loadJson(configFilename) configJson[variableName] = variableValue saveJson(configJson, configFilename) def getConfigParam(baseDir: str, variableName: str): """Gets a configuration value """ createConfig(baseDir) configFilename = baseDir + '/config.json' configJson = loadJson(configFilename) if configJson: if variableName in configJson: return configJson[variableName] return None def isSuspended(baseDir: str, nickname: str) -> bool: """Returns true if the given nickname is suspended """ adminNickname = getConfigParam(baseDir, 'admin') if not adminNickname: return False if nickname == adminNickname: return False suspendedFilename = baseDir + '/accounts/suspended.txt' if os.path.isfile(suspendedFilename): with open(suspendedFilename, "r") as f: lines = f.readlines() for suspended in lines: if suspended.strip('\n').strip('\r') == nickname: return True return False def getFollowersList(baseDir: str, nickname: str, domain: str, followFile='following.txt') -> []: """Returns a list of followers for the given account """ filename = \ baseDir + '/accounts/' + nickname + '@' + domain + '/' + followFile if not os.path.isfile(filename): return [] with open(filename, "r") as f: lines = f.readlines() for i in range(len(lines)): lines[i] = lines[i].strip() return lines return [] def getFollowersOfPerson(baseDir: str, nickname: str, domain: str, followFile='following.txt') -> []: """Returns a list containing the followers of the given person Used by the shared inbox to know who to send incoming mail to """ followers = [] if ':' in domain: domain = domain.split(':')[0] handle = nickname + '@' + domain if not os.path.isdir(baseDir + '/accounts/' + handle): return followers for subdir, dirs, files in os.walk(baseDir + '/accounts'): for account in dirs: filename = os.path.join(subdir, account) + '/' + followFile if account == handle or account.startswith('inbox@'): continue if not os.path.isfile(filename): continue with open(filename, 'r') as followingfile: for followingHandle in followingfile: followingHandle2 = followingHandle.replace('\n', '') followingHandle2 = followingHandle2.replace('\r', '') if followingHandle2 == handle: if account not in followers: followers.append(account) break break return followers def removeIdEnding(idStr: str) -> str: """Removes endings such as /activity and /undo """ if idStr.endswith('/activity'): idStr = idStr[:-len('/activity')] elif idStr.endswith('/undo'): idStr = idStr[:-len('/undo')] elif idStr.endswith('/event'): idStr = idStr[:-len('/event')] elif idStr.endswith('/replies'): idStr = idStr[:-len('/replies')] return idStr def getProtocolPrefixes() -> []: """Returns a list of valid prefixes """ return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://', 'hyper://', 'gemini://', 'gopher://') def getLinkPrefixes() -> []: """Returns a list of valid web link prefixes """ return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://', 'hyper://', 'gemini://', 'gopher://', 'briar:') def removeAvatarFromCache(baseDir: str, actorStr: str) -> None: """Removes any existing avatar entries from the cache This avoids duplicate entries with differing extensions """ avatarFilenameExtensions = getImageExtensions() for extension in avatarFilenameExtensions: avatarFilename = \ baseDir + '/cache/avatars/' + actorStr + '.' + extension if os.path.isfile(avatarFilename): os.remove(avatarFilename) def saveJson(jsonObject: {}, filename: str) -> bool: """Saves json to a file """ tries = 0 while tries < 5: try: with open(filename, 'w+') as fp: fp.write(json.dumps(jsonObject)) return True except BaseException: print('WARN: saveJson ' + str(tries)) time.sleep(1) tries += 1 return False def loadJson(filename: str, delaySec=2, maxTries=5) -> {}: """Makes a few attempts to load a json formatted file """ jsonObject = None tries = 0 while tries < maxTries: try: with open(filename, 'r') as fp: data = fp.read() jsonObject = json.loads(data) break except BaseException: print('WARN: loadJson exception') if delaySec > 0: time.sleep(delaySec) tries += 1 return jsonObject def loadJsonOnionify(filename: str, domain: str, onionDomain: str, delaySec=2) -> {}: """Makes a few attempts to load a json formatted file This also converts the domain name to the onion domain """ jsonObject = None tries = 0 while tries < 5: try: with open(filename, 'r') as fp: data = fp.read() if data: data = data.replace(domain, onionDomain) data = data.replace('https:', 'http:') print('*****data: ' + data) jsonObject = json.loads(data) break except BaseException: print('WARN: loadJson exception') if delaySec > 0: time.sleep(delaySec) tries += 1 return jsonObject def getStatusNumber(publishedStr=None) -> (str, str): """Returns the status number and published date """ if not publishedStr: currTime = datetime.datetime.utcnow() else: currTime = \ datetime.datetime.strptime(publishedStr, '%Y-%m-%dT%H:%M:%SZ') daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days # status is the number of seconds since epoch statusNumber = \ str(((daysSinceEpoch * 24 * 60 * 60) + (currTime.hour * 60 * 60) + (currTime.minute * 60) + currTime.second) * 1000 + int(currTime.microsecond / 1000)) # See https://github.com/tootsuite/mastodon/blob/ # 995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb # use the leftover microseconds as the sequence number sequenceId = currTime.microsecond % 1000 # shift by 16bits "sequence data" statusNumber = str((int(statusNumber) << 16) + sequenceId) published = currTime.strftime("%Y-%m-%dT%H:%M:%SZ") return statusNumber, published def evilIncarnate() -> []: return ('gab.com', 'gabfed.com', 'spinster.xyz', 'kiwifarms.cc', 'djitter.com') def isEvil(domain: str) -> bool: if not isinstance(domain, str): print('WARN: Malformed domain ' + str(domain)) return True # https://www.youtube.com/watch?v=5qw1hcevmdU evilDomains = evilIncarnate() for concentratedEvil in evilDomains: if domain.endswith(concentratedEvil): return True return False def containsInvalidChars(jsonStr: str) -> bool: """Does the given json string contain invalid characters? e.g. dubious clacks/admin dogwhistles """ invalidStrings = { '卐', '卍', '࿕', '࿖', '࿗', '࿘' } for isInvalid in invalidStrings: if isInvalid in jsonStr: return True return False def createPersonDir(nickname: str, domain: str, baseDir: str, dirname: str) -> str: """Create a directory for a person """ handle = nickname + '@' + domain if not os.path.isdir(baseDir + '/accounts/' + handle): os.mkdir(baseDir + '/accounts/' + handle) boxDir = baseDir + '/accounts/' + handle + '/' + dirname if not os.path.isdir(boxDir): os.mkdir(boxDir) return boxDir def createOutboxDir(nickname: str, domain: str, baseDir: str) -> str: """Create an outbox for a person """ return createPersonDir(nickname, domain, baseDir, 'outbox') def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str: """Create an inbox queue and returns the feed filename and directory """ return createPersonDir(nickname, domain, baseDir, 'queue') def domainPermitted(domain: str, federationList: []): if len(federationList) == 0: return True if ':' in domain: domain = domain.split(':')[0] if domain in federationList: return True return False def urlPermitted(url: str, federationList: []): if isEvil(url): return False if not federationList: return True for domain in federationList: if domain in url: return True return False def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str: """Returns the display name for the given actor """ if '/statuses/' in actor: actor = actor.split('/statuses/')[0] if not personCache.get(actor): return None if personCache[actor].get('actor'): if personCache[actor]['actor'].get('name'): return personCache[actor]['actor']['name'] else: # Try to obtain from the cached actors cachedActorFilename = \ baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json' if os.path.isfile(cachedActorFilename): actorJson = loadJson(cachedActorFilename, 1) if actorJson: if actorJson.get('name'): return(actorJson['name']) return None def getNicknameFromActor(actor: str) -> str: """Returns the nickname from an actor url """ if actor.startswith('@'): actor = actor[1:] if '/users/' not in actor: if '/profile/' in actor: nickStr = actor.split('/profile/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] elif '/channel/' in actor: nickStr = actor.split('/channel/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] elif '/accounts/' in actor: nickStr = actor.split('/accounts/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] elif '/@' in actor: # https://domain/@nick nickStr = actor.split('/@')[1] if '/' in nickStr: nickStr = nickStr.split('/')[0] return nickStr elif '@' in actor: nickStr = actor.split('@')[0] return nickStr return None nickStr = actor.split('/users/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] def getDomainFromActor(actor: str) -> (str, int): """Returns the domain name from an actor url """ if actor.startswith('@'): actor = actor[1:] port = None prefixes = getProtocolPrefixes() if '/profile/' in actor: domain = actor.split('/profile/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/accounts/' in actor: domain = actor.split('/accounts/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/channel/' in actor: domain = actor.split('/channel/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/users/' in actor: domain = actor.split('/users/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/@' in actor: domain = actor.split('/@')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '@' in actor: domain = actor.split('@')[1].strip() else: domain = actor for prefix in prefixes: domain = domain.replace(prefix, '') if '/' in actor: domain = domain.split('/')[0] if ':' in domain: portStr = domain.split(':')[1] if not portStr.isdigit(): return None, None port = int(portStr) domain = domain.split(':')[0] return domain, port def setDefaultPetName(baseDir: str, nickname: str, domain: str, followNickname: str, followDomain: str) -> None: """Sets a default petname This helps especially when using onion or i2p address """ if ':' in domain: domain = domain.split(':')[0] userPath = baseDir + '/accounts/' + nickname + '@' + domain petnamesFilename = userPath + '/petnames.txt' petnameLookupEntry = followNickname + ' ' + \ followNickname + '@' + followDomain + '\n' if not os.path.isfile(petnamesFilename): # if there is no existing petnames lookup file with open(petnamesFilename, 'w+') as petnamesFile: petnamesFile.write(petnameLookupEntry) return with open(petnamesFilename, 'r') as petnamesFile: petnamesStr = petnamesFile.read() if petnamesStr: petnamesList = petnamesStr.split('\n') for pet in petnamesList: if pet.startswith(followNickname + ' '): # petname already exists return # petname doesn't already exist with open(petnamesFilename, 'a+') as petnamesFile: petnamesFile.write(petnameLookupEntry) def followPerson(baseDir: str, nickname: str, domain: str, followNickname: str, followDomain: str, federationList: [], debug: bool, followFile='following.txt') -> bool: """Adds a person to the follow list """ if not domainPermitted(followDomain.lower().replace('\n', ''), federationList): if debug: print('DEBUG: follow of domain ' + followDomain + ' not permitted') return False if debug: print('DEBUG: follow of domain ' + followDomain) if ':' in domain: handle = nickname + '@' + domain.split(':')[0] else: handle = nickname + '@' + domain if not os.path.isdir(baseDir + '/accounts/' + handle): print('WARN: account for ' + handle + ' does not exist') return False if ':' in followDomain: handleToFollow = followNickname + '@' + followDomain.split(':')[0] else: handleToFollow = followNickname + '@' + followDomain # was this person previously unfollowed? unfollowedFilename = baseDir + '/accounts/' + handle + '/unfollowed.txt' if os.path.isfile(unfollowedFilename): if handleToFollow in open(unfollowedFilename).read(): # remove them from the unfollowed file newLines = '' with open(unfollowedFilename, "r") as f: lines = f.readlines() for line in lines: if handleToFollow not in line: newLines += line with open(unfollowedFilename, 'w+') as f: f.write(newLines) if not os.path.isdir(baseDir + '/accounts'): os.mkdir(baseDir + '/accounts') handleToFollow = followNickname + '@' + followDomain filename = baseDir + '/accounts/' + handle + '/' + followFile if os.path.isfile(filename): if handleToFollow in open(filename).read(): if debug: print('DEBUG: follow already exists') return True # prepend to follow file try: with open(filename, 'r+') as f: content = f.read() f.seek(0, 0) f.write(handleToFollow + '\n' + content) print('DEBUG: follow added') except Exception as e: print('WARN: Failed to write entry to follow file ' + filename + ' ' + str(e)) else: # first follow if debug: print('DEBUG: ' + handle + ' creating new following file to follow ' + handleToFollow + ', filename is ' + filename) with open(filename, 'w+') as f: f.write(handleToFollow + '\n') if followFile.endswith('following.txt'): # Default to adding new follows to the calendar. # Possibly this could be made optional # if following a person add them to the list of # calendar follows print('DEBUG: adding ' + followNickname + '@' + followDomain + ' to calendar of ' + nickname + '@' + domain) addPersonToCalendar(baseDir, nickname, domain, followNickname, followDomain) # add a default petname setDefaultPetName(baseDir, nickname, domain, followNickname, followDomain) return True def votesOnNewswireItem(status: []) -> int: """Returns the number of votes on a newswire item """ totalVotes = 0 for line in status: if 'vote:' in line: totalVotes += 1 return totalVotes def locateNewsVotes(baseDir: str, domain: str, postUrl: str) -> str: """Returns the votes filename for a news post within the news user account """ postUrl = \ postUrl.strip().replace('\n', '').replace('\r', '') # if this post in the shared inbox? postUrl = removeIdEnding(postUrl.strip()).replace('/', '#') if postUrl.endswith('.json'): postUrl = postUrl + '.votes' else: postUrl = postUrl + '.json.votes' accountDir = baseDir + '/accounts/news@' + domain + '/' postFilename = accountDir + 'outbox/' + postUrl if os.path.isfile(postFilename): return postFilename return None def locateNewsArrival(baseDir: str, domain: str, postUrl: str) -> str: """Returns the arrival time for a news post within the news user account """ postUrl = \ postUrl.strip().replace('\n', '').replace('\r', '') # if this post in the shared inbox? postUrl = removeIdEnding(postUrl.strip()).replace('/', '#') if postUrl.endswith('.json'): postUrl = postUrl + '.arrived' else: postUrl = postUrl + '.json.arrived' accountDir = baseDir + '/accounts/news@' + domain + '/' postFilename = accountDir + 'outbox/' + postUrl if os.path.isfile(postFilename): with open(postFilename, 'r') as arrivalFile: arrival = arrivalFile.read() if arrival: arrivalDate = \ datetime.datetime.strptime(arrival, "%Y-%m-%dT%H:%M:%SZ") return arrivalDate return None def clearFromPostCaches(baseDir: str, recentPostsCache: {}, postId: str) -> None: """Clears cached html for the given post, so that edits to news will appear """ filename = '/postcache/' + postId + '.html' for subdir, dirs, files in os.walk(baseDir + '/accounts'): for acct in dirs: if '@' not in acct: continue if 'inbox@' in acct: continue cacheDir = os.path.join(baseDir + '/accounts', acct) postFilename = cacheDir + filename if os.path.isfile(postFilename): try: os.remove(postFilename) except BaseException: print('WARN: clearFromPostCaches file not removed ' + postFilename) pass # if the post is in the recent posts cache then remove it if recentPostsCache.get('index'): if postId in recentPostsCache['index']: recentPostsCache['index'].remove(postId) if recentPostsCache.get('json'): if recentPostsCache['json'].get(postId): del recentPostsCache['json'][postId] if recentPostsCache.get('html'): if recentPostsCache['html'].get(postId): del recentPostsCache['html'][postId] break def locatePost(baseDir: str, nickname: str, domain: str, postUrl: str, replies=False) -> str: """Returns the filename for the given status post url """ if not replies: extension = 'json' else: extension = 'replies' # if this post in the shared inbox? postUrl = removeIdEnding(postUrl.strip()).replace('/', '#') # add the extension postUrl = postUrl + '.' + extension # search boxes boxes = ('inbox', 'outbox', 'tlblogs', 'tlevents') accountDir = baseDir + '/accounts/' + nickname + '@' + domain + '/' for boxName in boxes: postFilename = accountDir + boxName + '/' + postUrl if os.path.isfile(postFilename): return postFilename # check news posts accountDir = baseDir + '/accounts/news' + '@' + domain + '/' postFilename = accountDir + 'outbox/' + postUrl if os.path.isfile(postFilename): return postFilename # is it in the announce cache? postFilename = baseDir + '/cache/announce/' + nickname + '/' + postUrl if os.path.isfile(postFilename): return postFilename # print('WARN: unable to locate ' + nickname + ' ' + postUrl) return None def removeAttachment(baseDir: str, httpPrefix: str, domain: str, postJson: {}): if not postJson.get('attachment'): return if not postJson['attachment'][0].get('url'): return # if port: # if port != 80 and port != 443: # if ':' not in domain: # domain = domain + ':' + str(port) attachmentUrl = postJson['attachment'][0]['url'] if not attachmentUrl: return mediaFilename = baseDir + '/' + \ attachmentUrl.replace(httpPrefix + '://' + domain + '/', '') if os.path.isfile(mediaFilename): os.remove(mediaFilename) etagFilename = mediaFilename + '.etag' if os.path.isfile(etagFilename): os.remove(etagFilename) postJson['attachment'] = [] def removeModerationPostFromIndex(baseDir: str, postUrl: str, debug: bool) -> None: """Removes a url from the moderation index """ moderationIndexFile = baseDir + '/accounts/moderation.txt' if not os.path.isfile(moderationIndexFile): return postId = removeIdEnding(postUrl) if postId in open(moderationIndexFile).read(): with open(moderationIndexFile, "r") as f: lines = f.readlines() with open(moderationIndexFile, "w+") as f: for line in lines: if line.strip("\n").strip("\r") != postId: f.write(line) else: if debug: print('DEBUG: removed ' + postId + ' from moderation index') def isReplyToBlogPost(baseDir: str, nickname: str, domain: str, postJsonObject: str): """Is the given post a reply to a blog post? """ if not postJsonObject.get('object'): return False if not isinstance(postJsonObject['object'], dict): return False if not postJsonObject['object'].get('inReplyTo'): return False if not isinstance(postJsonObject['object']['inReplyTo'], str): return False blogsIndexFilename = baseDir + '/accounts/' + \ nickname + '@' + domain + '/tlblogs.index' if not os.path.isfile(blogsIndexFilename): return False postId = removeIdEnding(postJsonObject['object']['inReplyTo']) postId = postId.replace('/', '#') if postId in open(blogsIndexFilename).read(): return True return False def deletePost(baseDir: str, httpPrefix: str, nickname: str, domain: str, postFilename: str, debug: bool, recentPostsCache: {}) -> None: """Recursively deletes a post and its replies and attachments """ postJsonObject = loadJson(postFilename, 1) if postJsonObject: # don't allow deletion of bookmarked posts bookmarksIndexFilename = \ baseDir + '/accounts/' + nickname + '@' + domain + \ '/bookmarks.index' if os.path.isfile(bookmarksIndexFilename): bookmarkIndex = postFilename.split('/')[-1] + '\n' if bookmarkIndex in open(bookmarksIndexFilename).read(): return # don't remove replies to blog posts if isReplyToBlogPost(baseDir, nickname, domain, postJsonObject): return # remove from recent posts cache in memory if recentPostsCache: postId = \ removeIdEnding(postJsonObject['id']).replace('/', '#') if recentPostsCache.get('index'): if postId in recentPostsCache['index']: recentPostsCache['index'].remove(postId) if recentPostsCache.get('json'): if recentPostsCache['json'].get(postId): del recentPostsCache['json'][postId] if recentPostsCache.get('html'): if recentPostsCache['html'].get(postId): del recentPostsCache['html'][postId] # remove any attachment removeAttachment(baseDir, httpPrefix, domain, postJsonObject) extensions = ('votes', 'arrived', 'muted') for ext in extensions: extFilename = postFilename + '.' + ext if os.path.isfile(extFilename): os.remove(extFilename) # remove cached html version of the post cachedPostFilename = \ getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) # removePostFromCache(postJsonObject,recentPostsCache) hasObject = False if postJsonObject.get('object'): hasObject = True # remove from moderation index file if hasObject: if isinstance(postJsonObject['object'], dict): if postJsonObject['object'].get('moderationStatus'): if postJsonObject.get('id'): postId = removeIdEnding(postJsonObject['id']) removeModerationPostFromIndex(baseDir, postId, debug) # remove any hashtags index entries removeHashtagIndex = False if hasObject: if hasObject and isinstance(postJsonObject['object'], dict): if postJsonObject['object'].get('content'): if '#' in postJsonObject['object']['content']: removeHashtagIndex = True if removeHashtagIndex: if postJsonObject['object'].get('id') and \ postJsonObject['object'].get('tag'): # get the id of the post postId = removeIdEnding(postJsonObject['object']['id']) for tag in postJsonObject['object']['tag']: if tag['type'] != 'Hashtag': continue if not tag.get('name'): continue # find the index file for this tag tagIndexFilename = \ baseDir + '/tags/' + tag['name'][1:] + '.txt' if not os.path.isfile(tagIndexFilename): continue # remove postId from the tag index file lines = None with open(tagIndexFilename, "r") as f: lines = f.readlines() if lines: newlines = '' for fileLine in lines: if postId in fileLine: continue newlines += fileLine if not newlines.strip(): # if there are no lines then remove the # hashtag file os.remove(tagIndexFilename) else: with open(tagIndexFilename, "w+") as f: f.write(newlines) # remove any replies repliesFilename = postFilename.replace('.json', '.replies') if os.path.isfile(repliesFilename): if debug: print('DEBUG: removing replies to ' + postFilename) with open(repliesFilename, 'r') as f: for replyId in f: replyFile = locatePost(baseDir, nickname, domain, replyId) if replyFile: if os.path.isfile(replyFile): deletePost(baseDir, httpPrefix, nickname, domain, replyFile, debug, recentPostsCache) # remove the replies file os.remove(repliesFilename) # finally, remove the post itself os.remove(postFilename) def validNickname(domain: str, nickname: str) -> bool: forbiddenChars = ('.', ' ', '/', '?', ':', ';', '@', '#') for c in forbiddenChars: if c in nickname: return False # this should only apply for the shared inbox if nickname == domain: return False reservedNames = ('inbox', 'dm', 'outbox', 'following', 'public', 'followers', 'category', 'channel', 'calendar', 'tlreplies', 'tlmedia', 'tlblogs', 'tlevents', 'tlblogs', 'tlfeatures', 'moderation', 'moderationaction', 'activity', 'undo', 'reply', 'replies', 'question', 'like', 'likes', 'users', 'statuses', 'tags', 'accounts', 'channels', 'profile', 'updates', 'repeat', 'announce', 'shares', 'fonts', 'icons', 'avatars') if nickname in reservedNames: return False return True def noOfAccounts(baseDir: str) -> bool: """Returns the number of accounts on the system """ accountCtr = 0 for subdir, dirs, files in os.walk(baseDir + '/accounts'): for account in dirs: if '@' in account: if not account.startswith('inbox@'): accountCtr += 1 break return accountCtr def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool: """Returns the number of accounts on the system this month """ accountCtr = 0 currTime = int(time.time()) monthSeconds = int(60*60*24*30*months) for subdir, dirs, files in os.walk(baseDir + '/accounts'): for account in dirs: if '@' in account: if not account.startswith('inbox@'): lastUsedFilename = \ baseDir + '/accounts/' + account + '/.lastUsed' if os.path.isfile(lastUsedFilename): with open(lastUsedFilename, 'r') as lastUsedFile: lastUsed = lastUsedFile.read() if lastUsed.isdigit(): timeDiff = (currTime - int(lastUsed)) if timeDiff < monthSeconds: accountCtr += 1 break return accountCtr def isPublicPostFromUrl(baseDir: str, nickname: str, domain: str, postUrl: str) -> bool: """Returns whether the given url is a public post """ postFilename = locatePost(baseDir, nickname, domain, postUrl) if not postFilename: return False postJsonObject = loadJson(postFilename, 1) if not postJsonObject: return False return isPublicPost(postJsonObject) def isPublicPost(postJsonObject: {}) -> bool: """Returns true if the given post is public """ if not postJsonObject.get('type'): return False if postJsonObject['type'] != 'Create': return False if not postJsonObject.get('object'): return False if not isinstance(postJsonObject['object'], dict): return False if not postJsonObject['object'].get('to'): return False for recipient in postJsonObject['object']['to']: if recipient.endswith('#Public'): return True return False def copytree(src: str, dst: str, symlinks=False, ignore=None): """Copy a directory """ for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): shutil.copytree(s, d, symlinks, ignore) else: shutil.copy2(s, d) def getCachedPostDirectory(baseDir: str, nickname: str, domain: str) -> str: """Returns the directory where the html post cache exists """ htmlPostCacheDir = baseDir + '/accounts/' + \ nickname + '@' + domain + '/postcache' return htmlPostCacheDir def getCachedPostFilename(baseDir: str, nickname: str, domain: str, postJsonObject: {}) -> str: """Returns the html cache filename for the given post """ cachedPostDir = getCachedPostDirectory(baseDir, nickname, domain) if not os.path.isdir(cachedPostDir): # print('ERROR: invalid html cache directory '+cachedPostDir) return None if '@' not in cachedPostDir: # print('ERROR: invalid html cache directory '+cachedPostDir) return None cachedPostId = removeIdEnding(postJsonObject['id']) cachedPostFilename = cachedPostDir + '/' + cachedPostId.replace('/', '#') return cachedPostFilename + '.html' def removePostFromCache(postJsonObject: {}, recentPostsCache: {}): """ if the post exists in the recent posts cache then remove it """ if not postJsonObject.get('id'): return if not recentPostsCache.get('index'): return postId = postJsonObject['id'] if '#' in postId: postId = postId.split('#', 1)[0] postId = removeIdEnding(postId).replace('/', '#') if postId not in recentPostsCache['index']: return if recentPostsCache['json'].get(postId): del recentPostsCache['json'][postId] if recentPostsCache['html'].get(postId): del recentPostsCache['html'][postId] recentPostsCache['index'].remove(postId) def updateRecentPostsCache(recentPostsCache: {}, maxRecentPosts: int, postJsonObject: {}, htmlStr: str) -> None: """Store recent posts in memory so that they can be quickly recalled """ if not postJsonObject.get('id'): return postId = postJsonObject['id'] if '#' in postId: postId = postId.split('#', 1)[0] postId = removeIdEnding(postId).replace('/', '#') if recentPostsCache.get('index'): if postId in recentPostsCache['index']: return recentPostsCache['index'].append(postId) postJsonObject['muted'] = False recentPostsCache['json'][postId] = json.dumps(postJsonObject) recentPostsCache['html'][postId] = htmlStr while len(recentPostsCache['html'].items()) > maxRecentPosts: postId = recentPostsCache['index'][0] recentPostsCache['index'].pop(0) del recentPostsCache['json'][postId] del recentPostsCache['html'][postId] else: recentPostsCache['index'] = [postId] recentPostsCache['json'] = {} recentPostsCache['html'] = {} recentPostsCache['json'][postId] = json.dumps(postJsonObject) recentPostsCache['html'][postId] = htmlStr def fileLastModified(filename: str) -> str: """Returns the date when a file was last modified """ t = os.path.getmtime(filename) modifiedTime = datetime.datetime.fromtimestamp(t) return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ") def getCSS(baseDir: str, cssFilename: str, cssCache: {}) -> str: """Retrieves the css for a given file, or from a cache """ # does the css file exist? if not os.path.isfile(cssFilename): return None lastModified = fileLastModified(cssFilename) # has this already been loaded into the cache? if cssCache.get(cssFilename): if cssCache[cssFilename][0] == lastModified: # file hasn't changed, so return the version in the cache return cssCache[cssFilename][1] with open(cssFilename, 'r') as fpCSS: css = fpCSS.read() if cssCache.get(cssFilename): # alter the cache contents cssCache[cssFilename][0] = lastModified cssCache[cssFilename][1] = css else: # add entry to the cache cssCache[cssFilename] = [lastModified, css] return css return None def daysInMonth(year: int, monthNumber: int) -> int: """Returns the number of days in the month """ if monthNumber < 1 or monthNumber > 12: return None daysRange = monthrange(year, monthNumber) return daysRange[1] def mergeDicts(dict1: {}, dict2: {}) -> {}: """Merges two dictionaries """ res = {**dict1, **dict2} return res def isEventPost(messageJson: {}) -> bool: """Is the given post a mobilizon-type event activity? See https://framagit.org/framasoft/mobilizon/-/blob/ master/lib/federation/activity_stream/converter/event.ex """ if not messageJson.get('id'): return False if not messageJson.get('actor'): return False if not messageJson.get('object'): return False if not isinstance(messageJson['object'], dict): return False if not messageJson['object'].get('type'): return False if messageJson['object']['type'] != 'Event': return False print('Event arriving') if not messageJson['object'].get('startTime'): print('No event start time') return False if not messageJson['object'].get('actor'): print('No event actor') return False if not messageJson['object'].get('content'): print('No event content') return False if not messageJson['object'].get('name'): print('No event name') return False if not messageJson['object'].get('uuid'): print('No event UUID') return False print('Event detected') return True def isBlogPost(postJsonObject: {}) -> bool: """Is the given post a blog post? """ if postJsonObject['type'] != 'Create': return False if not postJsonObject.get('object'): return False if not isinstance(postJsonObject['object'], dict): return False if not postJsonObject['object'].get('type'): return False if not postJsonObject['object'].get('content'): return False if postJsonObject['object']['type'] != 'Article': return False return True def isNewsPost(postJsonObject: {}) -> bool: """Is the given post a blog post? """ return postJsonObject.get('news') def searchBoxPosts(baseDir: str, nickname: str, domain: str, searchStr: str, maxResults: int, boxName='outbox') -> []: """Search your posts and return a list of the filenames containing matching strings """ path = baseDir + '/accounts/' + nickname + '@' + domain + '/' + boxName if not os.path.isdir(path): return [] searchStr = searchStr.lower().strip() if '+' in searchStr: searchWords = searchStr.split('+') for index in range(len(searchWords)): searchWords[index] = searchWords[index].strip() print('SEARCH: ' + str(searchWords)) else: searchWords = [searchStr] res = [] for root, dirs, fnames in os.walk(path): for fname in fnames: filePath = os.path.join(root, fname) with open(filePath, 'r') as postFile: data = postFile.read().lower() notFound = False for keyword in searchWords: if keyword not in data: notFound = True break if notFound: continue res.append(filePath) if len(res) >= maxResults: return res break return res def getFileCaseInsensitive(path: str) -> str: """Returns a case specific filename given a case insensitive version of it """ if os.path.isfile(path): return path if path != path.lower(): if os.path.isfile(path.lower()): return path.lower() # directory, filename = os.path.split(path) # directory, filename = (directory or '.'), filename.lower() # for f in os.listdir(directory): # if f.lower() == filename: # newpath = os.path.join(directory, f) # if os.path.isfile(newpath): # return newpath return None def undoLikesCollectionEntry(recentPostsCache: {}, baseDir: str, postFilename: str, objectUrl: str, actor: str, domain: str, debug: bool) -> None: """Undoes a like for a particular actor """ postJsonObject = loadJson(postFilename) if postJsonObject: # remove any cached version of this post so that the # like icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('type'): return if postJsonObject['type'] != 'Create': return if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post '+objectUrl+' has no object') return if not isinstance(postJsonObject['object'], dict): return if not postJsonObject['object'].get('likes'): return if not isinstance(postJsonObject['object']['likes'], dict): return if not postJsonObject['object']['likes'].get('items'): return totalItems = 0 if postJsonObject['object']['likes'].get('totalItems'): totalItems = postJsonObject['object']['likes']['totalItems'] itemFound = False for likeItem in postJsonObject['object']['likes']['items']: if likeItem.get('actor'): if likeItem['actor'] == actor: if debug: print('DEBUG: like was removed for ' + actor) postJsonObject['object']['likes']['items'].remove(likeItem) itemFound = True break if itemFound: if totalItems == 1: if debug: print('DEBUG: likes was removed from post') del postJsonObject['object']['likes'] else: itlen = len(postJsonObject['object']['likes']['items']) postJsonObject['object']['likes']['totalItems'] = itlen saveJson(postJsonObject, postFilename) def updateLikesCollection(recentPostsCache: {}, baseDir: str, postFilename: str, objectUrl: str, actor: str, domain: str, debug: bool) -> None: """Updates the likes collection within a post """ postJsonObject = loadJson(postFilename) if not postJsonObject: return # remove any cached version of this post so that the # like icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post ' + objectUrl + ' has no object') return if not isinstance(postJsonObject['object'], dict): return if not objectUrl.endswith('/likes'): objectUrl = objectUrl + '/likes' if not postJsonObject['object'].get('likes'): if debug: print('DEBUG: Adding initial like to ' + objectUrl) likesJson = { "@context": "https://www.w3.org/ns/activitystreams", 'id': objectUrl, 'type': 'Collection', "totalItems": 1, 'items': [{ 'type': 'Like', 'actor': actor }] } postJsonObject['object']['likes'] = likesJson else: if not postJsonObject['object']['likes'].get('items'): postJsonObject['object']['likes']['items'] = [] for likeItem in postJsonObject['object']['likes']['items']: if likeItem.get('actor'): if likeItem['actor'] == actor: return newLike = { 'type': 'Like', 'actor': actor } postJsonObject['object']['likes']['items'].append(newLike) itlen = len(postJsonObject['object']['likes']['items']) postJsonObject['object']['likes']['totalItems'] = itlen if debug: print('DEBUG: saving post with likes added') pprint(postJsonObject) saveJson(postJsonObject, postFilename) def undoAnnounceCollectionEntry(recentPostsCache: {}, baseDir: str, postFilename: str, actor: str, domain: str, debug: bool) -> None: """Undoes an announce for a particular actor by removing it from the "shares" collection within a post. Note that the "shares" collection has no relation to shared items in shares.py. It's shares of posts, not shares of physical objects. """ postJsonObject = loadJson(postFilename) if postJsonObject: # remove any cached version of this announce so that the announce # icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('type'): return if postJsonObject['type'] != 'Create': return if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post has no object') return if not isinstance(postJsonObject['object'], dict): return if not postJsonObject['object'].get('shares'): return if not postJsonObject['object']['shares'].get('items'): return totalItems = 0 if postJsonObject['object']['shares'].get('totalItems'): totalItems = postJsonObject['object']['shares']['totalItems'] itemFound = False for announceItem in postJsonObject['object']['shares']['items']: if announceItem.get('actor'): if announceItem['actor'] == actor: if debug: print('DEBUG: Announce was removed for ' + actor) anIt = announceItem postJsonObject['object']['shares']['items'].remove(anIt) itemFound = True break if itemFound: if totalItems == 1: if debug: print('DEBUG: shares (announcements) ' + 'was removed from post') del postJsonObject['object']['shares'] else: itlen = len(postJsonObject['object']['shares']['items']) postJsonObject['object']['shares']['totalItems'] = itlen saveJson(postJsonObject, postFilename) def updateAnnounceCollection(recentPostsCache: {}, baseDir: str, postFilename: str, actor: str, domain: str, debug: bool) -> None: """Updates the announcements collection within a post Confusingly this is known as "shares", but isn't the same as shared items within shares.py It's shares of posts, not shares of physical objects. """ postJsonObject = loadJson(postFilename) if postJsonObject: # remove any cached version of this announce so that the announce # icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post ' + postFilename + ' has no object') return if not isinstance(postJsonObject['object'], dict): return postUrl = removeIdEnding(postJsonObject['id']) + '/shares' if not postJsonObject['object'].get('shares'): if debug: print('DEBUG: Adding initial shares (announcements) to ' + postUrl) announcementsJson = { "@context": "https://www.w3.org/ns/activitystreams", 'id': postUrl, 'type': 'Collection', "totalItems": 1, 'items': [{ 'type': 'Announce', 'actor': actor }] } postJsonObject['object']['shares'] = announcementsJson else: if postJsonObject['object']['shares'].get('items'): sharesItems = postJsonObject['object']['shares']['items'] for announceItem in sharesItems: if announceItem.get('actor'): if announceItem['actor'] == actor: return newAnnounce = { 'type': 'Announce', 'actor': actor } postJsonObject['object']['shares']['items'].append(newAnnounce) itlen = len(postJsonObject['object']['shares']['items']) postJsonObject['object']['shares']['totalItems'] = itlen else: if debug: print('DEBUG: shares (announcements) section of post ' + 'has no items list') if debug: print('DEBUG: saving post with shares (announcements) added') pprint(postJsonObject) saveJson(postJsonObject, postFilename) def siteIsActive(url: str) -> bool: """Returns true if the current url is resolvable. This can be used to check that an instance is online before trying to send posts to it. """ if not url.startswith('http'): return False try: req = urllib.request.Request(url) urllib.request.urlopen(req, timeout=10) # nosec return True except SocketError as e: if e.errno == errno.ECONNRESET: print('WARN: connection was reset during siteIsActive') return False def weekDayOfMonthStart(monthNumber: int, year: int) -> int: """Gets the day number of the first day of the month 1=sun, 7=sat """ firstDayOfMonth = datetime.datetime(year, monthNumber, 1, 0, 0) return int(firstDayOfMonth.strftime("%w")) + 1 def mediaFileMimeType(filename: str) -> str: """Given a media filename return its mime type """ if '.' not in filename: return 'image/png' extensions = { 'json': 'application/json', 'png': 'image/png', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'gif': 'image/gif', 'webp': 'image/webp', 'avif': 'image/avif', 'mp3': 'audio/mpeg', 'ogg': 'audio/ogg', 'mp4': 'video/mp4', 'ogv': 'video/ogv' } fileExt = filename.split('.')[-1] if not extensions.get(fileExt): return 'image/png' return extensions[fileExt]