Module connpy.services

Sub-modules

connpy.services.ai_service
connpy.services.base
connpy.services.config_service
connpy.services.context_service
connpy.services.exceptions
connpy.services.execution_service
connpy.services.import_export_service
connpy.services.node_service
connpy.services.plugin_service
connpy.services.profile_service
connpy.services.provider
connpy.services.sync_service
connpy.services.system_service

Classes

class AIService (config=None)
Expand source code
class AIService(BaseService):
    """Business logic for interacting with AI agents and LLM configurations."""

    def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
        """Send a prompt to the AI agent."""
        from connpy.ai import ai
        agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
        return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)


    def confirm(self, input_text, console=None):
        """Ask for a safe confirmation of an action."""
        from connpy.ai import ai
        agent = ai(self.config, console=console)
        return agent.confirm(input_text)


    def list_sessions(self):
        """Return a list of all saved AI sessions."""
        from connpy.ai import ai
        agent = ai(self.config)
        return agent._get_sessions()

    def delete_session(self, session_id):
        """Delete an AI session by ID."""
        import os
        sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
        path = os.path.join(sessions_dir, f"{session_id}.json")
        if os.path.exists(path):
            os.remove(path)
        else:
            raise InvalidConfigurationError(f"Session '{session_id}' not found.")

    def configure_provider(self, provider, model=None, api_key=None):
        """Update AI provider settings in the configuration."""
        settings = self.config.config.get("ai", {})
        if model:
            settings[f"{provider}_model"] = model
        if api_key:
            settings[f"{provider}_api_key"] = api_key
            
        self.config.config["ai"] = settings
        self.config._saveconfig(self.config.file)

    def load_session_data(self, session_id):
        """Load a session's raw data by ID."""
        from connpy.ai import ai
        agent = ai(self.config)
        return agent.load_session_data(session_id)

Business logic for interacting with AI agents and LLM configurations.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def ask(self,
input_text,
dryrun=False,
chat_history=None,
status=None,
debug=False,
session_id=None,
console=None,
chunk_callback=None,
confirm_handler=None,
trust=False,
**overrides)
Expand source code
def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
    """Send a prompt to the AI agent."""
    from connpy.ai import ai
    agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
    return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)

Send a prompt to the AI agent.

def configure_provider(self, provider, model=None, api_key=None)
Expand source code
def configure_provider(self, provider, model=None, api_key=None):
    """Update AI provider settings in the configuration."""
    settings = self.config.config.get("ai", {})
    if model:
        settings[f"{provider}_model"] = model
    if api_key:
        settings[f"{provider}_api_key"] = api_key
        
    self.config.config["ai"] = settings
    self.config._saveconfig(self.config.file)

Update AI provider settings in the configuration.

def confirm(self, input_text, console=None)
Expand source code
def confirm(self, input_text, console=None):
    """Ask for a safe confirmation of an action."""
    from connpy.ai import ai
    agent = ai(self.config, console=console)
    return agent.confirm(input_text)

Ask for a safe confirmation of an action.

def delete_session(self, session_id)
Expand source code
def delete_session(self, session_id):
    """Delete an AI session by ID."""
    import os
    sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
    path = os.path.join(sessions_dir, f"{session_id}.json")
    if os.path.exists(path):
        os.remove(path)
    else:
        raise InvalidConfigurationError(f"Session '{session_id}' not found.")

Delete an AI session by ID.

def list_sessions(self)
Expand source code
def list_sessions(self):
    """Return a list of all saved AI sessions."""
    from connpy.ai import ai
    agent = ai(self.config)
    return agent._get_sessions()

Return a list of all saved AI sessions.

def load_session_data(self, session_id)
Expand source code
def load_session_data(self, session_id):
    """Load a session's raw data by ID."""
    from connpy.ai import ai
    agent = ai(self.config)
    return agent.load_session_data(session_id)

Load a session's raw data by ID.

Inherited members

class ConfigService (config=None)
Expand source code
class ConfigService(BaseService):
    """Business logic for general application settings and state configuration."""

    def get_settings(self) -> Dict[str, Any]:
        """Get the global configuration settings block."""
        settings = self.config.config.copy()
        settings["configfolder"] = self.config.defaultdir
        return settings

    def get_default_dir(self) -> str:
        """Get the default configuration directory."""
        return self.config.defaultdir

    def set_config_folder(self, folder_path: str):
        """Set the default location for config file by writing to ~/.config/conn/.folder"""
        if not os.path.isdir(folder_path):
            raise ConnpyError(f"readable_dir:{folder_path} is not a valid path")
            
        pathfile = os.path.join(self.config.anchor_path, ".folder")
        folder = os.path.abspath(folder_path).rstrip('/')
        
        try:
            with open(pathfile, "w") as f:
                f.write(str(folder))
        except Exception as e:
            raise ConnpyError(f"Failed to save config folder: {e}")

    def update_setting(self, key, value):
        """Update a setting in the configuration file."""
        self.config.config[key] = value
        self.config._saveconfig(self.config.file)

    def encrypt_password(self, password):
        """Encrypt a password using the application's configuration encryption key."""
        return self.config.encrypt(password)
        
    def apply_theme_from_file(self, theme_input):
        """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration."""
        import yaml
        from ..printer import STYLES, LIGHT_THEME
        
        if theme_input == "dark":
            valid_styles = {}
            self.update_setting("theme", valid_styles)
            return valid_styles
        elif theme_input == "light":
            valid_styles = LIGHT_THEME.copy()
            self.update_setting("theme", valid_styles)
            return valid_styles
            
        if not os.path.exists(theme_input):
            raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.")
            
        try:
            with open(theme_input, 'r') as f:
                user_styles = yaml.safe_load(f)
        except Exception as e:
            raise InvalidConfigurationError(f"Failed to parse theme file: {e}")
            
        if not isinstance(user_styles, dict):
            raise InvalidConfigurationError("Theme file must be a YAML dictionary.")
            
        # Filter for valid styles only (prevent junk in config)
        valid_styles = {k: v for k, v in user_styles.items() if k in STYLES}
        
        if not valid_styles:
            raise InvalidConfigurationError("No valid style keys found in theme file.")
            
        # Persist and return merged styles
        self.update_setting("theme", valid_styles)
        return valid_styles

Business logic for general application settings and state configuration.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def apply_theme_from_file(self, theme_input)
Expand source code
def apply_theme_from_file(self, theme_input):
    """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration."""
    import yaml
    from ..printer import STYLES, LIGHT_THEME
    
    if theme_input == "dark":
        valid_styles = {}
        self.update_setting("theme", valid_styles)
        return valid_styles
    elif theme_input == "light":
        valid_styles = LIGHT_THEME.copy()
        self.update_setting("theme", valid_styles)
        return valid_styles
        
    if not os.path.exists(theme_input):
        raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.")
        
    try:
        with open(theme_input, 'r') as f:
            user_styles = yaml.safe_load(f)
    except Exception as e:
        raise InvalidConfigurationError(f"Failed to parse theme file: {e}")
        
    if not isinstance(user_styles, dict):
        raise InvalidConfigurationError("Theme file must be a YAML dictionary.")
        
    # Filter for valid styles only (prevent junk in config)
    valid_styles = {k: v for k, v in user_styles.items() if k in STYLES}
    
    if not valid_styles:
        raise InvalidConfigurationError("No valid style keys found in theme file.")
        
    # Persist and return merged styles
    self.update_setting("theme", valid_styles)
    return valid_styles

Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.

def encrypt_password(self, password)
Expand source code
def encrypt_password(self, password):
    """Encrypt a password using the application's configuration encryption key."""
    return self.config.encrypt(password)

Encrypt a password using the application's configuration encryption key.

def get_default_dir(self) ‑> str
Expand source code
def get_default_dir(self) -> str:
    """Get the default configuration directory."""
    return self.config.defaultdir

Get the default configuration directory.

def get_settings(self) ‑> Dict[str, Any]
Expand source code
def get_settings(self) -> Dict[str, Any]:
    """Get the global configuration settings block."""
    settings = self.config.config.copy()
    settings["configfolder"] = self.config.defaultdir
    return settings

Get the global configuration settings block.

def set_config_folder(self, folder_path: str)
Expand source code
def set_config_folder(self, folder_path: str):
    """Set the default location for config file by writing to ~/.config/conn/.folder"""
    if not os.path.isdir(folder_path):
        raise ConnpyError(f"readable_dir:{folder_path} is not a valid path")
        
    pathfile = os.path.join(self.config.anchor_path, ".folder")
    folder = os.path.abspath(folder_path).rstrip('/')
    
    try:
        with open(pathfile, "w") as f:
            f.write(str(folder))
    except Exception as e:
        raise ConnpyError(f"Failed to save config folder: {e}")

Set the default location for config file by writing to ~/.config/conn/.folder

def update_setting(self, key, value)
Expand source code
def update_setting(self, key, value):
    """Update a setting in the configuration file."""
    self.config.config[key] = value
    self.config._saveconfig(self.config.file)

