epicyon/utils.py

496 lines
18 KiB
Python
Raw Normal View History

2019-07-02 09:25:29 +00:00
__filename__ = "utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
2019-08-29 13:35:29 +00:00
__version__ = "1.0.0"
2019-07-02 09:25:29 +00:00
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
2019-10-11 18:03:58 +00:00
import time
2019-09-29 18:48:34 +00:00
import shutil
2019-07-02 09:25:29 +00:00
import datetime
2019-11-23 10:20:30 +00:00
import json
2019-07-02 09:25:29 +00:00
2019-10-22 11:55:06 +00:00
def saveJson(jsonObject: {},filename: str) -> bool:
"""Saves json to a file
"""
tries=0
while tries<5:
try:
with open(filename, 'w') as fp:
2019-11-23 10:20:30 +00:00
fp.write(json.dumps(jsonObject))
2019-10-22 11:55:06 +00:00
return True
2019-10-26 13:31:42 +00:00
except:
2019-11-15 22:42:50 +00:00
print('WARN: saveJson '+str(tries))
2019-10-22 11:55:06 +00:00
time.sleep(1)
tries+=1
return False
def loadJson(filename: str,delaySec=2) -> {}:
2019-10-22 11:55:06 +00:00
"""Makes a few attempts to load a json formatted file
"""
jsonObject=None
tries=0
while tries<5:
try:
with open(filename, 'r') as fp:
2019-11-23 10:28:36 +00:00
data=fp.read()
jsonObject=json.loads(data)
2019-10-22 11:55:06 +00:00
break
2019-10-26 13:01:32 +00:00
except:
print('WARN: loadJson exception')
if delaySec>0:
time.sleep(delaySec)
2019-10-22 11:55:06 +00:00
tries+=1
return jsonObject
2019-07-02 09:25:29 +00:00
def getStatusNumber() -> (str,str):
"""Returns the status number and published date
"""
currTime=datetime.datetime.utcnow()
daysSinceEpoch=(currTime - datetime.datetime(1970,1,1)).days
# status is the number of seconds since epoch
2019-10-12 12:45:53 +00:00
statusNumber=str(((daysSinceEpoch*24*60*60) + (currTime.hour*60*60) + (currTime.minute*60) + currTime.second)*1000 + int(currTime.microsecond/1000))
# See https://github.com/tootsuite/mastodon/blob/995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb
# use the leftover microseconds as the sequence number
sequenceId=currTime.microsecond % 1000
# shift by 16bits "sequence data"
statusNumber=str((int(statusNumber)<<16)+sequenceId)
2019-07-02 09:25:29 +00:00
published=currTime.strftime("%Y-%m-%dT%H:%M:%SZ")
return statusNumber,published
2019-09-09 15:53:23 +00:00
def isEvil(domain: str) -> bool:
# https://www.youtube.com/watch?v=5qw1hcevmdU
2019-11-16 12:14:14 +00:00
evilDomains=('gab.com','gabfed.com','spinster.xyz','kiwifarms.cc','djitter.com')
2019-09-09 15:53:23 +00:00
for concentratedEvil in evilDomains:
if domain.endswith(concentratedEvil):
return True
return False
2019-07-04 10:02:56 +00:00
def createPersonDir(nickname: str,domain: str,baseDir: str,dirname: str) -> str:
"""Create a directory for a person
2019-07-02 09:25:29 +00:00
"""
2019-07-28 13:30:19 +00:00
handle=nickname+'@'+domain
2019-07-02 09:25:29 +00:00
if not os.path.isdir(baseDir+'/accounts/'+handle):
os.mkdir(baseDir+'/accounts/'+handle)
2019-07-04 10:02:56 +00:00
boxDir=baseDir+'/accounts/'+handle+'/'+dirname
if not os.path.isdir(boxDir):
os.mkdir(boxDir)
return boxDir
def createOutboxDir(nickname: str,domain: str,baseDir: str) -> str:
"""Create an outbox for a person
"""
return createPersonDir(nickname,domain,baseDir,'outbox')
def createInboxQueueDir(nickname: str,domain: str,baseDir: str) -> str:
"""Create an inbox queue and returns the feed filename and directory
"""
return createPersonDir(nickname,domain,baseDir,'queue')
2019-07-02 10:39:55 +00:00
def domainPermitted(domain: str, federationList: []):
if len(federationList)==0:
return True
2019-07-11 12:29:31 +00:00
if ':' in domain:
domain=domain.split(':')[0]
2019-07-02 10:39:55 +00:00
if domain in federationList:
return True
return False
2019-11-16 12:07:57 +00:00
def urlPermitted(url: str,federationList: [],capability: str):
2019-09-09 15:53:23 +00:00
if isEvil(url):
return False
2019-11-16 12:14:14 +00:00
if not federationList:
2019-07-02 10:39:55 +00:00
return True
for domain in federationList:
if domain in url:
return True
return False
2019-07-06 15:17:21 +00:00
def getDisplayName(baseDir: str,actor: str,personCache: {}) -> str:
"""Returns the display name for the given actor
2019-08-22 12:41:16 +00:00
"""
if '/statuses/' in actor:
2019-08-22 13:21:16 +00:00
actor=actor.split('/statuses/')[0]
2019-08-22 13:29:57 +00:00
if not personCache.get(actor):
return None
2019-08-22 12:56:33 +00:00
if personCache[actor].get('actor'):
if personCache[actor]['actor'].get('name'):
return personCache[actor]['actor']['name']
else:
# Try to obtain from the cached actors
cachedActorFilename=baseDir+'/cache/actors/'+actor.replace('/','#')+'.json'
if os.path.isfile(cachedActorFilename):
2019-11-23 10:28:36 +00:00
actorJson=loadJson(cachedActorFilename,1)
if actorJson:
if actorJson.get('name'):
return(actorJson['name'])
2019-08-22 12:41:16 +00:00
return None
2019-07-06 15:17:21 +00:00
def getNicknameFromActor(actor: str) -> str:
"""Returns the nickname from an actor url
"""
if '/users/' not in actor:
if '/profile/' in actor:
nickStr=actor.split('/profile/')[1].replace('@','')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
2019-10-17 22:26:47 +00:00
if '/channel/' in actor:
nickStr=actor.split('/channel/')[1].replace('@','')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
2019-08-21 16:23:06 +00:00
# https://domain/@nick
if '/@' in actor:
nickStr=actor.split('/@')[1]
if '/' in nickStr:
nickStr=nickStr.split('/')[0]
return nickStr
2019-07-06 15:17:21 +00:00
return None
2019-07-10 09:47:07 +00:00
nickStr=actor.split('/users/')[1].replace('@','')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
2019-07-06 15:17:21 +00:00
def getDomainFromActor(actor: str) -> (str,int):
"""Returns the domain name from an actor url
"""
port=None
if '/profile/' in actor:
domain = actor.split('/profile/')[0].replace('https://','').replace('http://','').replace('dat://','')
2019-07-06 15:17:21 +00:00
else:
2019-10-17 22:26:47 +00:00
if '/channel/' in actor:
domain = actor.split('/channel/')[0].replace('https://','').replace('http://','').replace('dat://','')
else:
2019-10-17 22:26:47 +00:00
if '/users/' not in actor:
domain = actor.replace('https://','').replace('http://','').replace('dat://','')
if '/' in actor:
domain=domain.split('/')[0]
else:
domain = actor.split('/users/')[0].replace('https://','').replace('http://','').replace('dat://','')
2019-07-06 15:17:21 +00:00
if ':' in domain:
port=int(domain.split(':')[1])
domain=domain.split(':')[0]
return domain,port
2019-07-06 19:24:52 +00:00
def followPerson(baseDir: str,nickname: str, domain: str, \
followNickname: str, followDomain: str, \
federationList: [],debug: bool, \
followFile='following.txt') -> bool:
"""Adds a person to the follow list
"""
if not domainPermitted(followDomain.lower().replace('\n',''), \
federationList):
if debug:
print('DEBUG: follow of domain '+followDomain+' not permitted')
return False
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: follow of domain '+followDomain)
2019-07-16 22:57:45 +00:00
if ':' in domain:
handle=nickname+'@'+domain.split(':')[0].lower()
else:
handle=nickname+'@'+domain.lower()
if ':' in followDomain:
handleToFollow=followNickname+'@'+followDomain.split(':')[0].lower()
else:
handleToFollow=followNickname+'@'+followDomain.lower()
2019-07-06 19:24:52 +00:00
if not os.path.isdir(baseDir+'/accounts'):
os.mkdir(baseDir+'/accounts')
if not os.path.isdir(baseDir+'/accounts/'+handle):
os.mkdir(baseDir+'/accounts/'+handle)
filename=baseDir+'/accounts/'+handle+'/'+followFile
if os.path.isfile(filename):
if handleToFollow in open(filename).read():
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: follow already exists')
2019-07-06 19:24:52 +00:00
return True
2019-10-26 15:15:38 +00:00
# prepend to follow file
try:
with open(filename, 'r+') as followFile:
content = followFile.read()
followFile.seek(0, 0)
followFile.write(followNickname+'@'+followDomain+'\n'+content)
if debug:
print('DEBUG: follow added')
return True
except Exception as e:
print('WARN: Failed to write entry to follow file '+filename+' '+str(e))
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: creating new following file')
2019-07-06 19:24:52 +00:00
with open(filename, "w") as followfile:
2019-07-16 22:57:45 +00:00
followfile.write(followNickname+'@'+followDomain+'\n')
2019-07-06 19:24:52 +00:00
return True
2019-07-11 12:29:31 +00:00
2019-07-13 19:28:14 +00:00
def locatePost(baseDir: str,nickname: str,domain: str,postUrl: str,replies=False) -> str:
2019-07-11 12:29:31 +00:00
"""Returns the filename for the given status post url
"""
2019-07-13 19:28:14 +00:00
if not replies:
extension='json'
else:
extension='replies'
2019-11-18 14:42:18 +00:00
2019-07-11 19:31:02 +00:00
# if this post in the shared inbox?
handle='inbox@'+domain
2019-11-18 14:44:36 +00:00
postUrl=postUrl.replace('/','#').replace('/activity','').strip()
2019-11-18 14:42:18 +00:00
boxName='inbox'
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl+'.'+extension
if os.path.isfile(postFilename):
return postFilename
boxName='outbox'
2019-09-02 16:23:03 +00:00
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl+'.'+extension
2019-11-18 14:42:18 +00:00
if os.path.isfile(postFilename):
return postFilename
# if this post in the inbox of the person?
boxName='inbox'
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl+'.'+extension
if os.path.isfile(postFilename):
return postFilename
postFilename=baseDir+'/cache/announce/'+nickname+'/'+postUrl+'.'+extension
if os.path.isfile(postFilename):
return postFilename
print('WARN: unable to locate '+nickname+' '+postUrl+'.'+extension)
return None
2019-07-14 16:37:01 +00:00
2019-07-14 16:57:06 +00:00
def removeAttachment(baseDir: str,httpPrefix: str,domain: str,postJson: {}):
if not postJson.get('attachment'):
return
if not postJson['attachment'][0].get('url'):
return
if port:
if port!=80 and port!=443:
if ':' not in domain:
domain=domain+':'+str(port)
2019-07-14 16:57:06 +00:00
attachmentUrl=postJson['attachment'][0]['url']
if not attachmentUrl:
return
mediaFilename=baseDir+'/'+attachmentUrl.replace(httpPrefix+'://'+domain+'/','')
if os.path.isfile(mediaFilename):
os.remove(mediaFilename)
postJson['attachment']=[]
2019-08-12 18:02:29 +00:00
def removeModerationPostFromIndex(baseDir: str,postUrl: str,debug: bool) -> None:
"""Removes a url from the moderation index
"""
moderationIndexFile=baseDir+'/accounts/moderation.txt'
if not os.path.isfile(moderationIndexFile):
return
postId=postUrl.replace('/activity','')
if postId in open(moderationIndexFile).read():
with open(moderationIndexFile, "r") as f:
lines = f.readlines()
with open(moderationIndexFile, "w+") as f:
for line in lines:
if line.strip("\n") != postId:
f.write(line)
else:
if debug:
print('DEBUG: removed '+postId+' from moderation index')
2019-11-17 15:19:34 +00:00
def deletePost(baseDir: str,httpPrefix: str,nickname: str,domain: str,postFilename: str,debug: bool) -> None:
2019-07-14 16:37:01 +00:00
"""Recursively deletes a post and its replies and attachments
"""
2019-11-23 10:28:36 +00:00
postJsonObject=loadJson(postFilename,1)
2019-09-30 22:39:02 +00:00
if postJsonObject:
2019-11-17 15:19:34 +00:00
# don't allow deletion of bookmarked posts
bookmarksIndexFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/bookmarks.index'
if os.path.isfile(bookmarksIndexFilename):
bookmarkIndex=postFilename.split('/')[-1]+'\n'
if bookmarkIndex in open(bookmarksIndexFilename).read():
return
# remove any attachment
2019-07-14 17:02:41 +00:00
removeAttachment(baseDir,httpPrefix,domain,postJsonObject)
2019-11-16 22:09:54 +00:00
hasObject=False
if postJsonObject.get('object'):
hasObject=True
2019-08-12 13:22:17 +00:00
# remove from moderation index file
2019-11-16 22:09:54 +00:00
if hasObject:
if postJsonObject['object'].get('moderationStatus'):
if postJsonObject.get('id'):
postId=postJsonObject['id'].replace('/activity','')
removeModerationPostFromIndex(baseDir,postId,debug)
2019-07-14 17:02:41 +00:00
# remove any hashtags index entries
removeHashtagIndex=False
2019-11-16 22:09:54 +00:00
if hasObject:
if hasObject and isinstance(postJsonObject['object'], dict):
if postJsonObject['object'].get('content'):
if '#' in postJsonObject['object']['content']:
removeHashtagIndex=True
if removeHashtagIndex:
if postJsonObject['object'].get('id') and postJsonObject['object'].get('tag'):
# get the id of the post
postId=postJsonObject['object']['id'].replace('/activity','')
for tag in postJsonObject['object']['tag']:
if tag['type']!='Hashtag':
continue
# find the index file for this tag
tagIndexFilename=baseDir+'/tags/'+tag['name'][1:]+'.txt'
if not os.path.isfile(tagIndexFilename):
continue
# remove postId from the tag index file
with open(tagIndexFilename, "r") as f:
lines = f.readlines()
with open(tagIndexFilename, "w+") as f:
for line in lines:
if line.strip("\n") != postId:
f.write(line)
2019-07-14 17:02:41 +00:00
# remove any replies
2019-07-14 16:37:01 +00:00
repliesFilename=postFilename.replace('.json','.replies')
if os.path.isfile(repliesFilename):
if debug:
print('DEBUG: removing replies to '+postFilename)
with open(repliesFilename,'r') as f:
for replyId in f:
replyFile=locatePost(baseDir,nickname,domain,replyId)
if replyFile:
if os.path.isfile(replyFile):
deletePost(baseDir,nickname,domain,replyFile,debug)
2019-07-14 17:02:41 +00:00
# remove the replies file
2019-07-14 16:37:01 +00:00
os.remove(repliesFilename)
2019-07-14 17:02:41 +00:00
# finally, remove the post itself
2019-07-14 16:37:01 +00:00
os.remove(postFilename)
2019-07-27 22:48:34 +00:00
2019-08-23 13:47:29 +00:00
def validNickname(domain: str,nickname: str) -> bool:
2019-07-27 22:48:34 +00:00
forbiddenChars=['.',' ','/','?',':',';','@']
for c in forbiddenChars:
if c in nickname:
return False
2019-08-23 13:47:29 +00:00
if nickname==domain:
return False
2019-10-17 22:26:47 +00:00
reservedNames=['inbox','dm','outbox','following','public','followers','profile','channel','capabilities','calendar','tlreplies','tlmedia','moderation']
2019-07-27 22:48:34 +00:00
if nickname in reservedNames:
return False
return True
2019-08-08 11:24:26 +00:00
def noOfAccounts(baseDir: str) -> bool:
"""Returns the number of accounts on the system
"""
accountCtr=0
for subdir, dirs, files in os.walk(baseDir+'/accounts'):
for account in dirs:
if '@' in account:
2019-11-13 14:07:11 +00:00
if not account.startswith('inbox@'):
2019-08-08 11:24:26 +00:00
accountCtr+=1
return accountCtr
2019-08-10 11:31:42 +00:00
2019-11-13 15:16:24 +00:00
def noOfActiveAccountsMonthly(baseDir: str,months: int) -> bool:
2019-11-13 15:15:08 +00:00
"""Returns the number of accounts on the system this month
"""
accountCtr=0
currTime=int(time.time())
2019-11-13 15:17:08 +00:00
monthSeconds=int(60*60*24*30*months)
2019-11-13 15:15:08 +00:00
for subdir, dirs, files in os.walk(baseDir+'/accounts'):
for account in dirs:
if '@' in account:
if not account.startswith('inbox@'):
lastUsedFilename=baseDir+'/accounts/'+account+'/.lastUsed'
if os.path.isfile(lastUsedFilename):
with open(lastUsedFilename, 'r') as lastUsedFile:
lastUsed = lastUsedFile.read()
if lastUsed.isdigit():
timeDiff=(currTime-int(lastUsed))
if timeDiff<monthSeconds:
accountCtr+=1
return accountCtr
2019-08-10 11:31:42 +00:00
def isPublicPost(postJsonObject: {}) -> bool:
"""Returns true if the given post is public
"""
if not postJsonObject.get('type'):
return False
if postJsonObject['type']!='Create':
return False
if not postJsonObject.get('object'):
return False
if not isinstance(postJsonObject['object'], dict):
return False
if not postJsonObject['object'].get('to'):
return False
for recipient in postJsonObject['object']['to']:
if recipient.endswith('#Public'):
return True
return False
2019-09-29 18:48:34 +00:00
def copytree(src: str, dst: str, symlinks=False, ignore=None):
"""Copy a directory
"""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
2019-10-19 17:50:05 +00:00
def getCachedPostDirectory(baseDir: str,nickname: str,domain: str) -> str:
"""Returns the directory where the html post cache exists
"""
htmlPostCacheDir=baseDir+'/accounts/'+nickname+'@'+domain+'/postcache'
return htmlPostCacheDir
def getCachedPostFilename(baseDir: str,nickname: str,domain: str, \
postJsonObject: {}) -> str:
"""Returns the html cache filename for the given post
"""
cachedPostFilename= \
getCachedPostDirectory(baseDir,nickname,domain)+ \
'/'+postJsonObject['id'].replace('/activity','').replace('/','#')+'.html'
return cachedPostFilename
2019-11-24 13:46:28 +00:00
def removePostFromCache(postJsonObject: {},recentPostsCache: {}):
""" if the post exists in the recent posts cache then remove it
"""
if not postJsonObject.get('id'):
return
if not recentPostsCache.get('index'):
return
postId=postJsonObject['id'].replace('/activity','').replace('/','#')
if postId not in recentPostsCache['index']:
return
del recentPostsCache['json'][postId]
del recentPostsCache['html'][postId]
recentPostsCache['index'].remove(postId)
def updateRecentPostsCache(recentPostsCache: {},maxRecentPosts: int, \
postJsonObject: {},htmlStr: str) -> None:
"""Store recent posts in memory so that they can be quickly recalled
"""
if not postJsonObject.get('id'):
return
postId=postJsonObject['id'].replace('/activity','').replace('/','#')
if recentPostsCache.get('index'):
if postId in recentPostsCache['index']:
return
recentPostsCache['index'].append(postId)
recentPostsCache['json'][postId]=json.dumps(postJsonObject)
recentPostsCache['html'][postId]=htmlStr
while len(recentPostsCache['html'].items())>maxRecentPosts:
recentPostsCache['index'].pop(0)
del recentPostsCache['json'][postId]
del recentPostsCache['html'][postId]
else:
recentPostsCache['index']=[postId]
recentPostsCache['json']={}
recentPostsCache['html']={}
recentPostsCache['json'][postId]=json.dumps(postJsonObject)
recentPostsCache['html'][postId]=htmlStr