flake8 format

merge-requests/30/head
Bob Mottram 2020-04-04 14:44:49 +01:00
parent 6d19fa47a1
commit b63bf2c72d
1 changed files with 304 additions and 225 deletions

185
utils.py
View File

@ -13,16 +13,19 @@ import datetime
import json import json
from calendar import monthrange from calendar import monthrange
def removeAvatarFromCache(baseDir: str, actorStr: str) -> None: def removeAvatarFromCache(baseDir: str, actorStr: str) -> None:
"""Removes any existing avatar entries from the cache """Removes any existing avatar entries from the cache
This avoids duplicate entries with differing extensions This avoids duplicate entries with differing extensions
""" """
avatarFilenameExtensions = ('png', 'jpg', 'gif', 'webp') avatarFilenameExtensions = ('png', 'jpg', 'gif', 'webp')
for extension in avatarFilenameExtensions: for extension in avatarFilenameExtensions:
avatarFilename=baseDir+'/cache/avatars/'+actorStr+'.'+extension avatarFilename = \
baseDir + '/cache/avatars/' + actorStr + '.' + extension
if os.path.isfile(avatarFilename): if os.path.isfile(avatarFilename):
os.remove(avatarFilename) os.remove(avatarFilename)
def saveJson(jsonObject: {}, filename: str) -> bool: def saveJson(jsonObject: {}, filename: str) -> bool:
"""Saves json to a file """Saves json to a file
""" """
@ -32,12 +35,13 @@ def saveJson(jsonObject: {},filename: str) -> bool:
with open(filename, 'w') as fp: with open(filename, 'w') as fp:
fp.write(json.dumps(jsonObject)) fp.write(json.dumps(jsonObject))
return True return True
except: except BaseException:
print('WARN: saveJson ' + str(tries)) print('WARN: saveJson ' + str(tries))
time.sleep(1) time.sleep(1)
tries += 1 tries += 1
return False return False
def loadJson(filename: str, delaySec=2) -> {}: def loadJson(filename: str, delaySec=2) -> {}:
"""Makes a few attempts to load a json formatted file """Makes a few attempts to load a json formatted file
""" """
@ -49,14 +53,16 @@ def loadJson(filename: str,delaySec=2) -> {}:
data = fp.read() data = fp.read()
jsonObject = json.loads(data) jsonObject = json.loads(data)
break break
except: except BaseException:
print('WARN: loadJson exception') print('WARN: loadJson exception')
if delaySec > 0: if delaySec > 0:
time.sleep(delaySec) time.sleep(delaySec)
tries += 1 tries += 1
return jsonObject return jsonObject
def loadJsonOnionify(filename: str,domain: str,onionDomain: str,delaySec=2) -> {}:
def loadJsonOnionify(filename: str, domain: str, onionDomain: str,
delaySec=2) -> {}:
"""Makes a few attempts to load a json formatted file """Makes a few attempts to load a json formatted file
This also converts the domain name to the onion domain This also converts the domain name to the onion domain
""" """
@ -67,25 +73,33 @@ def loadJsonOnionify(filename: str,domain: str,onionDomain: str,delaySec=2) -> {
with open(filename, 'r') as fp: with open(filename, 'r') as fp:
data = fp.read() data = fp.read()
if data: if data:
data=data.replace(domain,onionDomain).replace('https:','http:') data = data.replace(domain, onionDomain)
data = data.replace('https:', 'http:')
print('*****data: ' + data) print('*****data: ' + data)
jsonObject = json.loads(data) jsonObject = json.loads(data)
break break
except: except BaseException:
print('WARN: loadJson exception') print('WARN: loadJson exception')
if delaySec > 0: if delaySec > 0:
time.sleep(delaySec) time.sleep(delaySec)
tries += 1 tries += 1
return jsonObject return jsonObject
def getStatusNumber() -> (str, str): def getStatusNumber() -> (str, str):
"""Returns the status number and published date """Returns the status number and published date
""" """
currTime = datetime.datetime.utcnow() currTime = datetime.datetime.utcnow()
daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
# status is the number of seconds since epoch # status is the number of seconds since epoch
statusNumber=str(((daysSinceEpoch*24*60*60) + (currTime.hour*60*60) + (currTime.minute*60) + currTime.second)*1000 + int(currTime.microsecond/1000)) statusNumber = \
# See https://github.com/tootsuite/mastodon/blob/995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb str(((daysSinceEpoch * 24 * 60 * 60) +
(currTime.hour * 60 * 60) +
(currTime.minute * 60) +
currTime.second) * 1000 +
int(currTime.microsecond / 1000))
# See https://github.com/tootsuite/mastodon/blob/
# 995f8b389a66ab76ec92d9a240de376f1fc13a38/lib/mastodon/snowflake.rb
# use the leftover microseconds as the sequence number # use the leftover microseconds as the sequence number
sequenceId = currTime.microsecond % 1000 sequenceId = currTime.microsecond % 1000
# shift by 16bits "sequence data" # shift by 16bits "sequence data"
@ -93,8 +107,11 @@ def getStatusNumber() -> (str,str):
published = currTime.strftime("%Y-%m-%dT%H:%M:%SZ") published = currTime.strftime("%Y-%m-%dT%H:%M:%SZ")
return statusNumber, published return statusNumber, published
def evilIncarnate() -> []: def evilIncarnate() -> []:
return ('gab.com','gabfed.com','spinster.xyz','kiwifarms.cc','djitter.com') return ('gab.com', 'gabfed.com', 'spinster.xyz',
'kiwifarms.cc', 'djitter.com')
def isEvil(domain: str) -> bool: def isEvil(domain: str) -> bool:
if not isinstance(domain, str): if not isinstance(domain, str):
@ -107,7 +124,9 @@ def isEvil(domain: str) -> bool:
return True return True
return False return False
def createPersonDir(nickname: str,domain: str,baseDir: str,dirname: str) -> str:
def createPersonDir(nickname: str, domain: str, baseDir: str,
dirname: str) -> str:
"""Create a directory for a person """Create a directory for a person
""" """
handle = nickname + '@' + domain handle = nickname + '@' + domain
@ -118,16 +137,19 @@ def createPersonDir(nickname: str,domain: str,baseDir: str,dirname: str) -> str:
os.mkdir(boxDir) os.mkdir(boxDir)
return boxDir return boxDir
def createOutboxDir(nickname: str, domain: str, baseDir: str) -> str: def createOutboxDir(nickname: str, domain: str, baseDir: str) -> str:
"""Create an outbox for a person """Create an outbox for a person
""" """
return createPersonDir(nickname, domain, baseDir, 'outbox') return createPersonDir(nickname, domain, baseDir, 'outbox')
def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str: def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str:
"""Create an inbox queue and returns the feed filename and directory """Create an inbox queue and returns the feed filename and directory
""" """
return createPersonDir(nickname, domain, baseDir, 'queue') return createPersonDir(nickname, domain, baseDir, 'queue')
def domainPermitted(domain: str, federationList: []): def domainPermitted(domain: str, federationList: []):
if len(federationList) == 0: if len(federationList) == 0:
return True return True
@ -137,6 +159,7 @@ def domainPermitted(domain: str, federationList: []):
return True return True
return False return False
def urlPermitted(url: str, federationList: [], capability: str): def urlPermitted(url: str, federationList: [], capability: str):
if isEvil(url): if isEvil(url):
return False return False
@ -147,6 +170,7 @@ def urlPermitted(url: str,federationList: [],capability: str):
return True return True
return False return False
def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str: def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str:
"""Returns the display name for the given actor """Returns the display name for the given actor
""" """
@ -159,7 +183,8 @@ def getDisplayName(baseDir: str,actor: str,personCache: {}) -> str:
return personCache[actor]['actor']['name'] return personCache[actor]['actor']['name']
else: else:
# Try to obtain from the cached actors # Try to obtain from the cached actors
cachedActorFilename=baseDir+'/cache/actors/'+(actor.replace('/','#'))+'.json' cachedActorFilename = \
baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cachedActorFilename): if os.path.isfile(cachedActorFilename):
actorJson = loadJson(cachedActorFilename, 1) actorJson = loadJson(cachedActorFilename, 1)
if actorJson: if actorJson:
@ -167,6 +192,7 @@ def getDisplayName(baseDir: str,actor: str,personCache: {}) -> str:
return(actorJson['name']) return(actorJson['name'])
return None return None
def getNicknameFromActor(actor: str) -> str: def getNicknameFromActor(actor: str) -> str:
"""Returns the nickname from an actor url """Returns the nickname from an actor url
""" """
@ -196,26 +222,30 @@ def getNicknameFromActor(actor: str) -> str:
else: else:
return nickStr.split('/')[0] return nickStr.split('/')[0]
def getDomainFromActor(actor: str) -> (str, int): def getDomainFromActor(actor: str) -> (str, int):
"""Returns the domain name from an actor url """Returns the domain name from an actor url
""" """
port = None port = None
if '/profile/' in actor: if '/profile/' in actor:
domain= \ domain = actor.split('/profile/')[0].replace('https://', '')
actor.split('/profile/')[0].replace('https://','').replace('http://','').replace('i2p://','').replace('dat://','') domain = domain.replace('http://', '').replace('i2p://', '')
domain = domain.replace('dat://', '')
else: else:
if '/channel/' in actor: if '/channel/' in actor:
domain= \ domain = actor.split('/channel/')[0].replace('https://', '')
actor.split('/channel/')[0].replace('https://','').replace('http://','').replace('i2p://','').replace('dat://','') domain = domain.replace('http://', '').replace('i2p://', '')
domain = domain.replace('dat://', '')
else: else:
if '/users/' not in actor: if '/users/' not in actor:
domain= \ domain = actor.replace('https://', '').replace('http://', '')
actor.replace('https://','').replace('http://','').replace('i2p://','').replace('dat://','') domain = domain.replace('i2p://', '').replace('dat://', '')
if '/' in actor: if '/' in actor:
domain = domain.split('/')[0] domain = domain.split('/')[0]
else: else:
domain= \ domain = actor.split('/users/')[0].replace('https://', '')
actor.split('/users/')[0].replace('https://','').replace('http://','').replace('i2p://','').replace('dat://','') domain = domain.replace('http://', '').replace('i2p://', '')
domain = domain.replace('dat://', '')
if ':' in domain: if ':' in domain:
portStr = domain.split(':')[1] portStr = domain.split(':')[1]
if not portStr.isdigit(): if not portStr.isdigit():
@ -224,16 +254,18 @@ def getDomainFromActor(actor: str) -> (str,int):
domain = domain.split(':')[0] domain = domain.split(':')[0]
return domain, port return domain, port
def followPerson(baseDir: str,nickname: str, domain: str, \
followNickname: str, followDomain: str, \ def followPerson(baseDir: str, nickname: str, domain: str,
federationList: [],debug: bool, \ followNickname: str, followDomain: str,
federationList: [], debug: bool,
followFile='following.txt') -> bool: followFile='following.txt') -> bool:
"""Adds a person to the follow list """Adds a person to the follow list
""" """
if not domainPermitted(followDomain.lower().replace('\n',''), \ if not domainPermitted(followDomain.lower().replace('\n', ''),
federationList): federationList):
if debug: if debug:
print('DEBUG: follow of domain '+followDomain+' not permitted') print('DEBUG: follow of domain ' +
followDomain + ' not permitted')
return False return False
if debug: if debug:
print('DEBUG: follow of domain ' + followDomain) print('DEBUG: follow of domain ' + followDomain)
@ -285,14 +317,17 @@ def followPerson(baseDir: str,nickname: str, domain: str, \
print('DEBUG: follow added') print('DEBUG: follow added')
return True return True
except Exception as e: except Exception as e:
print('WARN: Failed to write entry to follow file '+filename+' '+str(e)) print('WARN: Failed to write entry to follow file ' +
filename + ' ' + str(e))
if debug: if debug:
print('DEBUG: creating new following file to follow ' + handleToFollow) print('DEBUG: creating new following file to follow ' + handleToFollow)
with open(filename, "w") as followfile: with open(filename, "w") as followfile:
followfile.write(handleToFollow + '\n') followfile.write(handleToFollow + '\n')
return True return True
def locatePost(baseDir: str,nickname: str,domain: str,postUrl: str,replies=False) -> str:
def locatePost(baseDir: str, nickname: str, domain: str,
postUrl: str, replies=False) -> str:
"""Returns the filename for the given status post url """Returns the filename for the given status post url
""" """
if not replies: if not replies:
@ -301,43 +336,49 @@ def locatePost(baseDir: str,nickname: str,domain: str,postUrl: str,replies=False
extension = 'replies' extension = 'replies'
# if this post in the shared inbox? # if this post in the shared inbox?
handle='inbox@'+domain
postUrl = postUrl.replace('/', '#').replace('/activity', '').strip() postUrl = postUrl.replace('/', '#').replace('/activity', '').strip()
boxName = 'inbox' boxName = 'inbox'
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl+'.'+extension postFilename = baseDir + '/accounts/' + nickname + '@' + domain + \
'/' + boxName + '/' + postUrl + '.' + extension
if os.path.isfile(postFilename): if os.path.isfile(postFilename):
return postFilename return postFilename
boxName = 'outbox' boxName = 'outbox'
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl+'.'+extension postFilename = baseDir + '/accounts/' + nickname + '@' + domain + \
'/' + boxName + '/' + postUrl + '.' + extension
if os.path.isfile(postFilename): if os.path.isfile(postFilename):
return postFilename return postFilename
boxName = 'tlblogs' boxName = 'tlblogs'
postFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/'+boxName+'/'+postUrl+'.'+extension postFilename = baseDir + '/accounts/' + nickname + '@' + domain + \
'/' + boxName + '/' + postUrl + '.'+extension
if os.path.isfile(postFilename): if os.path.isfile(postFilename):
return postFilename return postFilename
postFilename=baseDir+'/cache/announce/'+nickname+'/'+postUrl+'.'+extension postFilename = baseDir + '/cache/announce/' + \
nickname + '/' + postUrl + '.' + extension
if os.path.isfile(postFilename): if os.path.isfile(postFilename):
return postFilename return postFilename
print('WARN: unable to locate '+nickname+' '+postUrl+'.'+extension) print('WARN: unable to locate ' + nickname + ' ' +
postUrl + '.' + extension)
return None return None
def removeAttachment(baseDir: str, httpPrefix: str, domain: str, postJson: {}): def removeAttachment(baseDir: str, httpPrefix: str, domain: str, postJson: {}):
if not postJson.get('attachment'): if not postJson.get('attachment'):
return return
if not postJson['attachment'][0].get('url'): if not postJson['attachment'][0].get('url'):
return return
if port: # if port:
if port!=80 and port!=443: # if port != 80 and port != 443:
if ':' not in domain: # if ':' not in domain:
domain=domain+':'+str(port) # domain = domain + ':' + str(port)
attachmentUrl = postJson['attachment'][0]['url'] attachmentUrl = postJson['attachment'][0]['url']
if not attachmentUrl: if not attachmentUrl:
return return
mediaFilename=baseDir+'/'+attachmentUrl.replace(httpPrefix+'://'+domain+'/','') mediaFilename = baseDir + '/' + \
attachmentUrl.replace(httpPrefix + '://' + domain + '/', '')
if os.path.isfile(mediaFilename): if os.path.isfile(mediaFilename):
os.remove(mediaFilename) os.remove(mediaFilename)
etagFilename = mediaFilename + '.etag' etagFilename = mediaFilename + '.etag'
@ -345,7 +386,9 @@ def removeAttachment(baseDir: str,httpPrefix: str,domain: str,postJson: {}):
os.remove(etagFilename) os.remove(etagFilename)
postJson['attachment'] = [] postJson['attachment'] = []
def removeModerationPostFromIndex(baseDir: str,postUrl: str,debug: bool) -> None:
def removeModerationPostFromIndex(baseDir: str, postUrl: str,
debug: bool) -> None:
"""Removes a url from the moderation index """Removes a url from the moderation index
""" """
moderationIndexFile = baseDir + '/accounts/moderation.txt' moderationIndexFile = baseDir + '/accounts/moderation.txt'
@ -361,15 +404,21 @@ def removeModerationPostFromIndex(baseDir: str,postUrl: str,debug: bool) -> None
f.write(line) f.write(line)
else: else:
if debug: if debug:
print('DEBUG: removed '+postId+' from moderation index') print('DEBUG: removed ' + postId +
' from moderation index')
def deletePost(baseDir: str,httpPrefix: str,nickname: str,domain: str,postFilename: str,debug: bool) -> None:
def deletePost(baseDir: str, httpPrefix: str,
nickname: str, domain: str, postFilename: str,
debug: bool) -> None:
"""Recursively deletes a post and its replies and attachments """Recursively deletes a post and its replies and attachments
""" """
postJsonObject = loadJson(postFilename, 1) postJsonObject = loadJson(postFilename, 1)
if postJsonObject: if postJsonObject:
# don't allow deletion of bookmarked posts # don't allow deletion of bookmarked posts
bookmarksIndexFilename=baseDir+'/accounts/'+nickname+'@'+domain+'/bookmarks.index' bookmarksIndexFilename = \
baseDir + '/accounts/' + nickname + '@' + domain + \
'/bookmarks.index'
if os.path.isfile(bookmarksIndexFilename): if os.path.isfile(bookmarksIndexFilename):
bookmarkIndex = postFilename.split('/')[-1] + '\n' bookmarkIndex = postFilename.split('/')[-1] + '\n'
if bookmarkIndex in open(bookmarksIndexFilename).read(): if bookmarkIndex in open(bookmarksIndexFilename).read():
@ -411,16 +460,19 @@ def deletePost(baseDir: str,httpPrefix: str,nickname: str,domain: str,postFilena
if '#' in postJsonObject['object']['content']: if '#' in postJsonObject['object']['content']:
removeHashtagIndex = True removeHashtagIndex = True
if removeHashtagIndex: if removeHashtagIndex:
if postJsonObject['object'].get('id') and postJsonObject['object'].get('tag'): if postJsonObject['object'].get('id') and \
postJsonObject['object'].get('tag'):
# get the id of the post # get the id of the post
postId=postJsonObject['object']['id'].replace('/activity','') postId = \
postJsonObject['object']['id'].replace('/activity', '')
for tag in postJsonObject['object']['tag']: for tag in postJsonObject['object']['tag']:
if tag['type'] != 'Hashtag': if tag['type'] != 'Hashtag':
continue continue
if not tag.get('name'): if not tag.get('name'):
continue continue
# find the index file for this tag # find the index file for this tag
tagIndexFilename=baseDir+'/tags/'+tag['name'][1:]+'.txt' tagIndexFilename = \
baseDir + '/tags/' + tag['name'][1:] + '.txt'
if not os.path.isfile(tagIndexFilename): if not os.path.isfile(tagIndexFilename):
continue continue
# remove postId from the tag index file # remove postId from the tag index file
@ -434,7 +486,8 @@ def deletePost(baseDir: str,httpPrefix: str,nickname: str,domain: str,postFilena
continue continue
newlines += l newlines += l
if not newlines.strip(): if not newlines.strip():
# if there are no lines then remove the hashtag file # if there are no lines then remove the
# hashtag file
os.remove(tagIndexFilename) os.remove(tagIndexFilename)
else: else:
with open(tagIndexFilename, "w+") as f: with open(tagIndexFilename, "w+") as f:
@ -450,24 +503,35 @@ def deletePost(baseDir: str,httpPrefix: str,nickname: str,domain: str,postFilena
replyFile = locatePost(baseDir, nickname, domain, replyId) replyFile = locatePost(baseDir, nickname, domain, replyId)
if replyFile: if replyFile:
if os.path.isfile(replyFile): if os.path.isfile(replyFile):
deletePost(baseDir,httpPrefix,nickname,domain,replyFile,debug) deletePost(baseDir, httpPrefix,
nickname, domain, replyFile, debug)
# remove the replies file # remove the replies file
os.remove(repliesFilename) os.remove(repliesFilename)
# finally, remove the post itself # finally, remove the post itself
os.remove(postFilename) os.remove(postFilename)
def validNickname(domain: str, nickname: str) -> bool: def validNickname(domain: str, nickname: str) -> bool:
forbiddenChars=['.',' ','/','?',':',';','@'] forbiddenChars = ('.', ' ', '/', '?', ':', ';', '@')
for c in forbiddenChars: for c in forbiddenChars:
if c in nickname: if c in nickname:
return False return False
if nickname == domain: if nickname == domain:
return False return False
reservedNames=['inbox','dm','outbox','following','public','followers','profile','channel','capabilities','calendar','tlreplies','tlmedia','tlblogs','moderation','activity','undo','reply','replies','question','like','likes','users','statuses','updates','repeat','announce','shares'] reservedNames = ('inbox', 'dm', 'outbox', 'following',
'public', 'followers', 'profile',
'channel', 'capabilities', 'calendar',
'tlreplies', 'tlmedia', 'tlblogs',
'moderation', 'activity', 'undo',
'reply', 'replies', 'question', 'like',
'likes', 'users', 'statuses',
'updates', 'repeat', 'announce',
'shares')
if nickname in reservedNames: if nickname in reservedNames:
return False return False
return True return True
def noOfAccounts(baseDir: str) -> bool: def noOfAccounts(baseDir: str) -> bool:
"""Returns the number of accounts on the system """Returns the number of accounts on the system
""" """
@ -479,6 +543,7 @@ def noOfAccounts(baseDir: str) -> bool:
accountCtr += 1 accountCtr += 1
return accountCtr return accountCtr
def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool: def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool:
"""Returns the number of accounts on the system this month """Returns the number of accounts on the system this month
""" """
@ -489,7 +554,8 @@ def noOfActiveAccountsMonthly(baseDir: str,months: int) -> bool:
for account in dirs: for account in dirs:
if '@' in account: if '@' in account:
if not account.startswith('inbox@'): if not account.startswith('inbox@'):
lastUsedFilename=baseDir+'/accounts/'+account+'/.lastUsed' lastUsedFilename = \
baseDir + '/accounts/' + account + '/.lastUsed'
if os.path.isfile(lastUsedFilename): if os.path.isfile(lastUsedFilename):
with open(lastUsedFilename, 'r') as lastUsedFile: with open(lastUsedFilename, 'r') as lastUsedFile:
lastUsed = lastUsedFile.read() lastUsed = lastUsedFile.read()
@ -499,7 +565,9 @@ def noOfActiveAccountsMonthly(baseDir: str,months: int) -> bool:
accountCtr += 1 accountCtr += 1
return accountCtr return accountCtr
def isPublicPostFromUrl(baseDir: str,nickname: str,domain: str,postUrl: str) -> bool:
def isPublicPostFromUrl(baseDir: str, nickname: str, domain: str,
postUrl: str) -> bool:
"""Returns whether the given url is a public post """Returns whether the given url is a public post
""" """
postFilename = locatePost(baseDir, nickname, domain, postUrl) postFilename = locatePost(baseDir, nickname, domain, postUrl)
@ -510,6 +578,7 @@ def isPublicPostFromUrl(baseDir: str,nickname: str,domain: str,postUrl: str) ->
return False return False
return isPublicPost(postJsonObject) return isPublicPost(postJsonObject)
def isPublicPost(postJsonObject: {}) -> bool: def isPublicPost(postJsonObject: {}) -> bool:
"""Returns true if the given post is public """Returns true if the given post is public
""" """
@ -528,6 +597,7 @@ def isPublicPost(postJsonObject: {}) -> bool:
return True return True
return False return False
def copytree(src: str, dst: str, symlinks=False, ignore=None): def copytree(src: str, dst: str, symlinks=False, ignore=None):
"""Copy a directory """Copy a directory
""" """
@ -539,13 +609,16 @@ def copytree(src: str, dst: str, symlinks=False, ignore=None):
else: else:
shutil.copy2(s, d) shutil.copy2(s, d)
def getCachedPostDirectory(baseDir: str, nickname: str, domain: str) -> str: def getCachedPostDirectory(baseDir: str, nickname: str, domain: str) -> str:
"""Returns the directory where the html post cache exists """Returns the directory where the html post cache exists
""" """
htmlPostCacheDir=baseDir+'/accounts/'+nickname+'@'+domain+'/postcache' htmlPostCacheDir = baseDir + '/accounts/' + \
nickname + '@' + domain + '/postcache'
return htmlPostCacheDir return htmlPostCacheDir
def getCachedPostFilename(baseDir: str,nickname: str,domain: str, \
def getCachedPostFilename(baseDir: str, nickname: str, domain: str,
postJsonObject: {}) -> str: postJsonObject: {}) -> str:
"""Returns the html cache filename for the given post """Returns the html cache filename for the given post
""" """
@ -562,6 +635,7 @@ def getCachedPostFilename(baseDir: str,nickname: str,domain: str, \
cachedPostFilename = cachedPostFilename + '.html' cachedPostFilename = cachedPostFilename + '.html'
return cachedPostFilename return cachedPostFilename
def removePostFromCache(postJsonObject: {}, recentPostsCache: {}): def removePostFromCache(postJsonObject: {}, recentPostsCache: {}):
""" if the post exists in the recent posts cache then remove it """ if the post exists in the recent posts cache then remove it
""" """
@ -584,7 +658,8 @@ def removePostFromCache(postJsonObject: {},recentPostsCache: {}):
del recentPostsCache['html'][postId] del recentPostsCache['html'][postId]
recentPostsCache['index'].remove(postId) recentPostsCache['index'].remove(postId)
def updateRecentPostsCache(recentPostsCache: {},maxRecentPosts: int, \
def updateRecentPostsCache(recentPostsCache: {}, maxRecentPosts: int,
postJsonObject: {}, htmlStr: str) -> None: postJsonObject: {}, htmlStr: str) -> None:
"""Store recent posts in memory so that they can be quickly recalled """Store recent posts in memory so that they can be quickly recalled
""" """
@ -613,6 +688,7 @@ def updateRecentPostsCache(recentPostsCache: {},maxRecentPosts: int, \
recentPostsCache['json'][postId] = json.dumps(postJsonObject) recentPostsCache['json'][postId] = json.dumps(postJsonObject)
recentPostsCache['html'][postId] = htmlStr recentPostsCache['html'][postId] = htmlStr
def fileLastModified(filename: str) -> str: def fileLastModified(filename: str) -> str:
"""Returns the date when a file was last modified """Returns the date when a file was last modified
""" """
@ -620,6 +696,7 @@ def fileLastModified(filename: str) -> str:
modifiedTime = datetime.datetime.fromtimestamp(t) modifiedTime = datetime.datetime.fromtimestamp(t)
return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ") return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ")
def daysInMonth(year: int, monthNumber: int) -> int: def daysInMonth(year: int, monthNumber: int) -> int:
"""Returns the number of days in the month """Returns the number of days in the month
""" """
@ -628,12 +705,14 @@ def daysInMonth(year: int,monthNumber: int) -> int:
daysRange = monthrange(year, monthNumber) daysRange = monthrange(year, monthNumber)
return daysRange[1] return daysRange[1]
def mergeDicts(dict1: {}, dict2: {}) -> {}: def mergeDicts(dict1: {}, dict2: {}) -> {}:
"""Merges two dictionaries """Merges two dictionaries
""" """
res = {**dict1, **dict2} res = {**dict1, **dict2}
return res return res
def isBlogPost(postJsonObject: {}) -> bool: def isBlogPost(postJsonObject: {}) -> bool:
"""Is the given post a blog post? """Is the given post a blog post?
""" """