Replace file operations with functions

main
bashrc 2026-04-27 12:56:00 +01:00
parent 54c759aea3
commit df6fed2438
5 changed files with 228 additions and 343 deletions

View File

@ -38,7 +38,7 @@ def get_style_sheet(self, base_dir: str, calling_domain: str, path: str,
tries = 0 tries = 0
while tries < 5: while tries < 5:
try: try:
css = get_css(base_dir, path) css = get_css(path)
if css: if css:
css_cache[path] = css css_cache[path] = css
break break

186
theme.py
View File

@ -28,6 +28,8 @@ from utils import language_right_to_left
from formats import get_image_extensions from formats import get_image_extensions
from content import dangerous_css from content import dangerous_css
from textmode import set_text_mode_theme from textmode import set_text_mode_theme
from data import load_string
from data import save_string
def import_theme(base_dir: str, filename: str) -> bool: def import_theme(base_dir: str, filename: str) -> bool:
@ -47,14 +49,10 @@ def import_theme(base_dir: str, filename: str) -> bool:
' missing from imported theme') ' missing from imported theme')
return False return False
new_theme_name = None new_theme_name = None
new_theme_name1 = None new_theme_name1 = \
try: load_string(temp_theme_dir + '/name.txt',
with open(temp_theme_dir + '/name.txt', 'r', 'EX: import_theme unable to read ' +
encoding='utf-8') as fp_theme: temp_theme_dir + '/name.txt')
new_theme_name1 = fp_theme.read()
except OSError:
print('EX: import_theme unable to read ' +
temp_theme_dir + '/name.txt')
if new_theme_name1: if new_theme_name1:
new_theme_name = remove_eol(new_theme_name1) new_theme_name = remove_eol(new_theme_name1)
@ -389,11 +387,8 @@ def _set_theme_from_dict(base_dir: str, name: str,
css = set_css_param(css, 'language-direction', 'rtl') css = set_css_param(css, 'language-direction', 'rtl')
filename = base_dir + '/' + filename filename = base_dir + '/' + filename
try: save_string(css, filename,
with open(filename, 'w+', encoding='utf-8') as fp_css: 'EX: _set_theme_from_dict unable to write ' + filename)
fp_css.write(css)
except OSError:
print('EX: _set_theme_from_dict unable to write ' + filename)
screen_name = ( screen_name = (
'login', 'follow', 'options', 'search', 'welcome' 'login', 'follow', 'options', 'search', 'welcome'
@ -413,21 +408,13 @@ def _set_background_format(base_dir: str,
if not os.path.isfile(css_filename): if not os.path.isfile(css_filename):
return return
css = None css = load_string(css_filename,
try: 'EX: _set_background_format 1 ' + css_filename + ' [ex]')
with open(css_filename, 'r', encoding='utf-8') as fp_css:
css = fp_css.read()
except OSError as exc:
print('EX: _set_background_format 1 ' + css_filename + ' ' + str(exc))
if css: if css:
css = css.replace('background.jpg', 'background.' + extension) css = css.replace('background.jpg', 'background.' + extension)
try: save_string(css, css_filename,
with open(css_filename, 'w+', encoding='utf-8') as fp_css2: 'EX: _set_background_format 2 ' +
fp_css2.write(css) css_filename + ' [ex]')
except OSError as exc:
print('EX: _set_background_format 2 ' +
css_filename + ' ' + str(exc))
def enable_grayscale(base_dir: str) -> None: def enable_grayscale(base_dir: str) -> None:
@ -438,28 +425,25 @@ def enable_grayscale(base_dir: str) -> None:
template_filename = base_dir + '/' + filename template_filename = base_dir + '/' + filename
if not os.path.isfile(template_filename): if not os.path.isfile(template_filename):
continue continue
try: css = load_string(template_filename,
with open(template_filename, 'r', encoding='utf-8') as fp_css: 'EX: enable_grayscale unable to read ' +
css = fp_css.read() template_filename + ' [ex]')
if 'grayscale' not in css: if css is None:
css = \ continue
css.replace('body, html {', if 'grayscale' in css:
'body, html {\n' + continue
' filter: grayscale(100%);') css = \
filename = base_dir + '/' + filename css.replace('body, html {', 'body, html {\n' +
with open(filename, 'w+', encoding='utf-8') as fp_css: ' filter: grayscale(100%);')
fp_css.write(css) filename = base_dir + '/' + filename
except OSError as ex: save_string(css, filename,
print('EX: enable_grayscale unable to read ' + 'EX: enable_grayscale unable to save ' +
template_filename + ' ' + str(ex)) filename + ' [ex]')
grayscale_filename = data_dir(base_dir) + '/.grayscale' grayscale_filename = data_dir(base_dir) + '/.grayscale'
if not os.path.isfile(grayscale_filename): if not os.path.isfile(grayscale_filename):
try: save_string(' ', grayscale_filename,
with open(grayscale_filename, 'w+', encoding='utf-8') as fp_gray: 'EX: enable_grayscale unable to write ' +
fp_gray.write(' ') grayscale_filename + ' [ex]')
except OSError as ex:
print('EX: enable_grayscale unable to write ' +
grayscale_filename + ' ' + str(ex))
def disable_grayscale(base_dir: str) -> None: def disable_grayscale(base_dir: str) -> None:
@ -470,18 +454,18 @@ def disable_grayscale(base_dir: str) -> None:
template_filename = base_dir + '/' + filename template_filename = base_dir + '/' + filename
if not os.path.isfile(template_filename): if not os.path.isfile(template_filename):
continue continue
try: css = load_string(template_filename,
with open(template_filename, 'r', encoding='utf-8') as fp_css: 'EX: disable_grayscale unable to read ' +
css = fp_css.read() template_filename + ' [ex]')
if 'grayscale' in css: if css is None:
css = \ continue
css.replace('\n filter: grayscale(100%);', '') if 'grayscale' not in css:
filename = base_dir + '/' + filename continue
with open(filename, 'w+', encoding='utf-8') as fp_css: css = css.replace('\n filter: grayscale(100%);', '')
fp_css.write(css) filename = base_dir + '/' + filename
except OSError as ex: save_string(css, filename,
print('EX: disable_grayscale unable to read ' + 'EX: disable_grayscale unable to save ' +
template_filename + ' ' + str(ex)) filename + ' [ex]')
grayscale_filename = data_dir(base_dir) + '/.grayscale' grayscale_filename = data_dir(base_dir) + '/.grayscale'
if os.path.isfile(grayscale_filename): if os.path.isfile(grayscale_filename):
try: try:
@ -500,25 +484,20 @@ def _set_dyslexic_font(base_dir: str) -> bool:
if not os.path.isfile(template_filename): if not os.path.isfile(template_filename):
continue continue
css = None css = load_string(template_filename,
try: 'EX: _set_dyslexic_font unable to read ' +
with open(template_filename, 'r', encoding='utf-8') as fp_css: template_filename)
css = fp_css.read()
except OSError:
print('EX: _set_dyslexic_font unable to read ' + template_filename)
if css: if not css:
css = \ continue
set_css_param(css, "*src", css = \
"url('./fonts/OpenDyslexic-Regular.woff2" + set_css_param(css, "*src",
"') format('woff2')") "url('./fonts/OpenDyslexic-Regular.woff2" +
css = set_css_param(css, "*font-family", "'OpenDyslexic'") "') format('woff2')")
filename = base_dir + '/' + filename css = set_css_param(css, "*font-family", "'OpenDyslexic'")
try: filename = base_dir + '/' + filename
with open(filename, 'w+', encoding='utf-8') as fp_css: save_string(css, filename,
fp_css.write(css) 'EX: _set_dyslexic_font unable to write ' + filename)
except OSError:
print('EX: _set_dyslexic_font unable to write ' + filename)
return False return False
@ -547,27 +526,22 @@ def _set_custom_font(base_dir: str):
if not os.path.isfile(template_filename): if not os.path.isfile(template_filename):
continue continue
css = None css = load_string(template_filename,
try: 'EX: _set_custom_font unable to read ' +
with open(template_filename, 'r', encoding='utf-8') as fp_css: template_filename)
css = fp_css.read()
except OSError:
print('EX: _set_custom_font unable to read ' + template_filename)
if css: if not css:
css = \ continue
set_css_param(css, "*src", css = \
"url('./fonts/custom." + set_css_param(css, "*src",
custom_font_ext + "') format('" + "url('./fonts/custom." +
custom_font_type + "')") custom_font_ext + "') format('" +
css = set_css_param(css, "*font-family", "'CustomFont'") custom_font_type + "')")
css = set_css_param(css, "header-font", "'CustomFont'") css = set_css_param(css, "*font-family", "'CustomFont'")
filename = base_dir + '/' + filename css = set_css_param(css, "header-font", "'CustomFont'")
try: filename = base_dir + '/' + filename
with open(filename, 'w+', encoding='utf-8') as fp_css: save_string(css, filename,
fp_css.write(css) 'EX: _set_custom_font unable to write ' + filename)
except OSError:
print('EX: _set_custom_font unable to write ' + filename)
def set_theme_from_designer(base_dir: str, theme_name: str, domain: str, def set_theme_from_designer(base_dir: str, theme_name: str, domain: str,
@ -832,11 +806,8 @@ def _set_clear_cache_flag(base_dir: str) -> None:
if not os.path.isdir(dir_str): if not os.path.isdir(dir_str):
return return
flag_filename = dir_str + '/.clear_cache' flag_filename = dir_str + '/.clear_cache'
try: save_string('\n', flag_filename,
with open(flag_filename, 'w+', encoding='utf-8') as fp_flag: 'EX: _set_clear_cache_flag unable to write ' + flag_filename)
fp_flag.write('\n')
except OSError:
print('EX: _set_clear_cache_flag unable to write ' + flag_filename)
def set_theme(base_dir: str, name: str, domain: str, def set_theme(base_dir: str, name: str, domain: str,
@ -932,13 +903,12 @@ def scan_themes_for_scripts(base_dir: str) -> bool:
if not fname.endswith('.svg'): if not fname.endswith('.svg'):
continue continue
svg_filename = os.path.join(subdir, fname) svg_filename = os.path.join(subdir, fname)
content = '' content = \
try: load_string(svg_filename,
with open(svg_filename, 'r', encoding='utf-8') as fp_svg: 'EX: scan_themes_for_scripts unable to read ' +
content = fp_svg.read() svg_filename)
except OSError: if not content:
print('EX: scan_themes_for_scripts unable to read ' + continue
svg_filename)
svg_dangerous = dangerous_svg(content, False) svg_dangerous = dangerous_svg(content, False)
if svg_dangerous: if svg_dangerous:
print('svg file contains script: ' + svg_filename) print('svg file contains script: ' + svg_filename)

View File

@ -12,6 +12,7 @@ import sys
import time import time
from socket import error as SocketError from socket import error as SocketError
from timeFunctions import date_utcnow from timeFunctions import date_utcnow
from data import append_string
class thread_with_trace(threading.Thread): class thread_with_trace(threading.Thread):
@ -157,14 +158,12 @@ def remove_dormant_threads(base_dir: str, threads_list: [], debug: bool,
if debug: if debug:
send_log_filename = base_dir + '/send.csv' send_log_filename = base_dir + '/send.csv'
try: text = curr_time.strftime("%Y-%m-%dT%H:%M:%SZ") + \
with open(send_log_filename, 'a+', encoding='utf-8') as fp_log: ',' + str(no_of_active_threads) + \
fp_log.write(curr_time.strftime("%Y-%m-%dT%H:%M:%SZ") + ',' + str(len(threads_list)) + '\n'
',' + str(no_of_active_threads) + append_string(text, send_log_filename,
',' + str(len(threads_list)) + '\n') 'EX: remove_dormant_threads unable to write ' +
except OSError: send_log_filename)
print('EX: remove_dormant_threads unable to write ' +
send_log_filename)
def begin_thread(thread, calling_function: str) -> bool: def begin_thread(thread, calling_function: str) -> bool:

View File

@ -14,6 +14,8 @@ from dateutil.tz import tz
from utils import acct_dir from utils import acct_dir
from utils import data_dir from utils import data_dir
from utils import has_object_dict from utils import has_object_dict
from data import load_string
from data import save_string
def convert_published_to_local_timezone(published, timezone: str) -> str: def convert_published_to_local_timezone(published, timezone: str) -> str:
@ -126,12 +128,11 @@ def get_account_timezone(base_dir: str, nickname: str, domain: str) -> str:
acct_dir(base_dir, nickname, domain) + '/timezone.txt' acct_dir(base_dir, nickname, domain) + '/timezone.txt'
if not os.path.isfile(tz_filename): if not os.path.isfile(tz_filename):
return None return None
timezone = None timezone = load_string(tz_filename,
try: 'EX: get_account_timezone unable to read ' +
with open(tz_filename, 'r', encoding='utf-8') as fp_timezone: tz_filename)
timezone = fp_timezone.read().strip() if timezone:
except OSError: timezone = timezone.strip()
print('EX: get_account_timezone unable to read ' + tz_filename)
return timezone return timezone
@ -142,12 +143,9 @@ def set_account_timezone(base_dir: str, nickname: str, domain: str,
tz_filename = \ tz_filename = \
acct_dir(base_dir, nickname, domain) + '/timezone.txt' acct_dir(base_dir, nickname, domain) + '/timezone.txt'
timezone = timezone.strip() timezone = timezone.strip()
try: save_string(timezone, tz_filename,
with open(tz_filename, 'w+', encoding='utf-8') as fp_timezone: 'EX: set_account_timezone unable to write ' +
fp_timezone.write(timezone) tz_filename)
except OSError:
print('EX: set_account_timezone unable to write ' +
tz_filename)
def load_account_timezones(base_dir: str) -> {}: def load_account_timezones(base_dir: str) -> {}:
@ -165,16 +163,15 @@ def load_account_timezones(base_dir: str) -> {}:
tz_filename = acct_directory + '/timezone.txt' tz_filename = acct_directory + '/timezone.txt'
if not os.path.isfile(tz_filename): if not os.path.isfile(tz_filename):
continue continue
timezone = None timezone = \
try: load_string(tz_filename,
with open(tz_filename, 'r', encoding='utf-8') as fp_timezone: 'EX: load_account_timezones unable to read ' +
timezone = fp_timezone.read().strip() tz_filename)
except OSError: if not timezone:
print('EX: load_account_timezones unable to read ' + continue
tz_filename) timezone = timezone.strip()
if timezone: nickname = acct.split('@')[0]
nickname = acct.split('@')[0] account_timezone[nickname] = timezone
account_timezone[nickname] = timezone
break break
return account_timezone return account_timezone

327
utils.py
View File

@ -23,6 +23,9 @@ from followingCalendar import add_person_to_calendar
from unicodetext import standardize_text from unicodetext import standardize_text
from formats import get_image_extensions from formats import get_image_extensions
from data import load_list from data import load_list
from data import save_string
from data import load_string
from data import append_string
VALID_HASHTAG_CHARS = \ VALID_HASHTAG_CHARS = \
set('_0123456789' + set('_0123456789' +
@ -145,13 +148,9 @@ def text_in_file(text: str, filename: str,
if not case_sensitive: if not case_sensitive:
text = text.lower() text = text.lower()
content = None content = \
try: load_string(filename,
with open(filename, 'r', encoding='utf-8') as fp_file: 'EX: unable to find text in missing file ' + filename)
content = fp_file.read()
except OSError:
print('EX: unable to find text in missing file ' + filename)
if content: if content:
if not case_sensitive: if not case_sensitive:
content = content.lower() content = content.lower()
@ -469,23 +468,16 @@ def set_accounts_data_dir(base_dir: str, accounts_data_path: str) -> None:
accounts_data_path_filename = base_dir + '/data_path.txt' accounts_data_path_filename = base_dir + '/data_path.txt'
if os.path.isfile(accounts_data_path_filename): if os.path.isfile(accounts_data_path_filename):
# read the existing path # read the existing path
path = None path = load_string(accounts_data_path_filename,
try: 'EX: unable to read ' +
with open(accounts_data_path_filename, 'r', accounts_data_path_filename)
encoding='utf-8') as fp_accounts: if path:
path = fp_accounts.read() if path.strip() == accounts_data_path:
except OSError: # path is already set, so avoid writing it again
print('EX: unable to read ' + accounts_data_path_filename) return
if path.strip() == accounts_data_path:
# path is already set, so avoid writing it again
return
try: save_string(accounts_data_path, accounts_data_path_filename,
with open(accounts_data_path_filename, 'w+', 'EX: unable to write ' + accounts_data_path_filename)
encoding='utf-8') as fp_accounts:
fp_accounts.write(accounts_data_path)
except OSError:
print('EX: unable to write ' + accounts_data_path_filename)
def data_dir(base_dir: str) -> str: def data_dir(base_dir: str) -> str:
@ -504,13 +496,9 @@ def data_dir(base_dir: str) -> str:
# is an alternative path set? # is an alternative path set?
accounts_data_path_filename = base_dir + '/data_path.txt' accounts_data_path_filename = base_dir + '/data_path.txt'
if os.path.isfile(accounts_data_path_filename): if os.path.isfile(accounts_data_path_filename):
path = None path = load_string(accounts_data_path_filename,
try: 'EX: unable to read ' +
with open(accounts_data_path_filename, 'r', accounts_data_path_filename)
encoding='utf-8') as fp_accounts:
path = fp_accounts.read()
except OSError:
print('EX: unable to read ' + accounts_data_path_filename)
if path: if path:
__accounts_data_path__ = path.strip() __accounts_data_path__ = path.strip()
print('Accounts data path set to ' + __accounts_data_path__) print('Accounts data path set to ' + __accounts_data_path__)
@ -536,13 +524,9 @@ def refresh_newswire(base_dir: str) -> None:
refresh_newswire_filename = data_dir(base_dir) + '/.refresh_newswire' refresh_newswire_filename = data_dir(base_dir) + '/.refresh_newswire'
if os.path.isfile(refresh_newswire_filename): if os.path.isfile(refresh_newswire_filename):
return return
try: save_string('\n', refresh_newswire_filename,
with open(refresh_newswire_filename, 'w+', 'EX: refresh_newswire unable to write ' +
encoding='utf-8') as fp_refresh: refresh_newswire_filename)
fp_refresh.write('\n')
except OSError:
print('EX: refresh_newswire unable to write ' +
refresh_newswire_filename)
def get_sha_256(msg: str): def get_sha_256(msg: str):
@ -714,12 +698,10 @@ def get_memorials(base_dir: str) -> str:
if not os.path.isfile(memorial_file): if not os.path.isfile(memorial_file):
return '' return ''
memorial_str = '' memorial_str = load_string(memorial_file,
try: 'EX: unable to read ' + memorial_file)
with open(memorial_file, 'r', encoding='utf-8') as fp_memorial: if memorial_str is None:
memorial_str = fp_memorial.read() memorial_str = ''
except OSError:
print('EX: unable to read ' + memorial_file)
return memorial_str return memorial_str
@ -738,11 +720,8 @@ def set_memorials(base_dir: str, domain: str, memorial_str) -> None:
# save the accounts # save the accounts
memorial_file = data_dir(base_dir) + '/memorial' memorial_file = data_dir(base_dir) + '/memorial'
try: save_string(memorial_str, memorial_file,
with open(memorial_file, 'w+', encoding='utf-8') as fp_memorial: 'EX: unable to write ' + memorial_file)
fp_memorial.write(memorial_str)
except OSError:
print('EX: unable to write ' + memorial_file)
def _create_config(base_dir: str) -> None: def _create_config(base_dir: str) -> None:
@ -949,14 +928,9 @@ def load_json(filename: str) -> {}:
filename = filename.replace('/Actor@', '/inbox@') filename = filename.replace('/Actor@', '/inbox@')
json_object = None json_object = None
data = None data = load_string(filename,
'EX: load_json exception ' + str(filename) + ' [ex]')
# load from file if data is None:
try:
with open(filename, 'r', encoding='utf-8') as fp_json:
data = fp_json.read()
except OSError as exc:
print('EX: load_json exception ' + str(filename) + ' ' + str(exc))
return json_object return json_object
# check that something was loaded # check that something was loaded
@ -982,19 +956,18 @@ def load_json_onionify(filename: str, domain: str, onion_domain: str,
json_object = None json_object = None
tries = 0 tries = 0
while tries < 5: while tries < 5:
try: data = load_string(filename,
with open(filename, 'r', encoding='utf-8') as fp_json: 'EX: load_json_onionify exception ' + filename)
data = fp_json.read() if data is None:
if data:
data = data.replace(domain, onion_domain)
data = data.replace('https:', 'http:')
json_object = json.loads(data)
break
except BaseException:
print('EX: load_json_onionify exception ' + str(filename))
if delay_sec > 0: if delay_sec > 0:
time.sleep(delay_sec) time.sleep(delay_sec)
tries += 1 tries += 1
continue
if data:
data = data.replace(domain, onion_domain)
data = data.replace('https:', 'http:')
json_object = json.loads(data)
break
return json_object return json_object
@ -1564,34 +1537,24 @@ def _set_default_pet_name(base_dir: str, nickname: str, domain: str,
follow_nickname + '@' + follow_domain + '\n' follow_nickname + '@' + follow_domain + '\n'
if not os.path.isfile(petnames_filename): if not os.path.isfile(petnames_filename):
# if there is no existing petnames lookup file # if there is no existing petnames lookup file
try: save_string(petname_lookup_entry, petnames_filename,
with open(petnames_filename, 'w+', 'EX: _set_default_pet_name unable to write ' +
encoding='utf-8') as fp_petnames: petnames_filename)
fp_petnames.write(petname_lookup_entry)
except OSError:
print('EX: _set_default_pet_name unable to write ' +
petnames_filename)
return return
try: petnames_str = load_string(petnames_filename,
with open(petnames_filename, 'r', encoding='utf-8') as fp_petnames: 'EX: _set_default_pet_name unable to read 1 ' +
petnames_str = fp_petnames.read() petnames_filename)
if petnames_str: if petnames_str:
petnames_list = petnames_str.split('\n') petnames_list = petnames_str.split('\n')
for pet in petnames_list: for pet in petnames_list:
if pet.startswith(follow_nickname + ' '): if pet.startswith(follow_nickname + ' '):
# petname already exists # petname already exists
return return
except OSError:
print('EX: _set_default_pet_name unable to read 1 ' +
petnames_filename)
# petname doesn't already exist # petname doesn't already exist
try: append_string(petname_lookup_entry, petnames_filename,
with open(petnames_filename, 'a+', encoding='utf-8') as fp_petnames: 'EX: _set_default_pet_name unable to read 2 ' +
fp_petnames.write(petname_lookup_entry) petnames_filename)
except OSError:
print('EX: _set_default_pet_name unable to read 2 ' +
petnames_filename)
def follow_person(base_dir: str, nickname: str, domain: str, def follow_person(base_dir: str, nickname: str, domain: str,
@ -1645,13 +1608,9 @@ def follow_person(base_dir: str, nickname: str, domain: str,
for line in lines: for line in lines:
if handle_to_follow not in line: if handle_to_follow not in line:
new_lines += line new_lines += line
try: save_string(new_lines, unfollowed_filename,
with open(unfollowed_filename, 'w+', 'EX: follow_person unable to write ' +
encoding='utf-8') as fp_unfoll: unfollowed_filename)
fp_unfoll.write(new_lines)
except OSError:
print('EX: follow_person unable to write ' +
unfollowed_filename)
dir_str = data_dir(base_dir) dir_str = data_dir(base_dir)
if not os.path.isdir(dir_str): if not os.path.isdir(dir_str):
@ -1683,11 +1642,8 @@ def follow_person(base_dir: str, nickname: str, domain: str,
' creating new following file to follow ' + ' creating new following file to follow ' +
handle_to_follow + handle_to_follow +
', filename is ' + filename) ', filename is ' + filename)
try: save_string(handle_to_follow + '\n', filename,
with open(filename, 'w+', encoding='utf-8') as fp_foll: 'EX: follow_person unable to write ' + filename)
fp_foll.write(handle_to_follow + '\n')
except OSError:
print('EX: follow_person unable to write ' + filename)
if follow_file.endswith('following.txt'): if follow_file.endswith('following.txt'):
# Default to adding new follows to the calendar. # Default to adding new follows to the calendar.
@ -1786,15 +1742,13 @@ def get_reply_interval_hours(base_dir: str, nickname: str, domain: str,
reply_interval_filename = \ reply_interval_filename = \
acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours' acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours'
if os.path.isfile(reply_interval_filename): if os.path.isfile(reply_interval_filename):
try: hours_str = \
with open(reply_interval_filename, 'r', load_string(reply_interval_filename,
encoding='utf-8') as fp_interval: 'EX: get_reply_interval_hours unable to read ' +
hours_str = fp_interval.read() reply_interval_filename)
if hours_str.isdigit(): if hours_str:
return int(hours_str) if hours_str.isdigit():
except OSError: return int(hours_str)
print('EX: get_reply_interval_hours unable to read ' +
reply_interval_filename)
return default_reply_interval_hrs return default_reply_interval_hrs
@ -1806,15 +1760,13 @@ def set_reply_interval_hours(base_dir: str, nickname: str, domain: str,
""" """
reply_interval_filename = \ reply_interval_filename = \
acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours' acct_dir(base_dir, nickname, domain) + '/.reply_interval_hours'
try: text = str(reply_interval_hours)
with open(reply_interval_filename, 'w+', if save_string(text, reply_interval_filename,
encoding='utf-8') as fp_interval: 'EX: set_reply_interval_hours ' +
fp_interval.write(str(reply_interval_hours)) 'unable to save reply interval ' +
return True str(reply_interval_filename) + ' ' +
except OSError: str(reply_interval_hours)):
print('EX: set_reply_interval_hours unable to save reply interval ' + return True
str(reply_interval_filename) + ' ' +
str(reply_interval_hours))
return False return False
@ -1847,22 +1799,16 @@ def _remove_attachment(base_dir: str, http_prefix: str,
account_media_log_filename = account_dir + '/media_log.txt' account_media_log_filename = account_dir + '/media_log.txt'
if os.path.isfile(account_media_log_filename): if os.path.isfile(account_media_log_filename):
search_filename = media_filename.replace(base_dir, '') search_filename = media_filename.replace(base_dir, '')
media_log_text = '' media_log_text = \
try: load_string(account_media_log_filename,
with open(account_media_log_filename, 'r', 'EX: _remove unable to read media log for ' + nickname)
encoding='utf-8') as fp_log: if media_log_text is None:
media_log_text = fp_log.read() media_log_text = ''
except OSError:
print('EX: _remove unable to read media log for ' + nickname)
if search_filename + '\n' in media_log_text: if search_filename + '\n' in media_log_text:
media_log_text = media_log_text.replace(search_filename + '\n', '') media_log_text = media_log_text.replace(search_filename + '\n', '')
try: save_string(media_log_text, account_media_log_filename,
with open(account_media_log_filename, 'w+', 'EX: unable to write media log after removal for ' +
encoding='utf-8') as fp_log: nickname)
fp_log.write(media_log_text)
except OSError:
print('EX: unable to write media log after removal for ' +
nickname)
# remove the transcript # remove the transcript
if os.path.isfile(media_filename + '.vtt'): if os.path.isfile(media_filename + '.vtt'):
@ -2085,13 +2031,9 @@ def _remove_post_id_from_tag_index(tag_index_filename: str,
'unable to delete tag index ' + str(tag_index_filename)) 'unable to delete tag index ' + str(tag_index_filename))
else: else:
# write the new hashtag index without the given post in it # write the new hashtag index without the given post in it
try: save_string(newlines, tag_index_filename,
with open(tag_index_filename, 'w+', 'EX: _remove_post_id_from_tag_index unable to write ' +
encoding='utf-8') as fp_index: tag_index_filename)
fp_index.write(newlines)
except OSError:
print('EX: _remove_post_id_from_tag_index unable to write ' +
tag_index_filename)
def _delete_hashtags_on_post(base_dir: str, post_json_object: {}) -> None: def _delete_hashtags_on_post(base_dir: str, post_json_object: {}) -> None:
@ -2158,24 +2100,19 @@ def _delete_conversation_post(base_dir: str, nickname: str, domain: str,
conversation_filename = conversation_dir + '/' + conversation_id conversation_filename = conversation_dir + '/' + conversation_id
if not os.path.isfile(conversation_filename): if not os.path.isfile(conversation_filename):
return False return False
conversation_str = '' conversation_str = \
try: load_string(conversation_filename,
with open(conversation_filename, 'r', encoding='utf-8') as fp_conv: 'EX: _delete_conversation_post unable to read ' +
conversation_str = fp_conv.read() conversation_filename)
except OSError: if conversation_str is None:
print('EX: _delete_conversation_post unable to read ' + conversation_str = ''
conversation_filename)
if post_id + '\n' not in conversation_str: if post_id + '\n' not in conversation_str:
return False return False
conversation_str = conversation_str.replace(post_id + '\n', '') conversation_str = conversation_str.replace(post_id + '\n', '')
if conversation_str: if conversation_str:
try: save_string(conversation_str, conversation_filename,
with open(conversation_filename, 'w+', 'EX: _delete_conversation_post unable to write ' +
encoding='utf-8') as fp_conv: conversation_filename)
fp_conv.write(conversation_str)
except OSError:
print('EX: _delete_conversation_post unable to write ' +
conversation_filename)
else: else:
if os.path.isfile(conversation_filename + '.muted'): if os.path.isfile(conversation_filename + '.muted'):
try: try:
@ -2677,17 +2614,15 @@ def no_of_active_accounts_monthly(base_dir: str, months: int) -> bool:
dir_str + '/' + account + '/.lastUsed' dir_str + '/' + account + '/.lastUsed'
if not os.path.isfile(last_used_filename): if not os.path.isfile(last_used_filename):
continue continue
try: last_used = \
with open(last_used_filename, 'r', load_string(last_used_filename,
encoding='utf-8') as fp_last_used: 'EX: no_of_active_accounts_monthly ' +
last_used = fp_last_used.read() 'unable to read ' + last_used_filename)
if last_used.isdigit(): if last_used:
time_diff = curr_time - int(last_used) if last_used.isdigit():
if time_diff < month_seconds: time_diff = curr_time - int(last_used)
account_ctr += 1 if time_diff < month_seconds:
except OSError: account_ctr += 1
print('EX: no_of_active_accounts_monthly unable to read ' +
last_used_filename)
break break
return account_ctr return account_ctr
@ -2738,20 +2673,17 @@ def file_last_modified(filename: str) -> str:
return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ") return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ")
def get_css(base_dir: str, css_filename: str) -> str: def get_css(css_filename: str) -> str:
"""Retrieves the css for a given file, or from a cache """Retrieves the css for a given file, or from a cache
""" """
# does the css file exist? # does the css file exist?
if not os.path.isfile(css_filename): if not os.path.isfile(css_filename):
return None return None
try: css = load_string(css_filename,
with open(css_filename, 'r', encoding='utf-8') as fp_css: 'EX: get_css unable to read ' + css_filename)
css = fp_css.read() if css:
return css return css
except OSError:
print('EX: get_css unable to read ' + css_filename)
return None return None
@ -2831,13 +2763,9 @@ def reject_post_id(base_dir: str, nickname: str, domain: str,
if recent_posts_cache['html'].get(post_url): if recent_posts_cache['html'].get(post_url):
del recent_posts_cache['html'][post_url] del recent_posts_cache['html'][post_url]
try: save_string('\n', post_filename + '.reject',
with open(post_filename + '.reject', 'w+', 'EX: reject_post_id unable to write ' +
encoding='utf-8') as fp_reject: post_filename + '.reject')
fp_reject.write('\n')
except OSError:
print('EX: reject_post_id unable to write ' +
post_filename + '.reject')
# if the post is in the inbox index then remove it # if the post is in the inbox index then remove it
index_file = \ index_file = \
@ -3560,11 +3488,9 @@ def set_minimize_all_images(base_dir: str,
if nickname not in min_images_for_accounts: if nickname not in min_images_for_accounts:
min_images_for_accounts.append(nickname) min_images_for_accounts.append(nickname)
if not os.path.isfile(filename): if not os.path.isfile(filename):
try: save_string('\n', filename,
with open(filename, 'w+', encoding='utf-8') as fp_min: 'EX: set_minimize_all_images unable to write ' +
fp_min.write('\n') filename)
except OSError:
print('EX: unable to write ' + filename)
return return
if nickname in min_images_for_accounts: if nickname in min_images_for_accounts:
@ -3612,12 +3538,9 @@ def save_reverse_timeline(base_dir: str, reverse_sequence: []) -> None:
acct_dir(base_dir, nickname, domain) + '/.reverse_timeline' acct_dir(base_dir, nickname, domain) + '/.reverse_timeline'
if nickname in reverse_sequence: if nickname in reverse_sequence:
if not os.path.isfile(reverse_filename): if not os.path.isfile(reverse_filename):
try: save_string('\n', reverse_filename,
with open(reverse_filename, 'w+', 'EX: failed to save reverse ' +
encoding='utf-8') as fp_reverse: reverse_filename)
fp_reverse.write('\n')
except OSError:
print('EX: failed to save reverse ' + reverse_filename)
else: else:
if os.path.isfile(reverse_filename): if os.path.isfile(reverse_filename):
try: try:
@ -3858,11 +3781,10 @@ def lines_in_file(filename: str) -> int:
"""Returns the number of lines in a file """Returns the number of lines in a file
""" """
if os.path.isfile(filename): if os.path.isfile(filename):
try: text = load_string(filename,
with open(filename, 'r', encoding='utf-8') as fp_lines: 'EX: lines_in_file error reading ' + filename)
return len(fp_lines.read().split('\n')) if text:
except OSError: return len(text.split('\n'))
print('EX: lines_in_file error reading ' + filename)
return 0 return 0
@ -4177,12 +4099,9 @@ def set_premium_account(base_dir: str, nickname: str, domain: str,
return False return False
else: else:
if flag_state: if flag_state:
try: if not save_string('\n', premium_filename,
with open(premium_filename, 'w+', 'EX: unable to set premium flag ' +
encoding='utf-8') as fp_premium: premium_filename):
fp_premium.write('\n')
except OSError:
print('EX: unable to set premium flag ' + premium_filename)
return False return False
return True return True