Snake case

merge-requests/30/head
Bob Mottram 2021-12-28 12:15:46 +00:00
parent 49ebb6c88c
commit 98e787fe1e
4 changed files with 295 additions and 290 deletions

View File

@ -246,8 +246,8 @@ from reaction import updateReactionCollection
from utils import undo_reaction_collection_entry
from utils import get_new_post_endpoints
from utils import has_actor
from utils import setReplyIntervalHours
from utils import canReplyTo
from utils import set_reply_interval_hours
from utils import can_reply_to
from utils import is_dm
from utils import replace_users_with_at
from utils import local_actor_url
@ -4981,8 +4981,9 @@ class PubServer(BaseHTTPRequestHandler):
# reply interval in hours
if fields.get('replyhours'):
if fields['replyhours'].isdigit():
setReplyIntervalHours(base_dir, nickname, domain,
fields['replyhours'])
set_reply_interval_hours(base_dir,
nickname, domain,
fields['replyhours'])
# change city
if fields.get('cityDropdown'):
@ -12960,8 +12961,8 @@ class PubServer(BaseHTTPRequestHandler):
if inReplyToUrl:
replyIntervalHours = self.server.default_reply_interval_hrs
if not canReplyTo(base_dir, nickname, domain,
inReplyToUrl, replyIntervalHours):
if not can_reply_to(base_dir, nickname, domain,
inReplyToUrl, replyIntervalHours):
print('Reply outside of time window ' + inReplyToUrl +
str(replyIntervalHours) + ' hours')
self._403()

View File

