epicyon/utils.py

179 lines
6.5 KiB
Python
Raw Normal View History

2019-07-02 09:25:29 +00:00
__filename__ = "utils.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "0.0.1"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
import os
import datetime
def getStatusNumber() -> (str,str):
"""Returns the status number and published date
"""
currTime=datetime.datetime.utcnow()
daysSinceEpoch=(currTime - datetime.datetime(1970,1,1)).days
# status is the number of seconds since epoch
statusNumber=str(((daysSinceEpoch*24*60*60) + (currTime.hour*60*60) + (currTime.minute*60) + currTime.second)*1000000 + currTime.microsecond)
published=currTime.strftime("%Y-%m-%dT%H:%M:%SZ")
conversationDate=currTime.strftime("%Y-%m-%d")
return statusNumber,published
2019-07-04 10:02:56 +00:00
def createPersonDir(nickname: str,domain: str,baseDir: str,dirname: str) -> str:
"""Create a directory for a person
2019-07-02 09:25:29 +00:00
"""
2019-07-03 09:40:27 +00:00
handle=nickname.lower()+'@'+domain.lower()
2019-07-02 09:25:29 +00:00
if not os.path.isdir(baseDir+'/accounts/'+handle):
os.mkdir(baseDir+'/accounts/'+handle)
2019-07-04 10:02:56 +00:00
boxDir=baseDir+'/accounts/'+handle+'/'+dirname
if not os.path.isdir(boxDir):
os.mkdir(boxDir)
return boxDir
def createOutboxDir(nickname: str,domain: str,baseDir: str) -> str:
"""Create an outbox for a person
"""
return createPersonDir(nickname,domain,baseDir,'outbox')
def createInboxQueueDir(nickname: str,domain: str,baseDir: str) -> str:
"""Create an inbox queue and returns the feed filename and directory
"""
return createPersonDir(nickname,domain,baseDir,'queue')
2019-07-02 10:39:55 +00:00
def domainPermitted(domain: str, federationList: []):
if len(federationList)==0:
return True
2019-07-11 12:29:31 +00:00
if ':' in domain:
domain=domain.split(':')[0]
2019-07-02 10:39:55 +00:00
if domain in federationList:
return True
return False
2019-07-09 14:20:23 +00:00
def urlPermitted(url: str, federationList: [],capability: str):
2019-07-12 20:56:26 +00:00
if url.endswith('gab.com'):
return False
2019-07-02 10:39:55 +00:00
if len(federationList)==0:
return True
for domain in federationList:
if domain in url:
return True
return False
2019-07-06 15:17:21 +00:00
def getNicknameFromActor(actor: str) -> str:
"""Returns the nickname from an actor url
"""
if '/users/' not in actor:
return None
2019-07-10 09:47:07 +00:00
nickStr=actor.split('/users/')[1].replace('@','')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
2019-07-06 15:17:21 +00:00
def getDomainFromActor(actor: str) -> (str,int):
"""Returns the domain name from an actor url
"""
port=None
if '/users/' not in actor:
domain = actor.replace('https://','').replace('http://','').replace('dat://','')
else:
domain = actor.split('/users/')[0].replace('https://','').replace('http://','').replace('dat://','')
if ':' in domain:
port=int(domain.split(':')[1])
domain=domain.split(':')[0]
return domain,port
2019-07-06 19:24:52 +00:00
def followPerson(baseDir: str,nickname: str, domain: str, \
followNickname: str, followDomain: str, \
federationList: [],debug: bool, \
followFile='following.txt') -> bool:
"""Adds a person to the follow list
"""
if not domainPermitted(followDomain.lower().replace('\n',''), \
federationList):
if debug:
print('DEBUG: follow of domain '+followDomain+' not permitted')
return False
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: follow of domain '+followDomain)
2019-07-06 19:24:52 +00:00
handle=nickname.lower()+'@'+domain.lower()
handleToFollow=followNickname.lower()+'@'+followDomain.lower()
if not os.path.isdir(baseDir+'/accounts'):
os.mkdir(baseDir+'/accounts')
if not os.path.isdir(baseDir+'/accounts/'+handle):
os.mkdir(baseDir+'/accounts/'+handle)
filename=baseDir+'/accounts/'+handle+'/'+followFile
if os.path.isfile(filename):
if handleToFollow in open(filename).read():
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: follow already exists')
2019-07-06 19:24:52 +00:00
return True
with open(filename, "a") as followfile:
followfile.write(handleToFollow+'\n')
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: follow added')
2019-07-06 19:24:52 +00:00
return True
2019-07-11 12:29:31 +00:00
if debug:
print('DEBUG: creating new following file')
2019-07-06 19:24:52 +00:00
with open(filename, "w") as followfile:
followfile.write(handleToFollow+'\n')
return True
2019-07-11 12:29:31 +00:00
2019-07-13 19:28:14 +00:00
def locatePost(baseDir: str,nickname: str,domain: str,postUrl: str,replies=False) -> str:
2019-07-11 12:29:31 +00:00
"""Returns the filename for the given status post url
"""
2019-07-13 19:28:14 +00:00
if not replies:
extension='json'
else:
extension='replies'
2019-07-11 19:31:02 +00:00
# if this post in the shared inbox?
handle='inbox@'+domain
boxName='inbox'
2019-07-13 19:28:14 +00:00
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl.replace('/','#')+'.'+extension
2019-07-11 12:29:31 +00:00
if not os.path.isfile(postFilename):
2019-07-11 19:31:02 +00:00
boxName='outbox'
2019-07-13 19:28:14 +00:00
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl.replace('/','#')+'.'+extension
2019-07-11 12:29:31 +00:00
if not os.path.isfile(postFilename):
2019-07-11 19:31:02 +00:00
# if this post in the inbox of the person?
boxName='inbox'
2019-07-13 19:28:14 +00:00
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl.replace('/','#')+'.'+extension
2019-07-11 12:29:31 +00:00
if not os.path.isfile(postFilename):
postFilename=None
return postFilename
2019-07-14 16:37:01 +00:00
2019-07-14 16:57:06 +00:00
def removeAttachment(baseDir: str,httpPrefix: str,domain: str,postJson: {}):
if not postJson.get('attachment'):
return
if not postJson['attachment'][0].get('url'):
return
if port!=80 and port!=443:
if ':' not in domain:
domain=domain+':'+str(port)
attachmentUrl=postJson['attachment'][0]['url']
if not attachmentUrl:
return
mediaFilename=baseDir+'/'+attachmentUrl.replace(httpPrefix+'://'+domain+'/','')
if os.path.isfile(mediaFilename):
os.remove(mediaFilename)
postJson['attachment']=[]
2019-07-14 16:37:01 +00:00
def deletePost(baseDir: str,nickname: str,domain: str,postFilename: str,debug: bool):
"""Recursively deletes a post and its replies and attachments
"""
2019-07-14 16:57:06 +00:00
with open(postFilename, 'r') as fp:
postJsonObject=commentjson.load(fp)
2019-07-14 16:37:01 +00:00
repliesFilename=postFilename.replace('.json','.replies')
if os.path.isfile(repliesFilename):
if debug:
print('DEBUG: removing replies to '+postFilename)
with open(repliesFilename,'r') as f:
for replyId in f:
replyFile=locatePost(baseDir,nickname,domain,replyId)
if replyFile:
if os.path.isfile(replyFile):
deletePost(baseDir,nickname,domain,replyFile,debug)
# remove the replies file itself
os.remove(repliesFilename)
os.remove(postFilename)