Move http head function to its own module

main
Bob Mottram 2024-08-06 18:43:47 +01:00
parent d17b3b1b2a
commit 06673b61c7
2 changed files with 143 additions and 122 deletions

124
daemon.py
View File

@ -10,13 +10,11 @@ __module_group__ = "Core"
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer, HTTPServer
import sys
import time
import datetime
import os
from socket import error as SocketError
import errno
from functools import partial
# for saving images
from hashlib import md5
from metadata import metadata_custom_emoji
from person import update_memorial_flags
from person import clear_person_qrcodes
@ -30,8 +28,6 @@ from follow import create_initial_last_seen
from threads import begin_thread
from threads import thread_with_trace
from threads import remove_dormant_threads
from media import path_is_video
from media import path_is_audio
from cwlists import load_cw_lists
from blocking import run_federated_blocks_daemon
from blocking import load_federated_blocks_endpoints
@ -52,7 +48,6 @@ from categories import update_hashtag_categories
from languages import load_default_post_languages
from utils import set_accounts_data_dir
from utils import data_dir
from utils import string_contains
from utils import check_bad_path
from utils import acct_handle_dir
from utils import load_reverse_timeline
@ -61,13 +56,10 @@ from utils import load_account_timezones
from utils import load_translations_from_file
from utils import load_bold_reading
from utils import load_hide_follows
from utils import decoded_host
from utils import get_full_domain
from utils import media_file_mime_type
from utils import set_config_param
from utils import get_config_param
from utils import load_json
from utils import is_image_file
from content import load_auto_cw_cache
from content import load_dogwhistles
from theme import scan_themes_for_scripts
@ -91,6 +83,7 @@ from importFollowing import run_import_following_watchdog
from relationships import update_moved_actors
from daemon_get import daemon_http_get
from daemon_post import daemon_http_post
from daemon_head import daemon_http_head
from httpcodes import http_200
from httpcodes import http_201
from httpcodes import http_207
@ -100,7 +93,6 @@ from httpcodes import http_304
from httpcodes import http_400
from httpcodes import write2
from httpheaders import set_headers
from httpheaders import set_headers_head
from daemon_utils import has_accept
from daemon_utils import is_authorized
@ -286,119 +278,7 @@ class PubServer(BaseHTTPRequestHandler):
self._dav_handler('delete', self.server.debug)
def do_HEAD(self):
if self.server.starting_daemon:
return
if check_bad_path(self.path):
http_400(self)
return
calling_domain = self.server.domain_full
if self.headers.get('Host'):
calling_domain = decoded_host(self.headers['Host'])
if self.server.onion_domain:
if calling_domain not in (self.server.domain,
self.server.domain_full,
self.server.onion_domain):
print('HEAD domain blocked: ' + calling_domain)
http_400(self)
return
else:
if calling_domain not in (self.server.domain,
self.server.domain_full):
print('HEAD domain blocked: ' + calling_domain)
http_400(self)
return
check_path = self.path
etag = None
file_length = -1
last_modified_time_str = None
if string_contains(self.path,
('/media/', '/accounts/avatars/',
'/accounts/headers/')):
if is_image_file(self.path) or \
path_is_video(self.path) or \
path_is_audio(self.path):
if '/media/' in self.path:
media_str = self.path.split('/media/')[1]
media_filename = \
self.server.base_dir + '/media/' + media_str
elif '/accounts/avatars/' in self.path:
avatar_file = self.path.split('/accounts/avatars/')[1]
if '/' not in avatar_file:
http_404(self, 149)
return
nickname = avatar_file.split('/')[0]
avatar_file = avatar_file.split('/')[1]
avatar_file_ext = avatar_file.split('.')[-1]
# remove any numbers, eg. avatar123.png becomes avatar.png
if avatar_file.startswith('avatar'):
avatar_file = 'avatar.' + avatar_file_ext
media_filename = \
data_dir(self.server.base_dir) + '/' + \
nickname + '@' + self.server.domain + '/' + \
avatar_file
else:
banner_file = self.path.split('/accounts/headers/')[1]
if '/' not in banner_file:
http_404(self, 150)
return
nickname = banner_file.split('/')[0]
banner_file = banner_file.split('/')[1]
banner_file_ext = banner_file.split('.')[-1]
# remove any numbers, eg. banner123.png becomes banner.png
if banner_file.startswith('banner'):
banner_file = 'banner.' + banner_file_ext
media_filename = \
data_dir(self.server.base_dir) + '/' + \
nickname + '@' + self.server.domain + '/' + \
banner_file
if os.path.isfile(media_filename):
check_path = media_filename
file_length = os.path.getsize(media_filename)
media_tm = os.path.getmtime(media_filename)
last_modified_time = \
datetime.datetime.fromtimestamp(media_tm,
datetime.timezone.utc)
time_format_str = '%a, %d %b %Y %H:%M:%S GMT'
last_modified_time_str = \
last_modified_time.strftime(time_format_str)
media_tag_filename = media_filename + '.etag'
if os.path.isfile(media_tag_filename):
try:
with open(media_tag_filename, 'r',
encoding='utf-8') as fp_efile:
etag = fp_efile.read()
except OSError:
print('EX: do_HEAD unable to read ' +
media_tag_filename)
else:
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read media binary ' +
media_filename)
if media_binary:
etag = md5(media_binary).hexdigest() # nosec
try:
with open(media_tag_filename, 'w+',
encoding='utf-8') as fp_efile:
fp_efile.write(etag)
except OSError:
print('EX: do_HEAD unable to write ' +
media_tag_filename)
else:
http_404(self, 151)
return
media_file_type = media_file_mime_type(check_path)
set_headers_head(self, media_file_type, file_length,
etag, calling_domain, False,
last_modified_time_str)
daemon_http_head(self)
def do_POST(self):
daemon_http_post(self)

