mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			152 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "daemon_head.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.6.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@libreserver.org"
 | |
| __status__ = "Production"
 | |
| __module_group__ = "Daemon"
 | |
| 
 | |
| import os
 | |
| import datetime
 | |
| from hashlib import md5
 | |
| from flags import is_image_file
 | |
| from utils import media_file_mime_type
 | |
| from utils import data_dir
 | |
| from utils import string_contains
 | |
| from utils import decoded_host
 | |
| from utils import check_bad_path
 | |
| from httpcodes import http_400
 | |
| from httpcodes import http_404
 | |
| from httpheaders import set_headers_head
 | |
| from media import path_is_video
 | |
| from media import path_is_audio
 | |
| from daemon_utils import get_user_agent
 | |
| from daemon_utils import log_epicyon_instances
 | |
| 
 | |
| 
 | |
| def daemon_http_head(self) -> None:
 | |
|     """ HTTP HEAD
 | |
|     """
 | |
|     if self.server.starting_daemon:
 | |
|         return
 | |
|     if check_bad_path(self.path):
 | |
|         http_400(self)
 | |
|         return
 | |
| 
 | |
|     calling_domain = self.server.domain_full
 | |
| 
 | |
|     ua_str = get_user_agent(self)
 | |
| 
 | |
|     if ua_str:
 | |
|         if 'Epicyon/' in ua_str:
 | |
|             log_epicyon_instances(self.server.base_dir, ua_str,
 | |
|                                   self.server.known_epicyon_instances)
 | |
| 
 | |
|     if self.headers.get('Host'):
 | |
|         calling_domain = decoded_host(self.headers['Host'])
 | |
|         if self.server.onion_domain:
 | |
|             if calling_domain not in (self.server.domain,
 | |
|                                       self.server.domain_full,
 | |
|                                       self.server.onion_domain):
 | |
|                 print('HEAD domain blocked: ' + calling_domain)
 | |
|                 http_400(self)
 | |
|                 return
 | |
|         else:
 | |
|             if calling_domain not in (self.server.domain,
 | |
|                                       self.server.domain_full):
 | |
|                 print('HEAD domain blocked: ' + calling_domain)
 | |
|                 http_400(self)
 | |
|                 return
 | |
| 
 | |
|     check_path = self.path
 | |
|     etag = None
 | |
|     file_length = -1
 | |
|     last_modified_time_str = None
 | |
| 
 | |
|     if string_contains(self.path,
 | |
|                        ('/media/', '/accounts/avatars/',
 | |
|                         '/accounts/headers/')):
 | |
|         if is_image_file(self.path) or \
 | |
|            path_is_video(self.path) or \
 | |
|            path_is_audio(self.path):
 | |
|             if '/media/' in self.path:
 | |
|                 media_str = self.path.split('/media/')[1]
 | |
|                 media_filename = \
 | |
|                     self.server.base_dir + '/media/' + media_str
 | |
|             elif '/accounts/avatars/' in self.path:
 | |
|                 avatar_file = self.path.split('/accounts/avatars/')[1]
 | |
|                 if '/' not in avatar_file:
 | |
|                     http_404(self, 149)
 | |
|                     return
 | |
|                 nickname = avatar_file.split('/')[0]
 | |
|                 avatar_file = avatar_file.split('/')[1]
 | |
|                 avatar_file_ext = avatar_file.split('.')[-1]
 | |
|                 # remove any numbers, eg. avatar123.png becomes avatar.png
 | |
|                 if avatar_file.startswith('avatar'):
 | |
|                     avatar_file = 'avatar.' + avatar_file_ext
 | |
|                 media_filename = \
 | |
|                     data_dir(self.server.base_dir) + '/' + \
 | |
|                     nickname + '@' + self.server.domain + '/' + \
 | |
|                     avatar_file
 | |
|             else:
 | |
|                 banner_file = self.path.split('/accounts/headers/')[1]
 | |
|                 if '/' not in banner_file:
 | |
|                     http_404(self, 150)
 | |
|                     return
 | |
|                 nickname = banner_file.split('/')[0]
 | |
|                 banner_file = banner_file.split('/')[1]
 | |
|                 banner_file_ext = banner_file.split('.')[-1]
 | |
|                 # remove any numbers, eg. banner123.png becomes banner.png
 | |
|                 if banner_file.startswith('banner'):
 | |
|                     banner_file = 'banner.' + banner_file_ext
 | |
|                 media_filename = \
 | |
|                     data_dir(self.server.base_dir) + '/' + \
 | |
|                     nickname + '@' + self.server.domain + '/' + \
 | |
|                     banner_file
 | |
| 
 | |
|             if os.path.isfile(media_filename):
 | |
|                 check_path = media_filename
 | |
|                 file_length = os.path.getsize(media_filename)
 | |
|                 media_tm = os.path.getmtime(media_filename)
 | |
|                 last_modified_time = \
 | |
|                     datetime.datetime.fromtimestamp(media_tm,
 | |
|                                                     datetime.timezone.utc)
 | |
|                 time_format_str = '%a, %d %b %Y %H:%M:%S GMT'
 | |
|                 last_modified_time_str = \
 | |
|                     last_modified_time.strftime(time_format_str)
 | |
|                 media_tag_filename = media_filename + '.etag'
 | |
|                 if os.path.isfile(media_tag_filename):
 | |
|                     try:
 | |
|                         with open(media_tag_filename, 'r',
 | |
|                                   encoding='utf-8') as fp_efile:
 | |
|                             etag = fp_efile.read()
 | |
|                     except OSError:
 | |
|                         print('EX: do_HEAD unable to read ' +
 | |
|                               media_tag_filename)
 | |
|                 else:
 | |
|                     media_binary = None
 | |
|                     try:
 | |
|                         with open(media_filename, 'rb') as fp_av:
 | |
|                             media_binary = fp_av.read()
 | |
|                     except OSError:
 | |
|                         print('EX: unable to read media binary ' +
 | |
|                               media_filename)
 | |
|                     if media_binary:
 | |
|                         etag = md5(media_binary).hexdigest()  # nosec
 | |
|                         try:
 | |
|                             with open(media_tag_filename, 'w+',
 | |
|                                       encoding='utf-8') as fp_efile:
 | |
|                                 fp_efile.write(etag)
 | |
|                         except OSError:
 | |
|                             print('EX: do_HEAD unable to write ' +
 | |
|                                   media_tag_filename)
 | |
|             else:
 | |
|                 http_404(self, 151)
 | |
|                 return
 | |
| 
 | |
|     media_file_type = media_file_mime_type(check_path)
 | |
|     set_headers_head(self, media_file_type, file_length,
 | |
|                      etag, calling_domain, False,
 | |
|                      last_modified_time_str)
 |