Module connpy.cli.run_handler

Classes

class RunHandler (app)
Expand source code
class RunHandler:
    def __init__(self, app):
        self.app = app
        self.print_lock = threading.Lock()

    def dispatch(self, args):
        if len(args.data) > 1:
            args.action = "noderun"
        actions = {"noderun": self.node_run, "generate": self.yaml_generate, "run": self.yaml_run}
        return actions.get(args.action)(args)

    def node_run(self, args):
        nodes_filter = args.data[0]
        commands = [" ".join(args.data[1:])]

        try:
            header_printed = False

            if hasattr(args, 'test_expected') and args.test_expected:
                # Mode: Test
                def _on_node_complete(unique, node_output, node_status, node_result):
                    nonlocal header_printed
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule("OUTPUT", style="header"))
                            header_printed = True
                    printer.test_panel(unique, node_output, node_status, node_result)

                results = self.app.services.execution.test_commands(
                    nodes_filter=nodes_filter,
                    commands=commands,
                    expected=args.test_expected,
                    on_node_complete=_on_node_complete
                )
                printer.test_summary(results)
            else:
                # Mode: Normal Run
                def _on_node_complete(unique, node_output, node_status):
                    nonlocal header_printed
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule("OUTPUT", style="header"))
                            header_printed = True
                    printer.node_panel(unique, node_output, node_status)

                results = self.app.services.execution.run_commands(
                    nodes_filter=nodes_filter,
                    commands=commands,
                    on_node_complete=_on_node_complete
                )
                printer.run_summary(results)

        except ConnpyError as e:
            printer.error(str(e))
            sys.exit(1)

    def yaml_generate(self, args):
        if os.path.exists(args.data[0]):
            printer.error(f"File '{args.data[0]}' already exists.")
            sys.exit(14)
        else:
            with open(args.data[0], "w") as file:
                file.write(get_instructions("generate"))
            printer.success(f"File {args.data[0]} generated successfully")
            sys.exit()

    def yaml_run(self, args):
        path = args.data[0]
        try:
            with open(path, "r") as f:
                playbook = yaml.load(f, Loader=yaml.FullLoader)

            for task in playbook.get("tasks", []):
                self.cli_run(task)

        except Exception as e:
            printer.error(f"Failed to run playbook {path}: {e}")
            sys.exit(10)

    def cli_run(self, script):
        name = script.get("name", "Task")
        try:
            action = script["action"]
            nodelist = script["nodes"]
            commands = script["commands"]
            variables = script.get("variables")
            output_cfg = script["output"]
            options = script.get("options", {})
        except KeyError as e:
            printer.error(f"[{name}] '{e.args[0]}' is mandatory in script")
            sys.exit(11)

        stdout = (output_cfg == "stdout")
        folder = output_cfg if output_cfg not in [None, "stdout"] else None
        prompt = options.get("prompt")

        try:
            header_printed = False
            if action == "run":
                # If stdout is true, we stream results as they arrive
                def _on_run_complete(unique, node_output, node_status):
                    nonlocal header_printed
                    if stdout:
                        with self.print_lock:
                            if not header_printed:
                                printer.console.print(Rule(name.upper(), style="header"))
                                header_printed = True
                        printer.node_panel(unique, node_output, node_status)

                results = self.app.services.execution.run_commands(
                    nodes_filter=nodelist,
                    commands=commands,
                    variables=variables,
                    parallel=options.get("parallel", 10),
                    timeout=options.get("timeout", 10),
                    folder=folder,
                    prompt=prompt,
                    on_node_complete=_on_run_complete
                )
                # Final Summary
                if not stdout and not folder:
                    with self.print_lock:
                        printer.console.print(Rule(name.upper(), style="header"))
                    for unique, data in results.items():
                        output = data["output"] if isinstance(data, dict) else data
                        printer.node_panel(unique, output, 0)
                
                # ALWAYS show the aggregate execution summary at the end
                printer.run_summary(results)

            elif action == "test":
                expected = script.get("expected", [])
                # Show test_panel per node ONLY if stdout is True
                def _on_test_complete(unique, node_output, node_status, node_result):
                    nonlocal header_printed
                    if stdout:
                        with self.print_lock:
                            if not header_printed:
                                printer.console.print(Rule(name.upper(), style="header"))
                                header_printed = True
                        printer.test_panel(unique, node_output, node_status, node_result)

                results = self.app.services.execution.test_commands(
                    nodes_filter=nodelist,
                    commands=commands,
                    expected=expected,
                    variables=variables,
                    parallel=options.get("parallel", 10),
                    timeout=options.get("timeout", 10),
                    folder=folder,
                    prompt=prompt,
                    on_node_complete=_on_test_complete
                )
                # ALWAYS show the aggregate summary at the end
                printer.test_summary(results)

        except ConnpyError as e:
            printer.error(str(e))

