__filename__ = "utils.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.1.0" __maintainer__ = "Bob Mottram" __email__ = "bob@freedombone.net" __status__ = "Production" import os import time import shutil import datetime import json from socket import error as SocketError import errno import urllib.request from pprint import pprint from calendar import monthrange from followingCalendar import addPersonToCalendar def isSystemAccount(nickname: str) -> bool: """Returns true if the given nickname is a system account """ if nickname == 'news' or nickname == 'inbox': return True return False def createConfig(baseDir: str) -> None: """Creates a configuration file """ configFilename = baseDir + '/config.json' if os.path.isfile(configFilename): return configJson = { } saveJson(configJson, configFilename) def setConfigParam(baseDir: str, variableName: str, variableValue) -> None: """Sets a configuration value """ createConfig(baseDir) configFilename = baseDir + '/config.json' configJson = {} if os.path.isfile(configFilename): configJson = loadJson(configFilename) configJson[variableName] = variableValue saveJson(configJson, configFilename) def getConfigParam(baseDir: str, variableName: str): """Gets a configuration value """ createConfig(baseDir) configFilename = baseDir + '/config.json' configJson = loadJson(configFilename) if configJson: if variableName in configJson: return configJson[variableName] return None def isSuspended(baseDir: str, nickname: str) -> bool: """Returns true if the given nickname is suspended """ adminNickname = getConfigParam(baseDir, 'admin') if not adminNickname: return False if nickname == adminNickname: return False suspendedFilename = baseDir + '/accounts/suspended.txt' if os.path.isfile(suspendedFilename): with open(suspendedFilename, "r") as f: lines = f.readlines() for suspended in lines: if suspended.strip('\n').strip('\r') == nickname: return True return False def getFollowersList(baseDir: str, nickname: str, domain: str, followFile='following.txt') -> []: """Returns a list of followers for the given account """ filename = \ baseDir + '/accounts/' + nickname + '@' + domain + '/' + followFile if not os.path.isfile(filename): return [] with open(filename, "r") as f: lines = f.readlines() for i in range(len(lines)): lines[i] = lines[i].strip() return lines return [] def getFollowersOfPerson(baseDir: str, nickname: str, domain: str, followFile='following.txt') -> []: """Returns a list containing the followers of the given person Used by the shared inbox to know who to send incoming mail to """ followers = [] if ':' in domain: domain = domain.split(':')[0] handle = nickname + '@' + domain if not os.path.isdir(baseDir + '/accounts/' + handle): return followers for subdir, dirs, files in os.walk(baseDir + '/accounts'): for account in dirs: filename = os.path.join(subdir, account) + '/' + followFile if account == handle or account.startswith('inbox@'): continue if not os.path.isfile(filename): continue with open(filename, 'r') as followingfile: for followingHandle in followingfile: followingHandle2 = followingHandle.replace('\n', '') followingHandle2 = followingHandle2.replace('\r', '') if followingHandle2 == handle: if account not in followers: followers.append(account) break return followers def removeIdEnding(idStr: str) -> str: """Removes endings such as /activity and /undo """ if idStr.endswith('/activity'): idStr = idStr[:-len('/activity')] elif idStr.endswith('/undo'): idStr = idStr[:-len('/undo')] elif idStr.endswith('/event'): idStr = idStr[:-len('/event')] elif idStr.endswith('/replies'): idStr = idStr[:-len('/replies')] return idStr def getProtocolPrefixes() -> []: """Returns a list of valid prefixes """ return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://', 'hyper://', 'gemini://', 'gopher://') def getLinkPrefixes() -> []: """Returns a list of valid web link prefixes """ return ('https://', 'http://', 'dat://', 'i2p://', 'gnunet://', 'hyper://', 'gemini://', 'gopher://', 'briar:') def removeAvatarFromCache(baseDir: str, actorStr: str) -> None: """Removes any existing avatar entries from the cache This avoids duplicate entries with differing extensions """ avatarFilenameExtensions = ('png', 'jpg', 'gif', 'webp', 'avif') for extension in avatarFilenameExtensions: avatarFilename = \ baseDir + '/cache/avatars/' + actorStr + '.' + extension if os.path.isfile(avatarFilename): os.remove(avatarFilename) def saveJson(jsonObject: {}, filename: str) -> bool: """Saves json to a file """ tries = 0 while tries < 5: try: with open(filename, 'w+') as fp: fp.write(json.dumps(jsonObject)) return True except BaseException: print('WARN: saveJson ' + str(tries)) time.sleep(1) tries += 1 return False def loadJson(filename: str, delaySec=2, maxTries=5) -> {}: """Makes a few attempts to load a json formatted file """ jsonObject = None tries = 0 while tries < maxTries: try: with open(filename, 'r') as fp: data = fp.read() jsonObject = json.loads(data) break except BaseException: print('WARN: loadJson exception') if delaySec > 0: time.sleep(delaySec) tries += 1 return jsonObject def loadJsonOnionify(filename: str, domain: str, onionDomain: str, delaySec=2) -> {}: """Makes a few attempts to load a json formatted file This also converts the domain name to the onion domain """ jsonObject = None tries = 0 while tries < 5: try: with open(filename, 'r') as fp: data = fp.read() if data: data = data.replace(domain, onionDomain) data = data.replace('https:', 'http:') print('*****data: ' + data) jsonObject = json.loads(data) break except BaseException: print('WARN: loadJson exception') if delaySec > 0: time.sleep(delaySec) tries += 1 return jsonObject def getStatusNumber(publishedStr=None) -> (str, str): """Returns the status number and published date """ if not publishedStr: currTime = datetime.datetime.utcnow() else: currTime = \ datetime.datetime.strptime(publishedStr, '%Y-%m-%dT%H:%M:%SZ') daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days # status is the number of seconds since epoch statusNumber = \ str(((daysSinceEpoch * 24 * 60 * 60) + (currTime.hour * 60 * 60) + (currTime.minute * 60) + currTime.second) * 1000 + int(currTime.microsecond / 1000)) # See https://github.com/tootsuite/mastodon/blob/ # 995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb # use the leftover microseconds as the sequence number sequenceId = currTime.microsecond % 1000 # shift by 16bits "sequence data" statusNumber = str((int(statusNumber) << 16) + sequenceId) published = currTime.strftime("%Y-%m-%dT%H:%M:%SZ") return statusNumber, published def evilIncarnate() -> []: return ('gab.com', 'gabfed.com', 'spinster.xyz', 'kiwifarms.cc', 'djitter.com') def isEvil(domain: str) -> bool: if not isinstance(domain, str): print('WARN: Malformed domain ' + str(domain)) return True # https://www.youtube.com/watch?v=5qw1hcevmdU evilDomains = evilIncarnate() for concentratedEvil in evilDomains: if domain.endswith(concentratedEvil): return True return False def createPersonDir(nickname: str, domain: str, baseDir: str, dirname: str) -> str: """Create a directory for a person """ handle = nickname + '@' + domain if not os.path.isdir(baseDir + '/accounts/' + handle): os.mkdir(baseDir + '/accounts/' + handle) boxDir = baseDir + '/accounts/' + handle + '/' + dirname if not os.path.isdir(boxDir): os.mkdir(boxDir) return boxDir def createOutboxDir(nickname: str, domain: str, baseDir: str) -> str: """Create an outbox for a person """ return createPersonDir(nickname, domain, baseDir, 'outbox') def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str: """Create an inbox queue and returns the feed filename and directory """ return createPersonDir(nickname, domain, baseDir, 'queue') def domainPermitted(domain: str, federationList: []): if len(federationList) == 0: return True if ':' in domain: domain = domain.split(':')[0] if domain in federationList: return True return False def urlPermitted(url: str, federationList: []): if isEvil(url): return False if not federationList: return True for domain in federationList: if domain in url: return True return False def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str: """Returns the display name for the given actor """ if '/statuses/' in actor: actor = actor.split('/statuses/')[0] if not personCache.get(actor): return None if personCache[actor].get('actor'): if personCache[actor]['actor'].get('name'): return personCache[actor]['actor']['name'] else: # Try to obtain from the cached actors cachedActorFilename = \ baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json' if os.path.isfile(cachedActorFilename): actorJson = loadJson(cachedActorFilename, 1) if actorJson: if actorJson.get('name'): return(actorJson['name']) return None def getNicknameFromActor(actor: str) -> str: """Returns the nickname from an actor url """ if actor.startswith('@'): actor = actor[1:] if '/users/' not in actor: if '/profile/' in actor: nickStr = actor.split('/profile/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] elif '/channel/' in actor: nickStr = actor.split('/channel/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] elif '/accounts/' in actor: nickStr = actor.split('/accounts/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] elif '/@' in actor: # https://domain/@nick nickStr = actor.split('/@')[1] if '/' in nickStr: nickStr = nickStr.split('/')[0] return nickStr elif '@' in actor: nickStr = actor.split('@')[0] return nickStr return None nickStr = actor.split('/users/')[1].replace('@', '') if '/' not in nickStr: return nickStr else: return nickStr.split('/')[0] def getDomainFromActor(actor: str) -> (str, int): """Returns the domain name from an actor url """ if actor.startswith('@'): actor = actor[1:] port = None prefixes = getProtocolPrefixes() if '/profile/' in actor: domain = actor.split('/profile/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/accounts/' in actor: domain = actor.split('/accounts/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/channel/' in actor: domain = actor.split('/channel/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/users/' in actor: domain = actor.split('/users/')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '/@' in actor: domain = actor.split('/@')[0] for prefix in prefixes: domain = domain.replace(prefix, '') elif '@' in actor: domain = actor.split('@')[1].strip() else: domain = actor for prefix in prefixes: domain = domain.replace(prefix, '') if '/' in actor: domain = domain.split('/')[0] if ':' in domain: portStr = domain.split(':')[1] if not portStr.isdigit(): return None, None port = int(portStr) domain = domain.split(':')[0] return domain, port def followPerson(baseDir: str, nickname: str, domain: str, followNickname: str, followDomain: str, federationList: [], debug: bool, followFile='following.txt') -> bool: """Adds a person to the follow list """ if not domainPermitted(followDomain.lower().replace('\n', ''), federationList): if debug: print('DEBUG: follow of domain ' + followDomain + ' not permitted') return False if debug: print('DEBUG: follow of domain ' + followDomain) if ':' in domain: handle = nickname + '@' + domain.split(':')[0] else: handle = nickname + '@' + domain if not os.path.isdir(baseDir + '/accounts/' + handle): print('WARN: account for ' + handle + ' does not exist') return False if ':' in followDomain: handleToFollow = followNickname + '@' + followDomain.split(':')[0] else: handleToFollow = followNickname + '@' + followDomain # was this person previously unfollowed? unfollowedFilename = baseDir + '/accounts/' + handle + '/unfollowed.txt' if os.path.isfile(unfollowedFilename): if handleToFollow in open(unfollowedFilename).read(): # remove them from the unfollowed file newLines = '' with open(unfollowedFilename, "r") as f: lines = f.readlines() for line in lines: if handleToFollow not in line: newLines += line with open(unfollowedFilename, 'w+') as f: f.write(newLines) if not os.path.isdir(baseDir + '/accounts'): os.mkdir(baseDir + '/accounts') handleToFollow = followNickname + '@' + followDomain filename = baseDir + '/accounts/' + handle + '/' + followFile if os.path.isfile(filename): if handleToFollow in open(filename).read(): if debug: print('DEBUG: follow already exists') return True # prepend to follow file try: with open(filename, 'r+') as f: content = f.read() f.seek(0, 0) f.write(handleToFollow + '\n' + content) print('DEBUG: follow added') except Exception as e: print('WARN: Failed to write entry to follow file ' + filename + ' ' + str(e)) else: # first follow if debug: print('DEBUG: ' + handle + ' creating new following file to follow ' + handleToFollow + ', filename is ' + filename) with open(filename, 'w+') as f: f.write(handleToFollow + '\n') # Default to adding new follows to the calendar. # Possibly this could be made optional if followFile.endswith('following.txt'): # if following a person add them to the list of # calendar follows print('DEBUG: adding ' + followNickname + '@' + followDomain + ' to calendar of ' + nickname + '@' + domain) addPersonToCalendar(baseDir, nickname, domain, followNickname, followDomain) return True def votesOnNewswireItem(status: []) -> int: """Returns the number of votes on a newswire item """ totalVotes = 0 for line in status: if 'vote:' in line: totalVotes += 1 return totalVotes def locateNewsVotes(baseDir: str, domain: str, postUrl: str) -> str: """Returns the votes filename for a news post within the news user account """ postUrl = \ postUrl.strip().replace('\n', '').replace('\r', '') # if this post in the shared inbox? postUrl = removeIdEnding(postUrl.strip()).replace('/', '#') if postUrl.endswith('.json'): postUrl = postUrl + '.votes' else: postUrl = postUrl + '.json.votes' accountDir = baseDir + '/accounts/news@' + domain + '/' postFilename = accountDir + 'outbox/' + postUrl if os.path.isfile(postFilename): return postFilename return None def locateNewsArrival(baseDir: str, domain: str, postUrl: str) -> str: """Returns the arrival time for a news post within the news user account """ postUrl = \ postUrl.strip().replace('\n', '').replace('\r', '') # if this post in the shared inbox? postUrl = removeIdEnding(postUrl.strip()).replace('/', '#') if postUrl.endswith('.json'): postUrl = postUrl + '.arrived' else: postUrl = postUrl + '.json.arrived' accountDir = baseDir + '/accounts/news@' + domain + '/' postFilename = accountDir + 'outbox/' + postUrl if os.path.isfile(postFilename): with open(postFilename, 'r') as arrivalFile: arrival = arrivalFile.read() if arrival: arrivalDate = \ datetime.datetime.strptime(arrival, "%Y-%m-%dT%H:%M:%SZ") return arrivalDate return None def locatePost(baseDir: str, nickname: str, domain: str, postUrl: str, replies=False) -> str: """Returns the filename for the given status post url """ if not replies: extension = 'json' else: extension = 'replies' # if this post in the shared inbox? postUrl = removeIdEnding(postUrl.strip()).replace('/', '#') # add the extension postUrl = postUrl + '.' + extension # search boxes boxes = ('inbox', 'outbox', 'tlblogs', 'tlevents') accountDir = baseDir + '/accounts/' + nickname + '@' + domain + '/' for boxName in boxes: postFilename = accountDir + boxName + '/' + postUrl if os.path.isfile(postFilename): return postFilename # check news posts accountDir = baseDir + '/accounts/news' + '@' + domain + '/' postFilename = accountDir + 'outbox/' + postUrl if os.path.isfile(postFilename): return postFilename # is it in the announce cache? postFilename = baseDir + '/cache/announce/' + nickname + '/' + postUrl if os.path.isfile(postFilename): return postFilename # print('WARN: unable to locate ' + nickname + ' ' + postUrl) return None def removeAttachment(baseDir: str, httpPrefix: str, domain: str, postJson: {}): if not postJson.get('attachment'): return if not postJson['attachment'][0].get('url'): return # if port: # if port != 80 and port != 443: # if ':' not in domain: # domain = domain + ':' + str(port) attachmentUrl = postJson['attachment'][0]['url'] if not attachmentUrl: return mediaFilename = baseDir + '/' + \ attachmentUrl.replace(httpPrefix + '://' + domain + '/', '') if os.path.isfile(mediaFilename): os.remove(mediaFilename) etagFilename = mediaFilename + '.etag' if os.path.isfile(etagFilename): os.remove(etagFilename) postJson['attachment'] = [] def removeModerationPostFromIndex(baseDir: str, postUrl: str, debug: bool) -> None: """Removes a url from the moderation index """ moderationIndexFile = baseDir + '/accounts/moderation.txt' if not os.path.isfile(moderationIndexFile): return postId = removeIdEnding(postUrl) if postId in open(moderationIndexFile).read(): with open(moderationIndexFile, "r") as f: lines = f.readlines() with open(moderationIndexFile, "w+") as f: for line in lines: if line.strip("\n").strip("\r") != postId: f.write(line) else: if debug: print('DEBUG: removed ' + postId + ' from moderation index') def isReplyToBlogPost(baseDir: str, nickname: str, domain: str, postJsonObject: str): """Is the given post a reply to a blog post? """ if not postJsonObject.get('object'): return False if not isinstance(postJsonObject['object'], dict): return False if not postJsonObject['object'].get('inReplyTo'): return False if not isinstance(postJsonObject['object']['inReplyTo'], str): return False blogsIndexFilename = baseDir + '/accounts/' + \ nickname + '@' + domain + '/tlblogs.index' if not os.path.isfile(blogsIndexFilename): return False postId = removeIdEnding(postJsonObject['object']['inReplyTo']) postId = postId.replace('/', '#') if postId in open(blogsIndexFilename).read(): return True return False def deletePost(baseDir: str, httpPrefix: str, nickname: str, domain: str, postFilename: str, debug: bool, recentPostsCache: {}) -> None: """Recursively deletes a post and its replies and attachments """ postJsonObject = loadJson(postFilename, 1) if postJsonObject: # don't allow deletion of bookmarked posts bookmarksIndexFilename = \ baseDir + '/accounts/' + nickname + '@' + domain + \ '/bookmarks.index' if os.path.isfile(bookmarksIndexFilename): bookmarkIndex = postFilename.split('/')[-1] + '\n' if bookmarkIndex in open(bookmarksIndexFilename).read(): return # don't remove replies to blog posts if isReplyToBlogPost(baseDir, nickname, domain, postJsonObject): return # remove from recent posts cache in memory if recentPostsCache: postId = \ removeIdEnding(postJsonObject['id']).replace('/', '#') if recentPostsCache.get('index'): if postId in recentPostsCache['index']: recentPostsCache['index'].remove(postId) if recentPostsCache['json'].get(postId): del recentPostsCache['json'][postId] # remove any attachment removeAttachment(baseDir, httpPrefix, domain, postJsonObject) # remove any mute file muteFilename = postFilename + '.muted' if os.path.isfile(muteFilename): os.remove(muteFilename) # remove any votes file votesFilename = postFilename + '.votes' if os.path.isfile(votesFilename): os.remove(votesFilename) # remove any arrived file arrivedFilename = postFilename + '.arrived' if os.path.isfile(arrivedFilename): os.remove(arrivedFilename) # remove cached html version of the post cachedPostFilename = \ getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) # removePostFromCache(postJsonObject,recentPostsCache) hasObject = False if postJsonObject.get('object'): hasObject = True # remove from moderation index file if hasObject: if isinstance(postJsonObject['object'], dict): if postJsonObject['object'].get('moderationStatus'): if postJsonObject.get('id'): postId = removeIdEnding(postJsonObject['id']) removeModerationPostFromIndex(baseDir, postId, debug) # remove any hashtags index entries removeHashtagIndex = False if hasObject: if hasObject and isinstance(postJsonObject['object'], dict): if postJsonObject['object'].get('content'): if '#' in postJsonObject['object']['content']: removeHashtagIndex = True if removeHashtagIndex: if postJsonObject['object'].get('id') and \ postJsonObject['object'].get('tag'): # get the id of the post postId = removeIdEnding(postJsonObject['object']['id']) for tag in postJsonObject['object']['tag']: if tag['type'] != 'Hashtag': continue if not tag.get('name'): continue # find the index file for this tag tagIndexFilename = \ baseDir + '/tags/' + tag['name'][1:] + '.txt' if not os.path.isfile(tagIndexFilename): continue # remove postId from the tag index file lines = None with open(tagIndexFilename, "r") as f: lines = f.readlines() if lines: newlines = '' for fileLine in lines: if postId in fileLine: continue newlines += fileLine if not newlines.strip(): # if there are no lines then remove the # hashtag file os.remove(tagIndexFilename) else: with open(tagIndexFilename, "w+") as f: f.write(newlines) # remove any replies repliesFilename = postFilename.replace('.json', '.replies') if os.path.isfile(repliesFilename): if debug: print('DEBUG: removing replies to ' + postFilename) with open(repliesFilename, 'r') as f: for replyId in f: replyFile = locatePost(baseDir, nickname, domain, replyId) if replyFile: if os.path.isfile(replyFile): deletePost(baseDir, httpPrefix, nickname, domain, replyFile, debug, recentPostsCache) # remove the replies file os.remove(repliesFilename) # finally, remove the post itself os.remove(postFilename) def validNickname(domain: str, nickname: str) -> bool: forbiddenChars = ('.', ' ', '/', '?', ':', ';', '@', '#') for c in forbiddenChars: if c in nickname: return False if nickname == domain: return False reservedNames = ('inbox', 'dm', 'outbox', 'following', 'public', 'followers', 'channel', 'calendar', 'tlreplies', 'tlmedia', 'tlblogs', 'tlevents', 'tlblogs', 'moderation', 'activity', 'undo', 'reply', 'replies', 'question', 'like', 'likes', 'users', 'statuses', 'accounts', 'channels', 'profile', 'updates', 'repeat', 'announce', 'shares', 'fonts', 'icons', 'avatars') if nickname in reservedNames: return False return True def noOfAccounts(baseDir: str) -> bool: """Returns the number of accounts on the system """ accountCtr = 0 for subdir, dirs, files in os.walk(baseDir + '/accounts'): for account in dirs: if '@' in account: if not account.startswith('inbox@'): accountCtr += 1 return accountCtr def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool: """Returns the number of accounts on the system this month """ accountCtr = 0 currTime = int(time.time()) monthSeconds = int(60*60*24*30*months) for subdir, dirs, files in os.walk(baseDir + '/accounts'): for account in dirs: if '@' in account: if not account.startswith('inbox@'): lastUsedFilename = \ baseDir + '/accounts/' + account + '/.lastUsed' if os.path.isfile(lastUsedFilename): with open(lastUsedFilename, 'r') as lastUsedFile: lastUsed = lastUsedFile.read() if lastUsed.isdigit(): timeDiff = (currTime - int(lastUsed)) if timeDiff < monthSeconds: accountCtr += 1 return accountCtr def isPublicPostFromUrl(baseDir: str, nickname: str, domain: str, postUrl: str) -> bool: """Returns whether the given url is a public post """ postFilename = locatePost(baseDir, nickname, domain, postUrl) if not postFilename: return False postJsonObject = loadJson(postFilename, 1) if not postJsonObject: return False return isPublicPost(postJsonObject) def isPublicPost(postJsonObject: {}) -> bool: """Returns true if the given post is public """ if not postJsonObject.get('type'): return False if postJsonObject['type'] != 'Create': return False if not postJsonObject.get('object'): return False if not isinstance(postJsonObject['object'], dict): return False if not postJsonObject['object'].get('to'): return False for recipient in postJsonObject['object']['to']: if recipient.endswith('#Public'): return True return False def copytree(src: str, dst: str, symlinks=False, ignore=None): """Copy a directory """ for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): shutil.copytree(s, d, symlinks, ignore) else: shutil.copy2(s, d) def getCachedPostDirectory(baseDir: str, nickname: str, domain: str) -> str: """Returns the directory where the html post cache exists """ htmlPostCacheDir = baseDir + '/accounts/' + \ nickname + '@' + domain + '/postcache' return htmlPostCacheDir def getCachedPostFilename(baseDir: str, nickname: str, domain: str, postJsonObject: {}) -> str: """Returns the html cache filename for the given post """ cachedPostDir = getCachedPostDirectory(baseDir, nickname, domain) if not os.path.isdir(cachedPostDir): # print('ERROR: invalid html cache directory '+cachedPostDir) return None if '@' not in cachedPostDir: # print('ERROR: invalid html cache directory '+cachedPostDir) return None cachedPostId = removeIdEnding(postJsonObject['id']) cachedPostFilename = cachedPostDir + '/' + cachedPostId.replace('/', '#') return cachedPostFilename + '.html' def removePostFromCache(postJsonObject: {}, recentPostsCache: {}): """ if the post exists in the recent posts cache then remove it """ if not postJsonObject.get('id'): return if not recentPostsCache.get('index'): return postId = postJsonObject['id'] if '#' in postId: postId = postId.split('#', 1)[0] postId = removeIdEnding(postId).replace('/', '#') if postId not in recentPostsCache['index']: return if recentPostsCache['json'].get(postId): del recentPostsCache['json'][postId] if recentPostsCache['html'].get(postId): del recentPostsCache['html'][postId] recentPostsCache['index'].remove(postId) def updateRecentPostsCache(recentPostsCache: {}, maxRecentPosts: int, postJsonObject: {}, htmlStr: str) -> None: """Store recent posts in memory so that they can be quickly recalled """ if not postJsonObject.get('id'): return postId = postJsonObject['id'] if '#' in postId: postId = postId.split('#', 1)[0] postId = removeIdEnding(postId).replace('/', '#') if recentPostsCache.get('index'): if postId in recentPostsCache['index']: return recentPostsCache['index'].append(postId) postJsonObject['muted'] = False recentPostsCache['json'][postId] = json.dumps(postJsonObject) recentPostsCache['html'][postId] = htmlStr while len(recentPostsCache['html'].items()) > maxRecentPosts: postId = recentPostsCache['index'][0] recentPostsCache['index'].pop(0) del recentPostsCache['json'][postId] del recentPostsCache['html'][postId] else: recentPostsCache['index'] = [postId] recentPostsCache['json'] = {} recentPostsCache['html'] = {} recentPostsCache['json'][postId] = json.dumps(postJsonObject) recentPostsCache['html'][postId] = htmlStr def fileLastModified(filename: str) -> str: """Returns the date when a file was last modified """ t = os.path.getmtime(filename) modifiedTime = datetime.datetime.fromtimestamp(t) return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ") def daysInMonth(year: int, monthNumber: int) -> int: """Returns the number of days in the month """ if monthNumber < 1 or monthNumber > 12: return None daysRange = monthrange(year, monthNumber) return daysRange[1] def mergeDicts(dict1: {}, dict2: {}) -> {}: """Merges two dictionaries """ res = {**dict1, **dict2} return res def isEventPost(messageJson: {}) -> bool: """Is the given post a mobilizon-type event activity? See https://framagit.org/framasoft/mobilizon/-/blob/ master/lib/federation/activity_stream/converter/event.ex """ if not messageJson.get('id'): return False if not messageJson.get('actor'): return False if not messageJson.get('object'): return False if not isinstance(messageJson['object'], dict): return False if not messageJson['object'].get('type'): return False if messageJson['object']['type'] != 'Event': return False print('Event arriving') if not messageJson['object'].get('startTime'): print('No event start time') return False if not messageJson['object'].get('actor'): print('No event actor') return False if not messageJson['object'].get('content'): print('No event content') return False if not messageJson['object'].get('name'): print('No event name') return False if not messageJson['object'].get('uuid'): print('No event UUID') return False print('Event detected') return True def isBlogPost(postJsonObject: {}) -> bool: """Is the given post a blog post? """ if postJsonObject['type'] != 'Create': return False if not postJsonObject.get('object'): return False if not isinstance(postJsonObject['object'], dict): return False if not postJsonObject['object'].get('type'): return False if not postJsonObject['object'].get('content'): return False if postJsonObject['object']['type'] != 'Article': return False return True def isNewsPost(postJsonObject: {}) -> bool: """Is the given post a blog post? """ return postJsonObject.get('news') def searchBoxPosts(baseDir: str, nickname: str, domain: str, searchStr: str, maxResults: int, boxName='outbox') -> []: """Search your posts and return a list of the filenames containing matching strings """ path = baseDir + '/accounts/' + nickname + '@' + domain + '/' + boxName if not os.path.isdir(path): return [] searchStr = searchStr.lower().strip() if '+' in searchStr: searchWords = searchStr.split('+') for index in range(len(searchWords)): searchWords[index] = searchWords[index].strip() print('SEARCH: ' + str(searchWords)) else: searchWords = [searchStr] res = [] for root, dirs, fnames in os.walk(path): for fname in fnames: filePath = os.path.join(root, fname) with open(filePath, 'r') as postFile: data = postFile.read().lower() notFound = False for keyword in searchWords: if keyword not in data: notFound = True break if notFound: continue res.append(filePath) if len(res) >= maxResults: return res return res def getFileCaseInsensitive(path: str) -> str: """Returns a case specific filename given a case insensitive version of it """ if os.path.isfile(path): return path if path != path.lower(): if os.path.isfile(path.lower()): return path.lower() # directory, filename = os.path.split(path) # directory, filename = (directory or '.'), filename.lower() # for f in os.listdir(directory): # if f.lower() == filename: # newpath = os.path.join(directory, f) # if os.path.isfile(newpath): # return newpath return None def undoLikesCollectionEntry(recentPostsCache: {}, baseDir: str, postFilename: str, objectUrl: str, actor: str, domain: str, debug: bool) -> None: """Undoes a like for a particular actor """ postJsonObject = loadJson(postFilename) if postJsonObject: # remove any cached version of this post so that the # like icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('type'): return if postJsonObject['type'] != 'Create': return if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post '+objectUrl+' has no object') return if not isinstance(postJsonObject['object'], dict): return if not postJsonObject['object'].get('likes'): return if not isinstance(postJsonObject['object']['likes'], dict): return if not postJsonObject['object']['likes'].get('items'): return totalItems = 0 if postJsonObject['object']['likes'].get('totalItems'): totalItems = postJsonObject['object']['likes']['totalItems'] itemFound = False for likeItem in postJsonObject['object']['likes']['items']: if likeItem.get('actor'): if likeItem['actor'] == actor: if debug: print('DEBUG: like was removed for ' + actor) postJsonObject['object']['likes']['items'].remove(likeItem) itemFound = True break if itemFound: if totalItems == 1: if debug: print('DEBUG: likes was removed from post') del postJsonObject['object']['likes'] else: itlen = len(postJsonObject['object']['likes']['items']) postJsonObject['object']['likes']['totalItems'] = itlen saveJson(postJsonObject, postFilename) def updateLikesCollection(recentPostsCache: {}, baseDir: str, postFilename: str, objectUrl: str, actor: str, domain: str, debug: bool) -> None: """Updates the likes collection within a post """ postJsonObject = loadJson(postFilename) if not postJsonObject: return # remove any cached version of this post so that the # like icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post ' + objectUrl + ' has no object') return if not isinstance(postJsonObject['object'], dict): return if not objectUrl.endswith('/likes'): objectUrl = objectUrl + '/likes' if not postJsonObject['object'].get('likes'): if debug: print('DEBUG: Adding initial like to ' + objectUrl) likesJson = { "@context": "https://www.w3.org/ns/activitystreams", 'id': objectUrl, 'type': 'Collection', "totalItems": 1, 'items': [{ 'type': 'Like', 'actor': actor }] } postJsonObject['object']['likes'] = likesJson else: if not postJsonObject['object']['likes'].get('items'): postJsonObject['object']['likes']['items'] = [] for likeItem in postJsonObject['object']['likes']['items']: if likeItem.get('actor'): if likeItem['actor'] == actor: return newLike = { 'type': 'Like', 'actor': actor } postJsonObject['object']['likes']['items'].append(newLike) itlen = len(postJsonObject['object']['likes']['items']) postJsonObject['object']['likes']['totalItems'] = itlen if debug: print('DEBUG: saving post with likes added') pprint(postJsonObject) saveJson(postJsonObject, postFilename) def undoAnnounceCollectionEntry(recentPostsCache: {}, baseDir: str, postFilename: str, actor: str, domain: str, debug: bool) -> None: """Undoes an announce for a particular actor by removing it from the "shares" collection within a post. Note that the "shares" collection has no relation to shared items in shares.py. It's shares of posts, not shares of physical objects. """ postJsonObject = loadJson(postFilename) if postJsonObject: # remove any cached version of this announce so that the announce # icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('type'): return if postJsonObject['type'] != 'Create': return if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post has no object') return if not isinstance(postJsonObject['object'], dict): return if not postJsonObject['object'].get('shares'): return if not postJsonObject['object']['shares'].get('items'): return totalItems = 0 if postJsonObject['object']['shares'].get('totalItems'): totalItems = postJsonObject['object']['shares']['totalItems'] itemFound = False for announceItem in postJsonObject['object']['shares']['items']: if announceItem.get('actor'): if announceItem['actor'] == actor: if debug: print('DEBUG: Announce was removed for ' + actor) anIt = announceItem postJsonObject['object']['shares']['items'].remove(anIt) itemFound = True break if itemFound: if totalItems == 1: if debug: print('DEBUG: shares (announcements) ' + 'was removed from post') del postJsonObject['object']['shares'] else: itlen = len(postJsonObject['object']['shares']['items']) postJsonObject['object']['shares']['totalItems'] = itlen saveJson(postJsonObject, postFilename) def updateAnnounceCollection(recentPostsCache: {}, baseDir: str, postFilename: str, actor: str, domain: str, debug: bool) -> None: """Updates the announcements collection within a post Confusingly this is known as "shares", but isn't the same as shared items within shares.py It's shares of posts, not shares of physical objects. """ postJsonObject = loadJson(postFilename) if postJsonObject: # remove any cached version of this announce so that the announce # icon is changed nickname = getNicknameFromActor(actor) cachedPostFilename = getCachedPostFilename(baseDir, nickname, domain, postJsonObject) if cachedPostFilename: if os.path.isfile(cachedPostFilename): os.remove(cachedPostFilename) removePostFromCache(postJsonObject, recentPostsCache) if not postJsonObject.get('object'): if debug: pprint(postJsonObject) print('DEBUG: post ' + postFilename + ' has no object') return if not isinstance(postJsonObject['object'], dict): return postUrl = removeIdEnding(postJsonObject['id']) + '/shares' if not postJsonObject['object'].get('shares'): if debug: print('DEBUG: Adding initial shares (announcements) to ' + postUrl) announcementsJson = { "@context": "https://www.w3.org/ns/activitystreams", 'id': postUrl, 'type': 'Collection', "totalItems": 1, 'items': [{ 'type': 'Announce', 'actor': actor }] } postJsonObject['object']['shares'] = announcementsJson else: if postJsonObject['object']['shares'].get('items'): sharesItems = postJsonObject['object']['shares']['items'] for announceItem in sharesItems: if announceItem.get('actor'): if announceItem['actor'] == actor: return newAnnounce = { 'type': 'Announce', 'actor': actor } postJsonObject['object']['shares']['items'].append(newAnnounce) itlen = len(postJsonObject['object']['shares']['items']) postJsonObject['object']['shares']['totalItems'] = itlen else: if debug: print('DEBUG: shares (announcements) section of post ' + 'has no items list') if debug: print('DEBUG: saving post with shares (announcements) added') pprint(postJsonObject) saveJson(postJsonObject, postFilename) def siteIsActive(url: str) -> bool: """Returns true if the current url is resolvable. This can be used to check that an instance is online before trying to send posts to it. """ if not url.startswith('http'): return False try: req = urllib.request.Request(url) urllib.request.urlopen(req, timeout=10) # nosec return True except SocketError as e: if e.errno == errno.ECONNRESET: print('WARN: connection was reset during siteIsActive') return False