Update a setting in the configuration file.

Inherited members

class ConnpyError (*args, **kwargs)
Expand source code
class ConnpyError(Exception):
    """Base exception for all connpy services."""
    pass

Base exception for all connpy services.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class ExecutionError (*args, **kwargs)
Expand source code
class ExecutionError(ConnpyError):
    """Raised when an execution fails or returns error."""
    pass

Raised when an execution fails or returns error.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class ExecutionService (config=None)
Expand source code
class ExecutionService(BaseService):
    """Business logic for executing commands on nodes and running automation scripts."""

    def run_commands(
        self, 
        nodes_filter: str, 
        commands: List[str], 
        variables: Optional[Dict[str, Any]] = None,
        parallel: int = 10,
        timeout: int = 10,
        folder: Optional[str] = None,
        prompt: Optional[str] = None,
        on_node_complete: Optional[Callable] = None,
        logger: Optional[Callable] = None,
        name: Optional[str] = None
    ) -> Dict[str, str]:

        """Execute commands on a set of nodes."""
        try:
            matched_names = self.config._getallnodes(nodes_filter)
            if not matched_names:
                raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
            
            node_data = self.config.getitems(matched_names, extract=True)
            executor = Nodes(node_data, config=self.config)
            self.last_executor = executor
            
            results = executor.run(
                commands=commands,
                vars=variables,
                parallel=parallel,
                timeout=timeout,
                folder=folder,
                prompt=prompt,
                on_complete=on_node_complete,
                logger=logger
            )

            # Combine output and status for the caller
            full_results = {}
            for unique in results:
                full_results[unique] = {
                    "output": results[unique],
                    "status": executor.status.get(unique, 1)
                }

            return full_results
        except Exception as e:
            raise ConnpyError(f"Execution failed: {e}")

    def test_commands(
        self,
        nodes_filter: str,
        commands: List[str],
        expected: List[str],
        variables: Optional[Dict[str, Any]] = None,
        parallel: int = 10,
        timeout: int = 10,
        folder: Optional[str] = None,
        prompt: Optional[str] = None,
        on_node_complete: Optional[Callable] = None,
        logger: Optional[Callable] = None,
        name: Optional[str] = None
    ) -> Dict[str, Dict[str, bool]]:

        """Run commands and verify expected output on a set of nodes."""
        try:
            matched_names = self.config._getallnodes(nodes_filter)
            if not matched_names:
                raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
            
            node_data = self.config.getitems(matched_names, extract=True)
            executor = Nodes(node_data, config=self.config)
            self.last_executor = executor
            
            results = executor.test(
                commands=commands,
                expected=expected,
                vars=variables,
                parallel=parallel,
                timeout=timeout,
                folder=folder,
                prompt=prompt,
                on_complete=on_node_complete,
                logger=logger
            )
            return results
        except Exception as e:
            raise ConnpyError(f"Testing failed: {e}")

    def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]:
        """Run a plain-text script containing one command per line."""
        if not os.path.exists(script_path):
            raise ConnpyError(f"Script file not found: {script_path}")
            
        try:
            with open(script_path, "r") as f:
                commands = [line.strip() for line in f if line.strip()]
        except Exception as e:
            raise ConnpyError(f"Failed to read script {script_path}: {e}")
            
        return self.run_commands(nodes_filter, commands, parallel=parallel)

    def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) -> Dict[str, Any]:
        """Run a structured Connpy YAML automation playbook (from path or content)."""
        playbook = None
        if playbook_data.startswith("---YAML---\n"):
            try:
                content = playbook_data[len("---YAML---\n"):]
                playbook = yaml.load(content, Loader=yaml.FullLoader)
            except Exception as e:
                raise ConnpyError(f"Failed to parse YAML content: {e}")
        else:
            if not os.path.exists(playbook_data):
                raise ConnpyError(f"Playbook file not found: {playbook_data}")
            try:
                with open(playbook_data, "r") as f:
                    playbook = yaml.load(f, Loader=yaml.FullLoader)
            except Exception as e:
                raise ConnpyError(f"Failed to load playbook {playbook_data}: {e}")
            
        # Basic validation
        if not isinstance(playbook, dict) or "nodes" not in playbook or "commands" not in playbook:
            raise ConnpyError("Invalid playbook format: missing 'nodes' or 'commands' keys.")
            
        action = playbook.get("action", "run")
        options = playbook.get("options", {})
        
        # Extract all fields similar to RunHandler.cli_run
        exec_args = {
            "nodes_filter": playbook["nodes"],
            "commands": playbook["commands"],
            "variables": playbook.get("variables"),
            "parallel": options.get("parallel", parallel),
            "timeout": playbook.get("timeout", options.get("timeout", 10)),
            "prompt": options.get("prompt"),
            "name": playbook.get("name", "Task")
        }

        # Map 'output' field to folder path if it's not stdout/null
        output_cfg = playbook.get("output")
        if output_cfg not in [None, "stdout"]:
            exec_args["folder"] = output_cfg

        if action == "run":
            return self.run_commands(**exec_args)
        elif action == "test":
            exec_args["expected"] = playbook.get("expected", [])
            return self.test_commands(**exec_args)
        else:
            raise ConnpyError(f"Unsupported playbook action: {action}")

Business logic for executing commands on nodes and running automation scripts.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) ‑> Dict[str, str]
Expand source code
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]:
    """Run a plain-text script containing one command per line."""
    if not os.path.exists(script_path):
        raise ConnpyError(f"Script file not found: {script_path}")
        
    try:
        with open(script_path, "r") as f:
            commands = [line.strip() for line in f if line.strip()]
    except Exception as e:
        raise ConnpyError(f"Failed to read script {script_path}: {e}")
        
    return self.run_commands(nodes_filter, commands, parallel=parallel)

Run a plain-text script containing one command per line.

def run_commands(self,
nodes_filter: str,
commands: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 10,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, str]
Expand source code
def run_commands(
    self, 
    nodes_filter: str, 
    commands: List[str], 
    variables: Optional[Dict[str, Any]] = None,
    parallel: int = 10,
    timeout: int = 10,
    folder: Optional[str] = None,
    prompt: Optional[str] = None,
    on_node_complete: Optional[Callable] = None,
    logger: Optional[Callable] = None,
    name: Optional[str] = None
) -> Dict[str, str]:

    """Execute commands on a set of nodes."""
    try:
        matched_names = self.config._getallnodes(nodes_filter)
        if not matched_names:
            raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
        
        node_data = self.config.getitems(matched_names, extract=True)
        executor = Nodes(node_data, config=self.config)
        self.last_executor = executor
        
        results = executor.run(
            commands=commands,
            vars=variables,
            parallel=parallel,
            timeout=timeout,
            folder=folder,
            prompt=prompt,
            on_complete=on_node_complete,
            logger=logger
        )

        # Combine output and status for the caller
        full_results = {}
        for unique in results:
            full_results[unique] = {
                "output": results[unique],
                "status": executor.status.get(unique, 1)
            }

        return full_results
    except Exception as e:
        raise ConnpyError(f"Execution failed: {e}")

Execute commands on a set of nodes.

def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) ‑> Dict[str, Any]
Expand source code
def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) -> Dict[str, Any]:
    """Run a structured Connpy YAML automation playbook (from path or content)."""
    playbook = None
    if playbook_data.startswith("---YAML---\n"):
        try:
            content = playbook_data[len("---YAML---\n"):]
            playbook = yaml.load(content, Loader=yaml.FullLoader)
        except Exception as e:
            raise ConnpyError(f"Failed to parse YAML content: {e}")
    else:
        if not os.path.exists(playbook_data):
            raise ConnpyError(f"Playbook file not found: {playbook_data}")
        try:
            with open(playbook_data, "r") as f:
                playbook = yaml.load(f, Loader=yaml.FullLoader)
        except Exception as e:
            raise ConnpyError(f"Failed to load playbook {playbook_data}: {e}")
        
    # Basic validation
    if not isinstance(playbook, dict) or "nodes" not in playbook or "commands" not in playbook:
        raise ConnpyError("Invalid playbook format: missing 'nodes' or 'commands' keys.")
        
    action = playbook.get("action", "run")
    options = playbook.get("options", {})
    
    # Extract all fields similar to RunHandler.cli_run
    exec_args = {
        "nodes_filter": playbook["nodes"],
        "commands": playbook["commands"],
        "variables": playbook.get("variables"),
        "parallel": options.get("parallel", parallel),
        "timeout": playbook.get("timeout", options.get("timeout", 10)),
        "prompt": options.get("prompt"),
        "name": playbook.get("name", "Task")
    }

    # Map 'output' field to folder path if it's not stdout/null
    output_cfg = playbook.get("output")
    if output_cfg not in [None, "stdout"]:
        exec_args["folder"] = output_cfg

    if action == "run":
        return self.run_commands(**exec_args)
    elif action == "test":
        exec_args["expected"] = playbook.get("expected", [])
        return self.test_commands(**exec_args)
    else:
        raise ConnpyError(f"Unsupported playbook action: {action}")

Run a structured Connpy YAML automation playbook (from path or content).

def test_commands(self,
nodes_filter: str,
commands: List[str],
expected: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 10,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, Dict[str, bool]]
Expand source code
def test_commands(
    self,
    nodes_filter: str,
    commands: List[str],
    expected: List[str],
    variables: Optional[Dict[str, Any]] = None,
    parallel: int = 10,
    timeout: int = 10,
    folder: Optional[str] = None,
    prompt: Optional[str] = None,
    on_node_complete: Optional[Callable] = None,
    logger: Optional[Callable] = None,
    name: Optional[str] = None
) -> Dict[str, Dict[str, bool]]:

    """Run commands and verify expected output on a set of nodes."""
    try:
        matched_names = self.config._getallnodes(nodes_filter)
        if not matched_names:
            raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
        
        node_data = self.config.getitems(matched_names, extract=True)
        executor = Nodes(node_data, config=self.config)
        self.last_executor = executor
        
        results = executor.test(
            commands=commands,
            expected=expected,
            vars=variables,
            parallel=parallel,
            timeout=timeout,
            folder=folder,
            prompt=prompt,
            on_complete=on_node_complete,
            logger=logger
        )
        return results
    except Exception as e:
        raise ConnpyError(f"Testing failed: {e}")

Run commands and verify expected output on a set of nodes.

Inherited members

class ImportExportService (config=None)
Expand source code
class ImportExportService(BaseService):
    """Business logic for YAML/JSON inventory import and export."""

    def export_to_file(self, file_path, folders=None):
        """Export nodes/folders to a YAML file."""
        if os.path.exists(file_path):
            raise InvalidConfigurationError(f"File '{file_path}' already exists.")
            
        data = self.export_to_dict(folders)
        try:
            with open(file_path, "w") as f:
                yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False)
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}")

    def export_to_dict(self, folders=None):
        """Export nodes/folders to a dictionary."""
        if not folders:
            return deepcopy(self.config.connections)
        else:
            # Validate folders exist
            for f in folders:
                if f != "@" and f not in self.config._getallfolders():
                    raise NodeNotFoundError(f"Folder '{f}' not found.")
            
            flat = self.config._getallnodesfull(folders, extract=False)
            nested = {}
            for k, v in flat.items():
                uniques = self.config._explode_unique(k)
                if not uniques:
                    continue
                
                if "folder" in uniques and "subfolder" in uniques:
                    f_name = uniques["folder"]
                    s_name = uniques["subfolder"]
                    i_name = uniques["id"]
                    
                    if f_name not in nested:
                        nested[f_name] = {"type": "folder"}
                    if s_name not in nested[f_name]:
                        nested[f_name][s_name] = {"type": "subfolder"}
                        
                    nested[f_name][s_name][i_name] = v
                    
                elif "folder" in uniques:
                    f_name = uniques["folder"]
                    i_name = uniques["id"]
                    
                    if f_name not in nested:
                        nested[f_name] = {"type": "folder"}
                        
                    nested[f_name][i_name] = v
                else:
                    i_name = uniques["id"]
                    nested[i_name] = v
                    
            return nested

    def import_from_file(self, file_path):
        """Import nodes/folders from a YAML file."""
        if not os.path.exists(file_path):
            raise InvalidConfigurationError(f"File '{file_path}' does not exist.")
            
        try:
            with open(file_path, "r") as f:
                data = yaml.load(f, Loader=yaml.FullLoader)
            self.import_from_dict(data)
        except Exception as e:
            raise InvalidConfigurationError(f"Failed to read/parse import file: {e}")

    def import_from_dict(self, data):
        """Import nodes/folders from a dictionary."""
        if not isinstance(data, dict):
            raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")

        def _traverse_import(node_data, current_folder='', current_subfolder=''):
            for k, v in node_data.items():
                if k == "type":
                    continue
                if isinstance(v, dict):
                    node_type = v.get("type", "connection")
                    if node_type == "folder":
                        self.config._folder_add(folder=k)
                        _traverse_import(v, current_folder=k, current_subfolder='')
                    elif node_type == "subfolder":
                        self.config._folder_add(folder=current_folder, subfolder=k)
                        _traverse_import(v, current_folder=current_folder, current_subfolder=k)
                    elif node_type == "connection":
                        unique_id = k
                        if current_subfolder:
                            unique_id = f"{k}@{current_subfolder}@{current_folder}"
                        elif current_folder:
                            unique_id = f"{k}@{current_folder}"
                        self._validate_node_name(unique_id)
                        
                        kwargs = deepcopy(v)
                        kwargs['id'] = k
                        kwargs['folder'] = current_folder
                        kwargs['subfolder'] = current_subfolder
                        
                        self.config._connections_add(**kwargs)
                else:
                    # Invalid format skip
                    pass

        _traverse_import(data)
        self.config._saveconfig(self.config.file)

Business logic for YAML/JSON inventory import and export.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def export_to_dict(self, folders=None)
Expand source code
def export_to_dict(self, folders=None):
    """Export nodes/folders to a dictionary."""
    if not folders:
        return deepcopy(self.config.connections)
    else:
        # Validate folders exist
        for f in folders:
            if f != "@" and f not in self.config._getallfolders():
                raise NodeNotFoundError(f"Folder '{f}' not found.")
        
        flat = self.config._getallnodesfull(folders, extract=False)
        nested = {}
        for k, v in flat.items():
            uniques = self.config._explode_unique(k)
            if not uniques:
                continue
            
            if "folder" in uniques and "subfolder" in uniques:
                f_name = uniques["folder"]
                s_name = uniques["subfolder"]
                i_name = uniques["id"]
                
                if f_name not in nested:
                    nested[f_name] = {"type": "folder"}
                if s_name not in nested[f_name]:
                    nested[f_name][s_name] = {"type": "subfolder"}
                    
                nested[f_name][s_name][i_name] = v
                
            elif "folder" in uniques:
                f_name = uniques["folder"]
                i_name = uniques["id"]
                
                if f_name not in nested:
                    nested[f_name] = {"type": "folder"}
                    
                nested[f_name][i_name] = v
            else:
                i_name = uniques["id"]
                nested[i_name] = v
                
        return nested

Export nodes/folders to a dictionary.

def export_to_file(self, file_path, folders=None)
Expand source code
def export_to_file(self, file_path, folders=None):
    """Export nodes/folders to a YAML file."""
    if os.path.exists(file_path):
        raise InvalidConfigurationError(f"File '{file_path}' already exists.")
        
    data = self.export_to_dict(folders)
    try:
        with open(file_path, "w") as f:
            yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False)
    except OSError as e:
        raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}")

Export nodes/folders to a YAML file.

def import_from_dict(self, data)
Expand source code
def import_from_dict(self, data):
    """Import nodes/folders from a dictionary."""
    if not isinstance(data, dict):
        raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")

    def _traverse_import(node_data, current_folder='', current_subfolder=''):
        for k, v in node_data.items():
            if k == "type":
                continue
            if isinstance(v, dict):
                node_type = v.get("type", "connection")
                if node_type == "folder":
                    self.config._folder_add(folder=k)
                    _traverse_import(v, current_folder=k, current_subfolder='')
                elif node_type == "subfolder":
                    self.config._folder_add(folder=current_folder, subfolder=k)
                    _traverse_import(v, current_folder=current_folder, current_subfolder=k)
                elif node_type == "connection":
                    unique_id = k
                    if current_subfolder:
                        unique_id = f"{k}@{current_subfolder}@{current_folder}"
                    elif current_folder:
                        unique_id = f"{k}@{current_folder}"
                    self._validate_node_name(unique_id)
                    
                    kwargs = deepcopy(v)
                    kwargs['id'] = k
                    kwargs['folder'] = current_folder
                    kwargs['subfolder'] = current_subfolder
                    
                    self.config._connections_add(**kwargs)
            else:
                # Invalid format skip
                pass

    _traverse_import(data)
    self.config._saveconfig(self.config.file)

Import nodes/folders from a dictionary.

def import_from_file(self, file_path)
Expand source code
def import_from_file(self, file_path):
    """Import nodes/folders from a YAML file."""
    if not os.path.exists(file_path):
        raise InvalidConfigurationError(f"File '{file_path}' does not exist.")
        
    try:
        with open(file_path, "r") as f:
            data = yaml.load(f, Loader=yaml.FullLoader)
        self.import_from_dict(data)
    except Exception as e:
        raise InvalidConfigurationError(f"Failed to read/parse import file: {e}")

Import nodes/folders from a YAML file.

Inherited members

class InvalidConfigurationError (*args, **kwargs)
Expand source code
class InvalidConfigurationError(ConnpyError):
    """Raised when data or configuration input is invalid."""
    pass

