Snake case

merge-requests/30/head
Bob Mottram 2021-12-28 21:55:38 +00:00
parent f96495a356
commit 30e04544ae
9 changed files with 156 additions and 154 deletions

View File

@ -37,8 +37,8 @@ from conversation import muteConversation
from conversation import unmuteConversation
def addGlobalBlock(base_dir: str,
blockNickname: str, blockDomain: str) -> bool:
def add_global_block(base_dir: str,
blockNickname: str, blockDomain: str) -> bool:
"""Global block which applies to all accounts
"""
blockingFilename = base_dir + '/accounts/blocking.txt'
@ -71,8 +71,8 @@ def addGlobalBlock(base_dir: str,
return True
def addBlock(base_dir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
def add_block(base_dir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Block the given account
"""
if blockDomain.startswith(domain) and nickname == blockNickname:
@ -139,9 +139,9 @@ def addBlock(base_dir: str, nickname: str, domain: str,
return True
def removeGlobalBlock(base_dir: str,
unblockNickname: str,
unblockDomain: str) -> bool:
def remove_global_block(base_dir: str,
unblockNickname: str,
unblockDomain: str) -> bool:
"""Unblock the given global block
"""
unblockingFilename = base_dir + '/accounts/blocking.txt'
@ -198,8 +198,8 @@ def removeGlobalBlock(base_dir: str,
return False
def removeBlock(base_dir: str, nickname: str, domain: str,
unblockNickname: str, unblockDomain: str) -> bool:
def remove_block(base_dir: str, nickname: str, domain: str,
unblockNickname: str, unblockDomain: str) -> bool:
"""Unblock the given account
"""
domain = remove_domain_port(domain)
@ -229,7 +229,7 @@ def removeBlock(base_dir: str, nickname: str, domain: str,
return False
def isBlockedHashtag(base_dir: str, hashtag: str) -> bool:
def is_blocked_hashtag(base_dir: str, hashtag: str) -> bool:
"""Is the given hashtag blocked?
"""
# avoid very long hashtags
@ -245,7 +245,7 @@ def isBlockedHashtag(base_dir: str, hashtag: str) -> bool:
return False
def getDomainBlocklist(base_dir: str) -> str:
def get_domain_blocklist(base_dir: str) -> str:
"""Returns all globally blocked domains as a string
This can be used for fast matching to mitigate flooding
"""
@ -266,10 +266,10 @@ def getDomainBlocklist(base_dir: str) -> str:
return blockedStr
def updateBlockedCache(base_dir: str,
blockedCache: [],
blockedCacheLastUpdated: int,
blockedCacheUpdateSecs: int) -> int:
def update_blocked_cache(base_dir: str,
blockedCache: [],
blockedCacheLastUpdated: int,
blockedCacheUpdateSecs: int) -> int:
"""Updates the cache of globally blocked domains held in memory
"""
curr_time = int(time.time())
@ -308,8 +308,8 @@ def _getShortDomain(domain: str) -> str:
return None
def isBlockedDomain(base_dir: str, domain: str,
blockedCache: [] = None) -> bool:
def is_blocked_domain(base_dir: str, domain: str,
blockedCache: [] = None) -> bool:
"""Is the given domain blocked?
"""
if '.' not in domain:
@ -320,7 +320,7 @@ def isBlockedDomain(base_dir: str, domain: str,
shortDomain = _getShortDomain(domain)
if not broch_modeIsActive(base_dir):
if not broch_mode_is_active(base_dir):
if blockedCache:
for blockedStr in blockedCache:
if '*@' + domain in blockedStr:
@ -368,7 +368,7 @@ def isBlocked(base_dir: str, nickname: str, domain: str,
if blockNickname and blockDomain:
blockHandle = blockNickname + '@' + blockDomain
if not broch_modeIsActive(base_dir):
if not broch_mode_is_active(base_dir):
# instance level block list
if blockedCache:
for blockedStr in blockedCache:
@ -455,8 +455,8 @@ def outboxBlock(base_dir: str, http_prefix: str,
domainBlocked, portBlocked = get_domain_from_actor(message_json['object'])
domainBlockedFull = get_full_domain(domainBlocked, portBlocked)
addBlock(base_dir, nickname, domain,
nicknameBlocked, domainBlockedFull)
add_block(base_dir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post blocked via c2s - ' + post_filename)
@ -513,27 +513,27 @@ def outboxUndoBlock(base_dir: str, http_prefix: str,
domainBlocked, portBlocked = get_domain_from_actor(domainObject)
domainBlockedFull = get_full_domain(domainBlocked, portBlocked)
removeBlock(base_dir, nickname, domain,
nicknameBlocked, domainBlockedFull)
remove_block(base_dir, nickname, domain,
nicknameBlocked, domainBlockedFull)
if debug:
print('DEBUG: post undo blocked via c2s - ' + post_filename)
def mutePost(base_dir: str, nickname: str, domain: str, port: int,
http_prefix: str, post_id: str, recent_posts_cache: {},
debug: bool) -> None:
def mute_post(base_dir: str, nickname: str, domain: str, port: int,
http_prefix: str, post_id: str, recent_posts_cache: {},
debug: bool) -> None:
""" Mutes the given post
"""
print('mutePost: post_id ' + post_id)
print('mute_post: post_id ' + post_id)
post_filename = locate_post(base_dir, nickname, domain, post_id)
if not post_filename:
print('mutePost: file not found ' + post_id)
print('mute_post: file not found ' + post_id)
return
post_json_object = load_json(post_filename)
if not post_json_object:
print('mutePost: object not loaded ' + post_id)
print('mute_post: object not loaded ' + post_id)
return
print('mutePost: ' + str(post_json_object))
print('mute_post: ' + str(post_json_object))
postJsonObj = post_json_object
alsoUpdatePostId = None
@ -582,7 +582,7 @@ def mutePost(base_dir: str, nickname: str, domain: str, port: int,
postJsonObj['ignores']['totalItems'] = igIt
postJsonObj['muted'] = True
if save_json(post_json_object, post_filename):
print('mutePost: saved ' + post_filename)
print('mute_post: saved ' + post_filename)
# remove cached post so that the muted version gets recreated
# without its content text and/or image
@ -653,9 +653,9 @@ def mutePost(base_dir: str, nickname: str, domain: str, port: int,
print('MUTE: ' + alsoUpdatePostId + ' removed referenced html')
def unmutePost(base_dir: str, nickname: str, domain: str, port: int,
http_prefix: str, post_id: str, recent_posts_cache: {},
debug: bool) -> None:
def unmute_post(base_dir: str, nickname: str, domain: str, port: int,
http_prefix: str, post_id: str, recent_posts_cache: {},
debug: bool) -> None:
""" Unmutes the given post
"""
post_filename = locate_post(base_dir, nickname, domain, post_id)
@ -671,7 +671,7 @@ def unmutePost(base_dir: str, nickname: str, domain: str, port: int,
os.remove(muteFilename)
except OSError:
if debug:
print('EX: unmutePost mute filename not deleted ' +
print('EX: unmute_post mute filename not deleted ' +
str(muteFilename))
print('UNMUTE: ' + muteFilename + ' file removed')
@ -721,7 +721,7 @@ def unmutePost(base_dir: str, nickname: str, domain: str, port: int,
os.remove(cachedPostFilename)
except OSError:
if debug:
print('EX: unmutePost cached post not deleted ' +
print('EX: unmute_post cached post not deleted ' +
str(cachedPostFilename))
# if the post is in the recent posts cache then mark it as unmuted
@ -755,7 +755,7 @@ def unmutePost(base_dir: str, nickname: str, domain: str, port: int,
except OSError:
if debug:
print('EX: ' +
'unmutePost cached ref post not removed ' +
'unmute_post cached ref post not removed ' +
str(cachedPostFilename))
if recent_posts_cache.get('json'):
@ -811,9 +811,9 @@ def outboxMute(base_dir: str, http_prefix: str,
print('WARN: unable to find nickname in ' + message_json['object'])
return
mutePost(base_dir, nickname, domain, port,
http_prefix, message_json['object'], recent_posts_cache,
debug)
mute_post(base_dir, nickname, domain, port,
http_prefix, message_json['object'], recent_posts_cache,
debug)
if debug:
print('DEBUG: post muted via c2s - ' + post_filename)
@ -867,22 +867,22 @@ def outboxUndoMute(base_dir: str, http_prefix: str,
message_json['object']['object'])
return
unmutePost(base_dir, nickname, domain, port,
http_prefix, message_json['object']['object'],
recent_posts_cache, debug)
unmute_post(base_dir, nickname, domain, port,
http_prefix, message_json['object']['object'],
recent_posts_cache, debug)
if debug:
print('DEBUG: post undo mute via c2s - ' + post_filename)
def broch_modeIsActive(base_dir: str) -> bool:
def broch_mode_is_active(base_dir: str) -> bool:
"""Returns true if broch mode is active
"""
allowFilename = base_dir + '/accounts/allowedinstances.txt'
return os.path.isfile(allowFilename)
def setBrochMode(base_dir: str, domain_full: str, enabled: bool) -> None:
def set_broch_mode(base_dir: str, domain_full: str, enabled: bool) -> None:
"""Broch mode can be used to lock down the instance during
a period of time when it is temporarily under attack.
For example, where an adversary is constantly spinning up new
@ -899,7 +899,7 @@ def setBrochMode(base_dir: str, domain_full: str, enabled: bool) -> None:
try:
os.remove(allowFilename)
except OSError:
print('EX: setBrochMode allow file not deleted ' +
print('EX: set_broch_mode allow file not deleted ' +
str(allowFilename))
print('Broch mode turned off')
else:
@ -982,7 +982,7 @@ def broch_modeLapses(base_dir: str, lapseDays: int) -> bool:
return False
def loadCWLists(base_dir: str, verbose: bool) -> {}:
def load_cw_lists(base_dir: str, verbose: bool) -> {}:
"""Load lists used for content warnings
"""
if not os.path.isdir(base_dir + '/cwlists'):
@ -1067,7 +1067,7 @@ def addCWfromLists(post_json_object: {}, cw_lists: {}, translate: {},
post_json_object['object']['sensitive'] = True
def getCWlistVariable(listName: str) -> str:
def get_cw_list_variable(listName: str) -> str:
"""Returns the variable associated with a CW list
"""
return 'list' + listName.replace(' ', '').replace("'", '')

168
daemon.py
View File

@ -128,20 +128,20 @@ from media import replace_twitter
from media import attach_media
from media import path_is_video
from media import path_is_audio
from blocking import getCWlistVariable
from blocking import loadCWLists
from blocking import updateBlockedCache
from blocking import mutePost
from blocking import unmutePost
from blocking import setBrochMode
from blocking import broch_modeIsActive
from blocking import addBlock
from blocking import removeBlock
from blocking import addGlobalBlock
from blocking import removeGlobalBlock
from blocking import isBlockedHashtag
from blocking import isBlockedDomain
from blocking import getDomainBlocklist
from blocking import get_cw_list_variable
from blocking import load_cw_lists
from blocking import update_blocked_cache
from blocking import mute_post
from blocking import unmute_post
from blocking import set_broch_mode
from blocking import broch_mode_is_active
from blocking import add_block
from blocking import remove_block
from blocking import add_global_block
from blocking import remove_global_block
from blocking import is_blocked_hashtag
from blocking import is_blocked_domain
from blocking import get_domain_blocklist
from roles import getActorRolesList
from roles import setRole
from roles import clearModeratorStatus
@ -614,13 +614,13 @@ class PubServer(BaseHTTPRequestHandler):
blockedUA = False
if not agentDomain.startswith(calling_domain):
self.server.blockedCacheLastUpdated = \
updateBlockedCache(self.server.base_dir,
self.server.blockedCache,
self.server.blockedCacheLastUpdated,
self.server.blockedCacheUpdateSecs)
update_blocked_cache(self.server.base_dir,
self.server.blockedCache,
self.server.blockedCacheLastUpdated,
self.server.blockedCacheUpdateSecs)
blockedUA = isBlockedDomain(self.server.base_dir, agentDomain,
self.server.blockedCache)
blockedUA = is_blocked_domain(self.server.base_dir, agentDomain,
self.server.blockedCache)
# if self.server.debug:
if blockedUA:
print('Blocked User agent: ' + agentDomain)
@ -1061,7 +1061,7 @@ class PubServer(BaseHTTPRequestHandler):
print('mastodon api v1: nickname ' + str(nickname))
self._update_known_crawlers(uaStr)
broch_mode = broch_modeIsActive(base_dir)
broch_mode = broch_mode_is_active(base_dir)
sendJson, sendJsonStr = \
masto_api_v1_response(path,
calling_domain,
@ -1134,7 +1134,7 @@ class PubServer(BaseHTTPRequestHandler):
# For example, if this or allied instances are being attacked
# then numbers of accounts may be changing as people
# migrate, and that information may be useful to an adversary
broch_mode = broch_modeIsActive(self.server.base_dir)
broch_mode = broch_mode_is_active(self.server.base_dir)
nodeInfoVersion = self.server.project_version
if not self.server.show_node_info_version or broch_mode:
@ -1461,13 +1461,13 @@ class PubServer(BaseHTTPRequestHandler):
get_domain_from_actor(message_json['actor'])
self.server.blockedCacheLastUpdated = \
updateBlockedCache(self.server.base_dir,
self.server.blockedCache,
self.server.blockedCacheLastUpdated,
self.server.blockedCacheUpdateSecs)
update_blocked_cache(self.server.base_dir,
self.server.blockedCache,
self.server.blockedCacheLastUpdated,
self.server.blockedCacheUpdateSecs)
if isBlockedDomain(self.server.base_dir, messageDomain,
self.server.blockedCache):
if is_blocked_domain(self.server.base_dir, messageDomain,
self.server.blockedCache):
print('POST from blocked domain ' + messageDomain)
self._400()
self.server.POSTbusy = False
@ -1527,10 +1527,10 @@ class PubServer(BaseHTTPRequestHandler):
return 5
self.server.blockedCacheLastUpdated = \
updateBlockedCache(self.server.base_dir,
self.server.blockedCache,
self.server.blockedCacheLastUpdated,
self.server.blockedCacheUpdateSecs)
update_blocked_cache(self.server.base_dir,
self.server.blockedCache,
self.server.blockedCacheLastUpdated,
self.server.blockedCacheUpdateSecs)
queueFilename = \
save_post_to_inbox_queue(self.server.base_dir,
@ -1981,7 +1981,7 @@ class PubServer(BaseHTTPRequestHandler):
nickname = '*'
fullBlockDomain = moderationText.strip()
if fullBlockDomain or nickname.startswith('#'):
addGlobalBlock(base_dir, nickname, fullBlockDomain)
add_global_block(base_dir, nickname, fullBlockDomain)
if moderationButton == 'unblock':
fullBlockDomain = None
if moderationText.startswith('http') or \
@ -2000,7 +2000,8 @@ class PubServer(BaseHTTPRequestHandler):
nickname = '*'
fullBlockDomain = moderationText.strip()
if fullBlockDomain or nickname.startswith('#'):
removeGlobalBlock(base_dir, nickname, fullBlockDomain)
remove_global_block(base_dir, nickname,
fullBlockDomain)
if moderationButton == 'remove':
if '/statuses/' not in moderationText:
remove_account(base_dir, nickname, domain, port)
@ -2618,9 +2619,9 @@ class PubServer(BaseHTTPRequestHandler):
if '&submitBlock=' in optionsConfirmParams:
print('Adding block by ' + chooserNickname +
' of ' + optionsActor)
if addBlock(base_dir, chooserNickname,
domain,
optionsNickname, optionsDomainFull):
if add_block(base_dir, chooserNickname,
domain,
optionsNickname, optionsDomainFull):
# send block activity
self._sendBlock(http_prefix,
chooserNickname, domain_full,
@ -3136,10 +3137,10 @@ class PubServer(BaseHTTPRequestHandler):
else:
print('Adding block by ' + blockerNickname +
' of ' + blockingActor)
if addBlock(base_dir, blockerNickname,
domain,
blockingNickname,
blockingDomainFull):
if add_block(base_dir, blockerNickname,
domain,
blockingNickname,
blockingDomainFull):
# send block activity
self._sendBlock(http_prefix,
blockerNickname, domain_full,
@ -3224,9 +3225,9 @@ class PubServer(BaseHTTPRequestHandler):
if debug:
print(blockerNickname + ' stops blocking ' +
blockingActor)
removeBlock(base_dir,
blockerNickname, domain,
blockingNickname, blockingDomainFull)
remove_block(base_dir,
blockerNickname, domain,
blockingNickname, blockingDomainFull)
if calling_domain.endswith('.onion') and onion_domain:
originPathStr = 'http://' + onion_domain + usersPath
elif (calling_domain.endswith('.i2p') and i2p_domain):
@ -4281,7 +4282,7 @@ class PubServer(BaseHTTPRequestHandler):
if fields.get('hashtagCategory'):
categoryStr = fields['hashtagCategory'].lower()
if not isBlockedHashtag(base_dir, categoryStr) and \
if not is_blocked_hashtag(base_dir, categoryStr) and \
not isFiltered(base_dir, nickname, domain, categoryStr):
setHashtagCategory(base_dir, hashtag, categoryStr, False)
else:
@ -5576,9 +5577,9 @@ class PubServer(BaseHTTPRequestHandler):
currBrochMode = \
get_config_param(base_dir, "broch_mode")
if broch_mode != currBrochMode:
setBrochMode(self.server.base_dir,
self.server.domain_full,
broch_mode)
set_broch_mode(self.server.base_dir,
self.server.domain_full,
broch_mode)
set_config_param(base_dir, "broch_mode",
broch_mode)
@ -6381,7 +6382,7 @@ class PubServer(BaseHTTPRequestHandler):
# set selected content warning lists
newListsEnabled = ''
for name, item in self.server.cw_lists.items():
listVarName = getCWlistVariable(name)
listVarName = get_cw_list_variable(name)
if fields.get(listVarName):
if fields[listVarName] == 'on':
if newListsEnabled:
@ -7515,7 +7516,7 @@ class PubServer(BaseHTTPRequestHandler):
if '?page=' in hashtag:
hashtag = hashtag.split('?page=')[0]
hashtag = urllib.parse.unquote_plus(hashtag)
if isBlockedHashtag(base_dir, hashtag):
if is_blocked_hashtag(base_dir, hashtag):
print('BLOCK: hashtag #' + hashtag)
msg = htmlHashtagBlocked(self.server.css_cache, base_dir,
self.server.translate).encode('utf-8')
@ -7584,7 +7585,7 @@ class PubServer(BaseHTTPRequestHandler):
"""Return an RSS 2 feed for a hashtag
"""
hashtag = path.split('/tags/rss2/')[1]
if isBlockedHashtag(base_dir, hashtag):
if is_blocked_hashtag(base_dir, hashtag):
self._400()
return
nickname = None
@ -9237,27 +9238,28 @@ class PubServer(BaseHTTPRequestHandler):
actor = \
http_prefix + '://' + domain_full + path.split('?mute=')[0]
nickname = get_nickname_from_actor(actor)
mutePost(base_dir, nickname, domain, port,
http_prefix, muteUrl,
self.server.recent_posts_cache, debug)
mute_post(base_dir, nickname, domain, port,
http_prefix, muteUrl,
self.server.recent_posts_cache, debug)
muteFilename = \
locate_post(base_dir, nickname, domain, muteUrl)
if muteFilename:
print('mutePost: Regenerating html post for changed mute status')
mutePostJson = load_json(muteFilename, 0, 1)
if mutePostJson:
print('mute_post: Regenerating html post for changed mute status')
mute_postJson = load_json(muteFilename, 0, 1)
if mute_postJson:
cachedPostFilename = \
get_cached_post_filename(base_dir, nickname,
domain, mutePostJson)
print('mutePost: Muted post json: ' + str(mutePostJson))
print('mutePost: Muted post nickname: ' +
domain, mute_postJson)
print('mute_post: Muted post json: ' + str(mute_postJson))
print('mute_post: Muted post nickname: ' +
nickname + ' ' + domain)
print('mutePost: Muted post cache: ' + str(cachedPostFilename))
print('mute_post: Muted post cache: ' +
str(cachedPostFilename))
showIndividualPostIcons = True
manuallyApproveFollowers = \
follower_approval_active(base_dir,
nickname, domain)
showRepeats = not is_dm(mutePostJson)
showRepeats = not is_dm(mute_postJson)
showPublicOnly = False
storeToCache = True
useCacheOnly = False
@ -9274,7 +9276,7 @@ class PubServer(BaseHTTPRequestHandler):
self.server.cached_webfingers,
self.server.person_cache,
nickname, domain,
self.server.port, mutePostJson,
self.server.port, mute_postJson,
avatarUrl, showAvatarOptions,
self.server.allow_deletion,
http_prefix,
@ -9346,28 +9348,28 @@ class PubServer(BaseHTTPRequestHandler):
actor = \
http_prefix + '://' + domain_full + path.split('?unmute=')[0]
nickname = get_nickname_from_actor(actor)
unmutePost(base_dir, nickname, domain, port,
http_prefix, muteUrl,
self.server.recent_posts_cache, debug)
unmute_post(base_dir, nickname, domain, port,
http_prefix, muteUrl,
self.server.recent_posts_cache, debug)
muteFilename = \
locate_post(base_dir, nickname, domain, muteUrl)
if muteFilename:
print('unmutePost: ' +
print('unmute_post: ' +
'Regenerating html post for changed unmute status')
mutePostJson = load_json(muteFilename, 0, 1)
if mutePostJson:
mute_postJson = load_json(muteFilename, 0, 1)
if mute_postJson:
cachedPostFilename = \
get_cached_post_filename(base_dir, nickname,
domain, mutePostJson)
print('unmutePost: Unmuted post json: ' + str(mutePostJson))
print('unmutePost: Unmuted post nickname: ' +
domain, mute_postJson)
print('unmute_post: Unmuted post json: ' + str(mute_postJson))
print('unmute_post: Unmuted post nickname: ' +
nickname + ' ' + domain)
print('unmutePost: Unmuted post cache: ' +
print('unmute_post: Unmuted post cache: ' +
str(cachedPostFilename))
showIndividualPostIcons = True
manuallyApproveFollowers = \
follower_approval_active(base_dir, nickname, domain)
showRepeats = not is_dm(mutePostJson)
showRepeats = not is_dm(mute_postJson)
showPublicOnly = False
storeToCache = True
useCacheOnly = False
@ -9384,7 +9386,7 @@ class PubServer(BaseHTTPRequestHandler):
self.server.cached_webfingers,
self.server.person_cache,
nickname, domain,
self.server.port, mutePostJson,
self.server.port, mute_postJson,
avatarUrl, showAvatarOptions,
self.server.allow_deletion,
http_prefix,
@ -16119,7 +16121,7 @@ class PubServer(BaseHTTPRequestHandler):
blockDomain = urllib.parse.unquote_plus(blockDomain.strip())
if '?' in blockDomain:
blockDomain = blockDomain.split('?')[0]
addGlobalBlock(self.server.base_dir, '*', blockDomain)
add_global_block(self.server.base_dir, '*', blockDomain)
msg = \
htmlAccountInfo(self.server.css_cache,
self.server.translate,
@ -16156,7 +16158,7 @@ class PubServer(BaseHTTPRequestHandler):
searchHandle = urllib.parse.unquote_plus(searchHandle)
blockDomain = blockDomain.split('?handle=')[0]
blockDomain = urllib.parse.unquote_plus(blockDomain.strip())
removeGlobalBlock(self.server.base_dir, '*', blockDomain)
remove_global_block(self.server.base_dir, '*', blockDomain)
msg = \
htmlAccountInfo(self.server.css_cache,
self.server.translate,
@ -18634,7 +18636,7 @@ def runDaemon(content_license_url: str,
# It helps to avoid touching the disk and so improves flooding resistance
httpd.blocklistUpdateCtr = 0
httpd.blocklistUpdateInterval = 100
httpd.domainBlocklist = getDomainBlocklist(base_dir)
httpd.domainBlocklist = get_domain_blocklist(base_dir)
httpd.manual_follower_approval = manual_follower_approval
httpd.onion_domain = onion_domain
@ -18792,9 +18794,9 @@ def runDaemon(content_license_url: str,
httpd.blockedCacheLastUpdated = 0
httpd.blockedCacheUpdateSecs = 120
httpd.blockedCacheLastUpdated = \
updateBlockedCache(base_dir, httpd.blockedCache,
httpd.blockedCacheLastUpdated,
httpd.blockedCacheUpdateSecs)
update_blocked_cache(base_dir, httpd.blockedCache,
httpd.blockedCacheLastUpdated,
httpd.blockedCacheUpdateSecs)
# cache to store css files
httpd.css_cache = {}
@ -18804,7 +18806,7 @@ def runDaemon(content_license_url: str,
metadata_custom_emoji(base_dir, http_prefix, httpd.domain_full)
# whether to enable broch mode, which locks down the instance
setBrochMode(base_dir, httpd.domain_full, broch_mode)
set_broch_mode(base_dir, httpd.domain_full, broch_mode)
if not os.path.isdir(base_dir + '/accounts/inbox@' + domain):
print('Creating shared inbox: inbox@' + domain)
@ -18828,7 +18830,7 @@ def runDaemon(content_license_url: str,
httpd.lists_enabled = lists_enabled
else:
httpd.lists_enabled = get_config_param(base_dir, "lists_enabled")
httpd.cw_lists = loadCWLists(base_dir, True)
httpd.cw_lists = load_cw_lists(base_dir, True)
# set the avatar for the news account
httpd.theme_name = get_config_param(base_dir, 'theme')

View File

@ -83,7 +83,7 @@ from acceptreject import receiveAcceptReject
from bookmarks import updateBookmarksCollection
from bookmarks import undoBookmarksCollectionEntry
from blocking import isBlocked
from blocking import isBlockedDomain
from blocking import is_blocked_domain
from blocking import broch_modeLapses
from filters import isFiltered
from utils import update_announce_collection
@ -500,7 +500,7 @@ def save_post_to_inbox_queue(base_dir: str, http_prefix: str,
post_json_object['object']['inReplyTo']
replyDomain, replyPort = \
get_domain_from_actor(inReplyTo)
if isBlockedDomain(base_dir, replyDomain, blockedCache):
if is_blocked_domain(base_dir, replyDomain, blockedCache):
if debug:
print('WARN: post contains reply from ' +
str(actor) +
@ -1866,7 +1866,7 @@ def _receiveAnnounce(recent_posts_cache: {},
objectDomain = objectDomain.replace(prefix, '')
if '/' in objectDomain:
objectDomain = objectDomain.split('/')[0]
if isBlockedDomain(base_dir, objectDomain):
if is_blocked_domain(base_dir, objectDomain):
if debug:
print('DEBUG: announced domain is blocked')
return False

View File

@ -33,8 +33,8 @@ from utils import remove_html
from utils import is_account_dir
from utils import acct_dir
from utils import local_actor_url
from blocking import isBlockedDomain
from blocking import isBlockedHashtag
from blocking import is_blocked_domain
from blocking import is_blocked_hashtag
from filters import isFiltered
from session import downloadImageAnyMimeType
@ -234,7 +234,7 @@ def _addNewswireDictEntry(base_dir: str, domain: str,
# check that no tags are blocked
for tag in postTags:
if isBlockedHashtag(base_dir, tag):
if is_blocked_hashtag(base_dir, tag):
return
_downloadNewswireFeedFavicon(session, base_dir, link, debug)
@ -372,7 +372,7 @@ def _xml2StrToHashtagCategories(base_dir: str, xmlStr: str,
if 'CDATA' in hashtagListStr:
continue
hashtagList = hashtagListStr.split(' ')
if not isBlockedHashtag(base_dir, categoryStr):
if not is_blocked_hashtag(base_dir, categoryStr):
for hashtag in hashtagList:
setHashtagCategory(base_dir, hashtag, categoryStr,
False, force)
@ -437,7 +437,7 @@ def _xml2StrToDict(base_dir: str, domain: str, xmlStr: str,
itemDomain = link.split('://')[1]
if '/' in itemDomain:
itemDomain = itemDomain.split('/')[0]
if isBlockedDomain(base_dir, itemDomain):
if is_blocked_domain(base_dir, itemDomain):
continue
pubDate = rssItem.split('<pubDate>')[1]
pubDate = pubDate.split('</pubDate>')[0]
@ -525,7 +525,7 @@ def _xml1StrToDict(base_dir: str, domain: str, xmlStr: str,
itemDomain = link.split('://')[1]
if '/' in itemDomain:
itemDomain = itemDomain.split('/')[0]
if isBlockedDomain(base_dir, itemDomain):
if is_blocked_domain(base_dir, itemDomain):
continue
pubDate = rssItem.split('<dc:date>')[1]
pubDate = pubDate.split('</dc:date>')[0]
@ -601,7 +601,7 @@ def _atomFeedToDict(base_dir: str, domain: str, xmlStr: str,
itemDomain = link.split('://')[1]
if '/' in itemDomain:
itemDomain = itemDomain.split('/')[0]
if isBlockedDomain(base_dir, itemDomain):
if is_blocked_domain(base_dir, itemDomain):
continue
pubDate = atomItem.split('<updated>')[1]
pubDate = pubDate.split('</updated>')[0]
@ -707,7 +707,7 @@ def _jsonFeedV1ToDict(base_dir: str, domain: str, xmlStr: str,
itemDomain = link.split('://')[1]
if '/' in itemDomain:
itemDomain = itemDomain.split('/')[0]
if isBlockedDomain(base_dir, itemDomain):
if is_blocked_domain(base_dir, itemDomain):
continue
if jsonFeedItem.get('date_published'):
if not isinstance(jsonFeedItem['date_published'], str):
@ -747,7 +747,7 @@ def _atomFeedYTToDict(base_dir: str, domain: str, xmlStr: str,
"""
if '<entry>' not in xmlStr:
return {}
if isBlockedDomain(base_dir, 'www.youtube.com'):
if is_blocked_domain(base_dir, 'www.youtube.com'):
return {}
result = {}
atomItems = xmlStr.split('<entry>')

View File

@ -30,7 +30,7 @@ from utils import save_json
from utils import acct_dir
from utils import local_actor_url
from utils import has_actor
from blocking import isBlockedDomain
from blocking import is_blocked_domain
from blocking import outboxBlock
from blocking import outboxUndoBlock
from blocking import outboxMute
@ -281,7 +281,7 @@ def postMessageToOutbox(session, translate: {},
testDomain, testPort = get_domain_from_actor(message_json['actor'])
testDomain = get_full_domain(testDomain, testPort)
if isBlockedDomain(base_dir, testDomain):
if is_blocked_domain(base_dir, testDomain):
if debug:
print('DEBUG: domain is blocked: ' + message_json['actor'])
return False

View File

@ -81,7 +81,7 @@ from content import replaceEmojiFromTags
from content import removeTextFormatting
from auth import create_basic_auth_header
from blocking import isBlocked
from blocking import isBlockedDomain
from blocking import is_blocked_domain
from filters import isFiltered
from git import convertPostToPatch
from linked_data_sig import generateJsonSignature
@ -853,7 +853,7 @@ def _getPostsForBlockedDomains(base_dir: str,
if isinstance(item['object']['inReplyTo'], str):
postDomain, postPort = \
get_domain_from_actor(item['object']['inReplyTo'])
if isBlockedDomain(base_dir, postDomain):
if is_blocked_domain(base_dir, postDomain):
if item['object'].get('url'):
url = item['object']['url']
else:
@ -872,7 +872,7 @@ def _getPostsForBlockedDomains(base_dir: str,
if tagType == 'mention' and tagItem.get('href'):
postDomain, postPort = \
get_domain_from_actor(tagItem['href'])
if isBlockedDomain(base_dir, postDomain):
if is_blocked_domain(base_dir, postDomain):
if item['object'].get('url'):
url = item['object']['url']
else:

View File

@ -170,7 +170,7 @@ from shares import updateSharedItemFederationToken
from shares import mergeSharedItemTokens
from shares import sendShareViaServer
from shares import getSharedItemsCatalogViaServer
from blocking import loadCWLists
from blocking import load_cw_lists
from blocking import addCWfromLists
testServerGroupRunning = False
@ -5860,7 +5860,7 @@ def _testWordsSimilarity() -> None:
def _testAddCWfromLists(base_dir: str) -> None:
print('testAddCWfromLists')
translate = {}
cw_lists = loadCWLists(base_dir, True)
cw_lists = load_cw_lists(base_dir, True)
assert cw_lists
post_json_object = {

View File

@ -25,7 +25,7 @@ from webapp_timeline import htmlTimeline
from webapp_utils import getContentWarningButton
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from blocking import isBlockedDomain
from blocking import is_blocked_domain
from blocking import isBlocked
from session import create_session
@ -170,7 +170,7 @@ def htmlAccountInfo(css_cache: {}, translate: {},
http_prefix + '://' + postDomain + '" ' + \
'target="_blank" rel="nofollow noopener noreferrer">' + \
postDomain + '</a> '
if isBlockedDomain(base_dir, postDomain):
if is_blocked_domain(base_dir, postDomain):
blockedPostsLinks = ''
urlCtr = 0
for url in blockedPostUrls:

View File

@ -73,7 +73,7 @@ from webapp_utils import endEditSection
from blog import getBlogAddress
from webapp_post import individualPostAsHtml
from webapp_timeline import htmlIndividualShare
from blocking import getCWlistVariable
from blocking import get_cw_list_variable
def _validProfilePreviewPost(post_json_object: {},
@ -1777,7 +1777,7 @@ def _htmlEditProfileFiltering(base_dir: str, nickname: str, domain: str,
cw_listsStr = ''
for name, item in cw_lists.items():
variableName = getCWlistVariable(name)
variableName = get_cw_list_variable(name)
listIsEnabled = False
if lists_enabled:
if name in lists_enabled: