mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			226 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			226 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
__filename__ = "daemon_post_links.py"
 | 
						|
__author__ = "Bob Mottram"
 | 
						|
__license__ = "AGPL3+"
 | 
						|
__version__ = "1.6.0"
 | 
						|
__maintainer__ = "Bob Mottram"
 | 
						|
__email__ = "bob@libreserver.org"
 | 
						|
__status__ = "Production"
 | 
						|
__module_group__ = "Daemon POST"
 | 
						|
 | 
						|
import os
 | 
						|
import errno
 | 
						|
from socket import error as SocketError
 | 
						|
from flags import is_editor
 | 
						|
from utils import data_dir
 | 
						|
from utils import dangerous_markup
 | 
						|
from utils import get_instance_url
 | 
						|
from utils import get_nickname_from_actor
 | 
						|
from utils import get_config_param
 | 
						|
from httpheaders import redirect_headers
 | 
						|
from content import extract_text_fields_in_post
 | 
						|
 | 
						|
 | 
						|
def _links_update_edited(fields: {}, links_filename: str) -> None:
 | 
						|
    """Moderators can update the links column
 | 
						|
    """
 | 
						|
    if fields.get('editedLinks'):
 | 
						|
        links_str = fields['editedLinks']
 | 
						|
        if fields.get('newColLink'):
 | 
						|
            if links_str:
 | 
						|
                if not links_str.endswith('\n'):
 | 
						|
                    links_str += '\n'
 | 
						|
            links_str += fields['newColLink'] + '\n'
 | 
						|
        try:
 | 
						|
            with open(links_filename, 'w+',
 | 
						|
                      encoding='utf-8') as fp_links:
 | 
						|
                fp_links.write(links_str)
 | 
						|
        except OSError:
 | 
						|
            print('EX: _links_update unable to write ' +
 | 
						|
                  links_filename)
 | 
						|
    else:
 | 
						|
        if fields.get('newColLink'):
 | 
						|
            # the text area is empty but there is a new link added
 | 
						|
            links_str = fields['newColLink'] + '\n'
 | 
						|
            try:
 | 
						|
                with open(links_filename, 'w+',
 | 
						|
                          encoding='utf-8') as fp_links:
 | 
						|
                    fp_links.write(links_str)
 | 
						|
            except OSError:
 | 
						|
                print('EX: _links_update unable to write ' +
 | 
						|
                      links_filename)
 | 
						|
        else:
 | 
						|
            if os.path.isfile(links_filename):
 | 
						|
                try:
 | 
						|
                    os.remove(links_filename)
 | 
						|
                except OSError:
 | 
						|
                    print('EX: _links_update unable to delete ' +
 | 
						|
                          links_filename)
 | 
						|
 | 
						|
 | 
						|
def _links_update_about(fields: {}, allow_local_network_access: bool,
 | 
						|
                        about_filename: str) -> None:
 | 
						|
    """Administrator can update the instance About screen
 | 
						|
    """
 | 
						|
    if fields.get('editedAbout'):
 | 
						|
        about_str = fields['editedAbout']
 | 
						|
        if not dangerous_markup(about_str,
 | 
						|
                                allow_local_network_access, []):
 | 
						|
            try:
 | 
						|
                with open(about_filename, 'w+',
 | 
						|
                          encoding='utf-8') as fp_about:
 | 
						|
                    fp_about.write(about_str)
 | 
						|
            except OSError:
 | 
						|
                print('EX: unable to write about ' +
 | 
						|
                      about_filename)
 | 
						|
    else:
 | 
						|
        if os.path.isfile(about_filename):
 | 
						|
            try:
 | 
						|
                os.remove(about_filename)
 | 
						|
            except OSError:
 | 
						|
                print('EX: _links_update unable to delete ' +
 | 
						|
                      about_filename)
 | 
						|
 | 
						|
 | 
						|
def _links_update_tos(fields: {}, allow_local_network_access: bool,
 | 
						|
                      tos_filename: str) -> None:
 | 
						|
    """Administrator can update the terms of service
 | 
						|
    """
 | 
						|
    if fields.get('editedTOS'):
 | 
						|
        tos_str = fields['editedTOS']
 | 
						|
        if not dangerous_markup(tos_str,
 | 
						|
                                allow_local_network_access, []):
 | 
						|
            try:
 | 
						|
                with open(tos_filename, 'w+', encoding='utf-8') as fp_tos:
 | 
						|
                    fp_tos.write(tos_str)
 | 
						|
            except OSError:
 | 
						|
                print('EX: unable to write TOS ' + tos_filename)
 | 
						|
    else:
 | 
						|
        if os.path.isfile(tos_filename):
 | 
						|
            try:
 | 
						|
                os.remove(tos_filename)
 | 
						|
            except OSError:
 | 
						|
                print('EX: _links_update unable to delete ' +
 | 
						|
                      tos_filename)
 | 
						|
 | 
						|
 | 
						|
def _links_update_sepcification(fields: {},
 | 
						|
                                specification_filename: str) -> None:
 | 
						|
    """Administrator can update the ActivityPub specification
 | 
						|
    """
 | 
						|
    if fields.get('editedSpecification'):
 | 
						|
        specification_str = fields['editedSpecification']
 | 
						|
        try:
 | 
						|
            with open(specification_filename, 'w+',
 | 
						|
                      encoding='utf-8') as fp_specification:
 | 
						|
                fp_specification.write(specification_str)
 | 
						|
        except OSError:
 | 
						|
            print('EX: unable to write specification ' +
 | 
						|
                  specification_filename)
 | 
						|
    else:
 | 
						|
        if os.path.isfile(specification_filename):
 | 
						|
            try:
 | 
						|
                os.remove(specification_filename)
 | 
						|
            except OSError:
 | 
						|
                print('EX: _links_update_specification unable to delete ' +
 | 
						|
                      specification_filename)
 | 
						|
 | 
						|
 | 
						|