Raised when data or configuration input is invalid.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class NodeAlreadyExistsError (*args, **kwargs)
Expand source code
class NodeAlreadyExistsError(ConnpyError):
    """Raised when a node or folder already exists."""
    pass

Raised when a node or folder already exists.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class NodeNotFoundError (*args, **kwargs)
Expand source code
class NodeNotFoundError(ConnpyError):
    """Raised when a connection or folder is not found."""
    pass

Raised when a connection or folder is not found.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class NodeService (config=None)
Expand source code
class NodeService(BaseService):
    def __init__(self, config=None):
        super().__init__(config)


    def list_nodes(self, filter_str=None, format_str=None):
        """Return a listed filtered by regex match and formatted if needed."""
        nodes = self.config._getallnodes()
        case_sensitive = self.config.config.get("case", False)
        
        if filter_str:
            flags = re.IGNORECASE if not case_sensitive else 0
            nodes = [n for n in nodes if re.search(filter_str, n, flags)]
            
        if not format_str:
            return nodes
            
        from .profile_service import ProfileService
        profile_service = ProfileService(self.config)
        
        formatted_nodes = []
        for n_id in nodes:
            # Use ProfileService to resolve profiles for dynamic formatting
            details = self.config.getitem(n_id, extract=False)
            if details:
                details = profile_service.resolve_node_data(details)
                
                name = n_id.split("@")[0]
                location = n_id.partition("@")[2] or "root"
                
                # Prepare context for .format() with all details
                context = details.copy()
                context.update({
                    "name": name,
                    "NAME": name.upper(),
                    "location": location,
                    "LOCATION": location.upper(),
                })
                
                # Add exploded uniques (id, folder, subfolder)
                uniques = self.config._explode_unique(n_id)
                if uniques:
                    context.update(uniques)
                
                # Add uppercase versions of all keys for convenience
                for k, v in list(context.items()):
                    if isinstance(v, str):
                        context[k.upper()] = v.upper()
                
                try:
                    formatted_nodes.append(format_str.format(**context))
                except (KeyError, IndexError, ValueError):
                    # Fallback to original string if format fails
                    formatted_nodes.append(n_id)
        return formatted_nodes

    def list_folders(self, filter_str=None):
        """Return all unique folders, optionally filtered by regex."""
        folders = self.config._getallfolders()
        case_sensitive = self.config.config.get("case", False)
        
        if filter_str:
            if filter_str.startswith("@"):
                if not case_sensitive:
                    folders = [f for f in folders if f.lower() == filter_str.lower()]
                else:
                    folders = [f for f in folders if f == filter_str]
            else:
                flags = re.IGNORECASE if not case_sensitive else 0
                folders = [f for f in folders if re.search(filter_str, f, flags)]
        return folders

    def get_node_details(self, unique_id):
        """Return full configuration dictionary for a specific node."""
        try:
            details = self.config.getitem(unique_id)
            if not details:
                raise NodeNotFoundError(f"Node '{unique_id}' not found.")
            return details
        except (KeyError, TypeError):
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")

    def explode_unique(self, unique_id):
        """Explode a unique ID into a dictionary of its parts."""
        return self.config._explode_unique(unique_id)

    def generate_cache(self, nodes=None, folders=None, profiles=None):
        """Generate and update the internal nodes cache."""
        self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)

    def validate_parent_folder(self, unique_id, is_folder=False):
        """Check if parent folder exists for a given node unique ID."""
        if is_folder:
            uniques = self.config._explode_unique(unique_id)
            if uniques and "subfolder" in uniques and "folder" in uniques:
                parent_folder = f"@{uniques['folder']}"
                if parent_folder not in self.config._getallfolders():
                    raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
        else:
            node_folder = unique_id.partition("@")[2]
            if node_folder:
                parent_folder = f"@{node_folder}"
                if parent_folder not in self.config._getallfolders():
                    raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")


    def add_node(self, unique_id, data, is_folder=False):
        """Logic for adding a new node or folder to configuration."""
        if not is_folder:
            self._validate_node_name(unique_id)
            
        all_nodes = self.config._getallnodes()
        all_folders = self.config._getallfolders()
        
        if is_folder:
            if unique_id in all_folders:
                raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.")
            uniques = self.config._explode_unique(unique_id)
            if not uniques:
                raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.")
            
            # Check if parent folder exists when creating a subfolder
            if "subfolder" in uniques:
                self.validate_parent_folder(unique_id, is_folder=True)
                    
            self.config._folder_add(**uniques)
            self.config._saveconfig(self.config.file)
        else:
            if unique_id in all_nodes:
                raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.")
                
            # Check if parent folder exists when creating a node in a folder
            self.validate_parent_folder(unique_id)
                    
            # Ensure 'id' is in data for config._connections_add
            if "id" not in data:
                uniques = self.config._explode_unique(unique_id)
                if uniques and "id" in uniques:
                    data["id"] = uniques["id"]
            
            self.config._connections_add(**data)
            self.config._saveconfig(self.config.file)

    def update_node(self, unique_id, data):
        """Explicitly update an existing node."""
        all_nodes = self.config._getallnodes()
        if unique_id not in all_nodes:
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")
            
        # Ensure 'id' is in data for config._connections_add
        if "id" not in data:
            uniques = self.config._explode_unique(unique_id)
            if uniques:
                data["id"] = uniques["id"]
            
        # config._connections_add actually handles updates if ID exists correctly
        self.config._connections_add(**data)
        self.config._saveconfig(self.config.file)

    def delete_node(self, unique_id, is_folder=False):
        """Logic for deleting a node or folder."""
        if is_folder:
            uniques = self.config._explode_unique(unique_id)
            if not uniques:
                raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.")
            self.config._folder_del(**uniques)
        else:
            uniques = self.config._explode_unique(unique_id)
            if not uniques:
                raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.")
            self.config._connections_del(**uniques)
            
        self.config._saveconfig(self.config.file)

    def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
        """Interact with a node directly."""
        from connpy.core import node
        from .profile_service import ProfileService
        
        node_data = self.config.getitem(unique_id, extract=False)
        if not node_data:
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")
            
        # Resolve profiles
        profile_service = ProfileService(self.config)
        resolved_data = profile_service.resolve_node_data(node_data)
            
        n = node(unique_id, **resolved_data, config=self.config)
        if sftp:
            n.protocol = "sftp"

        n.interact(debug=debug, logger=logger)

    def move_node(self, src_id, dst_id, copy=False):
        """Move or copy a node."""
        self._validate_node_name(dst_id)
        
        node_data = self.config.getitem(src_id)
        if not node_data:
            raise NodeNotFoundError(f"Source node '{src_id}' not found.")
            
        if dst_id in self.config._getallnodes():
            raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.")
            
        new_uniques = self.config._explode_unique(dst_id)
        if not new_uniques:
            raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.")
            
        new_node_data = node_data.copy()
        new_node_data.update(new_uniques)
        
        self.config._connections_add(**new_node_data)
        
        if not copy:
            src_uniques = self.config._explode_unique(src_id)
            self.config._connections_del(**src_uniques)
            
        self.config._saveconfig(self.config.file)

    def bulk_add(self, ids, hosts, common_data):
        """Add multiple nodes with shared common configuration."""
        count = 0
        all_nodes = self.config._getallnodes()
        
        for i, uid in enumerate(ids):
            if uid in all_nodes:
                continue
                
            try:
                self._validate_node_name(uid)
            except ReservedNameError:
                # For bulk, we might want to just skip or log. 
                # CLI caller will handle if it wants to be strict.
                continue
                
            host = hosts[i] if i < len(hosts) else hosts[0]
            uniques = self.config._explode_unique(uid)
            if not uniques:
                continue
                
            node_data = common_data.copy()
            node_data.pop("ids", None)
            node_data.pop("location", None)
            node_data.update(uniques)
            node_data["host"] = host
            node_data["type"] = "connection"

            self.config._connections_add(**node_data)
            count += 1
            
        if count > 0:
            self.config._saveconfig(self.config.file)
        return count

    def full_replace(self, connections, profiles):
        """Replace all connections and profiles with new data."""
        self.config.connections = connections
        self.config.profiles = profiles
        self.config._saveconfig(self.config.file)

    def get_inventory(self):
        """Return a full snapshot of connections and profiles."""
        return {
            "connections": self.config.connections,
            "profiles": self.config.profiles
        }

