Module connpy.services.execution_service

Classes

class ExecutionService (config=None)
Expand source code
class ExecutionService(BaseService):
    """Business logic for executing commands on nodes and running automation scripts."""

    def run_commands(
        self, 
        nodes_filter: str, 
        commands: List[str], 
        variables: Optional[Dict[str, Any]] = None,
        parallel: int = 10,
        timeout: int = 10,
        folder: Optional[str] = None,
        prompt: Optional[str] = None,
        on_node_complete: Optional[Callable] = None,
        logger: Optional[Callable] = None,
        name: Optional[str] = None
    ) -> Dict[str, str]:

        """Execute commands on a set of nodes."""
        try:
            matched_names = self.config._getallnodes(nodes_filter)
            if not matched_names:
                raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
            
            node_data = self.config.getitems(matched_names, extract=True)
            executor = Nodes(node_data, config=self.config)
            self.last_executor = executor
            
            results = executor.run(
                commands=commands,
                vars=variables,
                parallel=parallel,
                timeout=timeout,
                folder=folder,
                prompt=prompt,
                on_complete=on_node_complete,
                logger=logger
            )

            # Combine output and status for the caller
            full_results = {}
            for unique in results:
                full_results[unique] = {
                    "output": results[unique],
                    "status": executor.status.get(unique, 1)
                }

            return full_results
        except Exception as e:
            raise ConnpyError(f"Execution failed: {e}")

    def test_commands(
        self,
        nodes_filter: str,
        commands: List[str],
        expected: List[str],
        variables: Optional[Dict[str, Any]] = None,
        parallel: int = 10,
        timeout: int = 10,
        folder: Optional[str] = None,
        prompt: Optional[str] = None,
        on_node_complete: Optional[Callable] = None,
        logger: Optional[Callable] = None,
        name: Optional[str] = None
    ) -> Dict[str, Dict[str, bool]]:

        """Run commands and verify expected output on a set of nodes."""
        try:
            matched_names = self.config._getallnodes(nodes_filter)
            if not matched_names:
                raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
            
            node_data = self.config.getitems(matched_names, extract=True)
            executor = Nodes(node_data, config=self.config)
            self.last_executor = executor
            
            results = executor.test(
                commands=commands,
                expected=expected,
                vars=variables,
                parallel=parallel,
                timeout=timeout,
                folder=folder,
                prompt=prompt,
                on_complete=on_node_complete,
                logger=logger
            )
            return results
        except Exception as e:
            raise ConnpyError(f"Testing failed: {e}")

    def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]:
        """Run a plain-text script containing one command per line."""
        if not os.path.exists(script_path):
            raise ConnpyError(f"Script file not found: {script_path}")
            
        try:
            with open(script_path, "r") as f:
                commands = [line.strip() for line in f if line.strip()]
        except Exception as e:
            raise ConnpyError(f"Failed to read script {script_path}: {e}")
            
        return self.run_commands(nodes_filter, commands, parallel=parallel)

    def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) -> Dict[str, Any]:
        """Run a structured Connpy YAML automation playbook (from path or content)."""
        playbook = None
        if playbook_data.startswith("---YAML---\n"):
            try:
                content = playbook_data[len("---YAML---\n"):]
                playbook = yaml.load(content, Loader=yaml.FullLoader)
            except Exception as e:
                raise ConnpyError(f"Failed to parse YAML content: {e}")
        else:
            if not os.path.exists(playbook_data):
                raise ConnpyError(f"Playbook file not found: {playbook_data}")
            try:
                with open(playbook_data, "r") as f:
                    playbook = yaml.load(f, Loader=yaml.FullLoader)
            except Exception as e:
                raise ConnpyError(f"Failed to load playbook {playbook_data}: {e}")
            
        # Basic validation
        if not isinstance(playbook, dict) or "nodes" not in playbook or "commands" not in playbook:
            raise ConnpyError("Invalid playbook format: missing 'nodes' or 'commands' keys.")
            
        action = playbook.get("action", "run")
        options = playbook.get("options", {})
        
        # Extract all fields similar to RunHandler.cli_run
        exec_args = {
            "nodes_filter": playbook["nodes"],
            "commands": playbook["commands"],
            "variables": playbook.get("variables"),
            "parallel": options.get("parallel", parallel),
            "timeout": playbook.get("timeout", options.get("timeout", 10)),
            "prompt": options.get("prompt"),
            "name": playbook.get("name", "Task")
        }

        # Map 'output' field to folder path if it's not stdout/null
        output_cfg = playbook.get("output")
        if output_cfg not in [None, "stdout"]:
            exec_args["folder"] = output_cfg

        if action == "run":
            return self.run_commands(**exec_args)
        elif action == "test":
            exec_args["expected"] = playbook.get("expected", [])
            return self.test_commands(**exec_args)
        else:
            raise ConnpyError(f"Unsupported playbook action: {action}")

