Check that strings passed to system commands are safe

merge-requests/30/head
Bob Mottram 2022-07-22 11:54:57 +01:00
parent 762a5bdb3e
commit aee4048056
5 changed files with 70 additions and 28 deletions

View File

@ -16,6 +16,7 @@ import webbrowser
import urllib.parse import urllib.parse
from pathlib import Path from pathlib import Path
from random import randint from random import randint
from utils import safe_system_string
from utils import text_in_file from utils import text_in_file
from utils import disallow_announce from utils import disallow_announce
from utils import disallow_reply from utils import disallow_reply
@ -324,8 +325,10 @@ def _play_sound(sound_filename: str,
return return
if player == 'ffplay': if player == 'ffplay':
os.system('ffplay ' + sound_filename + cmd = \
' -autoexit -hide_banner -nodisp 2> /dev/null') 'ffplay ' + safe_system_string(sound_filename) + \
' -autoexit -hide_banner -nodisp 2> /dev/null'
os.system(cmd)
def _speaker_espeak(espeak, pitch: int, rate: int, srange: int, def _speaker_espeak(espeak, pitch: int, rate: int, srange: int,
@ -365,6 +368,7 @@ def _speaker_mimic3(pitch: int, rate: int, srange: int,
' --stdout' + \ ' --stdout' + \
' "' + text + '" > ' + \ ' "' + text + '" > ' + \
audio_filename + ' 2> /dev/null' audio_filename + ' 2> /dev/null'
cmd = safe_system_string(cmd)
try: try:
os.system(cmd) os.system(cmd)
except OSError as ex: except OSError as ex:
@ -388,11 +392,13 @@ def _speaker_picospeaker(pitch: int, rate: int, system_language: str,
speaker_lang = speaker_str speaker_lang = speaker_str
break break
say_text = str(say_text).replace('"', "'") say_text = str(say_text).replace('"', "'")
speaker_cmd = 'picospeaker ' + \ speaker_text = html.unescape(str(say_text))
'-l ' + speaker_lang + \ speaker_cmd = \
'picospeaker ' + \
'-l ' + safe_system_string(speaker_lang) + \
' -r ' + str(rate) + \ ' -r ' + str(rate) + \
' -p ' + str(pitch) + ' "' + \ ' -p ' + str(pitch) + ' "' + \
html.unescape(str(say_text)) + '" 2> /dev/null' safe_system_string(speaker_text) + '" 2> /dev/null'
os.system(speaker_cmd) os.system(speaker_cmd)
@ -405,19 +411,30 @@ def _desktop_notification(notification_type: str,
if notification_type == 'notify-send': if notification_type == 'notify-send':
# Ubuntu # Ubuntu
os.system('notify-send "' + title + '" "' + message + '"') cmd = \
'notify-send "' + safe_system_string(title) + \
'" "' + safe_system_string(message) + '"'
os.system(cmd)
elif notification_type == 'zenity': elif notification_type == 'zenity':
# Zenity # Zenity
os.system('zenity --notification --title "' + title + cmd = \
'" --text="' + message + '"') 'zenity --notification --title "' + safe_system_string(title) + \
'" --text="' + safe_system_string(message) + '"'
os.system(cmd)
elif notification_type == 'osascript': elif notification_type == 'osascript':
# Mac # Mac
os.system("osascript -e 'display notification \"" + cmd = \
message + "\" with title \"" + title + "\"'") "osascript -e 'display notification \"" + \
safe_system_string(message) + "\" with title \"" + \
safe_system_string(title) + "\"'"
os.system(cmd)
elif notification_type == 'New-BurntToastNotification': elif notification_type == 'New-BurntToastNotification':
# Windows # Windows
os.system("New-BurntToastNotification -Text \"" + cmd = \
title + "\", '" + message + "'") "New-BurntToastNotification -Text \"" + \
safe_system_string(title) + "\", '" + \
safe_system_string(message) + "'"
os.system(cmd)
def _text_to_speech(say_str: str, screenreader: str, def _text_to_speech(say_str: str, screenreader: str,

View File

@ -15,6 +15,7 @@ import random
from random import randint from random import randint
from hashlib import sha1 from hashlib import sha1
from auth import create_password from auth import create_password
from utils import safe_system_string
from utils import get_base_content_from_post from utils import get_base_content_from_post
from utils import get_full_domain from utils import get_full_domain
from utils import get_image_extensions from utils import get_image_extensions
@ -299,10 +300,13 @@ def _remove_meta_data(image_filename: str, output_filename: str) -> None:
return return
if os.path.isfile('/usr/bin/exiftool'): if os.path.isfile('/usr/bin/exiftool'):
print('Removing metadata from ' + output_filename + ' using exiftool') print('Removing metadata from ' + output_filename + ' using exiftool')
os.system('exiftool -all= ' + output_filename) # nosec cmd = 'exiftool -all= ' + safe_system_string(output_filename)
os.system(cmd) # nosec
elif os.path.isfile('/usr/bin/mogrify'): elif os.path.isfile('/usr/bin/mogrify'):
print('Removing metadata from ' + output_filename + ' using mogrify') print('Removing metadata from ' + output_filename + ' using mogrify')
os.system('/usr/bin/mogrify -strip ' + output_filename) # nosec cmd = \
'/usr/bin/mogrify -strip ' + safe_system_string(output_filename)
os.system(cmd) # nosec
def _spoof_meta_data(base_dir: str, nickname: str, domain: str, def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
@ -339,7 +343,9 @@ def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
cam_make, cam_model, cam_serial_number) = \ cam_make, cam_model, cam_serial_number) = \
spoof_geolocation(base_dir, spoof_city, curr_time_adjusted, spoof_geolocation(base_dir, spoof_city, curr_time_adjusted,
decoy_seed, None, None) decoy_seed, None, None)
if os.system('exiftool -artist=@"' + nickname + '@' + domain + '" ' + safe_handle = safe_system_string(nickname + '@' + domain)
safe_license_url = safe_system_string(content_license_url)
if os.system('exiftool -artist=@"' + safe_handle + '" ' +
'-Make="' + cam_make + '" ' + '-Make="' + cam_make + '" ' +
'-Model="' + cam_model + '" ' + '-Model="' + cam_model + '" ' +
'-Comment="' + str(cam_serial_number) + '" ' + '-Comment="' + str(cam_serial_number) + '" ' +
@ -351,7 +357,7 @@ def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
'-GPSLongitude=' + str(longitude) + ' ' + '-GPSLongitude=' + str(longitude) + ' ' +
'-GPSLatitudeRef=' + latitude_ref + ' ' + '-GPSLatitudeRef=' + latitude_ref + ' ' +
'-GPSLatitude=' + str(latitude) + ' ' + '-GPSLatitude=' + str(latitude) + ' ' +
'-copyright="' + content_license_url + '" ' + '-copyright="' + safe_license_url + '" ' +
'-Comment="" ' + '-Comment="" ' +
output_filename) != 0: # nosec output_filename) != 0: # nosec
print('ERROR: exiftool failed to run') print('ERROR: exiftool failed to run')
@ -364,8 +370,9 @@ def get_music_metadata(filename: str) -> {}:
"""Returns metadata for a music file """Returns metadata for a music file
""" """
result = None result = None
safe_filename = safe_system_string(filename)
try: try:
result = subprocess.run(['exiftool', '-v3', filename], result = subprocess.run(['exiftool', '-v3', safe_filename],
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
except BaseException as ex: except BaseException as ex:
print('EX: get_music_metadata failed ' + str(ex)) print('EX: get_music_metadata failed ' + str(ex))
@ -417,7 +424,8 @@ def convert_image_to_low_bandwidth(image_filename: str) -> None:
cmd = \ cmd = \
'/usr/bin/convert +noise Multiplicative ' + \ '/usr/bin/convert +noise Multiplicative ' + \
'-evaluate median 10% -dither Floyd-Steinberg ' + \ '-evaluate median 10% -dither Floyd-Steinberg ' + \
'-monochrome ' + image_filename + ' ' + low_bandwidth_filename '-monochrome ' + safe_system_string(image_filename) + \
' ' + safe_system_string(low_bandwidth_filename)
print('Low bandwidth image conversion: ' + cmd) print('Low bandwidth image conversion: ' + cmd)
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)
# wait for conversion to happen # wait for conversion to happen
@ -666,9 +674,11 @@ def path_is_audio(path: str) -> bool:
def get_image_dimensions(image_filename: str) -> (int, int): def get_image_dimensions(image_filename: str) -> (int, int):
"""Returns the dimensions of an image file """Returns the dimensions of an image file
""" """
safe_image_filename = safe_system_string(image_filename)
try: try:
result = subprocess.run(['identify', '-format', '"%wx%h"', result = subprocess.run(['identify', '-format', '"%wx%h"',
image_filename], stdout=subprocess.PIPE) safe_image_filename],
stdout=subprocess.PIPE)
except BaseException: except BaseException:
print('EX: get_image_dimensions unable to run identify command') print('EX: get_image_dimensions unable to run identify command')
return None, None return None, None

View File

@ -38,6 +38,7 @@ from roles import set_role
from roles import set_rolesFromList from roles import set_rolesFromList
from roles import get_actor_roles_list from roles import get_actor_roles_list
from media import process_meta_data from media import process_meta_data
from utils import safe_system_string
from utils import get_attachment_property_value from utils import get_attachment_property_value
from utils import get_nickname_from_actor from utils import get_nickname_from_actor
from utils import remove_html from utils import remove_html
@ -162,8 +163,9 @@ def set_profile_image(base_dir: str, http_prefix: str,
save_json(person_json, person_filename) save_json(person_json, person_filename)
cmd = \ cmd = \
'/usr/bin/convert ' + image_filename + ' -size ' + \ '/usr/bin/convert ' + safe_system_string(image_filename) + \
resolution + ' -quality 50 ' + profile_filename ' -size ' + resolution + ' -quality 50 ' + \
safe_system_string(profile_filename)
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)
process_meta_data(base_dir, nickname, domain, process_meta_data(base_dir, nickname, domain,
profile_filename, profile_filename, city, profile_filename, profile_filename, city,

19
pgp.py
View File

@ -12,6 +12,7 @@ import base64
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from person import get_actor_json from person import get_actor_json
from utils import safe_system_string
from utils import contains_pgp_public_key from utils import contains_pgp_public_key
from utils import is_pgp_encrypted from utils import is_pgp_encrypted
from utils import get_full_domain from utils import get_full_domain
@ -348,7 +349,7 @@ def _pgp_import_pub_key(recipient_pub_key: str) -> str:
""" """
# do a dry run # do a dry run
cmd_import_pub_key = \ cmd_import_pub_key = \
'echo "' + recipient_pub_key + \ 'echo "' + safe_system_string(recipient_pub_key) + \
'" | gpg --dry-run --import 2> /dev/null' '" | gpg --dry-run --import 2> /dev/null'
proc = subprocess.Popen([cmd_import_pub_key], proc = subprocess.Popen([cmd_import_pub_key],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
@ -358,7 +359,8 @@ def _pgp_import_pub_key(recipient_pub_key: str) -> str:
# this time for real # this time for real
cmd_import_pub_key = \ cmd_import_pub_key = \
'echo "' + recipient_pub_key + '" | gpg --import 2> /dev/null' 'echo "' + safe_system_string(recipient_pub_key) + \
'" | gpg --import 2> /dev/null'
proc = subprocess.Popen([cmd_import_pub_key], proc = subprocess.Popen([cmd_import_pub_key],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(import_result, err) = proc.communicate() (import_result, err) = proc.communicate()
@ -367,7 +369,8 @@ def _pgp_import_pub_key(recipient_pub_key: str) -> str:
# get the key id # get the key id
cmd_import_pub_key = \ cmd_import_pub_key = \
'echo "' + recipient_pub_key + '" | gpg --show-keys' 'echo "' + safe_system_string(recipient_pub_key) + \
'" | gpg --show-keys'
proc = subprocess.Popen([cmd_import_pub_key], proc = subprocess.Popen([cmd_import_pub_key],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(import_result, err) = proc.communicate() (import_result, err) = proc.communicate()
@ -395,8 +398,9 @@ def _pgp_encrypt(content: str, recipient_pub_key: str) -> str:
return None return None
cmd_encrypt = \ cmd_encrypt = \
'echo "' + content + '" | gpg --encrypt --armor --recipient ' + \ 'echo "' + safe_system_string(content) + \
key_id + ' 2> /dev/null' '" | gpg --encrypt --armor --recipient ' + \
safe_system_string(key_id) + ' 2> /dev/null'
proc = subprocess.Popen([cmd_encrypt], proc = subprocess.Popen([cmd_encrypt],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(encrypt_result, _) = proc.communicate() (encrypt_result, _) = proc.communicate()
@ -452,7 +456,8 @@ def pgp_decrypt(domain: str, content: str, fromHandle: str,
_pgp_import_pub_key(pub_key) _pgp_import_pub_key(pub_key)
cmd_decrypt = \ cmd_decrypt = \
'echo "' + content + '" | gpg --decrypt --armor 2> /dev/null' 'echo "' + safe_system_string(content) + \
'" | gpg --decrypt --armor 2> /dev/null'
proc = subprocess.Popen([cmd_decrypt], proc = subprocess.Popen([cmd_decrypt],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(decrypt_result, _) = proc.communicate() (decrypt_result, _) = proc.communicate()
@ -486,7 +491,7 @@ def pgp_local_public_key() -> str:
key_id = _pgp_local_public_key_id() key_id = _pgp_local_public_key_id()
if not key_id: if not key_id:
key_id = '' key_id = ''
cmd_str = "gpg --armor --export " + key_id cmd_str = "gpg --armor --export " + safe_system_string(key_id)
proc = subprocess.Popen([cmd_str], proc = subprocess.Popen([cmd_str],
stdout=subprocess.PIPE, shell=True) stdout=subprocess.PIPE, shell=True)
(result, err) = proc.communicate() (result, err) = proc.communicate()

View File

@ -3842,3 +3842,11 @@ def get_attachment_property_value(property_value: {}) -> (str, str):
prop_value_name = 'https://schema.org#value' prop_value_name = 'https://schema.org#value'
prop_value = property_value[prop_value_name] prop_value = property_value[prop_value_name]
return prop_value_name, prop_value return prop_value_name, prop_value
def safe_system_string(text: str) -> str:
"""Returns a safe version of a string which can be used within a
system command
"""
text = text.replace('$(', '(').replace('`', '')
return text