Base class for all connpy services, providing common configuration access.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def add_node(self, unique_id, data, is_folder=False)
Expand source code
def add_node(self, unique_id, data, is_folder=False):
    """Logic for adding a new node or folder to configuration."""
    if not is_folder:
        self._validate_node_name(unique_id)
        
    all_nodes = self.config._getallnodes()
    all_folders = self.config._getallfolders()
    
    if is_folder:
        if unique_id in all_folders:
            raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.")
        uniques = self.config._explode_unique(unique_id)
        if not uniques:
            raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.")
        
        # Check if parent folder exists when creating a subfolder
        if "subfolder" in uniques:
            self.validate_parent_folder(unique_id, is_folder=True)
                
        self.config._folder_add(**uniques)
        self.config._saveconfig(self.config.file)
    else:
        if unique_id in all_nodes:
            raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.")
            
        # Check if parent folder exists when creating a node in a folder
        self.validate_parent_folder(unique_id)
                
        # Ensure 'id' is in data for config._connections_add
        if "id" not in data:
            uniques = self.config._explode_unique(unique_id)
            if uniques and "id" in uniques:
                data["id"] = uniques["id"]
        
        self.config._connections_add(**data)
        self.config._saveconfig(self.config.file)

Logic for adding a new node or folder to configuration.

def bulk_add(self, ids, hosts, common_data)
Expand source code
def bulk_add(self, ids, hosts, common_data):
    """Add multiple nodes with shared common configuration."""
    count = 0
    all_nodes = self.config._getallnodes()
    
    for i, uid in enumerate(ids):
        if uid in all_nodes:
            continue
            
        try:
            self._validate_node_name(uid)
        except ReservedNameError:
            # For bulk, we might want to just skip or log. 
            # CLI caller will handle if it wants to be strict.
            continue
            
        host = hosts[i] if i < len(hosts) else hosts[0]
        uniques = self.config._explode_unique(uid)
        if not uniques:
            continue
            
        node_data = common_data.copy()
        node_data.pop("ids", None)
        node_data.pop("location", None)
        node_data.update(uniques)
        node_data["host"] = host
        node_data["type"] = "connection"

        self.config._connections_add(**node_data)
        count += 1
        
    if count > 0:
        self.config._saveconfig(self.config.file)
    return count

Add multiple nodes with shared common configuration.

def connect_node(self, unique_id, sftp=False, debug=False, logger=None)
Expand source code
def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
    """Interact with a node directly."""
    from connpy.core import node
    from .profile_service import ProfileService
    
    node_data = self.config.getitem(unique_id, extract=False)
    if not node_data:
        raise NodeNotFoundError(f"Node '{unique_id}' not found.")
        
    # Resolve profiles
    profile_service = ProfileService(self.config)
    resolved_data = profile_service.resolve_node_data(node_data)
        
    n = node(unique_id, **resolved_data, config=self.config)
    if sftp:
        n.protocol = "sftp"

    n.interact(debug=debug, logger=logger)

Interact with a node directly.

def delete_node(self, unique_id, is_folder=False)
Expand source code
def delete_node(self, unique_id, is_folder=False):
    """Logic for deleting a node or folder."""
    if is_folder:
        uniques = self.config._explode_unique(unique_id)
        if not uniques:
            raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.")
        self.config._folder_del(**uniques)
    else:
        uniques = self.config._explode_unique(unique_id)
        if not uniques:
            raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.")
        self.config._connections_del(**uniques)
        
    self.config._saveconfig(self.config.file)

Logic for deleting a node or folder.

def explode_unique(self, unique_id)
Expand source code
def explode_unique(self, unique_id):
    """Explode a unique ID into a dictionary of its parts."""
    return self.config._explode_unique(unique_id)

Explode a unique ID into a dictionary of its parts.

def full_replace(self, connections, profiles)
Expand source code
def full_replace(self, connections, profiles):
    """Replace all connections and profiles with new data."""
    self.config.connections = connections
    self.config.profiles = profiles
    self.config._saveconfig(self.config.file)

Replace all connections and profiles with new data.

def generate_cache(self, nodes=None, folders=None, profiles=None)
Expand source code
def generate_cache(self, nodes=None, folders=None, profiles=None):
    """Generate and update the internal nodes cache."""
    self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)

Generate and update the internal nodes cache.

def get_inventory(self)
Expand source code
def get_inventory(self):
    """Return a full snapshot of connections and profiles."""
    return {
        "connections": self.config.connections,
        "profiles": self.config.profiles
    }

Return a full snapshot of connections and profiles.

def get_node_details(self, unique_id)
Expand source code
def get_node_details(self, unique_id):
    """Return full configuration dictionary for a specific node."""
    try:
        details = self.config.getitem(unique_id)
        if not details:
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")
        return details
    except (KeyError, TypeError):
        raise NodeNotFoundError(f"Node '{unique_id}' not found.")

Return full configuration dictionary for a specific node.

def list_folders(self, filter_str=None)
Expand source code
def list_folders(self, filter_str=None):
    """Return all unique folders, optionally filtered by regex."""
    folders = self.config._getallfolders()
    case_sensitive = self.config.config.get("case", False)
    
    if filter_str:
        if filter_str.startswith("@"):
            if not case_sensitive:
                folders = [f for f in folders if f.lower() == filter_str.lower()]
            else:
                folders = [f for f in folders if f == filter_str]
        else:
            flags = re.IGNORECASE if not case_sensitive else 0
            folders = [f for f in folders if re.search(filter_str, f, flags)]
    return folders

Return all unique folders, optionally filtered by regex.

def list_nodes(self, filter_str=None, format_str=None)
Expand source code
def list_nodes(self, filter_str=None, format_str=None):
    """Return a listed filtered by regex match and formatted if needed."""
    nodes = self.config._getallnodes()
    case_sensitive = self.config.config.get("case", False)
    
    if filter_str:
        flags = re.IGNORECASE if not case_sensitive else 0
        nodes = [n for n in nodes if re.search(filter_str, n, flags)]
        
    if not format_str:
        return nodes
        
    from .profile_service import ProfileService
    profile_service = ProfileService(self.config)
    
    formatted_nodes = []
    for n_id in nodes:
        # Use ProfileService to resolve profiles for dynamic formatting
        details = self.config.getitem(n_id, extract=False)
        if details:
            details = profile_service.resolve_node_data(details)
            
            name = n_id.split("@")[0]
            location = n_id.partition("@")[2] or "root"
            
            # Prepare context for .format() with all details
            context = details.copy()
            context.update({
                "name": name,
                "NAME": name.upper(),
                "location": location,
                "LOCATION": location.upper(),
            })
            
            # Add exploded uniques (id, folder, subfolder)
            uniques = self.config._explode_unique(n_id)
            if uniques:
                context.update(uniques)
            
            # Add uppercase versions of all keys for convenience
            for k, v in list(context.items()):
                if isinstance(v, str):
                    context[k.upper()] = v.upper()
            
            try:
                formatted_nodes.append(format_str.format(**context))
            except (KeyError, IndexError, ValueError):
                # Fallback to original string if format fails
                formatted_nodes.append(n_id)
    return formatted_nodes

Return a listed filtered by regex match and formatted if needed.

def move_node(self, src_id, dst_id, copy=False)
Expand source code
def move_node(self, src_id, dst_id, copy=False):
    """Move or copy a node."""
    self._validate_node_name(dst_id)
    
    node_data = self.config.getitem(src_id)
    if not node_data:
        raise NodeNotFoundError(f"Source node '{src_id}' not found.")
        
    if dst_id in self.config._getallnodes():
        raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.")
        
    new_uniques = self.config._explode_unique(dst_id)
    if not new_uniques:
        raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.")
        
    new_node_data = node_data.copy()
    new_node_data.update(new_uniques)
    
    self.config._connections_add(**new_node_data)
    
    if not copy:
        src_uniques = self.config._explode_unique(src_id)
        self.config._connections_del(**src_uniques)
        
    self.config._saveconfig(self.config.file)

Move or copy a node.

def update_node(self, unique_id, data)
Expand source code
def update_node(self, unique_id, data):
    """Explicitly update an existing node."""
    all_nodes = self.config._getallnodes()
    if unique_id not in all_nodes:
        raise NodeNotFoundError(f"Node '{unique_id}' not found.")
        
    # Ensure 'id' is in data for config._connections_add
    if "id" not in data:
        uniques = self.config._explode_unique(unique_id)
        if uniques:
            data["id"] = uniques["id"]
        
    # config._connections_add actually handles updates if ID exists correctly
    self.config._connections_add(**data)
    self.config._saveconfig(self.config.file)

Explicitly update an existing node.

def validate_parent_folder(self, unique_id, is_folder=False)
Expand source code
def validate_parent_folder(self, unique_id, is_folder=False):
    """Check if parent folder exists for a given node unique ID."""
    if is_folder:
        uniques = self.config._explode_unique(unique_id)
        if uniques and "subfolder" in uniques and "folder" in uniques:
            parent_folder = f"@{uniques['folder']}"
            if parent_folder not in self.config._getallfolders():
                raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
    else:
        node_folder = unique_id.partition("@")[2]
        if node_folder:
            parent_folder = f"@{node_folder}"
            if parent_folder not in self.config._getallfolders():
                raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")

Check if parent folder exists for a given node unique ID.

Inherited members

