Move cache function into cache module

main
Bob Mottram 2024-08-31 10:31:08 +01:00
parent da66e2744d
commit 89f6e3d26d
3 changed files with 104 additions and 96 deletions

102
cache.py
View File

@ -8,9 +8,18 @@ __status__ = "Production"
__module_group__ = "Core"
import os
from session import download_image
from session import url_exists
from session import get_json
from session import get_json_valid
from utils import url_permitted
from utils import remove_html
from utils import get_url_from_post
from utils import data_dir
from utils import get_attributed_to
from utils import remove_id_ending
from utils import get_post_attachments
from utils import has_object_dict
from utils import contains_statuses
from utils import load_json
from utils import save_json
@ -18,6 +27,7 @@ from utils import get_file_case_insensitive
from utils import get_user_paths
from utils import date_utcnow
from utils import date_from_string_format
from content import remove_script
def remove_person_from_cache(base_dir: str, person_url: str,
@ -254,3 +264,95 @@ def get_person_pub_key(base_dir: str, session, person_url: str,
store_person_in_cache(base_dir, person_url, person_json,
person_cache, True)
return pub_key
def cache_svg_images(session, base_dir: str, http_prefix: str,
domain: str, domain_full: str,
onion_domain: str, i2p_domain: str,
post_json_object: {},
federation_list: [], debug: bool,
test_image_filename: str) -> bool:
"""Creates a local copy of a remote svg file
"""
if has_object_dict(post_json_object):
obj = post_json_object['object']
else:
obj = post_json_object
if not obj.get('id'):
return False
post_attachments = get_post_attachments(obj)
if not post_attachments:
return False
cached = False
post_id = remove_id_ending(obj['id']).replace('/', '--')
actor = 'unknown'
if post_attachments and obj.get('attributedTo'):
actor = get_attributed_to(obj['attributedTo'])
log_filename = data_dir(base_dir) + '/svg_scripts_log.txt'
for index in range(len(post_attachments)):
attach = post_attachments[index]
if not attach.get('mediaType'):
continue
if not attach.get('url'):
continue
url_str = get_url_from_post(attach['url'])
if url_str.endswith('.svg') or \
'svg' in attach['mediaType']:
url = remove_html(url_str)
if not url_permitted(url, federation_list):
continue
# if this is a local image then it has already been
# validated on upload
if '://' + domain in url:
continue
if onion_domain:
if '://' + onion_domain in url:
continue
if i2p_domain:
if '://' + i2p_domain in url:
continue
if '/' in url:
filename = url.split('/')[-1]
else:
filename = url
if not test_image_filename:
image_filename = \
base_dir + '/media/' + post_id + '_' + filename
if not download_image(session, url,
image_filename, debug):
continue
else:
image_filename = test_image_filename
image_data = None
try:
with open(image_filename, 'rb') as fp_svg:
image_data = fp_svg.read()
except OSError:
print('EX: unable to read svg file data')
if not image_data:
continue
image_data = image_data.decode()
cleaned_up = \
remove_script(image_data, log_filename, actor, url)
if cleaned_up != image_data:
# write the cleaned up svg image
svg_written = False
cleaned_up = cleaned_up.encode('utf-8')
try:
with open(image_filename, 'wb') as fp_im:
fp_im.write(cleaned_up)
svg_written = True
except OSError:
print('EX: unable to write cleaned up svg ' + url)
if svg_written:
# convert to list if needed
if isinstance(obj['attachment'], dict):
obj['attachment'] = [obj['attachment']]
# change the url to be the local version
obj['attachment'][index]['url'] = \
http_prefix + '://' + domain_full + '/media/' + \
post_id + '_' + filename
cached = True
else:
cached = True
return cached

View File

@ -20,7 +20,6 @@ from reaction import update_reaction_collection
from reaction import valid_emoji_content
from utils import harmless_markup
from utils import quote_toots_allowed
from utils import get_post_attachments
from utils import lines_in_file
from utils import resembles_url
from utils import get_url_from_post
@ -94,7 +93,6 @@ from categories import set_hashtag_category
from httpsig import get_digest_algorithm_from_headers
from httpsig import verify_post_headers
from session import create_session
from session import download_image
from follow import send_follow_request
from follow import follower_approval_active
from follow import is_following_actor
@ -107,6 +105,7 @@ from follow import no_of_follow_requests
from follow import get_no_of_followers
from follow import follow_approval_required
from pprint import pprint
from cache import cache_svg_images
from cache import get_actor_public_key_from_id
from cache import store_person_in_cache
from cache import get_person_pub_key
@ -166,7 +165,6 @@ from content import contains_invalid_local_links
from content import reject_twitter_summary
from content import load_dogwhistles
from content import valid_url_lengths
from content import remove_script
from threads import begin_thread
from maps import get_map_links_from_post_content
from maps import get_location_from_post
@ -175,98 +173,6 @@ from maps import geocoords_from_map_link
from reading import store_book_events
def cache_svg_images(session, base_dir: str, http_prefix: str,
domain: str, domain_full: str,
onion_domain: str, i2p_domain: str,
post_json_object: {},
federation_list: [], debug: bool,
test_image_filename: str) -> bool:
"""Creates a local copy of a remote svg file
"""
if has_object_dict(post_json_object):
obj = post_json_object['object']
else:
obj = post_json_object
if not obj.get('id'):
return False
post_attachments = get_post_attachments(obj)
if not post_attachments:
return False
cached = False
post_id = remove_id_ending(obj['id']).replace('/', '--')
actor = 'unknown'
if post_attachments and obj.get('attributedTo'):
actor = get_attributed_to(obj['attributedTo'])
log_filename = data_dir(base_dir) + '/svg_scripts_log.txt'
for index in range(len(post_attachments)):
attach = post_attachments[index]
if not attach.get('mediaType'):
continue
if not attach.get('url'):
continue
url_str = get_url_from_post(attach['url'])
if url_str.endswith('.svg') or \
'svg' in attach['mediaType']:
url = remove_html(url_str)
if not url_permitted(url, federation_list):
continue
# if this is a local image then it has already been
# validated on upload
if '://' + domain in url:
continue
if onion_domain:
if '://' + onion_domain in url:
continue
if i2p_domain:
if '://' + i2p_domain in url:
continue
if '/' in url:
filename = url.split('/')[-1]
else:
filename = url
if not test_image_filename:
image_filename = \
base_dir + '/media/' + post_id + '_' + filename
if not download_image(session, url,
image_filename, debug):
continue
else:
image_filename = test_image_filename
image_data = None
try:
with open(image_filename, 'rb') as fp_svg:
image_data = fp_svg.read()
except OSError:
print('EX: unable to read svg file data')
if not image_data:
continue
image_data = image_data.decode()
cleaned_up = \
remove_script(image_data, log_filename, actor, url)
if cleaned_up != image_data:
# write the cleaned up svg image
svg_written = False
cleaned_up = cleaned_up.encode('utf-8')
try:
with open(image_filename, 'wb') as fp_im:
fp_im.write(cleaned_up)
svg_written = True
except OSError:
print('EX: unable to write cleaned up svg ' + url)
if svg_written:
# convert to list if needed
if isinstance(obj['attachment'], dict):
obj['attachment'] = [obj['attachment']]
# change the url to be the local version
obj['attachment'][index]['url'] = \
http_prefix + '://' + domain_full + '/media/' + \
post_id + '_' + filename
cached = True
else:
cached = True
return cached
def _store_last_post_id(base_dir: str, nickname: str, domain: str,
post_json_object: {}) -> None:
"""Stores the id of the last post made by an actor

View File

@ -30,6 +30,7 @@ from httpsig import sign_post_headers
from httpsig import sign_post_headers_new
from httpsig import verify_post_headers
from httpsig import message_content_digest
from cache import cache_svg_images
from cache import store_person_in_cache
from cache import get_person_from_cache
from threads import thread_with_trace
@ -149,7 +150,6 @@ from delete import send_delete_via_server
from inbox import json_post_allows_comments
from inbox import valid_inbox
from inbox import valid_inbox_filenames
from inbox import cache_svg_images
from categories import guess_hashtag_category
from content import remove_link_trackers_from_content
from content import format_mixed_right_to_left