Module connpy.cli.run_handler
Classes
class RunHandler (app)-
Expand source code
class RunHandler: def __init__(self, app): self.app = app def dispatch(self, args): if len(args.data) > 1: args.action = "noderun" actions = {"noderun": self.node_run, "generate": self.yaml_generate, "run": self.yaml_run} return actions.get(args.action)(args) def node_run(self, args): nodes_filter = args.data[0] commands = [" ".join(args.data[1:])] try: header_printed = False # Inline execution with streaming results def _on_node_complete(unique, node_output, node_status): nonlocal header_printed if not header_printed: printer.console.print(Rule("OUTPUT", style="header")) header_printed = True printer.node_panel(unique, node_output, node_status) self.app.services.execution.run_commands( nodes_filter=nodes_filter, commands=commands, on_node_complete=_on_node_complete ) except ConnpyError as e: printer.error(str(e)) sys.exit(1) def yaml_generate(self, args): if os.path.exists(args.data[0]): printer.error(f"File '{args.data[0]}' already exists.") sys.exit(14) else: with open(args.data[0], "w") as file: file.write(get_instructions("generate")) printer.success(f"File {args.data[0]} generated successfully") sys.exit() def yaml_run(self, args): path = args.data[0] try: with open(path, "r") as f: playbook = yaml.load(f, Loader=yaml.FullLoader) for task in playbook.get("tasks", []): self.cli_run(task) except Exception as e: printer.error(f"Failed to run playbook {path}: {e}") sys.exit(10) def cli_run(self, script): try: action = script["action"] nodelist = script["nodes"] commands = script["commands"] variables = script.get("variables") output_cfg = script["output"] name = script.get("name", "Task") options = script.get("options", {}) except KeyError as e: printer.error(f"'{e.args[0]}' is mandatory in script") sys.exit(11) stdout = (output_cfg == "stdout") folder = output_cfg if output_cfg not in [None, "stdout"] else None prompt = options.get("prompt") printer.header(name.upper()) try: if action == "run": # If stdout is true, we stream results as they arrive on_complete = printer.node_panel if stdout else None results = self.app.services.execution.run_commands( nodes_filter=nodelist, commands=commands, variables=variables, parallel=options.get("parallel", 10), timeout=options.get("timeout", 10), folder=folder, prompt=prompt, on_node_complete=on_complete ) # If not streaming, we could print a summary table here if needed if not stdout: for unique, output in results.items(): printer.node_panel(unique, output, 0) elif action == "test": expected = script.get("expected", []) on_complete = printer.test_panel if stdout else None results = self.app.services.execution.test_commands( nodes_filter=nodelist, commands=commands, expected=expected, variables=variables, parallel=options.get("parallel", 10), timeout=options.get("timeout", 10), prompt=prompt, on_node_complete=on_complete ) if not stdout: printer.test_summary(results) except ConnpyError as e: printer.error(str(e))Methods
def cli_run(self, script)-
Expand source code
def cli_run(self, script): try: action = script["action"] nodelist = script["nodes"] commands = script["commands"] variables = script.get("variables") output_cfg = script["output"] name = script.get("name", "Task") options = script.get("options", {}) except KeyError as e: printer.error(f"'{e.args[0]}' is mandatory in script") sys.exit(11) stdout = (output_cfg == "stdout") folder = output_cfg if output_cfg not in [None, "stdout"] else None prompt = options.get("prompt") printer.header(name.upper()) try: if action == "run": # If stdout is true, we stream results as they arrive on_complete = printer.node_panel if stdout else None results = self.app.services.execution.run_commands( nodes_filter=nodelist, commands=commands, variables=variables, parallel=options.get("parallel", 10), timeout=options.get("timeout", 10), folder=folder, prompt=prompt, on_node_complete=on_complete ) # If not streaming, we could print a summary table here if needed if not stdout: for unique, output in results.items(): printer.node_panel(unique, output, 0) elif action == "test": expected = script.get("expected", []) on_complete = printer.test_panel if stdout else None results = self.app.services.execution.test_commands( nodes_filter=nodelist, commands=commands, expected=expected, variables=variables, parallel=options.get("parallel", 10), timeout=options.get("timeout", 10), prompt=prompt, on_node_complete=on_complete ) if not stdout: printer.test_summary(results) except ConnpyError as e: printer.error(str(e)) def dispatch(self, args)-
Expand source code
def dispatch(self, args): if len(args.data) > 1: args.action = "noderun" actions = {"noderun": self.node_run, "generate": self.yaml_generate, "run": self.yaml_run} return actions.get(args.action)(args) def node_run(self, args)-
Expand source code
def node_run(self, args): nodes_filter = args.data[0] commands = [" ".join(args.data[1:])] try: header_printed = False # Inline execution with streaming results def _on_node_complete(unique, node_output, node_status): nonlocal header_printed if not header_printed: printer.console.print(Rule("OUTPUT", style="header")) header_printed = True printer.node_panel(unique, node_output, node_status) self.app.services.execution.run_commands( nodes_filter=nodes_filter, commands=commands, on_node_complete=_on_node_complete ) except ConnpyError as e: printer.error(str(e)) sys.exit(1) def yaml_generate(self, args)-
Expand source code
def yaml_generate(self, args): if os.path.exists(args.data[0]): printer.error(f"File '{args.data[0]}' already exists.") sys.exit(14) else: with open(args.data[0], "w") as file: file.write(get_instructions("generate")) printer.success(f"File {args.data[0]} generated successfully") sys.exit() def yaml_run(self, args)-
Expand source code
def yaml_run(self, args): path = args.data[0] try: with open(path, "r") as f: playbook = yaml.load(f, Loader=yaml.FullLoader) for task in playbook.get("tasks", []): self.cli_run(task) except Exception as e: printer.error(f"Failed to run playbook {path}: {e}") sys.exit(10)