class PluginService (config=None)
Expand source code
class PluginService(BaseService):
    """Business logic for enabling, disabling, and listing plugins."""

    def list_plugins(self):
        """List all core and user-defined plugins with their status and hash."""
        import os
        import hashlib
        
        # Check for user plugins directory
        plugin_dir = os.path.join(self.config.defaultdir, "plugins")
        # Check for core plugins directory
        core_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
        
        all_plugin_info = {}

        def get_hash(path):
            try:
                with open(path, "rb") as f:
                    return hashlib.md5(f.read()).hexdigest()
            except Exception:
                return ""

        # User plugins
        if os.path.exists(plugin_dir):
            for f in os.listdir(plugin_dir):
                if f.endswith(".py"):
                    name = f[:-3]
                    path = os.path.join(plugin_dir, f)
                    all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
                elif f.endswith(".py.bkp"):
                    name = f[:-7]
                    all_plugin_info[name] = {"enabled": False}

        return all_plugin_info

    def add_plugin(self, name, source_file, update=False):
        """Add or update a plugin from a local file."""
        import os
        import shutil
        from connpy.plugins import Plugins

        if not name.isalpha() or not name.islower() or len(name) > 15:
            raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

        p_manager = Plugins()
        # Check for bad script
        error = p_manager.verify_script(source_file)
        if error:
            raise InvalidConfigurationError(f"Invalid plugin script: {error}")

        self._save_plugin_file(name, source_file, update, is_path=True)

    def add_plugin_from_bytes(self, name, content, update=False):
        """Add or update a plugin from bytes (gRPC)."""
        import tempfile
        import os
        
        if not name.isalpha() or not name.islower() or len(name) > 15:
            raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

        # Write to temp file to verify script
        with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp:
            tmp.write(content)
            tmp_path = tmp.name

        try:
            from connpy.plugins import Plugins
            p_manager = Plugins()
            error = p_manager.verify_script(tmp_path)
            if error:
                raise InvalidConfigurationError(f"Invalid plugin script: {error}")
            
            self._save_plugin_file(name, tmp_path, update, is_path=True)
        finally:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)

    def _save_plugin_file(self, name, source, update=False, is_path=True):
        import os
        import shutil
        
        plugin_dir = os.path.join(self.config.defaultdir, "plugins")
        os.makedirs(plugin_dir, exist_ok=True)
        
        target_file = os.path.join(plugin_dir, f"{name}.py")
        backup_file = f"{target_file}.bkp"

        if not update and (os.path.exists(target_file) or os.path.exists(backup_file)):
            raise InvalidConfigurationError(f"Plugin '{name}' already exists.")

        try:
            if is_path:
                shutil.copy2(source, target_file)
            else:
                with open(target_file, "wb") as f:
                    f.write(source)
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to save plugin file: {e}")

    def delete_plugin(self, name):
        """Remove a plugin file permanently."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"

        deleted = False
        for f in [plugin_file, disabled_file]:
            if os.path.exists(f):
                try:
                    os.remove(f)
                    deleted = True
                except OSError as e:
                    raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}")
        
        if not deleted:
            raise InvalidConfigurationError(f"Plugin '{name}' not found.")

    def enable_plugin(self, name):
        """Activate a plugin by renaming its backup file."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"
        
        if os.path.exists(plugin_file):
            return False # Already enabled
            
        if not os.path.exists(disabled_file):
            raise InvalidConfigurationError(f"Plugin '{name}' not found.")
            
        try:
            os.rename(disabled_file, plugin_file)
            return True
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")

    def disable_plugin(self, name):
        """Deactivate a plugin by renaming it to a backup file."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"
        
        if os.path.exists(disabled_file):
            return False # Already disabled
            
        if not os.path.exists(plugin_file):
            raise InvalidConfigurationError(f"Plugin '{name}' not found or is a core plugin.")
            
        try:
            os.rename(plugin_file, disabled_file)
            return True
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")

    def get_plugin_source(self, name):
        import os
        from ..services.exceptions import InvalidConfigurationError
        
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
        
        if os.path.exists(plugin_file):
            target = plugin_file
        elif os.path.exists(core_path):
            target = core_path
        else:
            raise InvalidConfigurationError(f"Plugin '{name}' not found")
        
        with open(target, "r") as f:
            return f.read()

    def invoke_plugin(self, name, args_dict):
        import sys, io
        from argparse import Namespace
        from ..services.exceptions import InvalidConfigurationError
        from connpy.plugins import Plugins
        class MockApp:
            is_mock = True
            def __init__(self, config):
                from ..core import node, nodes
                from ..ai import ai
                from ..services.provider import ServiceProvider
                
                self.config = config
                self.node = node
                self.nodes = nodes
                self.ai = ai
                
                self.services = ServiceProvider(config, mode="local")
                
                # Get settings for CLI behavior
                settings = self.services.config_svc.get_settings()
                self.case = settings.get("case", False)
                self.fzf = settings.get("fzf", False)
                
                try:
                    self.nodes_list = self.services.nodes.list_nodes()
                    self.folders = self.services.nodes.list_folders()
                    self.profiles = self.services.profiles.list_profiles()
                except Exception:
                    self.nodes_list = []
                    self.folders = []
                    self.profiles = []
        
        args = Namespace(**args_dict)
        
        p_manager = Plugins()
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
        
        if os.path.exists(plugin_file):
            target = plugin_file
        elif os.path.exists(core_path):
            target = core_path
        else:
            raise InvalidConfigurationError(f"Plugin '{name}' not found")
            
        module = p_manager._import_from_path(target)
        parser = module.Parser().parser if hasattr(module, "Parser") else None
        
        if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]):
            args.func = getattr(module, args_dict["__func_name__"])
        
        app = MockApp(self.config)
        
        from .. import printer
        from rich.console import Console
        
        from rich.console import Console
        buf = io.StringIO()
        old_console = printer._get_console()
        old_err_console = printer._get_err_console()
        
        printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
        printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
        printer.set_thread_stream(buf)
        
        try:
            if hasattr(module, "Entrypoint"):
                module.Entrypoint(args, parser, app)
        except BaseException as e:
            if not isinstance(e, SystemExit):
                import traceback
                printer.err_console.print(traceback.format_exc())
        finally:
            printer.set_thread_console(old_console)
            printer.set_thread_err_console(old_err_console)
            printer.set_thread_stream(None)
            
        for line in buf.getvalue().splitlines(keepends=True):
            yield line

Business logic for enabling, disabling, and listing plugins.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def add_plugin(self, name, source_file, update=False)
Expand source code
def add_plugin(self, name, source_file, update=False):
    """Add or update a plugin from a local file."""
    import os
    import shutil
    from connpy.plugins import Plugins

    if not name.isalpha() or not name.islower() or len(name) > 15:
        raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

    p_manager = Plugins()
    # Check for bad script
    error = p_manager.verify_script(source_file)
    if error:
        raise InvalidConfigurationError(f"Invalid plugin script: {error}")

    self._save_plugin_file(name, source_file, update, is_path=True)

Add or update a plugin from a local file.

def add_plugin_from_bytes(self, name, content, update=False)
Expand source code
def add_plugin_from_bytes(self, name, content, update=False):
    """Add or update a plugin from bytes (gRPC)."""
    import tempfile
    import os
    
    if not name.isalpha() or not name.islower() or len(name) > 15:
        raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

    # Write to temp file to verify script
    with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp:
        tmp.write(content)
        tmp_path = tmp.name

    try:
        from connpy.plugins import Plugins
        p_manager = Plugins()
        error = p_manager.verify_script(tmp_path)
        if error:
            raise InvalidConfigurationError(f"Invalid plugin script: {error}")
        
        self._save_plugin_file(name, tmp_path, update, is_path=True)
    finally:
        if os.path.exists(tmp_path):
            os.remove(tmp_path)

Add or update a plugin from bytes (gRPC).

def delete_plugin(self, name)
Expand source code
def delete_plugin(self, name):
    """Remove a plugin file permanently."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"

    deleted = False
    for f in [plugin_file, disabled_file]:
        if os.path.exists(f):
            try:
                os.remove(f)
                deleted = True
            except OSError as e:
                raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}")
    
    if not deleted:
        raise InvalidConfigurationError(f"Plugin '{name}' not found.")

Remove a plugin file permanently.

def disable_plugin(self, name)
Expand source code
def disable_plugin(self, name):
    """Deactivate a plugin by renaming it to a backup file."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"
    
    if os.path.exists(disabled_file):
        return False # Already disabled
        
    if not os.path.exists(plugin_file):
        raise InvalidConfigurationError(f"Plugin '{name}' not found or is a core plugin.")
        
    try:
        os.rename(plugin_file, disabled_file)
        return True
    except OSError as e:
        raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")

Deactivate a plugin by renaming it to a backup file.

def enable_plugin(self, name)
Expand source code
def enable_plugin(self, name):
    """Activate a plugin by renaming its backup file."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"
    
    if os.path.exists(plugin_file):
        return False # Already enabled
        
    if not os.path.exists(disabled_file):
        raise InvalidConfigurationError(f"Plugin '{name}' not found.")
        
    try:
        os.rename(disabled_file, plugin_file)
        return True
    except OSError as e:
        raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")

