mirror of https://gitlab.com/bashrc2/epicyon
Replace file operations with functions
parent
2070e73766
commit
60385ab4f9
22
data.py
22
data.py
|
|
@ -29,6 +29,26 @@ def _store_base(text: str, filename: str, exception_text: str,
|
|||
return False
|
||||
|
||||
|
||||
def save_with_err(text: str, filename: str, exception_text: str) -> bool:
|
||||
"""Saves a string to file and returns any error code
|
||||
"""
|
||||
errno: int = 0
|
||||
try:
|
||||
with open(filename, 'w+', encoding='utf-8') as fp:
|
||||
fp.write(text)
|
||||
return True, errno
|
||||
except OSError as exc:
|
||||
if '[ex]' in exception_text:
|
||||
exception_text = exception_text.replace('[ex]', str(exc))
|
||||
errno = exc.errno
|
||||
print(exception_text)
|
||||
except UnicodeEncodeError as exc:
|
||||
if '[ex]' in exception_text:
|
||||
exception_text = exception_text.replace('[ex]', str(exc))
|
||||
print(exception_text)
|
||||
return False, errno
|
||||
|
||||
|
||||
def load_string(filename: str, exception_text: str) -> str:
|
||||
"""Loads a string from file
|
||||
"""
|
||||
|
|
@ -115,7 +135,7 @@ def save_binary(text: str, filename: str, exception_text: str) -> bool:
|
|||
"""Saves a binary to file
|
||||
"""
|
||||
try:
|
||||
with open(filename, 'wb') as fp:
|
||||
with open(filename, 'wb+') as fp:
|
||||
fp.write(text)
|
||||
return True
|
||||
except OSError as exc:
|
||||
|
|
|
|||
13
tests.py
13
tests.py
|
|
@ -247,6 +247,7 @@ from blog import html_blog_post_gemini_links
|
|||
from data import load_list
|
||||
from data import load_string
|
||||
from data import save_string
|
||||
from data import save_binary
|
||||
from data import erase_file
|
||||
from data import is_a_dir
|
||||
from data import makedir
|
||||
|
|
@ -4643,8 +4644,9 @@ def _test_danger_svg(base_dir: str) -> None:
|
|||
}
|
||||
}
|
||||
|
||||
with open(svg_image_filename, 'wb+') as fp_svg:
|
||||
fp_svg.write(svg_content.encode('utf-8'))
|
||||
binary_content = svg_content.encode('utf-8')
|
||||
save_binary(binary_content, svg_image_filename,
|
||||
'EX: save failed ' + svg_image_filename)
|
||||
assert os.path.isfile(svg_image_filename)
|
||||
assert svg_content != svg_clean
|
||||
|
||||
|
|
@ -7250,11 +7252,8 @@ def _test_spoofed_geolocation() -> None:
|
|||
|
||||
kml_str += '</Document>\n'
|
||||
kml_str += '</kml>'
|
||||
try:
|
||||
with open('unittest_decoy.kml', 'w+', encoding='utf-8') as fp_kml:
|
||||
fp_kml.write(kml_str)
|
||||
except OSError:
|
||||
print('EX: unable to write unittest_decoy.kml')
|
||||
save_string(kml_str, 'unittest_decoy.kml',
|
||||
'EX: unable to write unittest_decoy.kml')
|
||||
|
||||
|
||||
def _test_skills() -> None:
|
||||
|
|
|
|||
40
utils.py
40
utils.py
|
|
@ -24,6 +24,8 @@ from unicodetext import standardize_text
|
|||
from formats import get_image_extensions
|
||||
from data import load_list
|
||||
from data import save_string
|
||||
from data import save_with_err
|
||||
from data import prepend_string
|
||||
from data import save_flag_file
|
||||
from data import load_string
|
||||
from data import append_string
|
||||
|
|
@ -911,19 +913,19 @@ def save_json(json_object: {}, filename: str) -> bool:
|
|||
return False
|
||||
|
||||
tries: int = 1
|
||||
savestr = json.dumps(json_object)
|
||||
while tries <= 5:
|
||||
try:
|
||||
with open(filename, 'w+', encoding='utf-8') as fp_json:
|
||||
fp_json.write(json.dumps(json_object))
|
||||
return True
|
||||
except OSError as exc:
|
||||
print('EX: save_json ' + str(tries) + ' ' + str(filename) +
|
||||
' ' + str(exc))
|
||||
if exc.errno == 36:
|
||||
# filename too long
|
||||
break
|
||||
time.sleep(1)
|
||||
tries += 1
|
||||
success, errno = \
|
||||
save_with_err(savestr, filename,
|
||||
'EX: save_json ' + str(tries) + ' ' +
|
||||
str(filename) + ' [ex]')
|
||||
if success:
|
||||
return True
|
||||
if errno == 36:
|
||||
# filename too long
|
||||
break
|
||||
time.sleep(1)
|
||||
tries += 1
|
||||
return False
|
||||
|
||||
|
||||
|
|
@ -1636,16 +1638,10 @@ def follow_person(base_dir: str, nickname: str, domain: str,
|
|||
print('DEBUG: follow already exists')
|
||||
return True
|
||||
# prepend to follow file
|
||||
try:
|
||||
with open(filename, 'r+', encoding='utf-8') as fp_foll:
|
||||
content: str = fp_foll.read()
|
||||
if handle_to_follow + '\n' not in content:
|
||||
fp_foll.seek(0, 0)
|
||||
fp_foll.write(handle_to_follow + '\n' + content)
|
||||
print('DEBUG: follow added')
|
||||
except OSError as ex:
|
||||
print('WARN: Failed to write entry to follow file ' +
|
||||
filename + ' ' + str(ex))
|
||||
if prepend_string(handle_to_follow, filename,
|
||||
'EX: Failed to write entry to follow file ' +
|
||||
filename + ' [ex]'):
|
||||
print('DEBUG: follow added')
|
||||
else:
|
||||
# first follow
|
||||
if debug:
|
||||
|
|
|
|||
Loading…
Reference in New Issue