Methods

def cli_run(self, script)
Expand source code
def cli_run(self, script):
    name = script.get("name", "Task")
    try:
        action = script["action"]
        nodelist = script["nodes"]
        commands = script["commands"]
        variables = script.get("variables")
        output_cfg = script["output"]
        options = script.get("options", {})
    except KeyError as e:
        printer.error(f"[{name}] '{e.args[0]}' is mandatory in script")
        sys.exit(11)

    stdout = (output_cfg == "stdout")
    folder = output_cfg if output_cfg not in [None, "stdout"] else None
    prompt = options.get("prompt")

    try:
        header_printed = False
        if action == "run":
            # If stdout is true, we stream results as they arrive
            def _on_run_complete(unique, node_output, node_status):
                nonlocal header_printed
                if stdout:
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule(name.upper(), style="header"))
                            header_printed = True
                    printer.node_panel(unique, node_output, node_status)

            results = self.app.services.execution.run_commands(
                nodes_filter=nodelist,
                commands=commands,
                variables=variables,
                parallel=options.get("parallel", 10),
                timeout=options.get("timeout", 10),
                folder=folder,
                prompt=prompt,
                on_node_complete=_on_run_complete
            )
            # Final Summary
            if not stdout and not folder:
                with self.print_lock:
                    printer.console.print(Rule(name.upper(), style="header"))
                for unique, data in results.items():
                    output = data["output"] if isinstance(data, dict) else data
                    printer.node_panel(unique, output, 0)
            
            # ALWAYS show the aggregate execution summary at the end
            printer.run_summary(results)

        elif action == "test":
            expected = script.get("expected", [])
            # Show test_panel per node ONLY if stdout is True
            def _on_test_complete(unique, node_output, node_status, node_result):
                nonlocal header_printed
                if stdout:
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule(name.upper(), style="header"))
                            header_printed = True
                    printer.test_panel(unique, node_output, node_status, node_result)

            results = self.app.services.execution.test_commands(
                nodes_filter=nodelist,
                commands=commands,
                expected=expected,
                variables=variables,
                parallel=options.get("parallel", 10),
                timeout=options.get("timeout", 10),
                folder=folder,
                prompt=prompt,
                on_node_complete=_on_test_complete
            )
            # ALWAYS show the aggregate summary at the end
            printer.test_summary(results)

    except ConnpyError as e:
        printer.error(str(e))
def dispatch(self, args)
Expand source code
def dispatch(self, args):
    if len(args.data) > 1:
        args.action = "noderun"
    actions = {"noderun": self.node_run, "generate": self.yaml_generate, "run": self.yaml_run}
    return actions.get(args.action)(args)
def node_run(self, args)
Expand source code
def node_run(self, args):
    nodes_filter = args.data[0]
    commands = [" ".join(args.data[1:])]

    try:
        header_printed = False

        if hasattr(args, 'test_expected') and args.test_expected:
            # Mode: Test
            def _on_node_complete(unique, node_output, node_status, node_result):
                nonlocal header_printed
                with self.print_lock:
                    if not header_printed:
                        printer.console.print(Rule("OUTPUT", style="header"))
                        header_printed = True
                printer.test_panel(unique, node_output, node_status, node_result)

            results = self.app.services.execution.test_commands(
                nodes_filter=nodes_filter,
                commands=commands,
                expected=args.test_expected,
                on_node_complete=_on_node_complete
            )
            printer.test_summary(results)
        else:
            # Mode: Normal Run
            def _on_node_complete(unique, node_output, node_status):
                nonlocal header_printed
                with self.print_lock:
                    if not header_printed:
                        printer.console.print(Rule("OUTPUT", style="header"))
                        header_printed = True
                printer.node_panel(unique, node_output, node_status)

            results = self.app.services.execution.run_commands(
                nodes_filter=nodes_filter,
                commands=commands,
                on_node_complete=_on_node_complete
            )
            printer.run_summary(results)

    except ConnpyError as e:
        printer.error(str(e))
        sys.exit(1)
def yaml_generate(self, args)
Expand source code
def yaml_generate(self, args):
    if os.path.exists(args.data[0]):
        printer.error(f"File '{args.data[0]}' already exists.")
        sys.exit(14)
    else:
        with open(args.data[0], "w") as file:
            file.write(get_instructions("generate"))
        printer.success(f"File {args.data[0]} generated successfully")
        sys.exit()
def yaml_run(self, args)
Expand source code
def yaml_run(self, args):
    path = args.data[0]
    try:
        with open(path, "r") as f:
            playbook = yaml.load(f, Loader=yaml.FullLoader)

        for task in playbook.get("tasks", []):
            self.cli_run(task)

    except Exception as e:
        printer.error(f"Failed to run playbook {path}: {e}")
        sys.exit(10)