Activate a plugin by renaming its backup file.

def get_plugin_source(self, name)
Expand source code
def get_plugin_source(self, name):
    import os
    from ..services.exceptions import InvalidConfigurationError
    
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
    
    if os.path.exists(plugin_file):
        target = plugin_file
    elif os.path.exists(core_path):
        target = core_path
    else:
        raise InvalidConfigurationError(f"Plugin '{name}' not found")
    
    with open(target, "r") as f:
        return f.read()
def invoke_plugin(self, name, args_dict)
Expand source code
def invoke_plugin(self, name, args_dict):
    import sys, io
    from argparse import Namespace
    from ..services.exceptions import InvalidConfigurationError
    from connpy.plugins import Plugins
    class MockApp:
        is_mock = True
        def __init__(self, config):
            from ..core import node, nodes
            from ..ai import ai
            from ..services.provider import ServiceProvider
            
            self.config = config
            self.node = node
            self.nodes = nodes
            self.ai = ai
            
            self.services = ServiceProvider(config, mode="local")
            
            # Get settings for CLI behavior
            settings = self.services.config_svc.get_settings()
            self.case = settings.get("case", False)
            self.fzf = settings.get("fzf", False)
            
            try:
                self.nodes_list = self.services.nodes.list_nodes()
                self.folders = self.services.nodes.list_folders()
                self.profiles = self.services.profiles.list_profiles()
            except Exception:
                self.nodes_list = []
                self.folders = []
                self.profiles = []
    
    args = Namespace(**args_dict)
    
    p_manager = Plugins()
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
    
    if os.path.exists(plugin_file):
        target = plugin_file
    elif os.path.exists(core_path):
        target = core_path
    else:
        raise InvalidConfigurationError(f"Plugin '{name}' not found")
        
    module = p_manager._import_from_path(target)
    parser = module.Parser().parser if hasattr(module, "Parser") else None
    
    if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]):
        args.func = getattr(module, args_dict["__func_name__"])
    
    app = MockApp(self.config)
    
    from .. import printer
    from rich.console import Console
    
    from rich.console import Console
    buf = io.StringIO()
    old_console = printer._get_console()
    old_err_console = printer._get_err_console()
    
    printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
    printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
    printer.set_thread_stream(buf)
    
    try:
        if hasattr(module, "Entrypoint"):
            module.Entrypoint(args, parser, app)
    except BaseException as e:
        if not isinstance(e, SystemExit):
            import traceback
            printer.err_console.print(traceback.format_exc())
    finally:
        printer.set_thread_console(old_console)
        printer.set_thread_err_console(old_err_console)
        printer.set_thread_stream(None)
        
    for line in buf.getvalue().splitlines(keepends=True):
        yield line
def list_plugins(self)
Expand source code
def list_plugins(self):
    """List all core and user-defined plugins with their status and hash."""
    import os
    import hashlib
    
    # Check for user plugins directory
    plugin_dir = os.path.join(self.config.defaultdir, "plugins")
    # Check for core plugins directory
    core_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
    
    all_plugin_info = {}

    def get_hash(path):
        try:
            with open(path, "rb") as f:
                return hashlib.md5(f.read()).hexdigest()
        except Exception:
            return ""

    # User plugins
    if os.path.exists(plugin_dir):
        for f in os.listdir(plugin_dir):
            if f.endswith(".py"):
                name = f[:-3]
                path = os.path.join(plugin_dir, f)
                all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
            elif f.endswith(".py.bkp"):
                name = f[:-7]
                all_plugin_info[name] = {"enabled": False}

    return all_plugin_info

List all core and user-defined plugins with their status and hash.

Inherited members

class ProfileAlreadyExistsError (*args, **kwargs)
Expand source code
class ProfileAlreadyExistsError(ConnpyError):
    """Raised when a profile with the same name already exists."""
    pass

Raised when a profile with the same name already exists.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class ProfileNotFoundError (*args, **kwargs)
Expand source code
class ProfileNotFoundError(ConnpyError):
    """Raised when a profile is not found."""
    pass

Raised when a profile is not found.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class ProfileService (config=None)
Expand source code
class ProfileService(BaseService):
    """Business logic for node profiles management."""

    def list_profiles(self, filter_str=None):
        """List all profile names, optionally filtered."""
        profiles = list(self.config.profiles.keys())
        case_sensitive = self.config.config.get("case", False)
        
        if filter_str:
            if not case_sensitive:
                f_str = filter_str.lower()
                return [p for p in profiles if f_str in p.lower()]
            else:
                return [p for p in profiles if filter_str in p]
        return profiles

    def get_profile(self, name, resolve=True):
        """Get the profile dictionary, optionally resolved."""
        profile = self.config.profiles.get(name)
        if not profile:
            raise ProfileNotFoundError(f"Profile '{name}' not found.")
        
        if resolve:
            return self.resolve_node_data(profile)
        return profile

    def add_profile(self, name, data):
        """Add a new profile."""
        if name in self.config.profiles:
            raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.")
            
        # Filter data to match _profiles_add signature and ensure id is passed
        allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
        filtered_data = {k: v for k, v in data.items() if k in allowed_keys}
        
        self.config._profiles_add(id=name, **filtered_data)
        self.config._saveconfig(self.config.file)

    def resolve_node_data(self, node_data):
        """Resolve profile references (@profile) in node data and handle inheritance."""
        resolved = node_data.copy()
        
        # 1. Identify all referenced profiles to support inheritance
        referenced_profiles = []
        for value in resolved.values():
            if isinstance(value, str) and value.startswith("@"):
                referenced_profiles.append(value[1:])
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, str) and item.startswith("@"):
                        referenced_profiles.append(item[1:])
        
        # 2. Resolve explicit references
        for key, value in resolved.items():
            if isinstance(value, str) and value.startswith("@"):
                profile_name = value[1:]
                try:
                    profile = self.get_profile(profile_name, resolve=True)
                    resolved[key] = profile.get(key, "")
                except ProfileNotFoundError:
                    resolved[key] = ""
            elif isinstance(value, list):
                resolved_list = []
                for item in value:
                    if isinstance(item, str) and item.startswith("@"):
                        profile_name = item[1:]
                        try:
                            profile = self.get_profile(profile_name, resolve=True)
                            if "password" in profile:
                                resolved_list.append(profile["password"])
                        except ProfileNotFoundError:
                            pass
                    else:
                        resolved_list.append(item)
                resolved[key] = resolved_list
        
        # 3. Inheritance: Fill empty keys from the first referenced profile
        if referenced_profiles:
            base_profile_name = referenced_profiles[0]
            try:
                base_profile = self.get_profile(base_profile_name, resolve=True)
                for key, value in base_profile.items():
                    # Fill if key is missing or empty
                    if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None:
                        resolved[key] = value
            except ProfileNotFoundError:
                pass

        # 4. Handle default protocol
        if resolved.get("protocol") == "" or resolved.get("protocol") is None:
            try:
                default_profile = self.get_profile("default", resolve=True)
                resolved["protocol"] = default_profile.get("protocol", "ssh")
            except ProfileNotFoundError:
                resolved["protocol"] = "ssh"
                
        return resolved

    def delete_profile(self, name):
        """Delete an existing profile, with safety checks."""
        if name not in self.config.profiles:
            raise ProfileNotFoundError(f"Profile '{name}' not found.")
            
        if name == "default":
            raise InvalidConfigurationError("Cannot delete the 'default' profile.")
            
        used_by = self.config._profileused(name)
        if used_by:
            # We return the list of nodes using it so the UI can inform the user
            raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}")
            
        self.config._profiles_del(id=name)
        self.config._saveconfig(self.config.file)

    def update_profile(self, name, data):
        """Update an existing profile."""
        if name not in self.config.profiles:
            raise ProfileNotFoundError(f"Profile '{name}' not found.")
            
        # Merge with existing data
        existing = self.get_profile(name, resolve=False)
        updated_data = existing.copy()
        updated_data.update(data)
        
        # Filter data to match _profiles_add signature
        allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
        filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys}
        
        self.config._profiles_add(id=name, **filtered_data)
        self.config._saveconfig(self.config.file)

Business logic for node profiles management.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def add_profile(self, name, data)
Expand source code
def add_profile(self, name, data):
    """Add a new profile."""
    if name in self.config.profiles:
        raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.")
        
    # Filter data to match _profiles_add signature and ensure id is passed
    allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
    filtered_data = {k: v for k, v in data.items() if k in allowed_keys}
    
    self.config._profiles_add(id=name, **filtered_data)
    self.config._saveconfig(self.config.file)

Add a new profile.