@ -26,7 +26,7 @@ from utils import fileLastModified
from utils import has_object_string
from utils import has_object_string_object
from utils import get_reply_interval_hours
from utils import canReplyTo
from utils import can_reply_to
from utils import get_user_paths
from utils import get_base_content_from_post
from utils import acct_dir
@ -3113,8 +3113,8 @@ def _createReplyNotificationFile(base_dir: str, nickname: str, domain: str,
replyIntervalHours = \
get_reply_interval_hours(base_dir, nickname, domain,
default_reply_interval_hrs)
if canReplyTo(base_dir, nickname, domain, inReplyTo,
replyIntervalHours):
if can_reply_to(base_dir, nickname, domain, inReplyTo,
replyIntervalHours):
actUrl = local_actor_url(http_prefix, nickname, domain)
_replyNotify(base_dir, handle, actUrl + '/tlreplies')
else:

View File

@ -56,7 +56,7 @@ from follow import sendUnfollowRequestViaServer
from siteactive import siteIsActive
from utils import get_sha_256
from utils import dangerous_svg
from utils import canReplyTo
from utils import can_reply_to
from utils import is_group_account
from utils import get_actor_languages_list
from utils import get_category_types
@ -5807,17 +5807,17 @@ def _testCanReplyTo(base_dir: str) -> None:
postUrl = post_json_object['object']['id']
replyIntervalHours = 2
currDateStr = "2021-09-08T21:32:10Z"
assert canReplyTo(base_dir, nickname, domain,
postUrl, replyIntervalHours,
currDateStr,
post_json_object)
assert can_reply_to(base_dir, nickname, domain,
postUrl, replyIntervalHours,
currDateStr,
post_json_object)
# test a post outside of the reply interval
currDateStr = "2021-09-09T09:24:47Z"
assert not canReplyTo(base_dir, nickname, domain,
postUrl, replyIntervalHours,
currDateStr,
post_json_object)
assert not can_reply_to(base_dir, nickname, domain,
postUrl, replyIntervalHours,
currDateStr,
post_json_object)
def _testSecondsBetweenPublished() -> None:

548
utils.py
View File

@ -1330,7 +1330,7 @@ def clear_from_post_caches(base_dir: str, recent_posts_cache: {},
def locate_post(base_dir: str, nickname: str, domain: str,
postUrl: str, replies: bool = False) -> str:
post_url: str, replies: bool = False) -> str:
"""Returns the filename for the given status post url
"""
if not replies:
@ -1339,31 +1339,31 @@ def locate_post(base_dir: str, nickname: str, domain: str,
extension = 'replies'
# if this post in the shared inbox?
postUrl = remove_id_ending(postUrl.strip()).replace('/', '#')
post_url = remove_id_ending(post_url.strip()).replace('/', '#')
# add the extension
postUrl = postUrl + '.' + extension
post_url = post_url + '.' + extension
# search boxes
boxes = ('inbox', 'outbox', 'tlblogs')
account_dir = acct_dir(base_dir, nickname, domain) + '/'
for boxName in boxes:
post_filename = account_dir + boxName + '/' + postUrl
for box_name in boxes:
post_filename = account_dir + box_name + '/' + post_url
if os.path.isfile(post_filename):
return post_filename
# check news posts
account_dir = base_dir + '/accounts/news' + '@' + domain + '/'
post_filename = account_dir + 'outbox/' + postUrl
post_filename = account_dir + 'outbox/' + post_url
if os.path.isfile(post_filename):
return post_filename
# is it in the announce cache?
post_filename = base_dir + '/cache/announce/' + nickname + '/' + postUrl
post_filename = base_dir + '/cache/announce/' + nickname + '/' + post_url
if os.path.isfile(post_filename):
return post_filename
# print('WARN: unable to locate ' + nickname + ' ' + postUrl)
# print('WARN: unable to locate ' + nickname + ' ' + post_url)
return None
@ -1390,7 +1390,7 @@ def get_reply_interval_hours(base_dir: str, nickname: str, domain: str,
during which replies are allowed
"""
reply_interval_filename = \
acct_dir(base_dir, nickname, domain) + '/.replyIntervalHours'
acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours'
if os.path.isfile(reply_interval_filename):
with open(reply_interval_filename, 'r') as interval_file:
hours_str = interval_file.read()
@ -1399,38 +1399,37 @@ def get_reply_interval_hours(base_dir: str, nickname: str, domain: str,
return default_reply_interval_hrs
def setReplyIntervalHours(base_dir: str, nickname: str, domain: str,
replyIntervalHours: int) -> bool:
def set_reply_interval_hours(base_dir: str, nickname: str, domain: str,
reply_interval_hours: int) -> bool:
"""Sets the reply interval for the given account.
The reply interval is the number of hours after a post being made
during which replies are allowed
"""
reply_interval_filename = \
acct_dir(base_dir, nickname, domain) + '/.replyIntervalHours'
with open(reply_interval_filename, 'w+') as interval_file:
try:
interval_file.write(str(replyIntervalHours))
acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours'
try:
with open(reply_interval_filename, 'w+') as interval_file:
interval_file.write(str(reply_interval_hours))
return True
except OSError:
print('EX: setReplyIntervalHours ' +
'unable to save reply interval ' +
str(reply_interval_filename) + ' ' +
str(replyIntervalHours))
except OSError:
print('EX: set_reply_interval_hours unable to save reply interval ' +
str(reply_interval_filename) + ' ' +
str(reply_interval_hours))
return False
def canReplyTo(base_dir: str, nickname: str, domain: str,
postUrl: str, replyIntervalHours: int,
currDateStr: str = None,
post_json_object: {} = None) -> bool:
def can_reply_to(base_dir: str, nickname: str, domain: str,
post_url: str, reply_interval_hours: int,
curr_date_str: str = None,
post_json_object: {} = None) -> bool:
"""Is replying to the given post permitted?
This is a spam mitigation feature, so that spammers can't
add a lot of replies to old post which you don't notice.
"""
if '/statuses/' not in postUrl:
if '/statuses/' not in post_url:
return True
if not post_json_object:
post_filename = locate_post(base_dir, nickname, domain, postUrl)
post_filename = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
return False
post_json_object = load_json(post_filename)
@ -1440,73 +1439,74 @@ def canReplyTo(base_dir: str, nickname: str, domain: str,
if not published:
return False
try:
pubDate = datetime.datetime.strptime(published, '%Y-%m-%dT%H:%M:%SZ')
pub_date = datetime.datetime.strptime(published, '%Y-%m-%dT%H:%M:%SZ')
except BaseException:
print('EX: canReplyTo unrecognized published date ' + str(published))
print('EX: can_reply_to unrecognized published date ' + str(published))
return False
if not currDateStr:
currDate = datetime.datetime.utcnow()
if not curr_date_str:
curr_date = datetime.datetime.utcnow()
else:
try:
currDate = datetime.datetime.strptime(currDateStr,
'%Y-%m-%dT%H:%M:%SZ')
curr_date = \
datetime.datetime.strptime(curr_date_str, '%Y-%m-%dT%H:%M:%SZ')
except BaseException:
print('EX: canReplyTo unrecognized current date ' +
str(currDateStr))
print('EX: can_reply_to unrecognized current date ' +
str(curr_date_str))
return False
hoursSincePublication = int((currDate - pubDate).total_seconds() / 3600)
if hoursSincePublication < 0 or \
hoursSincePublication >= replyIntervalHours:
hours_since_publication = \
int((curr_date - pub_date).total_seconds() / 3600)
if hours_since_publication < 0 or \
hours_since_publication >= reply_interval_hours:
return False
return True
def _removeAttachment(base_dir: str, http_prefix: str, domain: str,
postJson: {}):
if not postJson.get('attachment'):
post_json: {}):
if not post_json.get('attachment'):
return
if not postJson['attachment'][0].get('url'):
if not post_json['attachment'][0].get('url'):
return
attachmentUrl = postJson['attachment'][0]['url']
if not attachmentUrl:
attachment_url = post_json['attachment'][0]['url']
if not attachment_url:
return
mediaFilename = base_dir + '/' + \
attachmentUrl.replace(http_prefix + '://' + domain + '/', '')
if os.path.isfile(mediaFilename):
media_filename = base_dir + '/' + \
attachment_url.replace(http_prefix + '://' + domain + '/', '')
if os.path.isfile(media_filename):
try:
os.remove(mediaFilename)
os.remove(media_filename)
except OSError:
print('EX: _removeAttachment unable to delete media file ' +
str(mediaFilename))
etagFilename = mediaFilename + '.etag'
if os.path.isfile(etagFilename):
str(media_filename))
etag_filename = media_filename + '.etag'
if os.path.isfile(etag_filename):
try:
os.remove(etagFilename)
os.remove(etag_filename)
except OSError:
print('EX: _removeAttachment unable to delete etag file ' +
str(etagFilename))
postJson['attachment'] = []
str(etag_filename))
post_json['attachment'] = []
def removeModerationPostFromIndex(base_dir: str, postUrl: str,
def removeModerationPostFromIndex(base_dir: str, post_url: str,
debug: bool) -> None:
"""Removes a url from the moderation index
"""
moderation_index_file = base_dir + '/accounts/moderation.txt'
if not os.path.isfile(moderation_index_file):
return
post_id = remove_id_ending(postUrl)
post_id = remove_id_ending(post_url)
if post_id in open(moderation_index_file).read():
with open(moderation_index_file, 'r') as f:
lines = f.readlines()
with open(moderation_index_file, 'w+') as f:
with open(moderation_index_file, 'r') as file1:
lines = file1.readlines()
with open(moderation_index_file, 'w+') as file2:
for line in lines:
if line.strip("\n").strip("\r") != post_id:
f.write(line)
else:
if debug:
print('DEBUG: removed ' + post_id +
' from moderation index')
file2.write(line)
continue
if debug:
print('DEBUG: removed ' + post_id +
' from moderation index')
def _is_reply_to_blog_post(base_dir: str, nickname: str, domain: str,
@ -1535,37 +1535,37 @@ def _deletePostRemoveReplies(base_dir: str, nickname: str, domain: str,
recent_posts_cache: {}, debug: bool) -> None:
"""Removes replies when deleting a post
"""
repliesFilename = post_filename.replace('.json', '.replies')
if not os.path.isfile(repliesFilename):
replies_filename = post_filename.replace('.json', '.replies')
if not os.path.isfile(replies_filename):
return
if debug:
print('DEBUG: removing replies to ' + post_filename)
with open(repliesFilename, 'r') as f:
for replyId in f:
replyFile = locate_post(base_dir, nickname, domain, replyId)
if not replyFile:
with open(replies_filename, 'r') as replies_file:
for reply_id in replies_file:
reply_file = locate_post(base_dir, nickname, domain, reply_id)
if not reply_file:
continue
if os.path.isfile(replyFile):
if os.path.isfile(reply_file):
deletePost(base_dir, http_prefix,
nickname, domain, replyFile, debug,
nickname, domain, reply_file, debug,
recent_posts_cache)
# remove the replies file
try:
os.remove(repliesFilename)
os.remove(replies_filename)
except OSError:
print('EX: _deletePostRemoveReplies unable to delete replies file ' +
str(repliesFilename))
str(replies_filename))
def _isBookmarked(base_dir: str, nickname: str, domain: str,
post_filename: str) -> bool:
"""Returns True if the given post is bookmarked
"""
bookmarksIndexFilename = \
bookmarks_index_filename = \
acct_dir(base_dir, nickname, domain) + '/bookmarks.index'
if os.path.isfile(bookmarksIndexFilename):
bookmarkIndex = post_filename.split('/')[-1] + '\n'
if bookmarkIndex in open(bookmarksIndexFilename).read():
if os.path.isfile(bookmarks_index_filename):
bookmark_index = post_filename.split('/')[-1] + '\n'
if bookmark_index in open(bookmarks_index_filename).read():
return True
return False
@ -1622,13 +1622,13 @@ def _deleteCachedHtml(base_dir: str, nickname: str, domain: str,
def _deleteHashtagsOnPost(base_dir: str, post_json_object: {}) -> None:
"""Removes hashtags when a post is deleted
"""
removeHashtagIndex = False
remove_hashtag_index = False
if has_object_dict(post_json_object):
if post_json_object['object'].get('content'):
if '#' in post_json_object['object']['content']:
removeHashtagIndex = True
remove_hashtag_index = True
if not removeHashtagIndex:
if not remove_hashtag_index:
return
if not post_json_object['object'].get('id') or \
@ -1645,32 +1645,32 @@ def _deleteHashtagsOnPost(base_dir: str, post_json_object: {}) -> None:
if not tag.get('name'):
continue
# find the index file for this tag
tagIndexFilename = base_dir + '/tags/' + tag['name'][1:] + '.txt'
if not os.path.isfile(tagIndexFilename):
tag_index_filename = base_dir + '/tags/' + tag['name'][1:] + '.txt'
if not os.path.isfile(tag_index_filename):
continue
# remove post_id from the tag index file
lines = None
with open(tagIndexFilename, 'r') as f:
lines = f.readlines()
with open(tag_index_filename, 'r') as index_file:
lines = index_file.readlines()
if not lines:
continue
newlines = ''
for fileLine in lines:
if post_id in fileLine:
for file_line in lines:
if post_id in file_line:
# skip over the deleted post
continue
newlines += fileLine
newlines += file_line
if not newlines.strip():
# if there are no lines then remove the hashtag file
try:
os.remove(tagIndexFilename)
os.remove(tag_index_filename)
except OSError:
print('EX: _deleteHashtagsOnPost unable to delete tag index ' +
str(tagIndexFilename))
str(tag_index_filename))
else:
# write the new hashtag index without the given post in it
with open(tagIndexFilename, 'w+') as f:
f.write(newlines)
with open(tag_index_filename, 'w+') as index_file:
index_file.write(newlines)
def _deleteConversationPost(base_dir: str, nickname: str, domain: str,
@ -1683,36 +1683,37 @@ def _deleteConversationPost(base_dir: str, nickname: str, domain: str,
return False
if not post_json_object['object'].get('id'):
return False
conversationDir = acct_dir(base_dir, nickname, domain) + '/conversation'
conversationId = post_json_object['object']['conversation']
conversationId = conversationId.replace('/', '#')
conversation_dir = \
acct_dir(base_dir, nickname, domain) + '/conversation'
conversation_id = post_json_object['object']['conversation']
conversation_id = conversation_id.replace('/', '#')
post_id = post_json_object['object']['id']
conversationFilename = conversationDir + '/' + conversationId
if not os.path.isfile(conversationFilename):
conversation_filename = conversation_dir + '/' + conversation_id
if not os.path.isfile(conversation_filename):
return False
conversationStr = ''
with open(conversationFilename, 'r') as fp:
conversationStr = fp.read()
if post_id + '\n' not in conversationStr:
conversation_str = ''
with open(conversation_filename, 'r') as conv_file:
conversation_str = conv_file.read()
if post_id + '\n' not in conversation_str:
return False
conversationStr = conversationStr.replace(post_id + '\n', '')
if conversationStr:
with open(conversationFilename, 'w+') as fp:
fp.write(conversationStr)
conversation_str = conversation_str.replace(post_id + '\n', '')
if conversation_str:
with open(conversation_filename, 'w+') as conv_file:
conv_file.write(conversation_str)
else:
if os.path.isfile(conversationFilename + '.muted'):
if os.path.isfile(conversation_filename + '.muted'):
try:
os.remove(conversationFilename + '.muted')
os.remove(conversation_filename + '.muted')
except OSError:
print('EX: _deleteConversationPost ' +
'unable to remove conversation ' +
str(conversationFilename) + '.muted')
str(conversation_filename) + '.muted')
try:
os.remove(conversationFilename)
os.remove(conversation_filename)
except OSError:
print('EX: _deleteConversationPost ' +
'unable to remove conversation ' +
str(conversationFilename))
str(conversation_filename))
def deletePost(base_dir: str, http_prefix: str,
@ -1755,13 +1756,13 @@ def deletePost(base_dir: str, http_prefix: str,
extensions = ('votes', 'arrived', 'muted', 'tts', 'reject')
for ext in extensions:
extFilename = post_filename + '.' + ext
if os.path.isfile(extFilename):
ext_filename = post_filename + '.' + ext
if os.path.isfile(ext_filename):
try:
os.remove(extFilename)
os.remove(ext_filename)
except OSError:
print('EX: deletePost unable to remove ext ' +
str(extFilename))
str(ext_filename))
# remove cached html version of the post
_deleteCachedHtml(base_dir, nickname, domain, post_json_object)
@ -1798,7 +1799,7 @@ def isValidLanguage(text: str) -> bool:
"""Returns true if the given text contains a valid
natural language string
"""
naturalLanguages = {
natural_languages = {
"Latin": [65, 866],
"Cyrillic": [1024, 1274],
"Greek": [880, 1280],
@ -1830,15 +1831,15 @@ def isValidLanguage(text: str) -> bool:
"Khmer": [6016, 6144],
"Mongolian": [6144, 6320]
}
for langName, langRange in naturalLanguages.items():
okLang = True
for ch in text:
if ch.isdigit():
for lang_name, lang_range in natural_languages.items():
ok_lang = True
for char in text:
if char.isdigit():
continue
if ord(ch) not in range(langRange[0], langRange[1]):
okLang = False
if ord(char) not in range(lang_range[0], lang_range[1]):
ok_lang = False
break
if okLang:
if ok_lang:
return True
return False
@ -1870,9 +1871,9 @@ def _getReservedWords() -> str:
def getNicknameValidationPattern() -> str:
"""Returns a html text input validation pattern for nickname
"""
reservedNames = _getReservedWords()
reserved_names = _getReservedWords()
pattern = ''
for word in reservedNames:
for word in reserved_names:
if pattern:
pattern += '(?!.*\\b' + word + '\\b)'
else:
@ -1883,8 +1884,8 @@ def getNicknameValidationPattern() -> str:
def _isReservedName(nickname: str) -> bool:
"""Is the given nickname reserved for some special function?
"""
reservedNames = _getReservedWords()
if nickname in reservedNames:
reserved_names = _getReservedWords()
if nickname in reserved_names:
return True
return False
@ -1898,8 +1899,8 @@ def validNickname(domain: str, nickname: str) -> bool:
return False
if not isValidLanguage(nickname):
return False
forbiddenChars = ('.', ' ', '/', '?', ':', ';', '@', '#', '!')
for c in forbiddenChars:
forbidden_chars = ('.', ' ', '/', '?', ':', ';', '@', '#', '!')
for c in forbidden_chars:
if c in nickname:
return False
# this should only apply for the shared inbox
@ -1913,44 +1914,44 @@ def validNickname(domain: str, nickname: str) -> bool:
def noOfAccounts(base_dir: str) -> bool:
"""Returns the number of accounts on the system
"""
accountCtr = 0
account_ctr = 0
for subdir, dirs, files in os.walk(base_dir + '/accounts'):
for account in dirs:
if is_account_dir(account):
accountCtr += 1
account_ctr += 1
break
return accountCtr
return account_ctr
def noOfActiveAccountsMonthly(base_dir: str, months: int) -> bool:
"""Returns the number of accounts on the system this month
"""
accountCtr = 0
account_ctr = 0
curr_time = int(time.time())
monthSeconds = int(60*60*24*30*months)
month_seconds = int(60*60*24*30*months)
for subdir, dirs, files in os.walk(base_dir + '/accounts'):
for account in dirs:
if not is_account_dir(account):
continue
lastUsedFilename = \
last_used_filename = \
base_dir + '/accounts/' + account + '/.lastUsed'
if not os.path.isfile(lastUsedFilename):
if not os.path.isfile(last_used_filename):
continue
with open(lastUsedFilename, 'r') as lastUsedFile:
lastUsed = lastUsedFile.read()
if lastUsed.isdigit():
timeDiff = (curr_time - int(lastUsed))
if timeDiff < monthSeconds:
accountCtr += 1
with open(last_used_filename, 'r') as last_used_file:
last_used = last_used_file.read()
if last_used.isdigit():
time_diff = (curr_time - int(last_used))
if time_diff < month_seconds:
account_ctr += 1
break
return accountCtr
return account_ctr
def isPublicPostFromUrl(base_dir: str, nickname: str, domain: str,
postUrl: str) -> bool:
post_url: str) -> bool:
"""Returns whether the given url is a public post
"""
post_filename = locate_post(base_dir, nickname, domain, postUrl)
post_filename = locate_post(base_dir, nickname, domain, post_url)
if not post_filename:
return False
post_json_object = load_json(post_filename, 1)
@ -1980,12 +1981,12 @@ def copytree(src: str, dst: str, symlinks: str = False, ignore: bool = None):
"""Copy a directory
"""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
s_dir = os.path.join(src, item)
d_dir = os.path.join(dst, item)
if os.path.isdir(s_dir):
shutil.copytree(s_dir, d_dir, symlinks, ignore)
else:
shutil.copy2(s, d)
shutil.copy2(s_dir, d_dir)
def get_cached_post_directory(base_dir: str,
@ -2000,15 +2001,16 @@ def get_cached_post_filename(base_dir: str, nickname: str, domain: str,
post_json_object: {}) -> str:
"""Returns the html cache filename for the given post
"""
cachedPostDir = get_cached_post_directory(base_dir, nickname, domain)
if not os.path.isdir(cachedPostDir):
# print('ERROR: invalid html cache directory ' + cachedPostDir)
cached_post_dir = get_cached_post_directory(base_dir, nickname, domain)
if not os.path.isdir(cached_post_dir):
# print('ERROR: invalid html cache directory ' + cached_post_dir)
return None
if '@' not in cachedPostDir:
# print('ERROR: invalid html cache directory ' + cachedPostDir)
if '@' not in cached_post_dir:
# print('ERROR: invalid html cache directory ' + cached_post_dir)
return None
cachedPostId = remove_id_ending(post_json_object['id'])
cached_post_filename = cachedPostDir + '/' + cachedPostId.replace('/', '#')
cached_post_id = remove_id_ending(post_json_object['id'])
cached_post_filename = \
cached_post_dir + '/' + cached_post_id.replace('/', '#')
return cached_post_filename + '.html'
@ -2048,35 +2050,35 @@ def updateRecentPostsCache(recent_posts_cache: {}, max_recent_posts: int,
def fileLastModified(filename: str) -> str:
"""Returns the date when a file was last modified
"""
t = os.path.getmtime(filename)
modifiedTime = datetime.datetime.fromtimestamp(t)
return modifiedTime.strftime("%Y-%m-%dT%H:%M:%SZ")
time_val = os.path.getmtime(filename)
modified_time = datetime.datetime.fromtimestamp(time_val)
return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ")
def getCSS(base_dir: str, cssFilename: str, cssCache: {}) -> str:
def getCSS(base_dir: str, css_filename: str, cssCache: {}) -> str:
"""Retrieves the css for a given file, or from a cache
"""
# does the css file exist?
if not os.path.isfile(cssFilename):
if not os.path.isfile(css_filename):
return None
lastModified = fileLastModified(cssFilename)
last_modified = fileLastModified(css_filename)
# has this already been loaded into the cache?
if cssCache.get(cssFilename):
if cssCache[cssFilename][0] == lastModified:
if cssCache.get(css_filename):
if cssCache[css_filename][0] == last_modified:
# file hasn't changed, so return the version in the cache
return cssCache[cssFilename][1]
return cssCache[css_filename][1]
with open(cssFilename, 'r') as fpCSS:
css = fpCSS.read()
if cssCache.get(cssFilename):
with open(css_filename, 'r') as fp_css:
css = fp_css.read()
if cssCache.get(css_filename):
# alter the cache contents
cssCache[cssFilename][0] = lastModified
cssCache[cssFilename][1] = css
cssCache[css_filename][0] = last_modified
cssCache[css_filename][1] = css
else:
# add entry to the cache
cssCache[cssFilename] = [lastModified, css]
cssCache[css_filename] = [last_modified, css]
return css
return None
@ -2105,33 +2107,33 @@ def isNewsPost(post_json_object: {}) -> bool:
def _searchVirtualBoxPosts(base_dir: str, nickname: str, domain: str,
searchStr: str, maxResults: int,
boxName: str) -> []:
search_str: str, max_results: int,
box_name: str) -> []:
"""Searches through a virtual box, which is typically an index on the inbox
"""
indexFilename = \
acct_dir(base_dir, nickname, domain) + '/' + boxName + '.index'
if boxName == 'bookmarks':
boxName = 'inbox'
path = acct_dir(base_dir, nickname, domain) + '/' + boxName
index_filename = \
acct_dir(base_dir, nickname, domain) + '/' + box_name + '.index'
if box_name == 'bookmarks':
box_name = 'inbox'
path = acct_dir(base_dir, nickname, domain) + '/' + box_name
if not os.path.isdir(path):
return []
searchStr = searchStr.lower().strip()
search_str = search_str.lower().strip()
if '+' in searchStr:
searchWords = searchStr.split('+')
for index in range(len(searchWords)):
searchWords[index] = searchWords[index].strip()
print('SEARCH: ' + str(searchWords))
if '+' in search_str:
search_words = search_str.split('+')
for index in range(len(search_words)):
search_words[index] = search_words[index].strip()
print('SEARCH: ' + str(search_words))
else:
searchWords = [searchStr]
search_words = [search_str]
res = []
with open(indexFilename, 'r') as indexFile:
with open(index_filename, 'r') as index_file:
post_filename = 'start'
while post_filename:
post_filename = indexFile.readline()
post_filename = index_file.readline()
if not post_filename:
break
if '.json' not in post_filename:
@ -2139,63 +2141,63 @@ def _searchVirtualBoxPosts(base_dir: str, nickname: str, domain: str,
post_filename = path + '/' + post_filename.strip()
if not os.path.isfile(post_filename):
continue
with open(post_filename, 'r') as postFile:
data = postFile.read().lower()
with open(post_filename, 'r') as post_file:
data = post_file.read().lower()
notFound = False
for keyword in searchWords:
not_found = False
for keyword in search_words:
if keyword not in data:
notFound = True
not_found = True
break
if notFound:
if not_found:
continue
res.append(post_filename)
if len(res) >= maxResults:
if len(res) >= max_results:
return res
return res
def searchBoxPosts(base_dir: str, nickname: str, domain: str,
searchStr: str, maxResults: int,
boxName='outbox') -> []:
search_str: str, max_results: int,
box_name='outbox') -> []:
"""Search your posts and return a list of the filenames
containing matching strings
"""
path = acct_dir(base_dir, nickname, domain) + '/' + boxName
path = acct_dir(base_dir, nickname, domain) + '/' + box_name
# is this a virtual box, such as direct messages?
if not os.path.isdir(path):
if os.path.isfile(path + '.index'):
return _searchVirtualBoxPosts(base_dir, nickname, domain,
searchStr, maxResults, boxName)
search_str, max_results, box_name)
return []
searchStr = searchStr.lower().strip()
search_str = search_str.lower().strip()
if '+' in searchStr:
searchWords = searchStr.split('+')
for index in range(len(searchWords)):
searchWords[index] = searchWords[index].strip()
print('SEARCH: ' + str(searchWords))
if '+' in search_str:
search_words = search_str.split('+')
for index in range(len(search_words)):
search_words[index] = search_words[index].strip()
print('SEARCH: ' + str(search_words))
else:
searchWords = [searchStr]
search_words = [search_str]
res = []
for root, dirs, fnames in os.walk(path):
for fname in fnames:
filePath = os.path.join(root, fname)
with open(filePath, 'r') as postFile:
data = postFile.read().lower()
file_path = os.path.join(root, fname)
with open(file_path, 'r') as post_file:
data = post_file.read().lower()
notFound = False
for keyword in searchWords:
not_found = False
for keyword in search_words:
if keyword not in data:
notFound = True
not_found = True
break
if notFound:
if not_found:
continue
res.append(filePath)
if len(res) >= maxResults:
res.append(file_path)
if len(res) >= max_results:
return res
break
return res
@ -2255,16 +2257,16 @@ def undo_likes_collection_entry(recent_posts_cache: {},
total_items = 0
if obj['likes'].get('totalItems'):
total_items = obj['likes']['totalItems']
itemFound = False
for likeItem in obj['likes']['items']:
if likeItem.get('actor'):
if likeItem['actor'] == actor:
item_found = False
for like_item in obj['likes']['items']:
if like_item.get('actor'):
if like_item['actor'] == actor:
if debug:
print('DEBUG: like was removed for ' + actor)
obj['likes']['items'].remove(likeItem)
itemFound = True
obj['likes']['items'].remove(like_item)
item_found = True
break
if not itemFound:
if not item_found:
return
if total_items == 1:
if debug:
@ -2282,7 +2284,7 @@ def undo_reaction_collection_entry(recent_posts_cache: {},
object_url: str,
actor: str, domain: str, debug: bool,
post_json_object: {},
emojiContent: str) -> None:
emoji_content: str) -> None:
"""Undoes an emoji reaction for a particular actor
"""
if not post_json_object:
@ -2321,17 +2323,17 @@ def undo_reaction_collection_entry(recent_posts_cache: {},
total_items = 0
if obj['reactions'].get('totalItems'):
total_items = obj['reactions']['totalItems']
itemFound = False
for likeItem in obj['reactions']['items']:
if likeItem.get('actor'):
if likeItem['actor'] == actor and \
likeItem['content'] == emojiContent:
item_found = False
for like_item in obj['reactions']['items']:
if like_item.get('actor'):
if like_item['actor'] == actor and \
like_item['content'] == emoji_content:
if debug:
print('DEBUG: emoji reaction was removed for ' + actor)
obj['reactions']['items'].remove(likeItem)
itemFound = True
obj['reactions']['items'].remove(like_item)
item_found = True
break
if not itemFound:
if not item_found:
return
if total_items == 1:
if debug:
@ -2389,17 +2391,17 @@ def undo_announce_collection_entry(recent_posts_cache: {},
total_items = 0
if post_json_object['object']['shares'].get('totalItems'):
total_items = post_json_object['object']['shares']['totalItems']
itemFound = False
for announceItem in post_json_object['object']['shares']['items']:
if announceItem.get('actor'):
if announceItem['actor'] == actor:
item_found = False
for announce_item in post_json_object['object']['shares']['items']:
if announce_item.get('actor'):
if announce_item['actor'] == actor:
if debug:
print('DEBUG: Announce was removed for ' + actor)
anIt = announceItem
anIt = announce_item
post_json_object['object']['shares']['items'].remove(anIt)
itemFound = True
item_found = True
break
if not itemFound:
if not item_found:
return
if total_items == 1:
if debug:
@ -2446,14 +2448,14 @@ def update_announce_collection(recent_posts_cache: {},
pprint(post_json_object)
print('DEBUG: post ' + post_filename + ' has no object')
return
postUrl = remove_id_ending(post_json_object['id']) + '/shares'
post_url = remove_id_ending(post_json_object['id']) + '/shares'
if not post_json_object['object'].get('shares'):
if debug:
print('DEBUG: Adding initial shares (announcements) to ' +
postUrl)
announcementsJson = {
post_url)
announcements_json = {
"@context": "https://www.w3.org/ns/activitystreams",
'id': postUrl,
'id': post_url,
'type': 'Collection',
"totalItems": 1,
'items': [{
@ -2461,13 +2463,13 @@ def update_announce_collection(recent_posts_cache: {},
'actor': actor
}]
}
post_json_object['object']['shares'] = announcementsJson
post_json_object['object']['shares'] = announcements_json
else:
if post_json_object['object']['shares'].get('items'):
sharesItems = post_json_object['object']['shares']['items']
for announceItem in sharesItems:
if announceItem.get('actor'):
if announceItem['actor'] == actor:
shares_items = post_json_object['object']['shares']['items']
for announce_item in shares_items:
if announce_item.get('actor'):
if announce_item['actor'] == actor:
return
new_announce = {
'type': 'Announce',
@ -2583,18 +2585,18 @@ def reject_post_id(base_dir: str, nickname: str, domain: str,
# filename of the post without any extension or path
# This should also correspond to any index entry in
# the posts cache
postUrl = \
post_url = \
index_filename.replace('\n', '').replace('\r', '')
postUrl = postUrl.replace('.json', '').strip()
post_url = post_url.replace('.json', '').strip()
if postUrl in recent_posts_cache['index']:
if recent_posts_cache['json'].get(postUrl):
del recent_posts_cache['json'][postUrl]
if recent_posts_cache['html'].get(postUrl):
del recent_posts_cache['html'][postUrl]
if post_url in recent_posts_cache['index']:
if recent_posts_cache['json'].get(post_url):
del recent_posts_cache['json'][post_url]
if recent_posts_cache['html'].get(post_url):
del recent_posts_cache['html'][post_url]
with open(post_filename + '.reject', 'w+') as rejectFile:
rejectFile.write('\n')
with open(post_filename + '.reject', 'w+') as reject_file:
reject_file.write('\n')
def is_dm(post_json_object: {}) -> bool:
@ -2613,10 +2615,10 @@ def is_dm(post_json_object: {}) -> bool:
if post_json_object['object'].get('moderationStatus'):
return False
fields = ('to', 'cc')
for f in fields:
if not post_json_object['object'].get(f):
for field_name in fields:
if not post_json_object['object'].get(field_name):
continue
for to_address in post_json_object['object'][f]:
for to_address in post_json_object['object'][field_name]:
if to_address.endswith('#Public'):
return False
if to_address.endswith('followers'):
@ -2748,7 +2750,7 @@ def get_occupation_skills(actor_json: {}) -> []:
continue
if isinstance(occupation_item['skills'], list):
return occupation_item['skills']
elif isinstance(occupation_item['skills'], str):
if isinstance(occupation_item['skills'], str):
return [occupation_item['skills']]
break
return []
@ -2903,12 +2905,12 @@ def get_actor_property_url(actor_json: {}, property_name: str) -> str:
continue
property_value['value'] = property_value['value'].strip()
prefixes = get_protocol_prefixes()
prefixFound = False
prefix_found = False
for prefix in prefixes:
if property_value['value'].startswith(prefix):
prefixFound = True
prefix_found = True
break
if not prefixFound:
if not prefix_found:
continue
if '.' not in property_value['value']:
continue
@ -2938,9 +2940,9 @@ def get_port_from_domain(domain: str) -> int:
if ':' in domain:
if domain.startswith('did:'):
return None
portStr = domain.split(':')[1]
if portStr.isdigit():
return int(portStr)
port_str = domain.split(':')[1]
if port_str.isdigit():
return int(port_str)
return None
@ -2972,7 +2974,9 @@ def valid_password(password: str) -> bool:
return True
def is_float(value):
def is_float(value) -> bool:
"""Is the given value a float?
"""
try:
float(value)
return True
@ -3124,10 +3128,10 @@ def get_supported_languages(base_dir: str) -> []:
translations_dir = base_dir + '/translations'
languages_str = []
for _, _, files in os.walk(translations_dir):
for f in files:
if not f.endswith('.json'):
for fname in files:
if not fname.endswith('.json'):
continue
lang = f.split('.')[0]
lang = fname.split('.')[0]
if len(lang) == 2:
languages_str.append(lang)
break
@ -3140,14 +3144,14 @@ def get_category_types(base_dir: str) -> []:
ontology_dir = base_dir + '/ontology'
categories = []
for _, _, files in os.walk(ontology_dir):
for f in files:
if not f.endswith('.json'):
for fname in files:
if not fname.endswith('.json'):
continue
if '#' in f or '~' in f:
if '#' in fname or '~' in fname:
continue
if f.startswith('custom'):
if fname.startswith('custom'):
continue
ontology_filename = f.split('.')[0]
ontology_filename = fname.split('.')[0]
if 'Types' in ontology_filename:
categories.append(ontology_filename.replace('Types', ''))
break