141
daemon_head.py 100644
View File

@ -0,0 +1,141 @@
__filename__ = "daemon_head.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"
import os
import datetime
from hashlib import md5
from utils import media_file_mime_type
from utils import data_dir
from utils import is_image_file
from utils import string_contains
from utils import decoded_host
from utils import check_bad_path
from httpcodes import http_400
from httpcodes import http_404
from httpheaders import set_headers_head
from media import path_is_video
from media import path_is_audio
def daemon_http_head(self) -> None:
""" HTTP HEAD
"""
if self.server.starting_daemon:
return
if check_bad_path(self.path):
http_400(self)
return
calling_domain = self.server.domain_full
if self.headers.get('Host'):
calling_domain = decoded_host(self.headers['Host'])
if self.server.onion_domain:
if calling_domain not in (self.server.domain,
self.server.domain_full,
self.server.onion_domain):
print('HEAD domain blocked: ' + calling_domain)
http_400(self)
return
else:
if calling_domain not in (self.server.domain,
self.server.domain_full):
print('HEAD domain blocked: ' + calling_domain)
http_400(self)
return
check_path = self.path
etag = None
file_length = -1
last_modified_time_str = None
if string_contains(self.path,
('/media/', '/accounts/avatars/',
'/accounts/headers/')):
if is_image_file(self.path) or \
path_is_video(self.path) or \
path_is_audio(self.path):
if '/media/' in self.path:
media_str = self.path.split('/media/')[1]
media_filename = \
self.server.base_dir + '/media/' + media_str
elif '/accounts/avatars/' in self.path:
avatar_file = self.path.split('/accounts/avatars/')[1]
if '/' not in avatar_file:
http_404(self, 149)
return
nickname = avatar_file.split('/')[0]
avatar_file = avatar_file.split('/')[1]
avatar_file_ext = avatar_file.split('.')[-1]
# remove any numbers, eg. avatar123.png becomes avatar.png
if avatar_file.startswith('avatar'):
avatar_file = 'avatar.' + avatar_file_ext
media_filename = \
data_dir(self.server.base_dir) + '/' + \
nickname + '@' + self.server.domain + '/' + \
avatar_file
else:
banner_file = self.path.split('/accounts/headers/')[1]
if '/' not in banner_file:
http_404(self, 150)
return
nickname = banner_file.split('/')[0]
banner_file = banner_file.split('/')[1]
banner_file_ext = banner_file.split('.')[-1]
# remove any numbers, eg. banner123.png becomes banner.png
if banner_file.startswith('banner'):
banner_file = 'banner.' + banner_file_ext
media_filename = \
data_dir(self.server.base_dir) + '/' + \
nickname + '@' + self.server.domain + '/' + \
banner_file
if os.path.isfile(media_filename):
check_path = media_filename
file_length = os.path.getsize(media_filename)
media_tm = os.path.getmtime(media_filename)
last_modified_time = \
datetime.datetime.fromtimestamp(media_tm,
datetime.timezone.utc)
time_format_str = '%a, %d %b %Y %H:%M:%S GMT'
last_modified_time_str = \
last_modified_time.strftime(time_format_str)
media_tag_filename = media_filename + '.etag'
if os.path.isfile(media_tag_filename):
try:
with open(media_tag_filename, 'r',
encoding='utf-8') as fp_efile:
etag = fp_efile.read()
except OSError:
print('EX: do_HEAD unable to read ' +
media_tag_filename)
else:
media_binary = None
try:
with open(media_filename, 'rb') as fp_av:
media_binary = fp_av.read()
except OSError:
print('EX: unable to read media binary ' +
media_filename)
if media_binary:
etag = md5(media_binary).hexdigest() # nosec
try:
with open(media_tag_filename, 'w+',
encoding='utf-8') as fp_efile:
fp_efile.write(etag)
except OSError:
print('EX: do_HEAD unable to write ' +
media_tag_filename)
else:
http_404(self, 151)
return
media_file_type = media_file_mime_type(check_path)
set_headers_head(self, media_file_type, file_length,
etag, calling_domain, False,
last_modified_time_str)