Business logic for executing commands on nodes and running automation scripts.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) ‑> Dict[str, str]
Expand source code
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]:
    """Run a plain-text script containing one command per line."""
    if not os.path.exists(script_path):
        raise ConnpyError(f"Script file not found: {script_path}")
        
    try:
        with open(script_path, "r") as f:
            commands = [line.strip() for line in f if line.strip()]
    except Exception as e:
        raise ConnpyError(f"Failed to read script {script_path}: {e}")
        
    return self.run_commands(nodes_filter, commands, parallel=parallel)

Run a plain-text script containing one command per line.

def run_commands(self,
nodes_filter: str,
commands: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 10,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, str]
Expand source code
def run_commands(
    self, 
    nodes_filter: str, 
    commands: List[str], 
    variables: Optional[Dict[str, Any]] = None,
    parallel: int = 10,
    timeout: int = 10,
    folder: Optional[str] = None,
    prompt: Optional[str] = None,
    on_node_complete: Optional[Callable] = None,
    logger: Optional[Callable] = None,
    name: Optional[str] = None
) -> Dict[str, str]:

    """Execute commands on a set of nodes."""
    try:
        matched_names = self.config._getallnodes(nodes_filter)
        if not matched_names:
            raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
        
        node_data = self.config.getitems(matched_names, extract=True)
        executor = Nodes(node_data, config=self.config)
        self.last_executor = executor
        
        results = executor.run(
            commands=commands,
            vars=variables,
            parallel=parallel,
            timeout=timeout,
            folder=folder,
            prompt=prompt,
            on_complete=on_node_complete,
            logger=logger
        )

        # Combine output and status for the caller
        full_results = {}
        for unique in results:
            full_results[unique] = {
                "output": results[unique],
                "status": executor.status.get(unique, 1)
            }

        return full_results
    except Exception as e:
        raise ConnpyError(f"Execution failed: {e}")

Execute commands on a set of nodes.

def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) ‑> Dict[str, Any]
Expand source code
def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) -> Dict[str, Any]:
    """Run a structured Connpy YAML automation playbook (from path or content)."""
    playbook = None
    if playbook_data.startswith("---YAML---\n"):
        try:
            content = playbook_data[len("---YAML---\n"):]
            playbook = yaml.load(content, Loader=yaml.FullLoader)
        except Exception as e:
            raise ConnpyError(f"Failed to parse YAML content: {e}")
    else:
        if not os.path.exists(playbook_data):
            raise ConnpyError(f"Playbook file not found: {playbook_data}")
        try:
            with open(playbook_data, "r") as f:
                playbook = yaml.load(f, Loader=yaml.FullLoader)
        except Exception as e:
            raise ConnpyError(f"Failed to load playbook {playbook_data}: {e}")
        
    # Basic validation
    if not isinstance(playbook, dict) or "nodes" not in playbook or "commands" not in playbook:
        raise ConnpyError("Invalid playbook format: missing 'nodes' or 'commands' keys.")
        
    action = playbook.get("action", "run")
    options = playbook.get("options", {})
    
    # Extract all fields similar to RunHandler.cli_run
    exec_args = {
        "nodes_filter": playbook["nodes"],
        "commands": playbook["commands"],
        "variables": playbook.get("variables"),
        "parallel": options.get("parallel", parallel),
        "timeout": playbook.get("timeout", options.get("timeout", 10)),
        "prompt": options.get("prompt"),
        "name": playbook.get("name", "Task")
    }

    # Map 'output' field to folder path if it's not stdout/null
    output_cfg = playbook.get("output")
    if output_cfg not in [None, "stdout"]:
        exec_args["folder"] = output_cfg

    if action == "run":
        return self.run_commands(**exec_args)
    elif action == "test":
        exec_args["expected"] = playbook.get("expected", [])
        return self.test_commands(**exec_args)
    else:
        raise ConnpyError(f"Unsupported playbook action: {action}")

Run a structured Connpy YAML automation playbook (from path or content).

def test_commands(self,
nodes_filter: str,
commands: List[str],
expected: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 10,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, Dict[str, bool]]
Expand source code
def test_commands(
    self,
    nodes_filter: str,
    commands: List[str],
    expected: List[str],
    variables: Optional[Dict[str, Any]] = None,
    parallel: int = 10,
    timeout: int = 10,
    folder: Optional[str] = None,
    prompt: Optional[str] = None,
    on_node_complete: Optional[Callable] = None,
    logger: Optional[Callable] = None,
    name: Optional[str] = None
) -> Dict[str, Dict[str, bool]]:

    """Run commands and verify expected output on a set of nodes."""
    try:
        matched_names = self.config._getallnodes(nodes_filter)
        if not matched_names:
            raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
        
        node_data = self.config.getitems(matched_names, extract=True)
        executor = Nodes(node_data, config=self.config)
        self.last_executor = executor
        
        results = executor.test(
            commands=commands,
            expected=expected,
            vars=variables,
            parallel=parallel,
            timeout=timeout,
            folder=folder,
            prompt=prompt,
            on_complete=on_node_complete,
            logger=logger
        )
        return results
    except Exception as e:
        raise ConnpyError(f"Testing failed: {e}")

Run commands and verify expected output on a set of nodes.

Inherited members