| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  | __filename__ = "httpheaders.py" | 
					
						
							|  |  |  | __author__ = "Bob Mottram" | 
					
						
							|  |  |  | __license__ = "AGPL3+" | 
					
						
							|  |  |  | __version__ = "1.5.0" | 
					
						
							|  |  |  | __maintainer__ = "Bob Mottram" | 
					
						
							|  |  |  | __email__ = "bob@libreserver.org" | 
					
						
							|  |  |  | __status__ = "Production" | 
					
						
							|  |  |  | __module_group__ = "Core" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import urllib.parse | 
					
						
							|  |  |  | from hashlib import md5 | 
					
						
							| 
									
										
										
										
											2024-04-10 13:32:03 +00:00
										 |  |  | from utils import string_contains | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  | from utils import get_instance_url | 
					
						
							| 
									
										
										
										
											2024-07-20 18:22:38 +00:00
										 |  |  | from utils import data_dir | 
					
						
							|  |  |  | from utils import save_json | 
					
						
							| 
									
										
										
										
											2024-11-16 12:05:31 +00:00
										 |  |  | from utils import remove_id_ending | 
					
						
							| 
									
										
										
										
											2024-11-16 13:00:29 +00:00
										 |  |  | from utils import has_object_dict | 
					
						
							|  |  |  | from utils import get_attributed_to | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def login_headers(self, file_format: str, length: int, | 
					
						
							|  |  |  |                   calling_domain: str) -> None: | 
					
						
							|  |  |  |     self.send_response(200) | 
					
						
							|  |  |  |     self.send_header('Content-type', file_format) | 
					
						
							|  |  |  |     self.send_header('Content-Length', str(length)) | 
					
						
							|  |  |  |     self.send_header('Host', calling_domain) | 
					
						
							|  |  |  |     self.send_header('WWW-Authenticate', | 
					
						
							|  |  |  |                      'title="Login to Epicyon", Basic realm="epicyon"') | 
					
						
							| 
									
										
										
										
											2024-06-04 09:42:09 +00:00
										 |  |  |     self.send_header('Sec-GPC', '1') | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |     self.end_headers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def logout_headers(self, file_format: str, length: int, | 
					
						
							|  |  |  |                    calling_domain: str) -> None: | 
					
						
							|  |  |  |     self.send_response(200) | 
					
						
							|  |  |  |     self.send_header('Content-type', file_format) | 
					
						
							|  |  |  |     self.send_header('Content-Length', str(length)) | 
					
						
							|  |  |  |     self.send_header('Set-Cookie', 'epicyon=; SameSite=Strict') | 
					
						
							|  |  |  |     self.send_header('Host', calling_domain) | 
					
						
							|  |  |  |     self.send_header('WWW-Authenticate', | 
					
						
							|  |  |  |                      'title="Login to Epicyon", Basic realm="epicyon"') | 
					
						
							|  |  |  |     self.end_headers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _quoted_redirect(redirect: str) -> str: | 
					
						
							|  |  |  |     """hashtag screen urls sometimes contain non-ascii characters which
 | 
					
						
							|  |  |  |     need to be url encoded | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     if '/tags/' not in redirect: | 
					
						
							|  |  |  |         return redirect | 
					
						
							|  |  |  |     last_str = redirect.split('/')[-1] | 
					
						
							|  |  |  |     return redirect.replace('/' + last_str, '/' + | 
					
						
							|  |  |  |                             urllib.parse.quote_plus(last_str)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def logout_redirect(self, redirect: str, calling_domain: str) -> None: | 
					
						
							|  |  |  |     if '://' not in redirect: | 
					
						
							|  |  |  |         redirect = get_instance_url(calling_domain, | 
					
						
							|  |  |  |                                     self.server.http_prefix, | 
					
						
							|  |  |  |                                     self.server.domain_full, | 
					
						
							|  |  |  |                                     self.server.onion_domain, | 
					
						
							|  |  |  |                                     self.server.i2p_domain) + \ | 
					
						
							|  |  |  |                                     redirect | 
					
						
							|  |  |  |         print('WARN: redirect was not an absolute url, changed to ' + | 
					
						
							| 
									
										
										
										
											2024-10-18 14:29:56 +00:00
										 |  |  |               redirect + ' ' + calling_domain) | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     quot_redirect = _quoted_redirect(redirect) | 
					
						
							|  |  |  |     self.send_response(303) | 
					
						
							|  |  |  |     self.send_header('Set-Cookie', 'epicyon=; SameSite=Strict') | 
					
						
							|  |  |  |     self.send_header('Location', quot_redirect) | 
					
						
							|  |  |  |     self.send_header('Host', calling_domain) | 
					
						
							|  |  |  |     self.send_header('X-AP-Instance-ID', self.server.instance_id) | 
					
						
							|  |  |  |     self.send_header('Content-Length', '0') | 
					
						
							| 
									
										
										
										
											2024-06-04 09:42:09 +00:00
										 |  |  |     self.send_header('Sec-GPC', '1') | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |     self.end_headers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def redirect_headers(self, redirect: str, cookie: str, | 
					
						
							|  |  |  |                      calling_domain: str, | 
					
						
							| 
									
										
										
										
											2024-04-16 13:47:21 +00:00
										 |  |  |                      code: int) -> None: | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |     if '://' not in redirect: | 
					
						
							| 
									
										
										
										
											2024-10-18 14:29:56 +00:00
										 |  |  |         redirect = \ | 
					
						
							|  |  |  |             get_instance_url(calling_domain, | 
					
						
							|  |  |  |                              self.server.http_prefix, | 
					
						
							|  |  |  |                              self.server.domain_full, | 
					
						
							|  |  |  |                              self.server.onion_domain, | 
					
						
							|  |  |  |                              self.server.i2p_domain) + redirect | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |         print('WARN: redirect was not an absolute url, changed to ' + | 
					
						
							|  |  |  |               redirect) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.send_response(code) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if code != 303: | 
					
						
							|  |  |  |         print('Redirect headers: ' + str(code)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if cookie: | 
					
						
							|  |  |  |         cookie_str = cookie.replace('SET:', '').strip() | 
					
						
							|  |  |  |         if 'HttpOnly;' not in cookie_str: | 
					
						
							|  |  |  |             if self.server.http_prefix == 'https': | 
					
						
							|  |  |  |                 cookie_str += '; Secure' | 
					
						
							|  |  |  |             cookie_str += '; HttpOnly; SameSite=Strict' | 
					
						
							|  |  |  |         if not cookie.startswith('SET:'): | 
					
						
							|  |  |  |             self.send_header('Cookie', cookie_str) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.send_header('Set-Cookie', cookie_str) | 
					
						
							|  |  |  |     quot_redirect = _quoted_redirect(redirect) | 
					
						
							|  |  |  |     self.send_header('Location', quot_redirect) | 
					
						
							|  |  |  |     self.send_header('Host', calling_domain) | 
					
						
							|  |  |  |     self.send_header('X-AP-Instance-ID', self.server.instance_id) | 
					
						
							|  |  |  |     self.send_header('Content-Length', '0') | 
					
						
							| 
									
										
										
										
											2024-06-04 09:42:09 +00:00
										 |  |  |     self.send_header('Sec-GPC', '1') | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |     self.end_headers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def clear_login_details(self, nickname: str, calling_domain: str) -> None: | 
					
						
							|  |  |  |     """Clears login details for the given account
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     # remove any token | 
					
						
							|  |  |  |     if self.server.tokens.get(nickname): | 
					
						
							|  |  |  |         del self.server.tokens_lookup[self.server.tokens[nickname]] | 
					
						
							|  |  |  |         del self.server.tokens[nickname] | 
					
						
							|  |  |  |     redirect_headers(self, self.server.http_prefix + '://' + | 
					
						
							|  |  |  |                      self.server.domain_full + '/login', | 
					
						
							|  |  |  |                      'epicyon=; SameSite=Strict', | 
					
						
							| 
									
										
										
										
											2024-04-16 13:47:21 +00:00
										 |  |  |                      calling_domain, 303) | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _set_headers_base(self, file_format: str, length: int, cookie: str, | 
					
						
							|  |  |  |                       calling_domain: str, permissive: bool) -> None: | 
					
						
							|  |  |  |     self.send_response(200) | 
					
						
							|  |  |  |     self.send_header('Content-type', file_format) | 
					
						
							| 
									
										
										
										
											2024-04-10 13:32:03 +00:00
										 |  |  |     if string_contains(file_format, ('image/', 'audio/', 'video/')): | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |         cache_control = 'public, max-age=84600, immutable' | 
					
						
							|  |  |  |         self.send_header('Cache-Control', cache_control) | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         self.send_header('Cache-Control', 'public') | 
					
						
							|  |  |  |     self.send_header('Origin', self.server.domain_full) | 
					
						
							|  |  |  |     if length > -1: | 
					
						
							|  |  |  |         self.send_header('Content-Length', str(length)) | 
					
						
							|  |  |  |     if calling_domain: | 
					
						
							|  |  |  |         self.send_header('Host', calling_domain) | 
					
						
							|  |  |  |     if permissive: | 
					
						
							|  |  |  |         self.send_header('Access-Control-Allow-Origin', '*') | 
					
						
							|  |  |  |         return | 
					
						
							| 
									
										
										
										
											2024-06-15 18:31:15 +00:00
										 |  |  |     self.send_header('X-Robots-Tag', 'noai, noimageai, GPC') | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |     self.send_header('X-AP-Instance-ID', self.server.instance_id) | 
					
						
							|  |  |  |     self.send_header('X-Clacks-Overhead', self.server.clacks) | 
					
						
							| 
									
										
										
										
											2024-06-04 09:42:09 +00:00
										 |  |  |     self.send_header('Sec-GPC', '1') | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |     self.send_header('User-Agent', | 
					
						
							|  |  |  |                      'Epicyon/' + __version__ + | 
					
						
							|  |  |  |                      '; +' + self.server.http_prefix + '://' + | 
					
						
							|  |  |  |                      self.server.domain_full + '/') | 
					
						
							|  |  |  |     if cookie: | 
					
						
							|  |  |  |         cookie_str = cookie | 
					
						
							|  |  |  |         if 'HttpOnly;' not in cookie_str: | 
					
						
							|  |  |  |             if self.server.http_prefix == 'https': | 
					
						
							|  |  |  |                 cookie_str += '; Secure' | 
					
						
							|  |  |  |             cookie_str += '; HttpOnly; SameSite=Strict' | 
					
						
							|  |  |  |         self.send_header('Cookie', cookie_str) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def set_headers(self, file_format: str, length: int, cookie: str, | 
					
						
							|  |  |  |                 calling_domain: str, permissive: bool) -> None: | 
					
						
							|  |  |  |     _set_headers_base(self, file_format, length, cookie, calling_domain, | 
					
						
							|  |  |  |                       permissive) | 
					
						
							|  |  |  |     self.end_headers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-16 12:05:31 +00:00
										 |  |  | def set_html_post_headers(self, length: int, cookie: str, | 
					
						
							|  |  |  |                           calling_domain: str, permissive: bool, | 
					
						
							|  |  |  |                           post_json_object: {}) -> None: | 
					
						
							|  |  |  |     """A HTML format post representing an individual ActivityPub post
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     _set_headers_base(self, 'text/html', length, cookie, calling_domain, | 
					
						
							|  |  |  |                       permissive) | 
					
						
							|  |  |  |     if post_json_object.get('id'): | 
					
						
							|  |  |  |         # Discover activitypub version of the html post | 
					
						
							|  |  |  |         # https://swicg.github.io/activitypub-html-discovery/#http-link-header | 
					
						
							| 
									
										
										
										
											2024-11-16 13:00:29 +00:00
										 |  |  |         # link to the activitypub post | 
					
						
							| 
									
										
										
										
											2024-11-16 12:05:31 +00:00
										 |  |  |         post_id = remove_id_ending(post_json_object['id']) | 
					
						
							| 
									
										
										
										
											2024-11-16 13:00:29 +00:00
										 |  |  |         ap_post_link = \ | 
					
						
							| 
									
										
										
										
											2024-11-16 12:05:31 +00:00
										 |  |  |             '<' + post_id + '>; rel="alternate"; ' + \ | 
					
						
							|  |  |  |             'type="application/activity+json"' | 
					
						
							| 
									
										
										
										
											2024-11-16 13:00:29 +00:00
										 |  |  |         self.send_header('Link', ap_post_link) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # https://swicg.github.io/ | 
					
						
							|  |  |  |         # activitypub-html-discovery/#http-link-header-1 | 
					
						
							|  |  |  |         # link to the author's actor | 
					
						
							|  |  |  |         obj = post_json_object | 
					
						
							|  |  |  |         if has_object_dict(post_json_object): | 
					
						
							|  |  |  |             obj = post_json_object['object'] | 
					
						
							|  |  |  |         if obj.get('attributedTo'): | 
					
						
							|  |  |  |             actor = get_attributed_to(obj['attributedTo']) | 
					
						
							|  |  |  |             if actor: | 
					
						
							|  |  |  |                 ap_author_link = \ | 
					
						
							|  |  |  |                     '<' + actor + '>; rel="author"; ' + \ | 
					
						
							|  |  |  |                     'type="application/activity+json"' | 
					
						
							|  |  |  |                 self.send_header('Link', ap_author_link) | 
					
						
							| 
									
										
										
										
											2024-11-16 12:05:31 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     self.end_headers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  | def set_headers_head(self, file_format: str, length: int, etag: str, | 
					
						
							|  |  |  |                      calling_domain: str, permissive: bool, | 
					
						
							|  |  |  |                      last_modified_time_str: str) -> None: | 
					
						
							|  |  |  |     _set_headers_base(self, file_format, length, None, calling_domain, | 
					
						
							|  |  |  |                       permissive) | 
					
						
							|  |  |  |     if etag: | 
					
						
							|  |  |  |         self.send_header('ETag', '"' + etag + '"') | 
					
						
							|  |  |  |     if last_modified_time_str: | 
					
						
							|  |  |  |         self.send_header('last-modified', | 
					
						
							|  |  |  |                          last_modified_time_str) | 
					
						
							|  |  |  |     self.end_headers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def set_headers_etag(self, media_filename: str, file_format: str, | 
					
						
							|  |  |  |                      data, cookie: str, calling_domain: str, | 
					
						
							|  |  |  |                      permissive: bool, last_modified: str) -> None: | 
					
						
							|  |  |  |     datalen = len(data) | 
					
						
							|  |  |  |     _set_headers_base(self, file_format, datalen, cookie, calling_domain, | 
					
						
							|  |  |  |                       permissive) | 
					
						
							|  |  |  |     etag = None | 
					
						
							|  |  |  |     if os.path.isfile(media_filename + '.etag'): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             with open(media_filename + '.etag', 'r', | 
					
						
							| 
									
										
										
										
											2024-07-16 12:20:58 +00:00
										 |  |  |                       encoding='utf-8') as fp_media: | 
					
						
							|  |  |  |                 etag = fp_media.read() | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |         except OSError: | 
					
						
							|  |  |  |             print('EX: _set_headers_etag ' + | 
					
						
							|  |  |  |                   'unable to read ' + media_filename + '.etag') | 
					
						
							|  |  |  |     if not etag: | 
					
						
							|  |  |  |         etag = md5(data).hexdigest()  # nosec | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             with open(media_filename + '.etag', 'w+', | 
					
						
							| 
									
										
										
										
											2024-07-16 12:20:58 +00:00
										 |  |  |                       encoding='utf-8') as fp_media: | 
					
						
							|  |  |  |                 fp_media.write(etag) | 
					
						
							| 
									
										
										
										
											2024-03-01 17:10:04 +00:00
										 |  |  |         except OSError: | 
					
						
							|  |  |  |             print('EX: _set_headers_etag ' + | 
					
						
							|  |  |  |                   'unable to write ' + media_filename + '.etag') | 
					
						
							|  |  |  |     # if etag: | 
					
						
							|  |  |  |     #     self.send_header('ETag', '"' + etag + '"') | 
					
						
							|  |  |  |     if last_modified: | 
					
						
							|  |  |  |         self.send_header('last-modified', last_modified) | 
					
						
							|  |  |  |     self.send_header('accept-ranges', 'bytes') | 
					
						
							|  |  |  |     self.end_headers() | 
					
						
							| 
									
										
										
										
											2024-07-20 18:22:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def update_headers_catalog(base_dir: str, headers_catalog: {}, | 
					
						
							|  |  |  |                            headers: {}) -> None: | 
					
						
							|  |  |  |     """Creates a catalog of headers
 | 
					
						
							|  |  |  |     This allows us to spot anything unexpected for later investigation | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     headers_catalog_fieldname = data_dir(base_dir) + '/headers_catalog.json' | 
					
						
							|  |  |  |     changed = False | 
					
						
							|  |  |  |     for fieldname, fieldvalue in headers.items(): | 
					
						
							|  |  |  |         if fieldname in headers_catalog: | 
					
						
							|  |  |  |             continue | 
					
						
							| 
									
										
										
										
											2024-07-26 20:16:56 +00:00
										 |  |  |         if fieldname in ('cookie', 'Cookie'): | 
					
						
							| 
									
										
										
										
											2024-07-26 19:49:55 +00:00
										 |  |  |             fieldvalue = "" | 
					
						
							| 
									
										
										
										
											2024-07-20 18:22:38 +00:00
										 |  |  |         headers_catalog[fieldname] = fieldvalue | 
					
						
							|  |  |  |         changed = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if changed: | 
					
						
							|  |  |  |         save_json(headers_catalog, headers_catalog_fieldname) | 
					
						
							| 
									
										
										
										
											2024-07-26 20:16:56 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def contains_suspicious_headers(headers: {}) -> bool: | 
					
						
							|  |  |  |     """returns true if the given headers contain something suspicious
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     if 'Shellshock' in headers or \ | 
					
						
							|  |  |  |        'shellshock' in headers or \ | 
					
						
							|  |  |  |        'think-lang' in headers or \ | 
					
						
							|  |  |  |        'Think-lang' in headers: | 
					
						
							|  |  |  |         return True | 
					
						
							| 
									
										
										
										
											2024-07-28 14:45:40 +00:00
										 |  |  |     headers_str = str(headers) | 
					
						
							| 
									
										
										
										
											2024-08-02 12:00:07 +00:00
										 |  |  |     sus_strings = ('../../', 'index.php', 'passwd') | 
					
						
							|  |  |  |     if string_contains(headers_str, sus_strings): | 
					
						
							| 
									
										
										
										
											2024-07-26 20:16:56 +00:00
										 |  |  |         return True | 
					
						
							|  |  |  |     return False |