def delete_profile(self, name)
Expand source code
def delete_profile(self, name):
    """Delete an existing profile, with safety checks."""
    if name not in self.config.profiles:
        raise ProfileNotFoundError(f"Profile '{name}' not found.")
        
    if name == "default":
        raise InvalidConfigurationError("Cannot delete the 'default' profile.")
        
    used_by = self.config._profileused(name)
    if used_by:
        # We return the list of nodes using it so the UI can inform the user
        raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}")
        
    self.config._profiles_del(id=name)
    self.config._saveconfig(self.config.file)

Delete an existing profile, with safety checks.

def get_profile(self, name, resolve=True)
Expand source code
def get_profile(self, name, resolve=True):
    """Get the profile dictionary, optionally resolved."""
    profile = self.config.profiles.get(name)
    if not profile:
        raise ProfileNotFoundError(f"Profile '{name}' not found.")
    
    if resolve:
        return self.resolve_node_data(profile)
    return profile

Get the profile dictionary, optionally resolved.

def list_profiles(self, filter_str=None)
Expand source code
def list_profiles(self, filter_str=None):
    """List all profile names, optionally filtered."""
    profiles = list(self.config.profiles.keys())
    case_sensitive = self.config.config.get("case", False)
    
    if filter_str:
        if not case_sensitive:
            f_str = filter_str.lower()
            return [p for p in profiles if f_str in p.lower()]
        else:
            return [p for p in profiles if filter_str in p]
    return profiles

List all profile names, optionally filtered.

def resolve_node_data(self, node_data)
Expand source code
def resolve_node_data(self, node_data):
    """Resolve profile references (@profile) in node data and handle inheritance."""
    resolved = node_data.copy()
    
    # 1. Identify all referenced profiles to support inheritance
    referenced_profiles = []
    for value in resolved.values():
        if isinstance(value, str) and value.startswith("@"):
            referenced_profiles.append(value[1:])
        elif isinstance(value, list):
            for item in value:
                if isinstance(item, str) and item.startswith("@"):
                    referenced_profiles.append(item[1:])
    
    # 2. Resolve explicit references
    for key, value in resolved.items():
        if isinstance(value, str) and value.startswith("@"):
            profile_name = value[1:]
            try:
                profile = self.get_profile(profile_name, resolve=True)
                resolved[key] = profile.get(key, "")
            except ProfileNotFoundError:
                resolved[key] = ""
        elif isinstance(value, list):
            resolved_list = []
            for item in value:
                if isinstance(item, str) and item.startswith("@"):
                    profile_name = item[1:]
                    try:
                        profile = self.get_profile(profile_name, resolve=True)
                        if "password" in profile:
                            resolved_list.append(profile["password"])
                    except ProfileNotFoundError:
                        pass
                else:
                    resolved_list.append(item)
            resolved[key] = resolved_list
    
    # 3. Inheritance: Fill empty keys from the first referenced profile
    if referenced_profiles:
        base_profile_name = referenced_profiles[0]
        try:
            base_profile = self.get_profile(base_profile_name, resolve=True)
            for key, value in base_profile.items():
                # Fill if key is missing or empty
                if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None:
                    resolved[key] = value
        except ProfileNotFoundError:
            pass

    # 4. Handle default protocol
    if resolved.get("protocol") == "" or resolved.get("protocol") is None:
        try:
            default_profile = self.get_profile("default", resolve=True)
            resolved["protocol"] = default_profile.get("protocol", "ssh")
        except ProfileNotFoundError:
            resolved["protocol"] = "ssh"
            
    return resolved

Resolve profile references (@profile) in node data and handle inheritance.

def update_profile(self, name, data)
Expand source code
def update_profile(self, name, data):
    """Update an existing profile."""
    if name not in self.config.profiles:
        raise ProfileNotFoundError(f"Profile '{name}' not found.")
        
    # Merge with existing data
    existing = self.get_profile(name, resolve=False)
    updated_data = existing.copy()
    updated_data.update(data)
    
    # Filter data to match _profiles_add signature
    allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
    filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys}
    
    self.config._profiles_add(id=name, **filtered_data)
    self.config._saveconfig(self.config.file)

Update an existing profile.

Inherited members

class SystemService (config=None)
Expand source code
class SystemService(BaseService):
    """Business logic for application lifecycle (API, processes)."""

    def start_api(self, port=None):
        """Start the Connpy REST API."""
        from connpy.api import start_api
        try:
            start_api(port, config=self.config)
        except Exception as e:
            raise ConnpyError(f"Failed to start API: {e}")

    def debug_api(self, port=None):
        """Start the Connpy REST API in debug mode."""
        from connpy.api import debug_api
        try:
            debug_api(port, config=self.config)
        except Exception as e:
            raise ConnpyError(f"Failed to start API in debug mode: {e}")


    def stop_api(self):
        """Stop the Connpy REST API."""
        try:
            import os
            import signal
            
            pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
            stopped = False
            for pid_file in pids:
                if os.path.exists(pid_file):
                    try:
                        with open(pid_file, "r") as f:
                            # Read only the first line (PID)
                            line = f.readline().strip()
                            if not line:
                                continue
                            pid = int(line)
                        os.kill(pid, signal.SIGTERM)
                        # Remove the PID file after successful kill
                        os.remove(pid_file)
                        stopped = True
                    except (ValueError, OSError, ProcessLookupError):
                        # If process is already dead, just remove the stale PID file
                        try:
                            os.remove(pid_file)
                        except OSError:
                            pass
                        continue
            return stopped
        except Exception as e:
            raise ConnpyError(f"Failed to stop API: {e}")

    def restart_api(self, port=None):
        """Restart the Connpy REST API, maintaining the current port if none provided."""
        if port is None:
            status = self.get_api_status()
            if status["running"] and status.get("port"):
                port = status["port"]
        
        self.stop_api()
        import time
        time.sleep(1)
        self.start_api(port)

    def get_api_status(self):
        """Check if the API is currently running."""
        import os
        pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
        for pid_file in pids:
            if os.path.exists(pid_file):
                try:
                    with open(pid_file, "r") as f:
                        pid_line = f.readline().strip()
                        port_line = f.readline().strip()
                        if not pid_line:
                            continue
                        pid = int(pid_line)
                        port = int(port_line) if port_line else None
                    # Signal 0 checks for process existence without killing it
                    os.kill(pid, 0)
                    return {"running": True, "pid": pid, "port": port, "pid_file": pid_file}
                except (ValueError, OSError, ProcessLookupError):
                    continue
        return {"running": False}

Business logic for application lifecycle (API, processes).

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def debug_api(self, port=None)
Expand source code
def debug_api(self, port=None):
    """Start the Connpy REST API in debug mode."""
    from connpy.api import debug_api
    try:
        debug_api(port, config=self.config)
    except Exception as e:
        raise ConnpyError(f"Failed to start API in debug mode: {e}")

Start the Connpy REST API in debug mode.

def get_api_status(self)
Expand source code
def get_api_status(self):
    """Check if the API is currently running."""
    import os
    pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
    for pid_file in pids:
        if os.path.exists(pid_file):
            try:
                with open(pid_file, "r") as f:
                    pid_line = f.readline().strip()
                    port_line = f.readline().strip()
                    if not pid_line:
                        continue
                    pid = int(pid_line)
                    port = int(port_line) if port_line else None
                # Signal 0 checks for process existence without killing it
                os.kill(pid, 0)
                return {"running": True, "pid": pid, "port": port, "pid_file": pid_file}
            except (ValueError, OSError, ProcessLookupError):
                continue
    return {"running": False}

Check if the API is currently running.

def restart_api(self, port=None)
Expand source code
def restart_api(self, port=None):
    """Restart the Connpy REST API, maintaining the current port if none provided."""
    if port is None:
        status = self.get_api_status()
        if status["running"] and status.get("port"):
            port = status["port"]
    
    self.stop_api()
    import time
    time.sleep(1)
    self.start_api(port)

Restart the Connpy REST API, maintaining the current port if none provided.

def start_api(self, port=None)
Expand source code
def start_api(self, port=None):
    """Start the Connpy REST API."""
    from connpy.api import start_api
    try:
        start_api(port, config=self.config)
    except Exception as e:
        raise ConnpyError(f"Failed to start API: {e}")

Start the Connpy REST API.

def stop_api(self)
Expand source code
def stop_api(self):
    """Stop the Connpy REST API."""
    try:
        import os
        import signal
        
        pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
        stopped = False
        for pid_file in pids:
            if os.path.exists(pid_file):
                try:
                    with open(pid_file, "r") as f:
                        # Read only the first line (PID)
                        line = f.readline().strip()
                        if not line:
                            continue
                        pid = int(line)
                    os.kill(pid, signal.SIGTERM)
                    # Remove the PID file after successful kill
                    os.remove(pid_file)
                    stopped = True
                except (ValueError, OSError, ProcessLookupError):
                    # If process is already dead, just remove the stale PID file
                    try:
                        os.remove(pid_file)
                    except OSError:
                        pass
                    continue
        return stopped
    except Exception as e:
        raise ConnpyError(f"Failed to stop API: {e}")

Stop the Connpy REST API.

Inherited members