Replace directory checks with functions

main
bashrc 2026-05-02 12:59:08 +01:00
parent 28d17bfda5
commit b57262bda8
47 changed files with 309 additions and 268 deletions

View File

@ -25,6 +25,7 @@ from data import save_string
from data import load_list
from data import move_file
from data import is_a_file
from data import is_a_dir
def _hash_password(password: str) -> str:
@ -189,7 +190,7 @@ def store_basic_credentials(base_dir: str,
password = remove_eol(password).strip()
dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
os.mkdir(dir_str)
password_file = dir_str + '/passwords'

View File

@ -51,6 +51,7 @@ from data import load_string
from data import save_string
from data import load_list
from data import is_a_file
from data import is_a_dir
def _no_of_blog_replies(base_dir: str, http_prefix: str, translate: {},
@ -1107,7 +1108,7 @@ def account_has_blog(base_dir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account has a blog
"""
blogs_dir = acct_dir(base_dir, nickname, domain) + '/tlblogs'
if os.path.isdir(blogs_dir):
if is_a_dir(blogs_dir):
for path in os.listdir(blogs_dir):
if path.endswith('.json'):
return True

View File

@ -35,6 +35,7 @@ from data import save_binary
from data import load_binary
from data import erase_file
from data import is_a_file
from data import is_a_dir
def remove_person_from_cache(base_dir: str, person_url: str,
@ -114,7 +115,7 @@ def store_person_in_cache(base_dir: str, person_url: str,
# store to file
if not allow_write_to_file:
return
if os.path.isdir(base_dir + '/cache/actors'):
if is_a_dir(base_dir + '/cache/actors'):
cache_filename = base_dir + '/cache/actors/' + \
person_url.replace('/', '#') + '.json'
if not is_a_file(cache_filename):

View File

@ -18,6 +18,7 @@ from data import load_string
from data import save_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
MAX_TAG_LENGTH = 42
@ -242,7 +243,7 @@ def set_hashtag_category(base_dir: str, hashtag: str, category: str,
if not is_a_file(hashtag_filename):
return False
if not os.path.isdir(base_dir + '/tags'):
if not is_a_dir(base_dir + '/tags'):
os.mkdir(base_dir + '/tags')
category_filename = base_dir + '/tags/' + hashtag + '.category'
if force:

View File

@ -57,6 +57,7 @@ from data import save_binary
from data import append_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
MUSIC_SITES = ('soundcloud.com', 'bandcamp.com', 'resonate.coop')
@ -375,7 +376,7 @@ def _save_custom_emoji(session, base_dir: str, emoji_name: str, url: str,
return
emoji_name = emoji_name.replace(':', '').strip().lower()
custom_emoji_dir = base_dir + '/emojicustom'
if not os.path.isdir(custom_emoji_dir):
if not is_a_dir(custom_emoji_dir):
os.mkdir(custom_emoji_dir)
emoji_image_filename = custom_emoji_dir + '/' + emoji_name + '.' + ext
if not download_image(session, url,

View File

@ -26,6 +26,7 @@ from data import save_flag_file
from data import append_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _get_conversation_filename(base_dir: str, nickname: str, domain: str,
@ -43,7 +44,7 @@ def _get_conversation_filename(base_dir: str, nickname: str, domain: str,
if not post_json_object['object'].get('id'):
return None
conversation_dir = acct_dir(base_dir, nickname, domain) + '/conversation'
if not os.path.isdir(conversation_dir):
if not is_a_dir(conversation_dir):
os.mkdir(conversation_dir)
if post_json_object['object'].get('conversation'):
conversation_id = post_json_object['object']['conversation']

View File

@ -12,12 +12,13 @@ from utils import load_json
from utils import get_content_from_post
from utils import content_is_single_url
from utils import is_yggdrasil_address
from data import is_a_dir
def load_cw_lists(base_dir: str, verbose: bool) -> {}:
"""Load lists used for content warnings
"""
if not os.path.isdir(base_dir + '/cwlists'):
if not is_a_dir(base_dir + '/cwlists'):
return {}
result = {}
# NOTE: here we do want to allow recursive walk through

View File

@ -107,6 +107,7 @@ from poison import load_dictionary
from poison import load_2grams
from data import load_string
from data import is_a_file
from data import is_a_dir
class PubServer(BaseHTTPRequestHandler):
@ -168,8 +169,7 @@ class PubServer(BaseHTTPRequestHandler):
http_400(self)
return
dir_str = data_dir(self.server.base_dir)
if not os.path.isdir(dir_str + '/' +
nickname + '@' + self.server.domain):
if not is_a_dir(dir_str + '/' + nickname + '@' + self.server.domain):
print(endpoint_type.upper() +
' for non-existent account ' + self.path)
http_404(self, 146)
@ -699,7 +699,7 @@ def run_daemon(accounts_data_dir: str,
set_accounts_data_dir(base_dir, accounts_data_dir)
dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
print('Creating accounts directory')
os.mkdir(dir_str)
@ -1197,11 +1197,11 @@ def run_daemon(accounts_data_dir: str,
set_broch_mode(base_dir, httpd.domain_full, broch_mode)
dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str + '/inbox@' + domain):
if not is_a_dir(dir_str + '/inbox@' + domain):
print('Creating shared inbox: inbox@' + domain)
create_shared_inbox(base_dir, 'inbox', domain, port, http_prefix)
if not os.path.isdir(dir_str + '/news@' + domain):
if not is_a_dir(dir_str + '/news@' + domain):
print('Creating news inbox: news@' + domain)
create_news_inbox(base_dir, domain, port, http_prefix)
set_config_param(base_dir, "listsEnabled", "Murdoch press")
@ -1243,24 +1243,24 @@ def run_daemon(accounts_data_dir: str,
domain,
httpd.domain_full)
if not os.path.isdir(base_dir + '/cache'):
if not is_a_dir(base_dir + '/cache'):
os.mkdir(base_dir + '/cache')
if not os.path.isdir(base_dir + '/cache/actors'):
if not is_a_dir(base_dir + '/cache/actors'):
print('Creating actors cache')
os.mkdir(base_dir + '/cache/actors')
if not os.path.isdir(base_dir + '/cache/announce'):
if not is_a_dir(base_dir + '/cache/announce'):
print('Creating announce cache')
os.mkdir(base_dir + '/cache/announce')
if not os.path.isdir(base_dir + '/cache/avatars'):
if not is_a_dir(base_dir + '/cache/avatars'):
print('Creating avatars cache')
os.mkdir(base_dir + '/cache/avatars')
archive_dir = base_dir + '/archive'
if not os.path.isdir(archive_dir):
if not is_a_dir(archive_dir):
print('Creating archive')
os.mkdir(archive_dir)
if not os.path.isdir(base_dir + '/sharefiles'):
if not is_a_dir(base_dir + '/sharefiles'):
print('Creating shared item files directory')
os.mkdir(base_dir + '/sharefiles')

View File

@ -20,6 +20,7 @@ from httpcodes import http_404
from fitnessFunctions import fitness_performance
from newswire import rss2header
from newswire import rss2footer
from data import is_a_dir
def get_rss2feed(self, calling_domain: str, path: str,
@ -38,7 +39,7 @@ def get_rss2feed(self, calling_domain: str, path: str,
nickname = nickname.split('/')[0]
if not nickname.startswith('rss.'):
account_dir = acct_dir(base_dir, nickname, domain)
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
curr_session = \
establish_session("RSS request",
curr_session,
@ -154,7 +155,7 @@ def get_rss3feed(self, calling_domain: str, path: str,
nickname = nickname.split('/')[0]
if not nickname.startswith('rss.'):
account_dir = acct_dir(base_dir, nickname, domain)
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
curr_session = \
establish_session("get_rss3feed",
curr_session, proxy_type,

View File

@ -7,7 +7,6 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Daemon POST"
import os
import errno
from socket import error as SocketError
from httpcodes import http_404
@ -15,6 +14,7 @@ from utils import acct_dir
from utils import binary_is_image
from formats import get_image_extension_from_mime_type
from data import save_binary
from data import is_a_dir
def receive_image_attachment(self, length: int, path: str, base_dir: str,
@ -37,7 +37,7 @@ def receive_image_attachment(self, length: int, path: str, base_dir: str,
return
self.post_from_nickname = path_users_section.split('/')[0]
accounts_dir = acct_dir(base_dir, self.post_from_nickname, domain)
if not os.path.isdir(accounts_dir):
if not is_a_dir(accounts_dir):
http_404(self, 13)
self.server.postreq_busy = False
return

View File

@ -7,7 +7,6 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Daemon POST"
import os
import errno
import urllib.parse
from socket import error as SocketError
@ -37,6 +36,7 @@ from cache import clear_actor_cache
from blocking import add_global_block
from blocking import update_blocked_cache
from blocking import remove_global_block
from data import is_a_dir
def moderator_actions(self, path: str, calling_domain: str, cookie: str,
@ -153,7 +153,7 @@ def moderator_actions(self, path: str, calling_domain: str, cookie: str,
local_handle = \
search_handle + '@' + domain
dir_str = data_dir(base_dir)
if os.path.isdir(dir_str + '/' + local_handle):
if is_a_dir(dir_str + '/' + local_handle):
search_handle = local_handle
else:
search_handle: str = ''

View File

@ -7,7 +7,6 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Daemon POST"
import os
import errno
import urllib.parse
from socket import error as SocketError
@ -52,6 +51,7 @@ from flags import is_moderator
from data import save_flag_file
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _person_options_page_number(options_confirm_params: str) -> int:
@ -610,7 +610,7 @@ def _person_options_post_to_news(self, options_confirm_params: str,
newswire_blocked_filename)
refresh_newswire(base_dir)
else:
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
nw_filename = newswire_blocked_filename
if save_flag_file(nw_filename,
'EX: _person_options_post_to_news ' +
@ -662,7 +662,7 @@ def _person_options_post_to_features(self, options_confirm_params: str,
features_blocked_filename)
refresh_newswire(base_dir)
else:
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
feat_filename = features_blocked_filename
if save_flag_file(feat_filename,
'EX: _person_options_post_to_features ' +
@ -713,7 +713,7 @@ def _person_options_mod_news(self, options_confirm_params: str,
'EX: _person_options unable to delete ' +
newswire_mod_filename)
else:
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
nw_filename = newswire_mod_filename
save_flag_file(nw_filename,
'EX: _person_options_mod_news ' +

View File

@ -152,6 +152,7 @@ from data import save_string
from data import save_flag_file
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _profile_post_deactivate_account(base_dir: str, nickname: str, domain: str,
@ -468,7 +469,7 @@ def _profile_post_import_theme(base_dir: str, nickname: str,
""" HTTP POST import theme from file
"""
if fields.get('importTheme'):
if not os.path.isdir(base_dir + '/imports'):
if not is_a_dir(base_dir + '/imports'):
os.mkdir(base_dir + '/imports')
filename_base = base_dir + '/imports/newtheme.zip'
if is_a_file(filename_base):
@ -2723,7 +2724,7 @@ def profile_edit(self, calling_domain: str, cookie: str,
if m_type == 'instanceLogo':
filename_base = data_dir(base_dir) + '/login.temp'
elif m_type == 'importTheme':
if not os.path.isdir(base_dir + '/imports'):
if not is_a_dir(base_dir + '/imports'):
os.mkdir(base_dir + '/imports')
filename_base = base_dir + '/imports/newtheme.zip'
if is_a_file(filename_base):

View File

@ -183,3 +183,9 @@ def is_a_file(filename: str) -> bool:
"""Returns true if the given filename exists
"""
return os.path.isfile(filename)
def is_a_dir(directory: str) -> bool:
"""Returns true if the given directory exists
"""
return os.path.isdir(directory)

View File

@ -76,6 +76,7 @@ from data import save_string
from data import load_string
from data import prepend_string
from data import is_a_file
from data import is_a_dir
def _desktop_help() -> None:
@ -157,9 +158,9 @@ def _create_desktop_config(actor: str) -> None:
"""Sets up directories for desktop client configuration
"""
home_dir = str(Path.home())
if not os.path.isdir(home_dir + '/.config'):
if not is_a_dir(home_dir + '/.config'):
os.mkdir(home_dir + '/.config')
if not os.path.isdir(home_dir + '/.config/epicyon'):
if not is_a_dir(home_dir + '/.config/epicyon'):
os.mkdir(home_dir + '/.config/epicyon')
nickname = get_nickname_from_actor(actor)
domain, port = get_domain_from_actor(actor)
@ -167,7 +168,7 @@ def _create_desktop_config(actor: str) -> None:
if port not in (443, 80):
handle += '_' + str(port)
read_posts_dir = home_dir + '/.config/epicyon/' + handle
if not os.path.isdir(read_posts_dir):
if not is_a_dir(read_posts_dir):
os.mkdir(read_posts_dir)

View File

@ -138,6 +138,7 @@ from siteactive import is_online
from data import save_string
from data import load_list
from data import is_a_file
from data import is_a_dir
def str2bool(value_str) -> bool:
@ -1589,12 +1590,12 @@ def _command_options() -> None:
sys.exit()
# create cache for actors
if not os.path.isdir(base_dir + '/cache'):
if not is_a_dir(base_dir + '/cache'):
os.mkdir(base_dir + '/cache')
if not os.path.isdir(base_dir + '/cache/actors'):
if not is_a_dir(base_dir + '/cache/actors'):
print('Creating actors cache')
os.mkdir(base_dir + '/cache/actors')
if not os.path.isdir(base_dir + '/cache/announce'):
if not is_a_dir(base_dir + '/cache/announce'):
print('Creating announce cache')
os.mkdir(base_dir + '/cache/announce')
@ -3313,10 +3314,10 @@ def _command_options() -> None:
print('Password should be at least 8 characters')
sys.exit()
account_dir = acct_dir(base_dir, nickname, domain)
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
print('Account already exists')
sys.exit()
if os.path.isdir(base_dir + '/deactivated/' + nickname + '@' + domain):
if is_a_dir(base_dir + '/deactivated/' + nickname + '@' + domain):
print('Account is deactivated')
sys.exit()
if domain.endswith('.onion') or \
@ -3331,7 +3332,7 @@ def _command_options() -> None:
http_prefix = 'ipns'
create_person(base_dir, nickname, domain, port, http_prefix,
True, not argb.noapproval, argb.password.strip())
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
print('Account created for ' + nickname + '@' + domain)
else:
print('Account creation failed')
@ -3363,12 +3364,12 @@ def _command_options() -> None:
print('Password should be at least 8 characters')
sys.exit()
account_dir = acct_dir(base_dir, nickname, domain)
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
print('Group already exists')
sys.exit()
create_group(base_dir, nickname, domain, port, http_prefix,
True, argb.password.strip())
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
print('Group created for ' + nickname + '@' + domain)
else:
print('Group creation failed')
@ -3458,7 +3459,7 @@ def _command_options() -> None:
print('Password should be at least 8 characters')
sys.exit()
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
print('Account ' + nickname + '@' + domain + ' not found')
sys.exit()
password_file = data_dir(base_dir) + '/passwords'
@ -3886,18 +3887,18 @@ def _command_options() -> None:
password = 'boringpassword'
print('Generating some test data for user: ' + nickname)
if os.path.isdir(base_dir + '/tags'):
if is_a_dir(base_dir + '/tags'):
shutil.rmtree(base_dir + '/tags', ignore_errors=False)
dir_str = data_dir(base_dir)
if os.path.isdir(dir_str):
if is_a_dir(dir_str):
shutil.rmtree(dir_str, ignore_errors=False)
if os.path.isdir(base_dir + '/keys'):
if is_a_dir(base_dir + '/keys'):
shutil.rmtree(base_dir + '/keys', ignore_errors=False)
if os.path.isdir(base_dir + '/media'):
if is_a_dir(base_dir + '/media'):
shutil.rmtree(base_dir + '/media', ignore_errors=False)
if os.path.isdir(base_dir + '/sharefiles'):
if is_a_dir(base_dir + '/sharefiles'):
shutil.rmtree(base_dir + '/sharefiles', ignore_errors=False)
if os.path.isdir(base_dir + '/wfendpoints'):
if is_a_dir(base_dir + '/wfendpoints'):
shutil.rmtree(base_dir + '/wfendpoints', ignore_errors=False)
set_config_param(base_dir, 'registrationsRemaining',

View File

@ -52,6 +52,7 @@ from data import save_string
from data import erase_file
from data import move_file
from data import is_a_file
from data import is_a_dir
def create_initial_last_seen(base_dir: str, http_prefix: str) -> None:
@ -69,7 +70,7 @@ def create_initial_last_seen(base_dir: str, http_prefix: str) -> None:
if not is_a_file(following_filename):
continue
last_seen_dir = account_dir + '/lastseen'
if not os.path.isdir(last_seen_dir):
if not is_a_dir(last_seen_dir):
os.mkdir(last_seen_dir)
following_handles: list[str] = \
load_list(following_filename,
@ -199,7 +200,7 @@ def is_following_actor(base_dir: str,
"""
domain = remove_domain_port(domain)
accounts_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(accounts_dir):
if not is_a_dir(accounts_dir):
return False
following_file = accounts_dir + '/following.txt'
if not is_a_file(following_file):
@ -315,10 +316,10 @@ def unfollow_account(base_dir: str, nickname: str, domain: str,
if group_account:
handle_to_unfollow = '!' + handle_to_unfollow
dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
os.mkdir(dir_str)
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
os.mkdir(handle_dir)
accounts_dir = acct_dir(base_dir, nickname, domain)
@ -382,10 +383,10 @@ def clear_follows(base_dir: str, nickname: str, domain: str,
"""Removes all follows
"""
dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
os.mkdir(dir_str)
accounts_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(accounts_dir):
if not is_a_dir(accounts_dir):
os.mkdir(accounts_dir)
filename = accounts_dir + '/' + follow_file
if is_a_file(filename):
@ -651,7 +652,7 @@ def store_follow_request(base_dir: str,
"""Stores the follow request for later use
"""
accounts_dir = acct_dir(base_dir, nickname_to_follow, domain_to_follow)
if not os.path.isdir(accounts_dir):
if not is_a_dir(accounts_dir):
return False
domain_full = get_full_domain(domain, from_port)
@ -730,7 +731,7 @@ def store_follow_request(base_dir: str,
# store the follow request in its own directory
# We don't rely upon the inbox because items in there could expire
requests_dir = accounts_dir + '/requests'
if not os.path.isdir(requests_dir):
if not is_a_dir(requests_dir):
os.mkdir(requests_dir)
follow_activity_filename = requests_dir + '/' + approve_handle + '.follow'
return save_json(follow_json, follow_activity_filename)

View File

@ -18,6 +18,7 @@ from utils import get_gemini_blog_title
from utils import get_gemini_blog_published
from utils import get_gemini_blog_filename
from data import save_string
from data import is_a_dir
def blog_to_gemini(base_dir: str, nickname: str, domain: str,
@ -31,10 +32,10 @@ def blog_to_gemini(base_dir: str, nickname: str, domain: str,
account_dir = acct_dir(base_dir, nickname, domain)
else:
account_dir = base_dir
if os.path.isdir(account_dir + '/geminitest'):
if is_a_dir(account_dir + '/geminitest'):
shutil.rmtree(account_dir + '/geminitest', ignore_errors=True)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
if debug:
print('WARN: blog_to_gemini account directory not found ' +
account_dir)
@ -86,7 +87,7 @@ def blog_to_gemini(base_dir: str, nickname: str, domain: str,
gemini_blog_dir = account_dir + '/gemini'
else:
gemini_blog_dir = account_dir + '/geminitest'
if not os.path.isdir(gemini_blog_dir):
if not is_a_dir(gemini_blog_dir):
os.mkdir(gemini_blog_dir)
gemini_blog_filename = \

5
git.py
View File

@ -19,6 +19,7 @@ from utils import get_attributed_to
from utils import string_contains
from data import save_string
from data import is_a_file
from data import is_a_dir
def _git_format_content(content: str) -> str:
@ -202,10 +203,10 @@ def receive_git_patch(base_dir: str, nickname: str, domain: str,
patch_subject = patch_subject.replace(' ', '_')
project_name = \
_get_git_project_name(base_dir, nickname, domain, subject)
if not os.path.isdir(patches_dir):
if not is_a_dir(patches_dir):
os.mkdir(patches_dir)
project_dir = patches_dir + '/' + project_name
if not os.path.isdir(project_dir):
if not is_a_dir(project_dir):
os.mkdir(project_dir)
patch_filename = \
project_dir + '/' + patch_subject + '.patch'

View File

@ -43,6 +43,7 @@ from data import append_string
from data import prepend_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _strings_are_digits(strings_list: []) -> bool:
@ -117,10 +118,10 @@ def save_event_post(base_dir: str, handle: str, post_id: str,
master/lib/federation/activity_stream/converter/event.ex
"""
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('WARN: Account does not exist at ' + handle_dir)
calendar_path = handle_dir + '/calendar'
if not os.path.isdir(calendar_path):
if not is_a_dir(calendar_path):
os.mkdir(calendar_path)
# get the year, month and day from the event
@ -144,11 +145,11 @@ def save_event_post(base_dir: str, handle: str, post_id: str,
# if this is a full description of an event then save it
# as a separate json file
events_path = handle_dir + '/events'
if not os.path.isdir(events_path):
if not is_a_dir(events_path):
os.mkdir(events_path)
events_year_path = \
handle_dir + '/events/' + str(event_year)
if not os.path.isdir(events_year_path):
if not is_a_dir(events_year_path):
os.mkdir(events_year_path)
event_id = str(event_year) + '-' + event_time.strftime("%m") + '-' + \
event_time.strftime("%d") + '_' + event_json['uuid']
@ -171,7 +172,7 @@ def save_event_post(base_dir: str, handle: str, post_id: str,
tl_events_filename)
# create a directory for the calendar year
if not os.path.isdir(calendar_path + '/' + str(event_year)):
if not is_a_dir(calendar_path + '/' + str(event_year)):
os.mkdir(calendar_path + '/' + str(event_year))
# calendar month file containing event post Ids
@ -1080,7 +1081,7 @@ def _dav_store_event(base_dir: str, nickname: str, domain: str,
handle = nickname + '@' + domain
handle_dir = acct_handle_dir(base_dir, handle)
outbox_dir = handle_dir + '/outbox'
if not os.path.isdir(outbox_dir):
if not is_a_dir(outbox_dir):
return False
filename = outbox_dir + '/' + post_id.replace('/', '#') + '.json'
save_json(event_json, filename)

View File

@ -145,6 +145,7 @@ from data import append_string
from data import prepend_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _store_last_post_id(base_dir: str, nickname: str, domain: str,
@ -170,8 +171,8 @@ def _store_last_post_id(base_dir: str, nickname: str, domain: str,
return
account_dir = acct_dir(base_dir, nickname, domain)
lastpost_dir = account_dir + '/lastpost'
if not os.path.isdir(lastpost_dir):
if os.path.isdir(account_dir):
if not is_a_dir(lastpost_dir):
if is_a_dir(account_dir):
os.mkdir(lastpost_dir)
actor_filename = lastpost_dir + '/' + actor.replace('/', '#')
save_string(post_id, actor_filename,
@ -252,7 +253,7 @@ def valid_inbox(base_dir: str, nickname: str, domain: str) -> bool:
"""
domain = remove_domain_port(domain)
inbox_dir = acct_dir(base_dir, nickname, domain) + '/inbox'
if not os.path.isdir(inbox_dir):
if not is_a_dir(inbox_dir):
return True
for subdir, _, files in os.walk(inbox_dir):
for fname in files:
@ -274,7 +275,7 @@ def valid_inbox_filenames(base_dir: str, nickname: str, domain: str,
"""
domain = remove_domain_port(domain)
inbox_dir = acct_dir(base_dir, nickname, domain) + '/inbox'
if not os.path.isdir(inbox_dir):
if not is_a_dir(inbox_dir):
print('Not an inbox directory: ' + inbox_dir)
return True
expected_str = expected_domain + ':' + str(expected_port)
@ -698,7 +699,7 @@ def _inbox_post_recipients_add(base_dir: str, to_list: [],
nickname = recipient.split(domain_match)[1]
handle = nickname + '@' + domain
handle_dir = acct_handle_dir(base_dir, handle)
if os.path.isdir(handle_dir):
if is_a_dir(handle_dir):
if nickname == 'inbox':
# 'to' is the shared inbox
follower_recipients = True
@ -1077,7 +1078,7 @@ def _dm_notify(base_dir: str, handle: str, url: str) -> None:
"""Creates a notification that a new DM has arrived
"""
account_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return
dm_file = account_dir + '/.newDM'
if not is_a_file(dm_file):
@ -1090,7 +1091,7 @@ def _notify_post_arrival(base_dir: str, handle: str, url: str) -> None:
on the person options screen
"""
account_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return
notify_file = account_dir + '/.newNotifiedPost'
if is_a_file(notify_file):
@ -1110,7 +1111,7 @@ def _reply_notify(base_dir: str, handle: str, url: str) -> None:
"""Creates a notification that a new reply has arrived
"""
account_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return
reply_file = account_dir + '/.newReply'
if not is_a_file(reply_file):
@ -1123,7 +1124,7 @@ def _git_patch_notify(base_dir: str, handle: str, subject: str,
"""Creates a notification that a new git patch has arrived
"""
account_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return
patch_file = account_dir + '/.newPatch'
subject = subject.replace('[PATCH]', '').strip()
@ -1325,12 +1326,12 @@ def _update_last_seen(base_dir: str, handle: str, actor: str) -> None:
domain = handle.split('@')[1]
domain = remove_domain_port(domain)
account_path = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_path):
if not is_a_dir(account_path):
return
if not is_following_actor(base_dir, nickname, domain, actor):
return
last_seen_path = account_path + '/lastseen'
if not os.path.isdir(last_seen_path):
if not is_a_dir(last_seen_path):
os.mkdir(last_seen_path)
last_seen_filename = \
last_seen_path + '/' + actor.replace('/', '#') + '.txt'
@ -2808,7 +2809,7 @@ def clear_queue_items(base_dir: str, queue: []) -> None:
for _, dirs, _ in os.walk(dir_str):
for account in dirs:
queue_dir = dir_str + '/' + account + '/queue'
if not os.path.isdir(queue_dir):
if not is_a_dir(queue_dir):
continue
for _, _, queuefiles in os.walk(queue_dir):
for qfile in queuefiles:
@ -2831,7 +2832,7 @@ def _restore_queue_items(base_dir: str, queue: []) -> None:
for _, dirs, _ in os.walk(dir_str):
for account in dirs:
queue_dir = dir_str + '/' + account + '/queue'
if not os.path.isdir(queue_dir):
if not is_a_dir(queue_dir):
continue
for _, _, queuefiles in os.walk(queue_dir):
for qfile in queuefiles:
@ -3162,7 +3163,7 @@ def _receive_follow_request(session, session_onion, session_i2p,
handle_to_follow = nickname_to_follow + '@' + domain_to_follow
if domain_to_follow == domain:
handle_dir = acct_handle_dir(base_dir, handle_to_follow)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
if debug:
print('DEBUG: followed account not found - ' +
handle_dir)
@ -3292,7 +3293,7 @@ def _receive_follow_request(session, session_onion, session_i2p,
# update the followers
account_to_be_followed = \
acct_dir(base_dir, nickname_to_follow, domain_to_follow)
if os.path.isdir(account_to_be_followed):
if is_a_dir(account_to_be_followed):
followers_filename = account_to_be_followed + '/followers.txt'
# for actors which don't follow the mastodon

View File

@ -93,6 +93,7 @@ from data import prepend_string
from data import load_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def inbox_update_index(boxname: str, base_dir: str, handle: str,
@ -1015,7 +1016,7 @@ def receive_like(recent_posts_cache: {},
message_json['type'])
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of like - ' + handle)
# if this post in the outbox of the person?
handle_name = handle.split('@')[0]
@ -1241,7 +1242,7 @@ def receive_reaction(recent_posts_cache: {},
message_json['type'])
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of emoji reaction - ' + handle)
if is_a_file(handle_dir + '/.hideReactionButton'):
print('Emoji reaction rejected by ' + handle +
@ -1454,7 +1455,7 @@ def receive_zot_reaction(recent_posts_cache: {},
message_json['object']['type'])
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of zot emoji reaction - ' + handle)
if is_a_file(handle_dir + '/.hideReactionButton'):
print('Zot emoji reaction rejected by ' + handle +
@ -1758,7 +1759,7 @@ def receive_delete(handle: str, base_dir: str,
if debug:
print('DEBUG: actor is not the owner of the post to be deleted')
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of like - ' + handle)
# if this post in the outbox of the person?
message_id = remove_id_ending(message_json['object'])
@ -1924,7 +1925,7 @@ def receive_announce(recent_posts_cache: {},
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of announce - ' + handle)
# is the announce actor blocked?

View File

@ -7,7 +7,6 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Timeline"
import os
from flags import has_group_type
from timeFunctions import get_account_timezone
from announce import undo_announce_collection_entry
@ -38,6 +37,7 @@ from webapp_post import individual_post_as_html
from reaction import undo_reaction_collection_entry
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _receive_undo_follow(base_dir: str, message_json: {},
@ -215,7 +215,7 @@ def receive_undo_like(recent_posts_cache: {},
message_json['type'])
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of undo like - ' + handle)
# if this post in the outbox of the person?
handle_name = handle.split('@')[0]
@ -365,7 +365,7 @@ def receive_undo_reaction(recent_posts_cache: {},
message_json['type'])
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of undo reaction - ' + handle)
# if this post in the outbox of the person?
handle_name = handle.split('@')[0]
@ -631,7 +631,7 @@ def receive_undo_announce(recent_posts_cache: {},
message_json['type'] + ' announce')
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('DEBUG: unknown recipient of undo announce - ' + handle)
# if this post in the outbox of the person?
handle_name = handle.split('@')[0]

View File

@ -18,6 +18,7 @@ from utils import get_markdown_blog_filename
from utils import get_micron_blog_filename
from utils import get_gemini_blog_published
from data import save_string
from data import is_a_dir
def _markdown_get_sections(markdown: str) -> []:
@ -518,10 +519,10 @@ def blog_to_markdown(base_dir: str, nickname: str, domain: str,
account_dir = acct_dir(base_dir, nickname, domain)
else:
account_dir = base_dir
if os.path.isdir(account_dir + '/markdowntest'):
if is_a_dir(account_dir + '/markdowntest'):
shutil.rmtree(account_dir + '/markdowntest', ignore_errors=True)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
if debug:
print('WARN: blog_to_markdown account directory not found ' +
account_dir)
@ -545,7 +546,7 @@ def blog_to_markdown(base_dir: str, nickname: str, domain: str,
markdown_blog_dir = account_dir + '/markdown'
else:
markdown_blog_dir = account_dir + '/markdowntest'
if not os.path.isdir(markdown_blog_dir):
if not is_a_dir(markdown_blog_dir):
os.mkdir(markdown_blog_dir)
markdown_blog_filename = \
@ -594,10 +595,10 @@ def blog_to_micron(base_dir: str, nickname: str, domain: str,
account_dir = acct_dir(base_dir, nickname, domain)
else:
account_dir = base_dir
if os.path.isdir(account_dir + '/microntest'):
if is_a_dir(account_dir + '/microntest'):
shutil.rmtree(account_dir + '/microntest', ignore_errors=True)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
if debug:
print('WARN: blog_to_micron account directory not found ' +
account_dir)
@ -621,7 +622,7 @@ def blog_to_micron(base_dir: str, nickname: str, domain: str,
micron_blog_dir = account_dir + '/micron'
else:
micron_blog_dir = account_dir + '/microntest'
if not os.path.isdir(micron_blog_dir):
if not is_a_dir(micron_blog_dir):
os.mkdir(micron_blog_dir)
micron_blog_filename = \

View File

@ -38,6 +38,7 @@ from data import append_string
from data import erase_file
from data import move_file
from data import is_a_file
from data import is_a_dir
# music file ID3 v1 genres
@ -545,9 +546,9 @@ def _is_media(image_filename: str) -> bool:
def create_media_dirs(base_dir: str, media_path: str) -> None:
"""Creates stored media directories
"""
if not os.path.isdir(base_dir + '/media'):
if not is_a_dir(base_dir + '/media'):
os.mkdir(base_dir + '/media')
if not os.path.isdir(base_dir + '/' + media_path):
if not is_a_dir(base_dir + '/' + media_path):
os.mkdir(base_dir + '/' + media_path)
@ -772,9 +773,9 @@ def archive_media(base_dir: str, archive_directory: str,
min_week = weeks_since_epoch - max_weeks
if archive_directory:
if not os.path.isdir(archive_directory):
if not is_a_dir(archive_directory):
os.mkdir(archive_directory)
if not os.path.isdir(archive_directory + '/media'):
if not is_a_dir(archive_directory + '/media'):
os.mkdir(archive_directory + '/media')
for _, dirs, _ in os.walk(base_dir + '/media'):

View File

@ -49,6 +49,7 @@ from data import append_string
from data import prepend_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _update_feeds_outbox_index(base_dir: str, domain: str,
@ -451,7 +452,7 @@ def _create_news_mirror(base_dir: str, domain: str,
return True
mirror_dir = data_dir(base_dir) + '/newsmirror'
if not os.path.isdir(mirror_dir):
if not is_a_dir(mirror_dir):
os.mkdir(mirror_dir)
# count the directories
@ -483,7 +484,7 @@ def _create_news_mirror(base_dir: str, domain: str,
continue
post_id = post_id.strip()
mirror_article_dir = mirror_dir + '/' + post_id
if os.path.isdir(mirror_article_dir):
if is_a_dir(mirror_article_dir):
rmtree(mirror_article_dir,
ignore_errors=False, onexc=None)
removals.append(post_id)
@ -507,7 +508,7 @@ def _create_news_mirror(base_dir: str, domain: str,
mirror_index_filename)
mirror_article_dir = mirror_dir + '/' + post_id_number
if os.path.isdir(mirror_article_dir):
if is_a_dir(mirror_article_dir):
# already mirrored
return True
@ -523,7 +524,7 @@ def _create_news_mirror(base_dir: str, domain: str,
proc = Popen(command_str, shell=True)
os.waitpid(proc.pid, 0)
if not os.path.isdir(mirror_article_dir):
if not is_a_dir(mirror_article_dir):
print('WARN: failed to mirror ' + url)
return True
@ -560,7 +561,7 @@ def _convert_rss_to_activitypub(base_dir: str, http_prefix: str,
return
base_path = data_dir(base_dir) + '/news@' + domain + '/outbox'
if not os.path.isdir(base_path):
if not is_a_dir(base_path):
os.mkdir(base_path)
# oldest items first

View File

@ -59,6 +59,7 @@ from data import load_string
from data import save_binary
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _remove_cdata(text: str) -> str:
@ -188,7 +189,7 @@ def _download_newswire_feed_favicon(session, base_dir: str,
break
# create cached favicons directory if needed
if not os.path.isdir(base_dir + '/favicons'):
if not is_a_dir(base_dir + '/favicons'):
os.mkdir(base_dir + '/favicons')
# check svg for dubious scripts

View File

@ -76,6 +76,7 @@ from markdown import blog_to_micron
from data import erase_file
from data import move_file
from data import is_a_file
from data import is_a_dir
def _localonly_not_local(message_json: {}, domain_full: str) -> bool:
@ -603,7 +604,7 @@ def post_message_to_outbox(session, translate: {},
saved_post_id = saved_filename.split('/')[-1]
blogs_dir = \
data_dir(base_dir) + '/news@' + domain + '/tlblogs'
if not os.path.isdir(blogs_dir):
if not is_a_dir(blogs_dir):
os.mkdir(blogs_dir)
copyfile(saved_filename, blogs_dir + '/' + saved_post_id)
inbox_update_index('tlblogs', base_dir,

View File

@ -99,6 +99,7 @@ from data import append_string
from data import erase_file
from data import move_file
from data import is_a_file
from data import is_a_dir
def generate_rsa_key() -> (str, str):
@ -149,7 +150,7 @@ def set_profile_image(base_dir: str, http_prefix: str,
print('person definition not found: ' + person_filename)
return False
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('Account not found: ' + handle_dir)
return False
@ -195,8 +196,8 @@ def _account_exists(base_dir: str, nickname: str, domain: str) -> bool:
"""
domain = remove_domain_port(domain)
account_dir = acct_dir(base_dir, nickname, domain)
return os.path.isdir(account_dir) or \
os.path.isdir(base_dir + '/deactivated/' + nickname + '@' + domain)
return is_a_dir(account_dir) or \
is_a_dir(base_dir + '/deactivated/' + nickname + '@' + domain)
def randomize_actor_images(person_json: {}) -> None:
@ -603,29 +604,26 @@ def _create_person_base(base_dir: str, nickname: str, domain: str, port: int,
if save_to_file:
# save person to file
if not os.path.isdir(base_dir):
if not is_a_dir(base_dir):
os.mkdir(base_dir)
people_subdir = data_dir(base_dir)
if not os.path.isdir(people_subdir):
if not is_a_dir(people_subdir):
os.mkdir(people_subdir)
if not os.path.isdir(people_subdir + '/' + handle):
if not is_a_dir(people_subdir + '/' + handle):
os.mkdir(people_subdir + '/' + handle)
if not os.path.isdir(people_subdir + '/' +
handle + '/inbox'):
if not is_a_dir(people_subdir + '/' + handle + '/inbox'):
os.mkdir(people_subdir + '/' + handle + '/inbox')
if not os.path.isdir(people_subdir + '/' +
handle + '/outbox'):
if not is_a_dir(people_subdir + '/' + handle + '/outbox'):
os.mkdir(people_subdir + '/' + handle + '/outbox')
if not os.path.isdir(people_subdir + '/' +
handle + '/queue'):
if not is_a_dir(people_subdir + '/' + handle + '/queue'):
os.mkdir(people_subdir + '/' + handle + '/queue')
filename = people_subdir + '/' + handle + '.json'
save_json(new_person, filename)
# save to cache
if not os.path.isdir(base_dir + '/cache'):
if not is_a_dir(base_dir + '/cache'):
os.mkdir(base_dir + '/cache')
if not os.path.isdir(base_dir + '/cache/actors'):
if not is_a_dir(base_dir + '/cache/actors'):
os.mkdir(base_dir + '/cache/actors')
cache_filename = base_dir + '/cache/actors/' + \
new_person['id'].replace('/', '#') + '.json'
@ -633,9 +631,9 @@ def _create_person_base(base_dir: str, nickname: str, domain: str, port: int,
# save the private key
private_keys_subdir = '/keys/private'
if not os.path.isdir(base_dir + '/keys'):
if not is_a_dir(base_dir + '/keys'):
os.mkdir(base_dir + '/keys')
if not os.path.isdir(base_dir + private_keys_subdir):
if not is_a_dir(base_dir + private_keys_subdir):
os.mkdir(base_dir + private_keys_subdir)
filename = base_dir + private_keys_subdir + '/' + handle + '.key'
save_string(private_key_pem, filename,
@ -643,7 +641,7 @@ def _create_person_base(base_dir: str, nickname: str, domain: str, port: int,
# save the public key
public_keys_subdir = '/keys/public'
if not os.path.isdir(base_dir + public_keys_subdir):
if not is_a_dir(base_dir + public_keys_subdir):
os.mkdir(base_dir + public_keys_subdir)
filename = base_dir + public_keys_subdir + '/' + handle + '.pem'
save_string(public_key_pem, filename,
@ -755,7 +753,7 @@ def create_person(base_dir: str, nickname: str, domain: str, port: int,
return None, None, None, None
else:
dir_str = data_dir(base_dir)
if os.path.isdir(dir_str + '/news@' + domain):
if is_a_dir(dir_str + '/news@' + domain):
# news account already exists
return None, None, None, None
@ -778,10 +776,10 @@ def create_person(base_dir: str, nickname: str, domain: str, port: int,
set_role(base_dir, nickname, domain, 'editor')
dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
os.mkdir(dir_str)
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
os.mkdir(account_dir)
if manual_follower_approval:
@ -1449,7 +1447,7 @@ def _remove_tags_for_nickname(base_dir: str, nickname: str,
domain: str, port: int) -> None:
"""Removes tags for a nickname
"""
if not os.path.isdir(base_dir + '/tags'):
if not is_a_dir(base_dir + '/tags'):
return
domain_full = get_full_domain(domain, port)
match_str = domain_full + '/users/' + nickname + '/'
@ -1534,11 +1532,11 @@ def remove_account(base_dir: str, nickname: str,
handle = nickname + '@' + domain
remove_password(base_dir, nickname)
_remove_tags_for_nickname(base_dir, nickname, domain, port)
if os.path.isdir(base_dir + '/deactivated/' + handle):
if is_a_dir(base_dir + '/deactivated/' + handle):
shutil.rmtree(base_dir + '/deactivated/' + handle,
ignore_errors=False)
handle_dir = acct_handle_dir(base_dir, handle)
if os.path.isdir(handle_dir):
if is_a_dir(handle_dir):
shutil.rmtree(handle_dir, ignore_errors=False)
if is_a_file(handle_dir + '.json'):
erase_file(handle_dir + '.json',
@ -1556,14 +1554,14 @@ def remove_account(base_dir: str, nickname: str,
erase_file(base_dir + '/keys/public/' + handle + '.pem',
'EX: remove_account unable to delete ' +
base_dir + '/keys/public/' + handle + '.pem')
if os.path.isdir(base_dir + '/sharefiles/' + nickname):
if is_a_dir(base_dir + '/sharefiles/' + nickname):
shutil.rmtree(base_dir + '/sharefiles/' + nickname,
ignore_errors=False)
if is_a_file(base_dir + '/wfdeactivated/' + handle + '.json'):
erase_file(base_dir + '/wfdeactivated/' + handle + '.json',
'EX: remove_account unable to delete ' +
base_dir + '/wfdeactivated/' + handle + '.json')
if os.path.isdir(base_dir + '/sharefilesdeactivated/' + nickname):
if is_a_dir(base_dir + '/sharefilesdeactivated/' + nickname):
shutil.rmtree(base_dir + '/sharefilesdeactivated/' + nickname,
ignore_errors=False)
@ -1578,30 +1576,30 @@ def deactivate_account(base_dir: str, nickname: str, domain: str) -> bool:
handle = nickname + '@' + domain
account_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return False
deactivated_dir = base_dir + '/deactivated'
if not os.path.isdir(deactivated_dir):
if not is_a_dir(deactivated_dir):
os.mkdir(deactivated_dir)
shutil.move(account_dir, deactivated_dir + '/' + handle)
if is_a_file(base_dir + '/wfendpoints/' + handle + '.json'):
deactivated_webfinger_dir = base_dir + '/wfdeactivated'
if not os.path.isdir(deactivated_webfinger_dir):
if not is_a_dir(deactivated_webfinger_dir):
os.mkdir(deactivated_webfinger_dir)
shutil.move(base_dir + '/wfendpoints/' + handle + '.json',
deactivated_webfinger_dir + '/' + handle + '.json')
if os.path.isdir(base_dir + '/sharefiles/' + nickname):
if is_a_dir(base_dir + '/sharefiles/' + nickname):
deactivated_sharefiles_dir = base_dir + '/sharefilesdeactivated'
if not os.path.isdir(deactivated_sharefiles_dir):
if not is_a_dir(deactivated_sharefiles_dir):
os.mkdir(deactivated_sharefiles_dir)
shutil.move(base_dir + '/sharefiles/' + nickname,
deactivated_sharefiles_dir + '/' + nickname)
refresh_newswire(base_dir)
return os.path.isdir(deactivated_dir + '/' + nickname + '@' + domain)
return is_a_dir(deactivated_dir + '/' + nickname + '@' + domain)
def activate_account2(base_dir: str, nickname: str, domain: str) -> bool:
@ -1612,9 +1610,9 @@ def activate_account2(base_dir: str, nickname: str, domain: str) -> bool:
deactivated_dir = base_dir + '/deactivated'
deactivated_account_dir = deactivated_dir + '/' + handle
if os.path.isdir(deactivated_account_dir):
if is_a_dir(deactivated_account_dir):
account_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
shutil.move(deactivated_account_dir, account_dir)
activated = True
@ -1624,8 +1622,8 @@ def activate_account2(base_dir: str, nickname: str, domain: str) -> bool:
base_dir + '/wfendpoints/' + handle + '.json')
deactivated_sharefiles_dir = base_dir + '/sharefilesdeactivated'
if os.path.isdir(deactivated_sharefiles_dir + '/' + nickname):
if not os.path.isdir(base_dir + '/sharefiles/' + nickname):
if is_a_dir(deactivated_sharefiles_dir + '/' + nickname):
if not is_a_dir(base_dir + '/sharefiles/' + nickname):
shutil.move(deactivated_sharefiles_dir + '/' + nickname,
base_dir + '/sharefiles/' + nickname)
@ -1687,7 +1685,7 @@ def person_snooze(base_dir: str, nickname: str, domain: str,
"""Temporarily ignores the given actor
"""
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
print('ERROR: unknown account ' + account_dir)
return
snoozed_filename = account_dir + '/snoozed.txt'
@ -1704,7 +1702,7 @@ def person_unsnooze(base_dir: str, nickname: str, domain: str,
"""Undoes a temporarily ignore of the given actor
"""
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
print('ERROR: unknown account ' + account_dir)
return
snoozed_filename = account_dir + '/snoozed.txt'
@ -1744,7 +1742,7 @@ def set_person_notes(base_dir: str, nickname: str, domain: str,
if handle.startswith('@'):
handle = handle[1:]
notes_dir = acct_dir(base_dir, nickname, domain) + '/notes'
if not os.path.isdir(notes_dir):
if not is_a_dir(notes_dir):
os.mkdir(notes_dir)
notes_filename = notes_dir + '/' + handle + '.txt'
if not save_string(notes, notes_filename,
@ -1784,7 +1782,7 @@ def get_person_notes_endpoint(base_dir: str, nickname: str, domain: str,
"items": []
}
dir_str = acct_dir(base_dir, nickname, domain) + '/notes'
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
return notes_json
handle_txt = ''
if handle:

4
pgp.py
View File

@ -7,7 +7,6 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Profile Metadata"
import os
import base64
import subprocess
from pathlib import Path
@ -39,6 +38,7 @@ from briar import get_briar_address
from cwtch import get_cwtch_address
from blog import get_blog_address
from website import get_website
from data import is_a_dir
from utils import get_attachment_property_value
@ -555,7 +555,7 @@ def has_local_pg_pkey() -> bool:
"""
home_dir = str(Path.home())
gpg_dir = home_dir + '/.gnupg'
if os.path.isdir(gpg_dir):
if is_a_dir(gpg_dir):
key_id = pgp_local_public_key()
if key_id:
return True

View File

@ -153,6 +153,7 @@ from data import prepend_string
from data import erase_file
from data import move_file
from data import is_a_file
from data import is_a_dir
def convert_post_content_to_html(message_json: {}) -> None:
@ -993,7 +994,7 @@ def delete_all_posts(base_dir: str,
try:
if is_a_file(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
elif is_a_dir(file_path):
shutil.rmtree(file_path, ignore_errors=False, onexc=None)
except OSError as ex:
print('ERROR: delete_all_posts ' + str(ex))
@ -1079,7 +1080,7 @@ def _update_hashtags_index(base_dir: str, tag: {}, new_post_id: str,
# create hashtags directory
tags_dir = base_dir + '/tags'
if not os.path.isdir(tags_dir):
if not is_a_dir(tags_dir):
os.mkdir(tags_dir)
tag_name = tag['name']
tags_filename = tags_dir + '/' + tag_name[1:] + '.txt'
@ -2247,7 +2248,7 @@ def regenerate_index_for_box(base_dir: str,
box_dir = acct_dir(base_dir, nickname, domain) + '/' + box_name
box_index_filename = box_dir + '.index'
if not os.path.isdir(box_dir):
if not is_a_dir(box_dir):
return
if is_a_file(box_index_filename):
return
@ -2738,7 +2739,7 @@ def get_mentioned_people(base_dir: str, http_prefix: str,
if '@' not in handle:
handle = handle + '@' + domain
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
continue
else:
external_domain = handle.split('@')[1]
@ -5099,7 +5100,7 @@ def _expire_announce_cache_for_person(base_dir: str,
"""Expires entries within the announces cache
"""
cache_dir = base_dir + '/cache/announce/' + nickname
if not os.path.isdir(cache_dir):
if not is_a_dir(cache_dir):
print('No cached announces for ' + nickname + '@' + domain)
return 0
expired_post_count: int = 0
@ -5126,7 +5127,7 @@ def _expire_conversations_for_person(base_dir: str,
"""Expires entries within the conversation directory
"""
conv_dir = acct_dir(base_dir, nickname, domain) + '/conversation'
if not os.path.isdir(conv_dir):
if not is_a_dir(conv_dir):
print('No conversations for ' + nickname + '@' + domain)
return 0
expired_post_count: int = 0
@ -5156,7 +5157,7 @@ def _expire_posts_cache_for_person(base_dir: str,
"""Expires entries within the posts cache
"""
cache_dir = acct_dir(base_dir, nickname, domain) + '/postcache'
if not os.path.isdir(cache_dir):
if not is_a_dir(cache_dir):
print('No cached posts for ' + nickname + '@' + domain)
return 0
expired_post_count: int = 0
@ -5321,11 +5322,11 @@ def archive_posts(base_dir: str, http_prefix: str, archive_dir: str,
return
if archive_dir:
if not os.path.isdir(archive_dir):
if not is_a_dir(archive_dir):
os.mkdir(archive_dir)
if archive_dir:
if not os.path.isdir(archive_dir + '/accounts'):
if not is_a_dir(archive_dir + '/accounts'):
os.mkdir(archive_dir + '/accounts')
dir_str = data_dir(base_dir)
@ -5337,11 +5338,11 @@ def archive_posts(base_dir: str, http_prefix: str, archive_dir: str,
archive_subdir = None
if archive_dir:
archive_handle_dir = acct_handle_dir(archive_dir, handle)
if not os.path.isdir(archive_handle_dir):
if not is_a_dir(archive_handle_dir):
os.mkdir(archive_handle_dir)
if not os.path.isdir(archive_handle_dir + '/inbox'):
if not is_a_dir(archive_handle_dir + '/inbox'):
os.mkdir(archive_handle_dir + '/inbox')
if not os.path.isdir(archive_handle_dir + '/outbox'):
if not is_a_dir(archive_handle_dir + '/outbox'):
os.mkdir(archive_handle_dir + '/outbox')
archive_subdir = archive_handle_dir + '/inbox'
archive_posts_for_person(http_prefix,
@ -5562,7 +5563,7 @@ def archive_posts_for_person(http_prefix: str, nickname: str, domain: str,
if boxname not in ('inbox', 'outbox'):
return
if archive_dir:
if not os.path.isdir(archive_dir):
if not is_a_dir(archive_dir):
os.mkdir(archive_dir)
box_dir = create_person_dir(nickname, domain, base_dir, boxname)
posts_in_box = os.scandir(box_dir)
@ -6268,7 +6269,7 @@ def download_announce(session, base_dir: str, http_prefix: str,
# get the announced post
announce_cache_dir = base_dir + '/cache/announce/' + nickname
if not os.path.isdir(announce_cache_dir):
if not is_a_dir(announce_cache_dir):
os.mkdir(announce_cache_dir)
post_id = None

View File

@ -26,6 +26,7 @@ from data import save_string
from data import load_string
from data import prepend_string
from data import is_a_file
from data import is_a_dir
def get_book_link_from_content(content: str) -> str:
@ -473,15 +474,15 @@ def store_book_events(base_dir: str,
return False
dir_str = data_dir(base_dir)
reading_path = dir_str + '/reading'
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
os.mkdir(dir_str)
if not os.path.isdir(reading_path):
if not is_a_dir(reading_path):
os.mkdir(reading_path)
books_path = reading_path + '/books'
if not os.path.isdir(books_path):
if not is_a_dir(books_path):
os.mkdir(books_path)
readers_path = reading_path + '/readers'
if not os.path.isdir(readers_path):
if not is_a_dir(readers_path):
os.mkdir(readers_path)
actor = book_dict['actor']

View File

@ -26,6 +26,7 @@ from data import load_string
from data import save_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def get_moved_accounts(base_dir: str, nickname: str, domain: str,
@ -206,7 +207,7 @@ def update_moved_actors(base_dir: str, debug: bool) -> None:
"""Updates the file containing moved actors
"""
actors_cache_dir = base_dir + '/cache/actors'
if not os.path.isdir(actors_cache_dir):
if not is_a_dir(actors_cache_dir):
if debug:
print('No cached actors')
return

View File

@ -20,6 +20,7 @@ from data import load_list
from data import save_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _clear_role_status(base_dir: str, role: str) -> None:
@ -77,13 +78,13 @@ def _add_role(base_dir: str, nickname: str, domain: str,
if len(role_nickname) < 2:
continue
dir_str = data_dir(base_dir)
if os.path.isdir(dir_str + '/' + role_nickname + '@' + domain):
if is_a_dir(dir_str + '/' + role_nickname + '@' + domain):
text += role_nickname + '\n'
save_string(text, role_file,
'EX: _add_role, failed to write roles file1 ' + role_file)
else:
account_dir = acct_dir(base_dir, nickname, domain)
if os.path.isdir(account_dir):
if is_a_dir(account_dir):
save_string(nickname + '\n', role_file,
'EX: _add_role, failed to write roles file2 ' +
role_file)
@ -329,7 +330,7 @@ def set_roles_from_list(base_dir: str, domain: str, admin_nickname: str,
for roles_nick in roles_list:
roles_nick = roles_nick.strip()
roles_dir = acct_dir(base_dir, roles_nick, domain)
if os.path.isdir(roles_dir):
if is_a_dir(roles_dir):
text += roles_nick + '\n'
save_string(text, roles_filename,
'EX: unable to write ' + list_name + ' ' +
@ -338,7 +339,7 @@ def set_roles_from_list(base_dir: str, domain: str, admin_nickname: str,
for roles_nick in roles_list:
roles_nick = roles_nick.strip()
roles_dir = acct_dir(base_dir, roles_nick, domain)
if os.path.isdir(roles_dir):
if is_a_dir(roles_dir):
set_role(base_dir, roles_nick, domain, role_name)
else:
# nicknames on separate lines
@ -348,7 +349,7 @@ def set_roles_from_list(base_dir: str, domain: str, admin_nickname: str,
for roles_nick in roles_list:
roles_nick = roles_nick.strip()
roles_dir = acct_dir(base_dir, roles_nick, domain)
if os.path.isdir(roles_dir):
if is_a_dir(roles_dir):
text += roles_nick + '\n'
save_string(text, roles_filename,
'EX: unable to write ' + list_name + ' ' +
@ -357,5 +358,5 @@ def set_roles_from_list(base_dir: str, domain: str, admin_nickname: str,
for roles_nick in roles_list:
roles_nick = roles_nick.strip()
roles_dir = acct_dir(base_dir, roles_nick, domain)
if os.path.isdir(roles_dir):
if is_a_dir(roles_dir):
set_role(base_dir, roles_nick, domain, role_name)

View File

@ -29,6 +29,7 @@ from data import load_list
from data import erase_file
from data import move_file
from data import is_a_file
from data import is_a_dir
def _update_post_schedule(base_dir: str, handle: str, httpd,
@ -266,7 +267,7 @@ def remove_scheduled_posts(base_dir: str, nickname: str, domain: str) -> None:
schedule_index_filename)
# remove the scheduled posts
scheduled_dir = acct_dir(base_dir, nickname, domain) + '/scheduled'
if not os.path.isdir(scheduled_dir):
if not is_a_dir(scheduled_dir):
return
for scheduled_post_filename in os.listdir(scheduled_dir):
file_path = os.path.join(scheduled_dir, scheduled_post_filename)

View File

@ -23,6 +23,7 @@ from utils import get_mutuals_of_person
from data import load_string
from data import save_string
from data import is_a_file
from data import is_a_dir
def load_searchable_by_default(base_dir: str) -> {}:
@ -95,7 +96,7 @@ def _search_virtual_box_posts(base_dir: str, nickname: str, domain: str,
if box_name == 'bookmarks':
box_name = 'inbox'
path = acct_dir(base_dir, nickname, domain) + '/' + box_name
if not os.path.isdir(path):
if not is_a_dir(path):
return []
search_str = search_str.lower().strip()
@ -151,7 +152,7 @@ def search_box_posts(base_dir: str, nickname: str, domain: str,
"""
path = acct_dir(base_dir, nickname, domain) + '/' + box_name
# is this a virtual box, such as direct messages?
if not os.path.isdir(path):
if not is_a_dir(path):
if is_a_file(path + '.index'):
return _search_virtual_box_posts(base_dir, nickname, domain,
search_str, max_results, box_name)

View File

@ -64,6 +64,7 @@ from data import save_string
from data import load_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def _load_dfc_ids(base_dir: str, system_language: str,
@ -385,9 +386,9 @@ def add_share(base_dir: str,
# copy or move the image for the shared item to its destination
if image_filename:
if is_a_file(image_filename):
if not os.path.isdir(base_dir + '/sharefiles'):
if not is_a_dir(base_dir + '/sharefiles'):
os.mkdir(base_dir + '/sharefiles')
if not os.path.isdir(base_dir + '/sharefiles/' + nickname):
if not is_a_dir(base_dir + '/sharefiles/' + nickname):
os.mkdir(base_dir + '/sharefiles/' + nickname)
item_idfile = base_dir + '/sharefiles/' + nickname + '/' + item_id
formats = get_image_extensions()
@ -1733,13 +1734,13 @@ def _update_federated_shares_cache(session, shared_items_federated_domains: [],
"""
# create directories where catalogs will be stored
cache_dir = base_dir + '/cache'
if not os.path.isdir(cache_dir):
if not is_a_dir(cache_dir):
os.mkdir(cache_dir)
if shares_file_type == 'shares':
catalogs_dir = cache_dir + '/catalogs'
else:
catalogs_dir = cache_dir + '/wantedItems'
if not os.path.isdir(catalogs_dir):
if not is_a_dir(catalogs_dir):
os.mkdir(catalogs_dir)
as_header = {
@ -1820,9 +1821,9 @@ def _generate_next_shares_token_update(base_dir: str,
for this instance will be updated
"""
token_update_dir = data_dir(base_dir)
if not os.path.isdir(base_dir):
if not is_a_dir(base_dir):
os.mkdir(base_dir)
if not os.path.isdir(token_update_dir):
if not is_a_dir(token_update_dir):
os.mkdir(token_update_dir)
token_update_filename = token_update_dir + '/.tokenUpdate'
next_update_sec = None

View File

@ -248,6 +248,7 @@ from data import load_list
from data import load_string
from data import save_string
from data import erase_file
from data import is_a_dir
TEST_SERVER_GROUP_RUNNING = False
@ -329,7 +330,7 @@ def _test_http_signed_get(base_dir: str):
no_recency_check)
path = base_dir + '/.testHttpsigGET'
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
@ -375,7 +376,7 @@ def _test_http_signed_get(base_dir: str):
assert verify_post_headers(http_prefix, public_key_pem, headers,
boxpath, getreq_method, None,
message_body_json_str, debug, no_recency_check)
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
@ -609,7 +610,7 @@ def _test_httpsig_base(with_digest: bool, base_dir: str):
print('test_httpsig(' + str(with_digest) + ')')
path = base_dir + '/.testHttpsigBase'
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
@ -770,7 +771,7 @@ def create_server_alice(path: str, domain: str, port: int,
has_follows: bool, has_posts: bool,
send_threads: []):
print('Creating test server: Alice on port ' + str(port))
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
@ -975,7 +976,7 @@ def create_server_bob(path: str, domain: str, port: int,
has_follows: bool, has_posts: bool,
send_threads: []):
print('Creating test server: Bob on port ' + str(port))
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
@ -1180,7 +1181,7 @@ def create_server_eve(path: str, domain: str, port: int, federation_list: [],
has_follows: bool, has_posts: bool,
send_threads: []):
print('Creating test server: Eve on port ' + str(port))
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
@ -1305,7 +1306,7 @@ def create_server_group(path: str, domain: str, port: int,
has_follows: bool, has_posts: bool,
send_threads: []):
print('Creating test server: Group on port ' + str(port))
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
@ -1422,7 +1423,7 @@ def test_post_message_between_servers(base_dir: str) -> None:
'https://creativecommons.org/licenses/by-nc/4.0'
media_creator: str = 'Secret Squirrel'
if os.path.isdir(base_dir + '/.tests'):
if is_a_dir(base_dir + '/.tests'):
shutil.rmtree(base_dir + '/.tests', ignore_errors=False)
os.mkdir(base_dir + '/.tests')
@ -1545,7 +1546,7 @@ def test_post_message_between_servers(base_dir: str) -> None:
m_path = get_media_path()
media_path = alice_dir + '/' + m_path
for _ in range(30):
if os.path.isdir(inbox_path):
if is_a_dir(inbox_path):
if len([name for name in os.listdir(inbox_path)
if os.path.isfile(os.path.join(inbox_path, name))]) > 0:
if len([name for name in os.listdir(outbox_path)
@ -1565,7 +1566,7 @@ def test_post_message_between_servers(base_dir: str) -> None:
# check that a news account exists
news_actor_dir = data_dir(alice_dir) + '/news@' + alice_domain
print("news_actor_dir: " + news_actor_dir)
assert os.path.isdir(news_actor_dir)
assert is_a_dir(news_actor_dir)
news_actor_file = news_actor_dir + '.json'
assert os.path.isfile(news_actor_file)
news_actor_json = load_json(news_actor_file)
@ -1729,7 +1730,7 @@ def test_post_message_between_servers(base_dir: str) -> None:
outbox_message_arrived: bool = False
for _ in range(20):
time.sleep(1)
if not os.path.isdir(inbox_path):
if not is_a_dir(inbox_path):
continue
if len([name for name in os.listdir(outbox_path)
if os.path.isfile(os.path.join(outbox_path, name))]) > 0:
@ -1782,7 +1783,7 @@ def test_follow_between_servers(base_dir: str) -> None:
media_license_url = 'https://creativecommons.org/licenses/by-nc/4.0'
media_creator = 'Penfold'
if os.path.isdir(base_dir + '/.tests'):
if is_a_dir(base_dir + '/.tests'):
shutil.rmtree(base_dir + '/.tests', ignore_errors=False)
os.mkdir(base_dir + '/.tests')
@ -1947,7 +1948,7 @@ def test_follow_between_servers(base_dir: str) -> None:
alice_message_arrived: bool = False
for _ in range(20):
time.sleep(1)
if os.path.isdir(inbox_path):
if is_a_dir(inbox_path):
if len([name for name in os.listdir(inbox_path)
if os.path.isfile(os.path.join(inbox_path, name))]) > 0:
alice_message_arrived = True
@ -1992,7 +1993,7 @@ def test_shared_items_federation(base_dir: str) -> None:
media_license_url = 'https://creativecommons.org/licenses/by-nc/4.0'
media_creator = 'Dr Drokk'
if os.path.isdir(base_dir + '/.tests'):
if is_a_dir(base_dir + '/.tests'):
shutil.rmtree(base_dir + '/.tests', ignore_errors=False)
os.mkdir(base_dir + '/.tests')
@ -2084,8 +2085,8 @@ def test_shared_items_federation(base_dir: str) -> None:
print('\n\n*********************************************************')
print("Alice and Bob agree to share items catalogs")
assert os.path.isdir(alice_dir)
assert os.path.isdir(bob_dir)
assert is_a_dir(alice_dir)
assert is_a_dir(bob_dir)
set_config_param(alice_dir, 'sharedItemsFederatedDomains', bob_address)
set_config_param(bob_dir, 'sharedItemsFederatedDomains', alice_address)
@ -2154,7 +2155,7 @@ def test_shared_items_federation(base_dir: str) -> None:
print('\n\n*********************************************************')
print('Bob publishes some shared items')
if os.path.isdir(bob_dir + '/ontology'):
if is_a_dir(bob_dir + '/ontology'):
shutil.rmtree(bob_dir + '/ontology', ignore_errors=False)
os.mkdir(bob_dir + '/ontology')
copyfile(base_dir + '/img/logo.png', bob_dir + '/logo.png')
@ -2367,7 +2368,7 @@ def test_shared_items_federation(base_dir: str) -> None:
alice_message_arrived: bool = False
for _ in range(20):
time.sleep(1)
if os.path.isdir(inbox_path):
if is_a_dir(inbox_path):
if len([name for name in os.listdir(inbox_path)
if os.path.isfile(os.path.join(inbox_path, name))]) > 0:
alice_message_arrived = True
@ -2469,7 +2470,7 @@ def test_group_follow(base_dir: str) -> None:
media_license_url = 'https://creativecommons.org/licenses/by-nc/4.0'
media_creator = 'Bumble'
if os.path.isdir(base_dir + '/.tests'):
if is_a_dir(base_dir + '/.tests'):
shutil.rmtree(base_dir + '/.tests', ignore_errors=False)
os.mkdir(base_dir + '/.tests')
@ -2818,7 +2819,7 @@ def test_group_follow(base_dir: str) -> None:
for _ in range(20):
time.sleep(1)
if os.path.isdir(inbox_path):
if is_a_dir(inbox_path):
curr_posts_inbox = \
len([name for name in os.listdir(inbox_path)
if os.path.isfile(os.path.join(inbox_path, name))])
@ -2841,7 +2842,7 @@ def test_group_follow(base_dir: str) -> None:
bob_message_arrived: bool = False
for _ in range(20):
time.sleep(1)
if os.path.isdir(inbox_path_bob):
if is_a_dir(inbox_path_bob):
curr_posts_bob = \
len([name for name in os.listdir(inbox_path_bob)
if os.path.isfile(os.path.join(inbox_path_bob, name))])
@ -2902,7 +2903,7 @@ def _test_followers_of_person(base_dir: str) -> None:
http_prefix = 'https'
federation_list: list[str] = []
base_dir = curr_dir + '/.tests_followersofperson'
if os.path.isdir(base_dir):
if is_a_dir(base_dir):
shutil.rmtree(base_dir, ignore_errors=False)
os.mkdir(base_dir)
os.chdir(base_dir)
@ -2952,7 +2953,7 @@ def _test_followers_on_domain(base_dir: str) -> None:
http_prefix = 'https'
federation_list: list[str] = []
base_dir = curr_dir + '/.tests_nooffollowersOndomain'
if os.path.isdir(base_dir):
if is_a_dir(base_dir):
shutil.rmtree(base_dir, ignore_errors=False)
os.mkdir(base_dir)
os.chdir(base_dir)
@ -3019,7 +3020,7 @@ def _test_group_followers(base_dir: str) -> None:
http_prefix = 'https'
federation_list: list[str] = []
base_dir = curr_dir + '/.tests_testgroupfollowers'
if os.path.isdir(base_dir):
if is_a_dir(base_dir):
shutil.rmtree(base_dir, ignore_errors=False)
os.mkdir(base_dir)
os.chdir(base_dir)
@ -3065,7 +3066,7 @@ def _test_follows(base_dir: str) -> None:
http_prefix: str = 'https'
federation_list: list[str] = ['wild.com', 'mesh.com']
base_dir = curr_dir + '/.tests_testfollows'
if os.path.isdir(base_dir):
if is_a_dir(base_dir):
shutil.rmtree(base_dir, ignore_errors=False)
os.mkdir(base_dir)
os.chdir(base_dir)
@ -3157,7 +3158,7 @@ def _test_create_person_account(base_dir: str):
http_prefix: str = 'https'
client_to_server: bool = False
base_dir: str = curr_dir + '/.tests_createperson'
if os.path.isdir(base_dir):
if is_a_dir(base_dir):
shutil.rmtree(base_dir, ignore_errors=False)
os.mkdir(base_dir)
os.chdir(base_dir)
@ -3297,7 +3298,7 @@ def _test_authentication(base_dir: str) -> None:
password = 'SuperSecretPassword12345'
base_dir = curr_dir + '/.tests_authentication'
if os.path.isdir(base_dir):
if is_a_dir(base_dir):
shutil.rmtree(base_dir, ignore_errors=False)
os.mkdir(base_dir)
os.chdir(base_dir)
@ -3349,7 +3350,7 @@ def test_client_to_server(base_dir: str):
federation_list: list[str] = []
low_bandwidth: bool = False
if os.path.isdir(base_dir + '/.tests'):
if is_a_dir(base_dir + '/.tests'):
shutil.rmtree(base_dir + '/.tests', ignore_errors=False)
os.mkdir(base_dir + '/.tests')
@ -3490,7 +3491,7 @@ def test_client_to_server(base_dir: str):
print('send_result: ' + str(send_result))
for _ in range(30):
if os.path.isdir(outbox_path):
if is_a_dir(outbox_path):
if len([name for name in os.listdir(outbox_path)
if os.path.isfile(os.path.join(outbox_path, name))]) == 1:
break
@ -3505,7 +3506,7 @@ def test_client_to_server(base_dir: str):
print(">>> c2s post arrived in Alice's outbox\n\n\n")
for _ in range(30):
if os.path.isdir(inbox_path):
if is_a_dir(inbox_path):
if len([name for name in os.listdir(bob_inbox_path)
if os.path.isfile(os.path.join(bob_inbox_path,
name))]) == 1:
@ -3524,10 +3525,10 @@ def test_client_to_server(base_dir: str):
time.sleep(2)
calendar_path = data_dir(bob_dir) + '/bob@' + bob_domain + '/calendar'
if not os.path.isdir(calendar_path):
if not is_a_dir(calendar_path):
print('Missing calendar path: ' + calendar_path)
assert os.path.isdir(calendar_path)
assert os.path.isdir(calendar_path + '/' + str(test_date.year))
assert is_a_dir(calendar_path)
assert is_a_dir(calendar_path + '/' + str(test_date.year))
assert os.path.isfile(calendar_path + '/' + str(test_date.year) + '/' +
str(test_date.month) + '.txt')
print(">>> calendar entry created for s2s post which arrived at " +
@ -3701,7 +3702,7 @@ def test_client_to_server(base_dir: str):
True, __version__, signing_priv_key_pem,
system_language, mitm_servers)
for _ in range(20):
if os.path.isdir(outbox_path) and os.path.isdir(inbox_path):
if is_a_dir(outbox_path) and is_a_dir(inbox_path):
if len([name for name in os.listdir(outbox_path)
if os.path.isfile(os.path.join(outbox_path, name))]) == 2:
test = len([name for name in os.listdir(inbox_path)
@ -3733,7 +3734,7 @@ def test_client_to_server(base_dir: str):
True, __version__, signing_priv_key_pem,
system_language, mitm_servers)
for _ in range(20):
if os.path.isdir(outbox_path) and os.path.isdir(inbox_path):
if is_a_dir(outbox_path) and is_a_dir(inbox_path):
if len([name for name in os.listdir(outbox_path)
if os.path.isfile(os.path.join(outbox_path, name))]) == 3:
test = len([name for name in os.listdir(inbox_path)
@ -3782,7 +3783,7 @@ def test_client_to_server(base_dir: str):
signing_priv_key_pem,
system_language, mitm_servers)
for _ in range(30):
if os.path.isdir(outbox_path) and os.path.isdir(inbox_path):
if is_a_dir(outbox_path) and is_a_dir(inbox_path):
if len([name for name in os.listdir(outbox_path)
if os.path.isfile(os.path.join(outbox_path, name))]) == 4:
if len([name for name in os.listdir(inbox_path)
@ -3824,7 +3825,7 @@ def test_client_to_server(base_dir: str):
True, __version__, signing_priv_key_pem,
system_language, mitm_servers)
for _ in range(30):
if os.path.isdir(inbox_path):
if is_a_dir(inbox_path):
test = len([name for name in os.listdir(inbox_path)
if os.path.isfile(os.path.join(inbox_path, name))])
if test == bob_posts_before-1:
@ -4216,15 +4217,15 @@ def _test_addemoji(base_dir: str):
hashtags: dict = {}
base_dir_original = base_dir
path = base_dir + '/.tests'
if not os.path.isdir(path):
if not is_a_dir(path):
os.mkdir(path)
path = base_dir + '/.tests/emoji'
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
base_dir = path
path = base_dir + '/emoji'
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
copytree(base_dir_original + '/emoji', base_dir + '/emoji', False, None)
@ -6863,7 +6864,7 @@ def test_update_actor(base_dir: str):
federation_list: list[str] = []
system_language = 'en'
if os.path.isdir(base_dir + '/.tests'):
if is_a_dir(base_dir + '/.tests'):
shutil.rmtree(base_dir + '/.tests',
ignore_errors=False)
os.mkdir(base_dir + '/.tests')
@ -6972,7 +6973,7 @@ def test_update_actor(base_dir: str):
assert THR_ALICE.is_alive() is False
os.chdir(base_dir)
if os.path.isdir(base_dir + '/.tests'):
if is_a_dir(base_dir + '/.tests'):
shutil.rmtree(base_dir + '/.tests', ignore_errors=False)
@ -7780,7 +7781,7 @@ def _test_httpsig_base_new(with_digest: bool, base_dir: str,
debug = True
path = base_dir + '/.testHttpsigBaseNew'
if os.path.isdir(path):
if is_a_dir(path):
shutil.rmtree(path, ignore_errors=False)
os.mkdir(path)
os.chdir(path)
@ -8956,7 +8957,7 @@ def _test_book_link(base_dir: str):
max_cached_readers = 10
base_dir2 = base_dir + '/.testbookevents'
if os.path.isdir(base_dir2):
if is_a_dir(base_dir2):
shutil.rmtree(base_dir2, ignore_errors=False)
os.mkdir(base_dir2)
@ -9285,7 +9286,7 @@ def _test_book_link(base_dir: str):
assert books_cache['reader_list'][expected_readers - 1] == actor
assert books_cache['readers'].get(actor)
if os.path.isdir(base_dir2):
if is_a_dir(base_dir2):
shutil.rmtree(base_dir2, ignore_errors=False)
@ -9626,7 +9627,7 @@ def _test_gemini_blog(base_dir: str) -> None:
message_json, system_language,
debug, True)
assert result
assert os.path.isdir(gemini_blog_dir)
assert is_a_dir(gemini_blog_dir)
assert os.path.isfile(gemini_blog_filename)
assert text_in_file('# ' + title + '\n', gemini_blog_filename)
assert text_in_file(content, gemini_blog_filename)
@ -9658,7 +9659,7 @@ def _test_markdown_blog(base_dir: str) -> None:
message_json, system_language,
debug, True)
assert result
assert os.path.isdir(markdown_blog_dir)
assert is_a_dir(markdown_blog_dir)
assert os.path.isfile(markdown_blog_filename)
assert text_in_file(content, markdown_blog_filename)
assert text_in_file('2022-02-25', markdown_blog_filename)
@ -9688,7 +9689,7 @@ def _test_micron_blog(base_dir: str) -> None:
message_json, system_language,
debug, True)
assert result
assert os.path.isdir(micron_blog_dir)
assert is_a_dir(micron_blog_dir)
assert os.path.isfile(micron_blog_filename)
assert text_in_file(content, micron_blog_filename)
assert text_in_file('2022-02-25', micron_blog_filename)

View File

@ -33,6 +33,7 @@ from data import save_string
from data import save_flag_file
from data import erase_file
from data import is_a_file
from data import is_a_dir
def import_theme(base_dir: str, filename: str) -> bool:
@ -41,7 +42,7 @@ def import_theme(base_dir: str, filename: str) -> bool:
if not is_a_file(filename):
return False
temp_theme_dir = base_dir + '/imports/files'
if os.path.isdir(temp_theme_dir):
if is_a_dir(temp_theme_dir):
rmtree(temp_theme_dir, ignore_errors=False, onexc=None)
os.mkdir(temp_theme_dir)
unpack_archive(filename, temp_theme_dir, 'zip')
@ -86,10 +87,10 @@ def import_theme(base_dir: str, filename: str) -> bool:
new_theme_name = new_theme_name + '2'
theme_dir = base_dir + '/theme/' + new_theme_name
if not os.path.isdir(theme_dir):
if not is_a_dir(theme_dir):
os.mkdir(theme_dir)
copytree(temp_theme_dir, theme_dir, False, None)
if os.path.isdir(temp_theme_dir):
if is_a_dir(temp_theme_dir):
rmtree(temp_theme_dir, ignore_errors=False, onexc=None)
if scan_themes_for_scripts(theme_dir):
rmtree(theme_dir, ignore_errors=False, onexc=None)
@ -103,7 +104,7 @@ def export_theme(base_dir: str, theme: str) -> bool:
theme_dir = base_dir + '/theme/' + theme
if not is_a_file(theme_dir + '/theme.json'):
return False
if not os.path.isdir(base_dir + '/exports'):
if not is_a_dir(base_dir + '/exports'):
os.mkdir(base_dir + '/exports')
export_filename = base_dir + '/exports/' + theme + '.zip'
if is_a_file(export_filename):
@ -161,7 +162,7 @@ def _copy_theme_help_files(base_dir: str, theme_name: str,
if not system_language:
system_language = 'en'
theme_dir = base_dir + '/theme/' + theme_name + '/welcome'
if not os.path.isdir(theme_dir):
if not is_a_dir(theme_dir):
theme_dir = base_dir + '/defaultwelcome'
dir_str = data_dir(base_dir)
for _, _, files in os.walk(theme_dir):
@ -173,7 +174,7 @@ def _copy_theme_help_files(base_dir: str, theme_name: str,
'.md')
if dest_help_markdown_file in ('profile.md', 'final.md'):
dest_help_markdown_file = 'welcome_' + dest_help_markdown_file
if os.path.isdir(dir_str):
if is_a_dir(dir_str):
copyfile(theme_dir + '/' + help_markdown_file,
dir_str + '/' + dest_help_markdown_file)
break
@ -635,7 +636,7 @@ def _set_theme_fonts(base_dir: str, theme_name: str) -> None:
fonts_dir = base_dir + '/fonts'
theme_fonts_dir = \
base_dir + '/theme/' + theme_name_lower + '/fonts'
if not os.path.isdir(theme_fonts_dir):
if not is_a_dir(theme_fonts_dir):
return
for _, _, files in os.walk(theme_fonts_dir):
for filename in files:
@ -781,7 +782,7 @@ def set_news_avatar(base_dir: str, name: str,
if is_a_file(filename):
erase_file(filename,
'EX: set_news_avatar unable to delete ' + filename)
if os.path.isdir(base_dir + '/cache/avatars'):
if is_a_dir(base_dir + '/cache/avatars'):
copyfile(new_filename, filename)
account_dir = acct_dir(base_dir, nickname, domain)
copyfile(new_filename, account_dir + '/avatar.png')
@ -792,7 +793,7 @@ def _set_clear_cache_flag(base_dir: str) -> None:
(eg. a script in a cron job) to clear the browser cache
"""
dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
return
flag_filename = dir_str + '/.clear_cache'
save_flag_file(flag_filename,
@ -852,7 +853,7 @@ def set_theme(base_dir: str, name: str, domain: str,
news_avatar_theme_filename = \
base_dir + '/theme/' + name + '/icons/avatar_news.png'
dir_str = data_dir(base_dir)
if os.path.isdir(dir_str + '/news@' + domain):
if is_a_dir(dir_str + '/news@' + domain):
if is_a_file(news_avatar_theme_filename):
news_avatar_filename = dir_str + '/news@' + domain + '/avatar.png'
copyfile(news_avatar_theme_filename, news_avatar_filename)

View File

@ -29,6 +29,7 @@ from data import load_string
from data import append_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
VALID_HASHTAG_CHARS = \
set('_0123456789' +
@ -717,7 +718,7 @@ def set_memorials(base_dir: str, domain: str, memorial_str) -> None:
for memorial_item in memorial_list:
memorial_nick = memorial_item.strip()
check_dir = acct_dir(base_dir, memorial_nick, domain)
if os.path.isdir(check_dir):
if is_a_dir(check_dir):
new_memorial_str += memorial_nick + '\n'
memorial_str = new_memorial_str
@ -817,7 +818,7 @@ def get_followers_of_person(base_dir: str,
domain: str = remove_domain_port(domain)
handle: str = nickname + '@' + domain
handle_dir: str = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
return followers
dir_str: str = data_dir(base_dir)
for subdir, dirs, _ in os.walk(dir_str):
@ -1035,10 +1036,10 @@ def create_person_dir(nickname: str, domain: str, base_dir: str,
"""
handle: str = nickname + '@' + domain
handle_dir: str = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
os.mkdir(handle_dir)
box_dir: str = acct_handle_dir(base_dir, handle) + '/' + dir_name
if not os.path.isdir(box_dir):
if not is_a_dir(box_dir):
os.mkdir(box_dir)
return box_dir
@ -1589,7 +1590,7 @@ def follow_person(base_dir: str, nickname: str, domain: str,
handle: str = nickname + '@' + domain
handle_dir: str = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
if not is_a_dir(handle_dir):
print('WARN: account for ' + handle + ' does not exist')
return False
@ -1622,7 +1623,7 @@ def follow_person(base_dir: str, nickname: str, domain: str,
unfollowed_filename)
dir_str: str = data_dir(base_dir)
if not os.path.isdir(dir_str):
if not is_a_dir(dir_str):
os.mkdir(dir_str)
handle_to_follow = follow_nickname + '@' + follow_domain
if group_account:
@ -2616,7 +2617,7 @@ def copytree(src: str, dst: str, symlinks: str, ignore: bool) -> None:
for item in os.listdir(src):
s_dir = os.path.join(src, item)
d_dir = os.path.join(dst, item)
if os.path.isdir(s_dir):
if is_a_dir(s_dir):
shutil.copytree(s_dir, d_dir, symlinks, ignore)
else:
shutil.copy2(s_dir, d_dir)
@ -2637,7 +2638,7 @@ def get_cached_post_filename(base_dir: str, nickname: str, domain: str,
"""
cached_post_dir: str = \
get_cached_post_directory(base_dir, nickname, domain)
if not os.path.isdir(cached_post_dir):
if not is_a_dir(cached_post_dir):
# print('ERROR: invalid html cache directory ' + cached_post_dir)
return None
if '@' not in cached_post_dir:
@ -2768,7 +2769,7 @@ def reject_post_id(base_dir: str, nickname: str, domain: str,
def load_translations_from_file(base_dir: str, language: str) -> ({}, str):
"""Returns the translations dictionary
"""
if not os.path.isdir(base_dir + '/translations'):
if not is_a_dir(base_dir + '/translations'):
print('ERROR: translations directory not found')
return None, None
if not language:

View File

@ -44,6 +44,7 @@ from data import load_string
from data import save_string
from data import load_line
from data import is_a_file
from data import is_a_dir
def get_hashtag_categories_feed(base_dir: str,
@ -428,7 +429,7 @@ def store_hash_tags(base_dir: str, nickname: str, domain: str,
tags_dir = base_dir + '/tags'
# add tags directory if it doesn't exist
if not os.path.isdir(tags_dir):
if not is_a_dir(tags_dir):
print('Creating tags directory')
os.mkdir(tags_dir)
@ -456,7 +457,7 @@ def store_hash_tags(base_dir: str, nickname: str, domain: str,
tag_maps_dir = base_dir + '/tagmaps'
if map_links:
# add tagmaps directory if it doesn't exist
if not os.path.isdir(tag_maps_dir):
if not is_a_dir(tag_maps_dir):
print('Creating tagmaps directory')
os.mkdir(tag_maps_dir)

View File

@ -7,11 +7,11 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Timeline"
import os
from utils import acct_dir
from data import erase_file
from data import save_flag_file
from data import is_a_file
from data import is_a_dir
def is_minimal(base_dir: str, domain: str, nickname: str) -> bool:
@ -19,7 +19,7 @@ def is_minimal(base_dir: str, domain: str, nickname: str) -> bool:
for the given account
"""
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return True
minimal_filename = account_dir + '/.notminimal'
if is_a_file(minimal_filename):
@ -32,7 +32,7 @@ def set_minimal(base_dir: str, domain: str, nickname: str,
"""Sets whether an account should display minimal buttons
"""
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return
minimal_filename = account_dir + '/.notminimal'
minimal_file_exists = is_a_file(minimal_filename)

View File

@ -149,6 +149,7 @@ from data import load_list
from data import save_string
from data import save_flag_file
from data import is_a_file
from data import is_a_dir
# maximum length for display name within html posts
MAX_DISPLAY_NAME_LENGTH = 42
@ -571,7 +572,7 @@ def _save_individual_post_as_html_to_cache(base_dir: str,
return False
# create the cache directory if needed
if not os.path.isdir(html_post_cache_dir):
if not is_a_dir(html_post_cache_dir):
os.mkdir(html_post_cache_dir)
if save_string(post_html, cached_post_filename,

View File

@ -61,6 +61,7 @@ from data import load_list
from data import load_string
from data import save_string
from data import is_a_file
from data import is_a_dir
def html_search_emoji(translate: {}, base_dir: str, search_str: str,
@ -355,7 +356,7 @@ def html_search_shared_items(translate: {},
catalogs_dir = base_dir + '/cache/catalogs'
else:
catalogs_dir = base_dir + '/cache/wantedItems'
if curr_page <= page_number and os.path.isdir(catalogs_dir):
if curr_page <= page_number and is_a_dir(catalogs_dir):
for _, dirs, files in os.walk(catalogs_dir):
for fname in files:
if '#' in fname:
@ -923,7 +924,7 @@ def html_hashtag_search(nickname: str, domain: str, port: int,
# check that the directory for the nickname exists
if nickname:
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
nickname = None
# read the index
@ -1192,7 +1193,7 @@ def html_hashtag_search_remote(nickname: str, domain: str, port: int,
# check that the directory for the nickname exists
if nickname:
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
return None
# read the css
@ -1378,7 +1379,7 @@ def hashtag_search_rss(nickname: str, domain: str, port: int,
# check that the directory for the nickname exists
if nickname:
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
nickname = None
# read the index
@ -1490,7 +1491,7 @@ def hashtag_search_json(nickname: str, domain: str, port: int,
# check that the directory for the nickname exists
if nickname:
account_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_dir):
if not is_a_dir(account_dir):
nickname = None
# read the index

View File

@ -65,6 +65,7 @@ from data import save_binary
from data import load_string
from data import erase_file
from data import is_a_file
from data import is_a_dir
def minimizing_attached_images(base_dir: str, nickname: str, domain: str,
@ -542,7 +543,7 @@ def shares_timeline_json(actor: str, page_number: int, items_per_page: int,
catalogs_dir = base_dir + '/cache/catalogs'
else:
catalogs_dir = base_dir + '/cache/wantedItems'
if os.path.isdir(catalogs_dir):
if is_a_dir(catalogs_dir):
for _, dirs, files in os.walk(catalogs_dir):
for fname in files:
if '#' in fname:
@ -2123,7 +2124,7 @@ def html_show_share(base_dir: str, domain: str, nickname: str,
catalogs_dir = base_dir + '/cache/catalogs'
else:
catalogs_dir = base_dir + '/cache/wantedItems'
if not os.path.isdir(catalogs_dir):
if not is_a_dir(catalogs_dir):
return None
for _, _, files in os.walk(catalogs_dir):
for fname in files:

View File

@ -7,7 +7,6 @@ __email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Onboarding"
import os
from shutil import copyfile
from utils import data_dir
from utils import get_config_param
@ -19,6 +18,7 @@ from markdown import markdown_to_html
from data import save_flag_file
from data import load_string
from data import is_a_file
from data import is_a_dir
def is_welcome_screen_complete(base_dir: str,
@ -26,7 +26,7 @@ def is_welcome_screen_complete(base_dir: str,
"""Returns true if the welcome screen is complete for the given account
"""
account_path = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_path):
if not is_a_dir(account_path):
return False
complete_filename = account_path + '/.welcome_complete'
return is_a_file(complete_filename)
@ -37,7 +37,7 @@ def welcome_screen_is_complete(base_dir: str,
"""Indicates that the welcome screen has been shown for a given account
"""
account_path = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(account_path):
if not is_a_dir(account_path):
return
complete_filename = account_path + '/.welcome_complete'
save_flag_file(complete_filename,

View File

@ -30,6 +30,7 @@ from utils import get_nickname_from_actor
from utils import get_domain_from_actor
from utils import is_yggdrasil_url
from data import is_a_file
from data import is_a_dir
def _parse_handle(handle: str) -> (str, str, bool):
@ -160,7 +161,7 @@ def store_webfinger_endpoint(nickname: str, domain: str, port: int,
domain = get_full_domain(domain, port)
handle = nickname + '@' + domain
wf_subdir = '/wfendpoints'
if not os.path.isdir(base_dir + wf_subdir):
if not is_a_dir(base_dir + wf_subdir):
os.mkdir(base_dir + wf_subdir)
filename = base_dir + wf_subdir + '/' + handle + '.json'
save_json(wf_json, filename)
@ -625,7 +626,7 @@ def webfinger_update(base_dir: str, nickname: str, domain: str,
"""
handle = nickname + '@' + domain
wf_subdir = '/wfendpoints'
if not os.path.isdir(base_dir + wf_subdir):
if not is_a_dir(base_dir + wf_subdir):
return
filename = base_dir + wf_subdir + '/' + handle + '.json'