diff --git a/src/registrar/management/commands/cat_files_into_getgov.py b/src/registrar/management/commands/cat_files_into_getgov.py index 5de44ba73..ceb7ba6b2 100644 --- a/src/registrar/management/commands/cat_files_into_getgov.py +++ b/src/registrar/management/commands/cat_files_into_getgov.py @@ -3,9 +3,12 @@ import glob import logging import os +import string from django.core.management import BaseCommand +from registrar.management.commands.utility.terminal_helper import TerminalHelper + logger = logging.getLogger(__name__) @@ -27,6 +30,7 @@ class Command(BaseCommand): def handle(self, **options): file_extension: str = options.get("file_extension").lstrip(".") directory = options.get("directory") + helper = TerminalHelper() # file_extension is always coerced as str, Truthy is OK to use here. if not file_extension or not isinstance(file_extension, str): @@ -38,29 +42,63 @@ class Command(BaseCommand): for src_file_path in matching_extensions: filename = os.path.basename(src_file_path) + exit_status = -1 do_command = True - exit_status: int desired_file_path = f"{directory}/{filename}" if os.path.exists(desired_file_path): # For linter - prompt = " Do you want to replace it? (y/n) " - replace = input(f"{desired_file_path} already exists. {prompt}") - if replace.lower() != "y": + prompt = "Do you want to replace it?" + replace = f"{desired_file_path} already exists. {prompt}" + if not helper.query_yes_no(replace): do_command = False - if do_command: - copy_from = f"../tmp/{filename}" - self.cat(copy_from, desired_file_path) - exit_status = os.system(f"cat ../tmp/{filename} > {desired_file_path}") - - if exit_status == 0: - logger.info(f"Successfully copied {filename}") - else: - logger.error(f"Failed to copy {filename}") + try: + if do_command: + copy_from = f"../tmp/{filename}" + exit_status = self.cat(copy_from, desired_file_path) + except ValueError as err: + raise err + finally: + if exit_status == 0: + logger.info(f"Successfully copied {filename}") + else: + logger.error(f"Failed to copy {filename}") def cat(self, copy_from, copy_to): """Runs the cat command to copy_from a location to copy_to a location""" - exit_status = os.system(f"cat {copy_from} > {copy_to}") + + # copy_from will be system defined + self.check_file_path(copy_from, check_directory = False) + self.check_file_path(copy_to) + + # This command can only be ran from inside cf ssh getgov-{sandbox} + # It has no utility when running locally, and to exploit this + # you would have to have ssh access anyway, which is a bigger problem. + exit_status = os.system(f"cat {copy_from} > {copy_to}") # nosec return exit_status + + def check_file_path(self, file_path: str, check_directory = True): + """Does a check on user input to ensure validity""" + if not isinstance(file_path, str): + raise ValueError("Invalid path provided") + + # Remove any initial/final whitespace + file_path = file_path.strip() + + # Check for any attempts to move up in the directory structure + if ".." in file_path and check_directory: + raise ValueError( + "Moving up in the directory structure is not allowed" + ) + + # Check for any invalid characters + valid_chars = f"/-_.() {string.ascii_letters}{string.digits}" + for char in file_path: + if char not in valid_chars: + raise ValueError( + f"Invalid character {char} in file path" + ) + + return file_path