def links_update(self, calling_domain: str, cookie: str,
 | 
						|
                 path: str, base_dir: str, debug: bool,
 | 
						|
                 default_timeline: str,
 | 
						|
                 allow_local_network_access: bool,
 | 
						|
                 http_prefix: str, domain_full: str,
 | 
						|
                 onion_domain: str, i2p_domain: str,
 | 
						|
                 max_post_length: int) -> None:
 | 
						|
    """Updates the left links column of the timeline
 | 
						|
    """
 | 
						|
    users_path = path.replace('/linksdata', '')
 | 
						|
    users_path = users_path.replace('/editlinks', '')
 | 
						|
    actor_str = \
 | 
						|
        get_instance_url(calling_domain,
 | 
						|
                         http_prefix,
 | 
						|
                         domain_full,
 | 
						|
                         onion_domain,
 | 
						|
                         i2p_domain) + \
 | 
						|
        users_path
 | 
						|
 | 
						|
    boundary = None
 | 
						|
    if ' boundary=' in self.headers['Content-type']:
 | 
						|
        boundary = self.headers['Content-type'].split('boundary=')[1]
 | 
						|
        if ';' in boundary:
 | 
						|
            boundary = boundary.split(';')[0]
 | 
						|
 | 
						|
    # get the nickname
 | 
						|
    nickname = get_nickname_from_actor(actor_str)
 | 
						|
    editor = None
 | 
						|
    if nickname:
 | 
						|
        editor = is_editor(base_dir, nickname)
 | 
						|
    if not nickname or not editor:
 | 
						|
        if not nickname:
 | 
						|
            print('WARN: nickname not found in ' + actor_str)
 | 
						|
        else:
 | 
						|
            print('WARN: nickname is not a moderator' + actor_str)
 | 
						|
        redirect_headers(self, actor_str, cookie, calling_domain, 303)
 | 
						|
        self.server.postreq_busy = False
 | 
						|
        return
 | 
						|
 | 
						|
    if self.headers.get('Content-length'):
 | 
						|
        length = int(self.headers['Content-length'])
 | 
						|
 | 
						|
        # check that the POST isn't too large
 | 
						|
        if length > max_post_length:
 | 
						|
            print('Maximum links data length exceeded ' + str(length))
 | 
						|
            redirect_headers(self, actor_str, cookie, calling_domain, 303)
 | 
						|
            self.server.postreq_busy = False
 | 
						|
            return
 | 
						|
 | 
						|
    try:
 | 
						|
        # read the bytes of the http form POST
 | 
						|
        post_bytes = self.rfile.read(length)
 | 
						|
    except SocketError as ex:
 | 
						|
        if ex.errno == errno.ECONNRESET:
 | 
						|
            print('EX: connection was reset while ' +
 | 
						|
                  'reading bytes from http form POST')
 | 
						|
        else:
 | 
						|
            print('EX: error while reading bytes ' +
 | 
						|
                  'from http form POST')
 | 
						|
        self.send_response(400)
 | 
						|
        self.end_headers()
 | 
						|
        self.server.postreq_busy = False
 | 
						|
        return
 | 
						|
    except ValueError as ex:
 | 
						|
        print('EX: failed to read bytes for POST, ' + str(ex))
 | 
						|
        self.send_response(400)
 | 
						|
        self.end_headers()
 | 
						|
        self.server.postreq_busy = False
 | 
						|
        return
 | 
						|
 | 
						|
    links_filename = data_dir(base_dir) + '/links.txt'
 | 
						|
    about_filename = data_dir(base_dir) + '/about.md'
 | 
						|
    tos_filename = data_dir(base_dir) + '/tos.md'
 | 
						|
    specification_filename = data_dir(base_dir) + '/activitypub.md'
 | 
						|
 | 
						|
    if not boundary:
 | 
						|
        if b'--LYNX' in post_bytes:
 | 
						|
            boundary = '--LYNX'
 | 
						|
 | 
						|
    if boundary:
 | 
						|
        # extract all of the text fields into a dict
 | 
						|
        fields = \
 | 
						|
            extract_text_fields_in_post(post_bytes, boundary, debug, None)
 | 
						|
 | 
						|
        _links_update_edited(fields, links_filename)
 | 
						|
 | 
						|
        # administrator can do a few extra things other than editing links
 | 
						|
        admin_nickname = get_config_param(base_dir, 'admin')
 | 
						|
        if nickname == admin_nickname:
 | 
						|
            _links_update_about(fields, allow_local_network_access,
 | 
						|
                                about_filename)
 | 
						|
            _links_update_tos(fields, allow_local_network_access, tos_filename)
 | 
						|
            _links_update_sepcification(fields, specification_filename)
 | 
						|
 | 
						|
    # redirect back to the default timeline
 | 
						|
    redirect_headers(self, actor_str + '/' + default_timeline,
 | 
						|
                     cookie, calling_domain, 303)
 | 
						|
    self.server.postreq_busy = False
 |