Module connpy.services
Sub-modules
connpy.services.ai_serviceconnpy.services.baseconnpy.services.config_serviceconnpy.services.context_serviceconnpy.services.exceptionsconnpy.services.execution_serviceconnpy.services.import_export_serviceconnpy.services.node_serviceconnpy.services.plugin_serviceconnpy.services.profile_serviceconnpy.services.providerconnpy.services.sync_serviceconnpy.services.system_service
Classes
class AIService (config=None)-
Expand source code
class AIService(BaseService): """Business logic for interacting with AI agents and LLM configurations.""" def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides): """Send a prompt to the AI agent.""" from connpy.ai import ai agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides) return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback) def confirm(self, input_text, console=None): """Ask for a safe confirmation of an action.""" from connpy.ai import ai agent = ai(self.config, console=console) return agent.confirm(input_text) def list_sessions(self): """Return a list of all saved AI sessions.""" from connpy.ai import ai agent = ai(self.config) return agent._get_sessions() def delete_session(self, session_id): """Delete an AI session by ID.""" import os sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions") path = os.path.join(sessions_dir, f"{session_id}.json") if os.path.exists(path): os.remove(path) else: raise InvalidConfigurationError(f"Session '{session_id}' not found.") def configure_provider(self, provider, model=None, api_key=None): """Update AI provider settings in the configuration.""" settings = self.config.config.get("ai", {}) if model: settings[f"{provider}_model"] = model if api_key: settings[f"{provider}_api_key"] = api_key self.config.config["ai"] = settings self.config._saveconfig(self.config.file) def load_session_data(self, session_id): """Load a session's raw data by ID.""" from connpy.ai import ai agent = ai(self.config) return agent.load_session_data(session_id)Business logic for interacting with AI agents and LLM configurations.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def ask(self,
input_text,
dryrun=False,
chat_history=None,
status=None,
debug=False,
session_id=None,
console=None,
chunk_callback=None,
confirm_handler=None,
trust=False,
**overrides)-
Expand source code
def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides): """Send a prompt to the AI agent.""" from connpy.ai import ai agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides) return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)Send a prompt to the AI agent.
def configure_provider(self, provider, model=None, api_key=None)-
Expand source code
def configure_provider(self, provider, model=None, api_key=None): """Update AI provider settings in the configuration.""" settings = self.config.config.get("ai", {}) if model: settings[f"{provider}_model"] = model if api_key: settings[f"{provider}_api_key"] = api_key self.config.config["ai"] = settings self.config._saveconfig(self.config.file)Update AI provider settings in the configuration.
def confirm(self, input_text, console=None)-
Expand source code
def confirm(self, input_text, console=None): """Ask for a safe confirmation of an action.""" from connpy.ai import ai agent = ai(self.config, console=console) return agent.confirm(input_text)Ask for a safe confirmation of an action.
def delete_session(self, session_id)-
Expand source code
def delete_session(self, session_id): """Delete an AI session by ID.""" import os sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions") path = os.path.join(sessions_dir, f"{session_id}.json") if os.path.exists(path): os.remove(path) else: raise InvalidConfigurationError(f"Session '{session_id}' not found.")Delete an AI session by ID.
def list_sessions(self)-
Expand source code
def list_sessions(self): """Return a list of all saved AI sessions.""" from connpy.ai import ai agent = ai(self.config) return agent._get_sessions()Return a list of all saved AI sessions.
def load_session_data(self, session_id)-
Expand source code
def load_session_data(self, session_id): """Load a session's raw data by ID.""" from connpy.ai import ai agent = ai(self.config) return agent.load_session_data(session_id)Load a session's raw data by ID.
Inherited members
class ConfigService (config=None)-
Expand source code
class ConfigService(BaseService): """Business logic for general application settings and state configuration.""" def get_settings(self) -> Dict[str, Any]: """Get the global configuration settings block.""" settings = self.config.config.copy() settings["configfolder"] = self.config.defaultdir return settings def get_default_dir(self) -> str: """Get the default configuration directory.""" return self.config.defaultdir def set_config_folder(self, folder_path: str): """Set the default location for config file by writing to ~/.config/conn/.folder""" if not os.path.isdir(folder_path): raise ConnpyError(f"readable_dir:{folder_path} is not a valid path") pathfile = os.path.join(self.config.anchor_path, ".folder") folder = os.path.abspath(folder_path).rstrip('/') try: with open(pathfile, "w") as f: f.write(str(folder)) except Exception as e: raise ConnpyError(f"Failed to save config folder: {e}") def update_setting(self, key, value): """Update a setting in the configuration file.""" self.config.config[key] = value self.config._saveconfig(self.config.file) def encrypt_password(self, password): """Encrypt a password using the application's configuration encryption key.""" return self.config.encrypt(password) def apply_theme_from_file(self, theme_input): """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.""" import yaml from ..printer import STYLES, LIGHT_THEME if theme_input == "dark": valid_styles = {} self.update_setting("theme", valid_styles) return valid_styles elif theme_input == "light": valid_styles = LIGHT_THEME.copy() self.update_setting("theme", valid_styles) return valid_styles if not os.path.exists(theme_input): raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.") try: with open(theme_input, 'r') as f: user_styles = yaml.safe_load(f) except Exception as e: raise InvalidConfigurationError(f"Failed to parse theme file: {e}") if not isinstance(user_styles, dict): raise InvalidConfigurationError("Theme file must be a YAML dictionary.") # Filter for valid styles only (prevent junk in config) valid_styles = {k: v for k, v in user_styles.items() if k in STYLES} if not valid_styles: raise InvalidConfigurationError("No valid style keys found in theme file.") # Persist and return merged styles self.update_setting("theme", valid_styles) return valid_stylesBusiness logic for general application settings and state configuration.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def apply_theme_from_file(self, theme_input)-
Expand source code
def apply_theme_from_file(self, theme_input): """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.""" import yaml from ..printer import STYLES, LIGHT_THEME if theme_input == "dark": valid_styles = {} self.update_setting("theme", valid_styles) return valid_styles elif theme_input == "light": valid_styles = LIGHT_THEME.copy() self.update_setting("theme", valid_styles) return valid_styles if not os.path.exists(theme_input): raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.") try: with open(theme_input, 'r') as f: user_styles = yaml.safe_load(f) except Exception as e: raise InvalidConfigurationError(f"Failed to parse theme file: {e}") if not isinstance(user_styles, dict): raise InvalidConfigurationError("Theme file must be a YAML dictionary.") # Filter for valid styles only (prevent junk in config) valid_styles = {k: v for k, v in user_styles.items() if k in STYLES} if not valid_styles: raise InvalidConfigurationError("No valid style keys found in theme file.") # Persist and return merged styles self.update_setting("theme", valid_styles) return valid_stylesApply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.
def encrypt_password(self, password)-
Expand source code
def encrypt_password(self, password): """Encrypt a password using the application's configuration encryption key.""" return self.config.encrypt(password)Encrypt a password using the application's configuration encryption key.
def get_default_dir(self) ‑> str-
Expand source code
def get_default_dir(self) -> str: """Get the default configuration directory.""" return self.config.defaultdirGet the default configuration directory.
def get_settings(self) ‑> Dict[str, Any]-
Expand source code
def get_settings(self) -> Dict[str, Any]: """Get the global configuration settings block.""" settings = self.config.config.copy() settings["configfolder"] = self.config.defaultdir return settingsGet the global configuration settings block.
def set_config_folder(self, folder_path: str)-
Expand source code
def set_config_folder(self, folder_path: str): """Set the default location for config file by writing to ~/.config/conn/.folder""" if not os.path.isdir(folder_path): raise ConnpyError(f"readable_dir:{folder_path} is not a valid path") pathfile = os.path.join(self.config.anchor_path, ".folder") folder = os.path.abspath(folder_path).rstrip('/') try: with open(pathfile, "w") as f: f.write(str(folder)) except Exception as e: raise ConnpyError(f"Failed to save config folder: {e}")Set the default location for config file by writing to ~/.config/conn/.folder
def update_setting(self, key, value)-
Expand source code
def update_setting(self, key, value): """Update a setting in the configuration file.""" self.config.config[key] = value self.config._saveconfig(self.config.file)Update a setting in the configuration file.
Inherited members
class ConnpyError (*args, **kwargs)-
Expand source code
class ConnpyError(Exception): """Base exception for all connpy services.""" passBase exception for all connpy services.
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
class ExecutionError (*args, **kwargs)-
Expand source code
class ExecutionError(ConnpyError): """Raised when an execution fails or returns error.""" passRaised when an execution fails or returns error.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class ExecutionService (config=None)-
Expand source code
class ExecutionService(BaseService): """Business logic for executing commands on nodes and running automation scripts.""" def run_commands( self, nodes_filter: str, commands: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 10, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, str]: """Execute commands on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.run( commands=commands, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) # Combine output and status for the caller full_results = {} for unique in results: full_results[unique] = { "output": results[unique], "status": executor.status.get(unique, 1) } return full_results except Exception as e: raise ConnpyError(f"Execution failed: {e}") def test_commands( self, nodes_filter: str, commands: List[str], expected: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 10, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, Dict[str, bool]]: """Run commands and verify expected output on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.test( commands=commands, expected=expected, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) return results except Exception as e: raise ConnpyError(f"Testing failed: {e}") def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]: """Run a plain-text script containing one command per line.""" if not os.path.exists(script_path): raise ConnpyError(f"Script file not found: {script_path}") try: with open(script_path, "r") as f: commands = [line.strip() for line in f if line.strip()] except Exception as e: raise ConnpyError(f"Failed to read script {script_path}: {e}") return self.run_commands(nodes_filter, commands, parallel=parallel) def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) -> Dict[str, Any]: """Run a structured Connpy YAML automation playbook (from path or content).""" playbook = None if playbook_data.startswith("---YAML---\n"): try: content = playbook_data[len("---YAML---\n"):] playbook = yaml.load(content, Loader=yaml.FullLoader) except Exception as e: raise ConnpyError(f"Failed to parse YAML content: {e}") else: if not os.path.exists(playbook_data): raise ConnpyError(f"Playbook file not found: {playbook_data}") try: with open(playbook_data, "r") as f: playbook = yaml.load(f, Loader=yaml.FullLoader) except Exception as e: raise ConnpyError(f"Failed to load playbook {playbook_data}: {e}") # Basic validation if not isinstance(playbook, dict) or "nodes" not in playbook or "commands" not in playbook: raise ConnpyError("Invalid playbook format: missing 'nodes' or 'commands' keys.") action = playbook.get("action", "run") options = playbook.get("options", {}) # Extract all fields similar to RunHandler.cli_run exec_args = { "nodes_filter": playbook["nodes"], "commands": playbook["commands"], "variables": playbook.get("variables"), "parallel": options.get("parallel", parallel), "timeout": playbook.get("timeout", options.get("timeout", 10)), "prompt": options.get("prompt"), "name": playbook.get("name", "Task") } # Map 'output' field to folder path if it's not stdout/null output_cfg = playbook.get("output") if output_cfg not in [None, "stdout"]: exec_args["folder"] = output_cfg if action == "run": return self.run_commands(**exec_args) elif action == "test": exec_args["expected"] = playbook.get("expected", []) return self.test_commands(**exec_args) else: raise ConnpyError(f"Unsupported playbook action: {action}")Business logic for executing commands on nodes and running automation scripts.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) ‑> Dict[str, str]-
Expand source code
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]: """Run a plain-text script containing one command per line.""" if not os.path.exists(script_path): raise ConnpyError(f"Script file not found: {script_path}") try: with open(script_path, "r") as f: commands = [line.strip() for line in f if line.strip()] except Exception as e: raise ConnpyError(f"Failed to read script {script_path}: {e}") return self.run_commands(nodes_filter, commands, parallel=parallel)Run a plain-text script containing one command per line.
def run_commands(self,
nodes_filter: str,
commands: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 10,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, str]-
Expand source code
def run_commands( self, nodes_filter: str, commands: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 10, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, str]: """Execute commands on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.run( commands=commands, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) # Combine output and status for the caller full_results = {} for unique in results: full_results[unique] = { "output": results[unique], "status": executor.status.get(unique, 1) } return full_results except Exception as e: raise ConnpyError(f"Execution failed: {e}")Execute commands on a set of nodes.
def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) ‑> Dict[str, Any]-
Expand source code
def run_yaml_playbook(self, playbook_data: str, parallel: int = 10) -> Dict[str, Any]: """Run a structured Connpy YAML automation playbook (from path or content).""" playbook = None if playbook_data.startswith("---YAML---\n"): try: content = playbook_data[len("---YAML---\n"):] playbook = yaml.load(content, Loader=yaml.FullLoader) except Exception as e: raise ConnpyError(f"Failed to parse YAML content: {e}") else: if not os.path.exists(playbook_data): raise ConnpyError(f"Playbook file not found: {playbook_data}") try: with open(playbook_data, "r") as f: playbook = yaml.load(f, Loader=yaml.FullLoader) except Exception as e: raise ConnpyError(f"Failed to load playbook {playbook_data}: {e}") # Basic validation if not isinstance(playbook, dict) or "nodes" not in playbook or "commands" not in playbook: raise ConnpyError("Invalid playbook format: missing 'nodes' or 'commands' keys.") action = playbook.get("action", "run") options = playbook.get("options", {}) # Extract all fields similar to RunHandler.cli_run exec_args = { "nodes_filter": playbook["nodes"], "commands": playbook["commands"], "variables": playbook.get("variables"), "parallel": options.get("parallel", parallel), "timeout": playbook.get("timeout", options.get("timeout", 10)), "prompt": options.get("prompt"), "name": playbook.get("name", "Task") } # Map 'output' field to folder path if it's not stdout/null output_cfg = playbook.get("output") if output_cfg not in [None, "stdout"]: exec_args["folder"] = output_cfg if action == "run": return self.run_commands(**exec_args) elif action == "test": exec_args["expected"] = playbook.get("expected", []) return self.test_commands(**exec_args) else: raise ConnpyError(f"Unsupported playbook action: {action}")Run a structured Connpy YAML automation playbook (from path or content).
def test_commands(self,
nodes_filter: str,
commands: List[str],
expected: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 10,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, Dict[str, bool]]-
Expand source code
def test_commands( self, nodes_filter: str, commands: List[str], expected: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 10, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, Dict[str, bool]]: """Run commands and verify expected output on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.test( commands=commands, expected=expected, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) return results except Exception as e: raise ConnpyError(f"Testing failed: {e}")Run commands and verify expected output on a set of nodes.
Inherited members
class ImportExportService (config=None)-
Expand source code
class ImportExportService(BaseService): """Business logic for YAML/JSON inventory import and export.""" def export_to_file(self, file_path, folders=None): """Export nodes/folders to a YAML file.""" if os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' already exists.") data = self.export_to_dict(folders) try: with open(file_path, "w") as f: yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False) except OSError as e: raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}") def export_to_dict(self, folders=None): """Export nodes/folders to a dictionary.""" if not folders: return deepcopy(self.config.connections) else: # Validate folders exist for f in folders: if f != "@" and f not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{f}' not found.") flat = self.config._getallnodesfull(folders, extract=False) nested = {} for k, v in flat.items(): uniques = self.config._explode_unique(k) if not uniques: continue if "folder" in uniques and "subfolder" in uniques: f_name = uniques["folder"] s_name = uniques["subfolder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} if s_name not in nested[f_name]: nested[f_name][s_name] = {"type": "subfolder"} nested[f_name][s_name][i_name] = v elif "folder" in uniques: f_name = uniques["folder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} nested[f_name][i_name] = v else: i_name = uniques["id"] nested[i_name] = v return nested def import_from_file(self, file_path): """Import nodes/folders from a YAML file.""" if not os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' does not exist.") try: with open(file_path, "r") as f: data = yaml.load(f, Loader=yaml.FullLoader) self.import_from_dict(data) except Exception as e: raise InvalidConfigurationError(f"Failed to read/parse import file: {e}") def import_from_dict(self, data): """Import nodes/folders from a dictionary.""" if not isinstance(data, dict): raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.") def _traverse_import(node_data, current_folder='', current_subfolder=''): for k, v in node_data.items(): if k == "type": continue if isinstance(v, dict): node_type = v.get("type", "connection") if node_type == "folder": self.config._folder_add(folder=k) _traverse_import(v, current_folder=k, current_subfolder='') elif node_type == "subfolder": self.config._folder_add(folder=current_folder, subfolder=k) _traverse_import(v, current_folder=current_folder, current_subfolder=k) elif node_type == "connection": unique_id = k if current_subfolder: unique_id = f"{k}@{current_subfolder}@{current_folder}" elif current_folder: unique_id = f"{k}@{current_folder}" self._validate_node_name(unique_id) kwargs = deepcopy(v) kwargs['id'] = k kwargs['folder'] = current_folder kwargs['subfolder'] = current_subfolder self.config._connections_add(**kwargs) else: # Invalid format skip pass _traverse_import(data) self.config._saveconfig(self.config.file)Business logic for YAML/JSON inventory import and export.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def export_to_dict(self, folders=None)-
Expand source code
def export_to_dict(self, folders=None): """Export nodes/folders to a dictionary.""" if not folders: return deepcopy(self.config.connections) else: # Validate folders exist for f in folders: if f != "@" and f not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{f}' not found.") flat = self.config._getallnodesfull(folders, extract=False) nested = {} for k, v in flat.items(): uniques = self.config._explode_unique(k) if not uniques: continue if "folder" in uniques and "subfolder" in uniques: f_name = uniques["folder"] s_name = uniques["subfolder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} if s_name not in nested[f_name]: nested[f_name][s_name] = {"type": "subfolder"} nested[f_name][s_name][i_name] = v elif "folder" in uniques: f_name = uniques["folder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} nested[f_name][i_name] = v else: i_name = uniques["id"] nested[i_name] = v return nestedExport nodes/folders to a dictionary.
def export_to_file(self, file_path, folders=None)-
Expand source code
def export_to_file(self, file_path, folders=None): """Export nodes/folders to a YAML file.""" if os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' already exists.") data = self.export_to_dict(folders) try: with open(file_path, "w") as f: yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False) except OSError as e: raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}")Export nodes/folders to a YAML file.
def import_from_dict(self, data)-
Expand source code
def import_from_dict(self, data): """Import nodes/folders from a dictionary.""" if not isinstance(data, dict): raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.") def _traverse_import(node_data, current_folder='', current_subfolder=''): for k, v in node_data.items(): if k == "type": continue if isinstance(v, dict): node_type = v.get("type", "connection") if node_type == "folder": self.config._folder_add(folder=k) _traverse_import(v, current_folder=k, current_subfolder='') elif node_type == "subfolder": self.config._folder_add(folder=current_folder, subfolder=k) _traverse_import(v, current_folder=current_folder, current_subfolder=k) elif node_type == "connection": unique_id = k if current_subfolder: unique_id = f"{k}@{current_subfolder}@{current_folder}" elif current_folder: unique_id = f"{k}@{current_folder}" self._validate_node_name(unique_id) kwargs = deepcopy(v) kwargs['id'] = k kwargs['folder'] = current_folder kwargs['subfolder'] = current_subfolder self.config._connections_add(**kwargs) else: # Invalid format skip pass _traverse_import(data) self.config._saveconfig(self.config.file)Import nodes/folders from a dictionary.
def import_from_file(self, file_path)-
Expand source code
def import_from_file(self, file_path): """Import nodes/folders from a YAML file.""" if not os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' does not exist.") try: with open(file_path, "r") as f: data = yaml.load(f, Loader=yaml.FullLoader) self.import_from_dict(data) except Exception as e: raise InvalidConfigurationError(f"Failed to read/parse import file: {e}")Import nodes/folders from a YAML file.
Inherited members
class InvalidConfigurationError (*args, **kwargs)-
Expand source code
class InvalidConfigurationError(ConnpyError): """Raised when data or configuration input is invalid.""" passRaised when data or configuration input is invalid.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class NodeAlreadyExistsError (*args, **kwargs)-
Expand source code
class NodeAlreadyExistsError(ConnpyError): """Raised when a node or folder already exists.""" passRaised when a node or folder already exists.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class NodeNotFoundError (*args, **kwargs)-
Expand source code
class NodeNotFoundError(ConnpyError): """Raised when a connection or folder is not found.""" passRaised when a connection or folder is not found.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class NodeService (config=None)-
Expand source code
class NodeService(BaseService): def __init__(self, config=None): super().__init__(config) def list_nodes(self, filter_str=None, format_str=None): """Return a listed filtered by regex match and formatted if needed.""" nodes = self.config._getallnodes() case_sensitive = self.config.config.get("case", False) if filter_str: flags = re.IGNORECASE if not case_sensitive else 0 nodes = [n for n in nodes if re.search(filter_str, n, flags)] if not format_str: return nodes from .profile_service import ProfileService profile_service = ProfileService(self.config) formatted_nodes = [] for n_id in nodes: # Use ProfileService to resolve profiles for dynamic formatting details = self.config.getitem(n_id, extract=False) if details: details = profile_service.resolve_node_data(details) name = n_id.split("@")[0] location = n_id.partition("@")[2] or "root" # Prepare context for .format() with all details context = details.copy() context.update({ "name": name, "NAME": name.upper(), "location": location, "LOCATION": location.upper(), }) # Add exploded uniques (id, folder, subfolder) uniques = self.config._explode_unique(n_id) if uniques: context.update(uniques) # Add uppercase versions of all keys for convenience for k, v in list(context.items()): if isinstance(v, str): context[k.upper()] = v.upper() try: formatted_nodes.append(format_str.format(**context)) except (KeyError, IndexError, ValueError): # Fallback to original string if format fails formatted_nodes.append(n_id) return formatted_nodes def list_folders(self, filter_str=None): """Return all unique folders, optionally filtered by regex.""" folders = self.config._getallfolders() case_sensitive = self.config.config.get("case", False) if filter_str: if filter_str.startswith("@"): if not case_sensitive: folders = [f for f in folders if f.lower() == filter_str.lower()] else: folders = [f for f in folders if f == filter_str] else: flags = re.IGNORECASE if not case_sensitive else 0 folders = [f for f in folders if re.search(filter_str, f, flags)] return folders def get_node_details(self, unique_id): """Return full configuration dictionary for a specific node.""" try: details = self.config.getitem(unique_id) if not details: raise NodeNotFoundError(f"Node '{unique_id}' not found.") return details except (KeyError, TypeError): raise NodeNotFoundError(f"Node '{unique_id}' not found.") def explode_unique(self, unique_id): """Explode a unique ID into a dictionary of its parts.""" return self.config._explode_unique(unique_id) def generate_cache(self, nodes=None, folders=None, profiles=None): """Generate and update the internal nodes cache.""" self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles) def validate_parent_folder(self, unique_id, is_folder=False): """Check if parent folder exists for a given node unique ID.""" if is_folder: uniques = self.config._explode_unique(unique_id) if uniques and "subfolder" in uniques and "folder" in uniques: parent_folder = f"@{uniques['folder']}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") else: node_folder = unique_id.partition("@")[2] if node_folder: parent_folder = f"@{node_folder}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") def add_node(self, unique_id, data, is_folder=False): """Logic for adding a new node or folder to configuration.""" if not is_folder: self._validate_node_name(unique_id) all_nodes = self.config._getallnodes() all_folders = self.config._getallfolders() if is_folder: if unique_id in all_folders: raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.") uniques = self.config._explode_unique(unique_id) if not uniques: raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.") # Check if parent folder exists when creating a subfolder if "subfolder" in uniques: self.validate_parent_folder(unique_id, is_folder=True) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) else: if unique_id in all_nodes: raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.") # Check if parent folder exists when creating a node in a folder self.validate_parent_folder(unique_id) # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques and "id" in uniques: data["id"] = uniques["id"] self.config._connections_add(**data) self.config._saveconfig(self.config.file) def update_node(self, unique_id, data): """Explicitly update an existing node.""" all_nodes = self.config._getallnodes() if unique_id not in all_nodes: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques: data["id"] = uniques["id"] # config._connections_add actually handles updates if ID exists correctly self.config._connections_add(**data) self.config._saveconfig(self.config.file) def delete_node(self, unique_id, is_folder=False): """Logic for deleting a node or folder.""" if is_folder: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.") self.config._folder_del(**uniques) else: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.") self.config._connections_del(**uniques) self.config._saveconfig(self.config.file) def connect_node(self, unique_id, sftp=False, debug=False, logger=None): """Interact with a node directly.""" from connpy.core import node from .profile_service import ProfileService node_data = self.config.getitem(unique_id, extract=False) if not node_data: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Resolve profiles profile_service = ProfileService(self.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.config) if sftp: n.protocol = "sftp" n.interact(debug=debug, logger=logger) def move_node(self, src_id, dst_id, copy=False): """Move or copy a node.""" self._validate_node_name(dst_id) node_data = self.config.getitem(src_id) if not node_data: raise NodeNotFoundError(f"Source node '{src_id}' not found.") if dst_id in self.config._getallnodes(): raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.") new_uniques = self.config._explode_unique(dst_id) if not new_uniques: raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.") new_node_data = node_data.copy() new_node_data.update(new_uniques) self.config._connections_add(**new_node_data) if not copy: src_uniques = self.config._explode_unique(src_id) self.config._connections_del(**src_uniques) self.config._saveconfig(self.config.file) def bulk_add(self, ids, hosts, common_data): """Add multiple nodes with shared common configuration.""" count = 0 all_nodes = self.config._getallnodes() for i, uid in enumerate(ids): if uid in all_nodes: continue try: self._validate_node_name(uid) except ReservedNameError: # For bulk, we might want to just skip or log. # CLI caller will handle if it wants to be strict. continue host = hosts[i] if i < len(hosts) else hosts[0] uniques = self.config._explode_unique(uid) if not uniques: continue node_data = common_data.copy() node_data.pop("ids", None) node_data.pop("location", None) node_data.update(uniques) node_data["host"] = host node_data["type"] = "connection" self.config._connections_add(**node_data) count += 1 if count > 0: self.config._saveconfig(self.config.file) return count def full_replace(self, connections, profiles): """Replace all connections and profiles with new data.""" self.config.connections = connections self.config.profiles = profiles self.config._saveconfig(self.config.file) def get_inventory(self): """Return a full snapshot of connections and profiles.""" return { "connections": self.config.connections, "profiles": self.config.profiles }Base class for all connpy services, providing common configuration access.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def add_node(self, unique_id, data, is_folder=False)-
Expand source code
def add_node(self, unique_id, data, is_folder=False): """Logic for adding a new node or folder to configuration.""" if not is_folder: self._validate_node_name(unique_id) all_nodes = self.config._getallnodes() all_folders = self.config._getallfolders() if is_folder: if unique_id in all_folders: raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.") uniques = self.config._explode_unique(unique_id) if not uniques: raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.") # Check if parent folder exists when creating a subfolder if "subfolder" in uniques: self.validate_parent_folder(unique_id, is_folder=True) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) else: if unique_id in all_nodes: raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.") # Check if parent folder exists when creating a node in a folder self.validate_parent_folder(unique_id) # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques and "id" in uniques: data["id"] = uniques["id"] self.config._connections_add(**data) self.config._saveconfig(self.config.file)Logic for adding a new node or folder to configuration.
def bulk_add(self, ids, hosts, common_data)-
Expand source code
def bulk_add(self, ids, hosts, common_data): """Add multiple nodes with shared common configuration.""" count = 0 all_nodes = self.config._getallnodes() for i, uid in enumerate(ids): if uid in all_nodes: continue try: self._validate_node_name(uid) except ReservedNameError: # For bulk, we might want to just skip or log. # CLI caller will handle if it wants to be strict. continue host = hosts[i] if i < len(hosts) else hosts[0] uniques = self.config._explode_unique(uid) if not uniques: continue node_data = common_data.copy() node_data.pop("ids", None) node_data.pop("location", None) node_data.update(uniques) node_data["host"] = host node_data["type"] = "connection" self.config._connections_add(**node_data) count += 1 if count > 0: self.config._saveconfig(self.config.file) return countAdd multiple nodes with shared common configuration.
def connect_node(self, unique_id, sftp=False, debug=False, logger=None)-
Expand source code
def connect_node(self, unique_id, sftp=False, debug=False, logger=None): """Interact with a node directly.""" from connpy.core import node from .profile_service import ProfileService node_data = self.config.getitem(unique_id, extract=False) if not node_data: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Resolve profiles profile_service = ProfileService(self.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.config) if sftp: n.protocol = "sftp" n.interact(debug=debug, logger=logger)Interact with a node directly.
def delete_node(self, unique_id, is_folder=False)-
Expand source code
def delete_node(self, unique_id, is_folder=False): """Logic for deleting a node or folder.""" if is_folder: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.") self.config._folder_del(**uniques) else: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.") self.config._connections_del(**uniques) self.config._saveconfig(self.config.file)Logic for deleting a node or folder.
def explode_unique(self, unique_id)-
Expand source code
def explode_unique(self, unique_id): """Explode a unique ID into a dictionary of its parts.""" return self.config._explode_unique(unique_id)Explode a unique ID into a dictionary of its parts.
def full_replace(self, connections, profiles)-
Expand source code
def full_replace(self, connections, profiles): """Replace all connections and profiles with new data.""" self.config.connections = connections self.config.profiles = profiles self.config._saveconfig(self.config.file)Replace all connections and profiles with new data.
def generate_cache(self, nodes=None, folders=None, profiles=None)-
Expand source code
def generate_cache(self, nodes=None, folders=None, profiles=None): """Generate and update the internal nodes cache.""" self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)Generate and update the internal nodes cache.
def get_inventory(self)-
Expand source code
def get_inventory(self): """Return a full snapshot of connections and profiles.""" return { "connections": self.config.connections, "profiles": self.config.profiles }Return a full snapshot of connections and profiles.
def get_node_details(self, unique_id)-
Expand source code
def get_node_details(self, unique_id): """Return full configuration dictionary for a specific node.""" try: details = self.config.getitem(unique_id) if not details: raise NodeNotFoundError(f"Node '{unique_id}' not found.") return details except (KeyError, TypeError): raise NodeNotFoundError(f"Node '{unique_id}' not found.")Return full configuration dictionary for a specific node.
def list_folders(self, filter_str=None)-
Expand source code
def list_folders(self, filter_str=None): """Return all unique folders, optionally filtered by regex.""" folders = self.config._getallfolders() case_sensitive = self.config.config.get("case", False) if filter_str: if filter_str.startswith("@"): if not case_sensitive: folders = [f for f in folders if f.lower() == filter_str.lower()] else: folders = [f for f in folders if f == filter_str] else: flags = re.IGNORECASE if not case_sensitive else 0 folders = [f for f in folders if re.search(filter_str, f, flags)] return foldersReturn all unique folders, optionally filtered by regex.
def list_nodes(self, filter_str=None, format_str=None)-
Expand source code
def list_nodes(self, filter_str=None, format_str=None): """Return a listed filtered by regex match and formatted if needed.""" nodes = self.config._getallnodes() case_sensitive = self.config.config.get("case", False) if filter_str: flags = re.IGNORECASE if not case_sensitive else 0 nodes = [n for n in nodes if re.search(filter_str, n, flags)] if not format_str: return nodes from .profile_service import ProfileService profile_service = ProfileService(self.config) formatted_nodes = [] for n_id in nodes: # Use ProfileService to resolve profiles for dynamic formatting details = self.config.getitem(n_id, extract=False) if details: details = profile_service.resolve_node_data(details) name = n_id.split("@")[0] location = n_id.partition("@")[2] or "root" # Prepare context for .format() with all details context = details.copy() context.update({ "name": name, "NAME": name.upper(), "location": location, "LOCATION": location.upper(), }) # Add exploded uniques (id, folder, subfolder) uniques = self.config._explode_unique(n_id) if uniques: context.update(uniques) # Add uppercase versions of all keys for convenience for k, v in list(context.items()): if isinstance(v, str): context[k.upper()] = v.upper() try: formatted_nodes.append(format_str.format(**context)) except (KeyError, IndexError, ValueError): # Fallback to original string if format fails formatted_nodes.append(n_id) return formatted_nodesReturn a listed filtered by regex match and formatted if needed.
def move_node(self, src_id, dst_id, copy=False)-
Expand source code
def move_node(self, src_id, dst_id, copy=False): """Move or copy a node.""" self._validate_node_name(dst_id) node_data = self.config.getitem(src_id) if not node_data: raise NodeNotFoundError(f"Source node '{src_id}' not found.") if dst_id in self.config._getallnodes(): raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.") new_uniques = self.config._explode_unique(dst_id) if not new_uniques: raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.") new_node_data = node_data.copy() new_node_data.update(new_uniques) self.config._connections_add(**new_node_data) if not copy: src_uniques = self.config._explode_unique(src_id) self.config._connections_del(**src_uniques) self.config._saveconfig(self.config.file)Move or copy a node.
def update_node(self, unique_id, data)-
Expand source code
def update_node(self, unique_id, data): """Explicitly update an existing node.""" all_nodes = self.config._getallnodes() if unique_id not in all_nodes: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques: data["id"] = uniques["id"] # config._connections_add actually handles updates if ID exists correctly self.config._connections_add(**data) self.config._saveconfig(self.config.file)Explicitly update an existing node.
def validate_parent_folder(self, unique_id, is_folder=False)-
Expand source code
def validate_parent_folder(self, unique_id, is_folder=False): """Check if parent folder exists for a given node unique ID.""" if is_folder: uniques = self.config._explode_unique(unique_id) if uniques and "subfolder" in uniques and "folder" in uniques: parent_folder = f"@{uniques['folder']}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") else: node_folder = unique_id.partition("@")[2] if node_folder: parent_folder = f"@{node_folder}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")Check if parent folder exists for a given node unique ID.
Inherited members
class PluginService (config=None)-
Expand source code
class PluginService(BaseService): """Business logic for enabling, disabling, and listing plugins.""" def list_plugins(self): """List all core and user-defined plugins with their status and hash.""" import os import hashlib # Check for user plugins directory plugin_dir = os.path.join(self.config.defaultdir, "plugins") # Check for core plugins directory core_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins") all_plugin_info = {} def get_hash(path): try: with open(path, "rb") as f: return hashlib.md5(f.read()).hexdigest() except Exception: return "" # User plugins if os.path.exists(plugin_dir): for f in os.listdir(plugin_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(plugin_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} elif f.endswith(".py.bkp"): name = f[:-7] all_plugin_info[name] = {"enabled": False} return all_plugin_info def add_plugin(self, name, source_file, update=False): """Add or update a plugin from a local file.""" import os import shutil from connpy.plugins import Plugins if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") p_manager = Plugins() # Check for bad script error = p_manager.verify_script(source_file) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, source_file, update, is_path=True) def add_plugin_from_bytes(self, name, content, update=False): """Add or update a plugin from bytes (gRPC).""" import tempfile import os if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") # Write to temp file to verify script with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp: tmp.write(content) tmp_path = tmp.name try: from connpy.plugins import Plugins p_manager = Plugins() error = p_manager.verify_script(tmp_path) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, tmp_path, update, is_path=True) finally: if os.path.exists(tmp_path): os.remove(tmp_path) def _save_plugin_file(self, name, source, update=False, is_path=True): import os import shutil plugin_dir = os.path.join(self.config.defaultdir, "plugins") os.makedirs(plugin_dir, exist_ok=True) target_file = os.path.join(plugin_dir, f"{name}.py") backup_file = f"{target_file}.bkp" if not update and (os.path.exists(target_file) or os.path.exists(backup_file)): raise InvalidConfigurationError(f"Plugin '{name}' already exists.") try: if is_path: shutil.copy2(source, target_file) else: with open(target_file, "wb") as f: f.write(source) except OSError as e: raise InvalidConfigurationError(f"Failed to save plugin file: {e}") def delete_plugin(self, name): """Remove a plugin file permanently.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" deleted = False for f in [plugin_file, disabled_file]: if os.path.exists(f): try: os.remove(f) deleted = True except OSError as e: raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}") if not deleted: raise InvalidConfigurationError(f"Plugin '{name}' not found.") def enable_plugin(self, name): """Activate a plugin by renaming its backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(plugin_file): return False # Already enabled if not os.path.exists(disabled_file): raise InvalidConfigurationError(f"Plugin '{name}' not found.") try: os.rename(disabled_file, plugin_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}") def disable_plugin(self, name): """Deactivate a plugin by renaming it to a backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(disabled_file): return False # Already disabled if not os.path.exists(plugin_file): raise InvalidConfigurationError(f"Plugin '{name}' not found or is a core plugin.") try: os.rename(plugin_file, disabled_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}") def get_plugin_source(self, name): import os from ..services.exceptions import InvalidConfigurationError plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py" if os.path.exists(plugin_file): target = plugin_file elif os.path.exists(core_path): target = core_path else: raise InvalidConfigurationError(f"Plugin '{name}' not found") with open(target, "r") as f: return f.read() def invoke_plugin(self, name, args_dict): import sys, io from argparse import Namespace from ..services.exceptions import InvalidConfigurationError from connpy.plugins import Plugins class MockApp: is_mock = True def __init__(self, config): from ..core import node, nodes from ..ai import ai from ..services.provider import ServiceProvider self.config = config self.node = node self.nodes = nodes self.ai = ai self.services = ServiceProvider(config, mode="local") # Get settings for CLI behavior settings = self.services.config_svc.get_settings() self.case = settings.get("case", False) self.fzf = settings.get("fzf", False) try: self.nodes_list = self.services.nodes.list_nodes() self.folders = self.services.nodes.list_folders() self.profiles = self.services.profiles.list_profiles() except Exception: self.nodes_list = [] self.folders = [] self.profiles = [] args = Namespace(**args_dict) p_manager = Plugins() import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py" if os.path.exists(plugin_file): target = plugin_file elif os.path.exists(core_path): target = core_path else: raise InvalidConfigurationError(f"Plugin '{name}' not found") module = p_manager._import_from_path(target) parser = module.Parser().parser if hasattr(module, "Parser") else None if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]): args.func = getattr(module, args_dict["__func_name__"]) app = MockApp(self.config) from .. import printer from rich.console import Console from rich.console import Console buf = io.StringIO() old_console = printer._get_console() old_err_console = printer._get_err_console() printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_stream(buf) try: if hasattr(module, "Entrypoint"): module.Entrypoint(args, parser, app) except BaseException as e: if not isinstance(e, SystemExit): import traceback printer.err_console.print(traceback.format_exc()) finally: printer.set_thread_console(old_console) printer.set_thread_err_console(old_err_console) printer.set_thread_stream(None) for line in buf.getvalue().splitlines(keepends=True): yield lineBusiness logic for enabling, disabling, and listing plugins.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def add_plugin(self, name, source_file, update=False)-
Expand source code
def add_plugin(self, name, source_file, update=False): """Add or update a plugin from a local file.""" import os import shutil from connpy.plugins import Plugins if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") p_manager = Plugins() # Check for bad script error = p_manager.verify_script(source_file) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, source_file, update, is_path=True)Add or update a plugin from a local file.
def add_plugin_from_bytes(self, name, content, update=False)-
Expand source code
def add_plugin_from_bytes(self, name, content, update=False): """Add or update a plugin from bytes (gRPC).""" import tempfile import os if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") # Write to temp file to verify script with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp: tmp.write(content) tmp_path = tmp.name try: from connpy.plugins import Plugins p_manager = Plugins() error = p_manager.verify_script(tmp_path) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, tmp_path, update, is_path=True) finally: if os.path.exists(tmp_path): os.remove(tmp_path)Add or update a plugin from bytes (gRPC).
def delete_plugin(self, name)-
Expand source code
def delete_plugin(self, name): """Remove a plugin file permanently.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" deleted = False for f in [plugin_file, disabled_file]: if os.path.exists(f): try: os.remove(f) deleted = True except OSError as e: raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}") if not deleted: raise InvalidConfigurationError(f"Plugin '{name}' not found.")Remove a plugin file permanently.
def disable_plugin(self, name)-
Expand source code
def disable_plugin(self, name): """Deactivate a plugin by renaming it to a backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(disabled_file): return False # Already disabled if not os.path.exists(plugin_file): raise InvalidConfigurationError(f"Plugin '{name}' not found or is a core plugin.") try: os.rename(plugin_file, disabled_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")Deactivate a plugin by renaming it to a backup file.
def enable_plugin(self, name)-
Expand source code
def enable_plugin(self, name): """Activate a plugin by renaming its backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(plugin_file): return False # Already enabled if not os.path.exists(disabled_file): raise InvalidConfigurationError(f"Plugin '{name}' not found.") try: os.rename(disabled_file, plugin_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")Activate a plugin by renaming its backup file.
def get_plugin_source(self, name)-
Expand source code
def get_plugin_source(self, name): import os from ..services.exceptions import InvalidConfigurationError plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py" if os.path.exists(plugin_file): target = plugin_file elif os.path.exists(core_path): target = core_path else: raise InvalidConfigurationError(f"Plugin '{name}' not found") with open(target, "r") as f: return f.read() def invoke_plugin(self, name, args_dict)-
Expand source code
def invoke_plugin(self, name, args_dict): import sys, io from argparse import Namespace from ..services.exceptions import InvalidConfigurationError from connpy.plugins import Plugins class MockApp: is_mock = True def __init__(self, config): from ..core import node, nodes from ..ai import ai from ..services.provider import ServiceProvider self.config = config self.node = node self.nodes = nodes self.ai = ai self.services = ServiceProvider(config, mode="local") # Get settings for CLI behavior settings = self.services.config_svc.get_settings() self.case = settings.get("case", False) self.fzf = settings.get("fzf", False) try: self.nodes_list = self.services.nodes.list_nodes() self.folders = self.services.nodes.list_folders() self.profiles = self.services.profiles.list_profiles() except Exception: self.nodes_list = [] self.folders = [] self.profiles = [] args = Namespace(**args_dict) p_manager = Plugins() import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py" if os.path.exists(plugin_file): target = plugin_file elif os.path.exists(core_path): target = core_path else: raise InvalidConfigurationError(f"Plugin '{name}' not found") module = p_manager._import_from_path(target) parser = module.Parser().parser if hasattr(module, "Parser") else None if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]): args.func = getattr(module, args_dict["__func_name__"]) app = MockApp(self.config) from .. import printer from rich.console import Console from rich.console import Console buf = io.StringIO() old_console = printer._get_console() old_err_console = printer._get_err_console() printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_stream(buf) try: if hasattr(module, "Entrypoint"): module.Entrypoint(args, parser, app) except BaseException as e: if not isinstance(e, SystemExit): import traceback printer.err_console.print(traceback.format_exc()) finally: printer.set_thread_console(old_console) printer.set_thread_err_console(old_err_console) printer.set_thread_stream(None) for line in buf.getvalue().splitlines(keepends=True): yield line def list_plugins(self)-
Expand source code
def list_plugins(self): """List all core and user-defined plugins with their status and hash.""" import os import hashlib # Check for user plugins directory plugin_dir = os.path.join(self.config.defaultdir, "plugins") # Check for core plugins directory core_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins") all_plugin_info = {} def get_hash(path): try: with open(path, "rb") as f: return hashlib.md5(f.read()).hexdigest() except Exception: return "" # User plugins if os.path.exists(plugin_dir): for f in os.listdir(plugin_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(plugin_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} elif f.endswith(".py.bkp"): name = f[:-7] all_plugin_info[name] = {"enabled": False} return all_plugin_infoList all core and user-defined plugins with their status and hash.
Inherited members
class ProfileAlreadyExistsError (*args, **kwargs)-
Expand source code
class ProfileAlreadyExistsError(ConnpyError): """Raised when a profile with the same name already exists.""" passRaised when a profile with the same name already exists.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class ProfileNotFoundError (*args, **kwargs)-
Expand source code
class ProfileNotFoundError(ConnpyError): """Raised when a profile is not found.""" passRaised when a profile is not found.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class ProfileService (config=None)-
Expand source code
class ProfileService(BaseService): """Business logic for node profiles management.""" def list_profiles(self, filter_str=None): """List all profile names, optionally filtered.""" profiles = list(self.config.profiles.keys()) case_sensitive = self.config.config.get("case", False) if filter_str: if not case_sensitive: f_str = filter_str.lower() return [p for p in profiles if f_str in p.lower()] else: return [p for p in profiles if filter_str in p] return profiles def get_profile(self, name, resolve=True): """Get the profile dictionary, optionally resolved.""" profile = self.config.profiles.get(name) if not profile: raise ProfileNotFoundError(f"Profile '{name}' not found.") if resolve: return self.resolve_node_data(profile) return profile def add_profile(self, name, data): """Add a new profile.""" if name in self.config.profiles: raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.") # Filter data to match _profiles_add signature and ensure id is passed allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file) def resolve_node_data(self, node_data): """Resolve profile references (@profile) in node data and handle inheritance.""" resolved = node_data.copy() # 1. Identify all referenced profiles to support inheritance referenced_profiles = [] for value in resolved.values(): if isinstance(value, str) and value.startswith("@"): referenced_profiles.append(value[1:]) elif isinstance(value, list): for item in value: if isinstance(item, str) and item.startswith("@"): referenced_profiles.append(item[1:]) # 2. Resolve explicit references for key, value in resolved.items(): if isinstance(value, str) and value.startswith("@"): profile_name = value[1:] try: profile = self.get_profile(profile_name, resolve=True) resolved[key] = profile.get(key, "") except ProfileNotFoundError: resolved[key] = "" elif isinstance(value, list): resolved_list = [] for item in value: if isinstance(item, str) and item.startswith("@"): profile_name = item[1:] try: profile = self.get_profile(profile_name, resolve=True) if "password" in profile: resolved_list.append(profile["password"]) except ProfileNotFoundError: pass else: resolved_list.append(item) resolved[key] = resolved_list # 3. Inheritance: Fill empty keys from the first referenced profile if referenced_profiles: base_profile_name = referenced_profiles[0] try: base_profile = self.get_profile(base_profile_name, resolve=True) for key, value in base_profile.items(): # Fill if key is missing or empty if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None: resolved[key] = value except ProfileNotFoundError: pass # 4. Handle default protocol if resolved.get("protocol") == "" or resolved.get("protocol") is None: try: default_profile = self.get_profile("default", resolve=True) resolved["protocol"] = default_profile.get("protocol", "ssh") except ProfileNotFoundError: resolved["protocol"] = "ssh" return resolved def delete_profile(self, name): """Delete an existing profile, with safety checks.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") if name == "default": raise InvalidConfigurationError("Cannot delete the 'default' profile.") used_by = self.config._profileused(name) if used_by: # We return the list of nodes using it so the UI can inform the user raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}") self.config._profiles_del(id=name) self.config._saveconfig(self.config.file) def update_profile(self, name, data): """Update an existing profile.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") # Merge with existing data existing = self.get_profile(name, resolve=False) updated_data = existing.copy() updated_data.update(data) # Filter data to match _profiles_add signature allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file)Business logic for node profiles management.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def add_profile(self, name, data)-
Expand source code
def add_profile(self, name, data): """Add a new profile.""" if name in self.config.profiles: raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.") # Filter data to match _profiles_add signature and ensure id is passed allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file)Add a new profile.
def delete_profile(self, name)-
Expand source code
def delete_profile(self, name): """Delete an existing profile, with safety checks.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") if name == "default": raise InvalidConfigurationError("Cannot delete the 'default' profile.") used_by = self.config._profileused(name) if used_by: # We return the list of nodes using it so the UI can inform the user raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}") self.config._profiles_del(id=name) self.config._saveconfig(self.config.file)Delete an existing profile, with safety checks.
def get_profile(self, name, resolve=True)-
Expand source code
def get_profile(self, name, resolve=True): """Get the profile dictionary, optionally resolved.""" profile = self.config.profiles.get(name) if not profile: raise ProfileNotFoundError(f"Profile '{name}' not found.") if resolve: return self.resolve_node_data(profile) return profileGet the profile dictionary, optionally resolved.
def list_profiles(self, filter_str=None)-
Expand source code
def list_profiles(self, filter_str=None): """List all profile names, optionally filtered.""" profiles = list(self.config.profiles.keys()) case_sensitive = self.config.config.get("case", False) if filter_str: if not case_sensitive: f_str = filter_str.lower() return [p for p in profiles if f_str in p.lower()] else: return [p for p in profiles if filter_str in p] return profilesList all profile names, optionally filtered.
def resolve_node_data(self, node_data)-
Expand source code
def resolve_node_data(self, node_data): """Resolve profile references (@profile) in node data and handle inheritance.""" resolved = node_data.copy() # 1. Identify all referenced profiles to support inheritance referenced_profiles = [] for value in resolved.values(): if isinstance(value, str) and value.startswith("@"): referenced_profiles.append(value[1:]) elif isinstance(value, list): for item in value: if isinstance(item, str) and item.startswith("@"): referenced_profiles.append(item[1:]) # 2. Resolve explicit references for key, value in resolved.items(): if isinstance(value, str) and value.startswith("@"): profile_name = value[1:] try: profile = self.get_profile(profile_name, resolve=True) resolved[key] = profile.get(key, "") except ProfileNotFoundError: resolved[key] = "" elif isinstance(value, list): resolved_list = [] for item in value: if isinstance(item, str) and item.startswith("@"): profile_name = item[1:] try: profile = self.get_profile(profile_name, resolve=True) if "password" in profile: resolved_list.append(profile["password"]) except ProfileNotFoundError: pass else: resolved_list.append(item) resolved[key] = resolved_list # 3. Inheritance: Fill empty keys from the first referenced profile if referenced_profiles: base_profile_name = referenced_profiles[0] try: base_profile = self.get_profile(base_profile_name, resolve=True) for key, value in base_profile.items(): # Fill if key is missing or empty if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None: resolved[key] = value except ProfileNotFoundError: pass # 4. Handle default protocol if resolved.get("protocol") == "" or resolved.get("protocol") is None: try: default_profile = self.get_profile("default", resolve=True) resolved["protocol"] = default_profile.get("protocol", "ssh") except ProfileNotFoundError: resolved["protocol"] = "ssh" return resolvedResolve profile references (@profile) in node data and handle inheritance.
def update_profile(self, name, data)-
Expand source code
def update_profile(self, name, data): """Update an existing profile.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") # Merge with existing data existing = self.get_profile(name, resolve=False) updated_data = existing.copy() updated_data.update(data) # Filter data to match _profiles_add signature allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file)Update an existing profile.
Inherited members
class SystemService (config=None)-
Expand source code
class SystemService(BaseService): """Business logic for application lifecycle (API, processes).""" def start_api(self, port=None): """Start the Connpy REST API.""" from connpy.api import start_api try: start_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API: {e}") def debug_api(self, port=None): """Start the Connpy REST API in debug mode.""" from connpy.api import debug_api try: debug_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API in debug mode: {e}") def stop_api(self): """Stop the Connpy REST API.""" try: import os import signal pids = ["/run/connpy.pid", "/tmp/connpy.pid"] stopped = False for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: # Read only the first line (PID) line = f.readline().strip() if not line: continue pid = int(line) os.kill(pid, signal.SIGTERM) # Remove the PID file after successful kill os.remove(pid_file) stopped = True except (ValueError, OSError, ProcessLookupError): # If process is already dead, just remove the stale PID file try: os.remove(pid_file) except OSError: pass continue return stopped except Exception as e: raise ConnpyError(f"Failed to stop API: {e}") def restart_api(self, port=None): """Restart the Connpy REST API, maintaining the current port if none provided.""" if port is None: status = self.get_api_status() if status["running"] and status.get("port"): port = status["port"] self.stop_api() import time time.sleep(1) self.start_api(port) def get_api_status(self): """Check if the API is currently running.""" import os pids = ["/run/connpy.pid", "/tmp/connpy.pid"] for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: pid_line = f.readline().strip() port_line = f.readline().strip() if not pid_line: continue pid = int(pid_line) port = int(port_line) if port_line else None # Signal 0 checks for process existence without killing it os.kill(pid, 0) return {"running": True, "pid": pid, "port": port, "pid_file": pid_file} except (ValueError, OSError, ProcessLookupError): continue return {"running": False}Business logic for application lifecycle (API, processes).
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def debug_api(self, port=None)-
Expand source code
def debug_api(self, port=None): """Start the Connpy REST API in debug mode.""" from connpy.api import debug_api try: debug_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API in debug mode: {e}")Start the Connpy REST API in debug mode.
def get_api_status(self)-
Expand source code
def get_api_status(self): """Check if the API is currently running.""" import os pids = ["/run/connpy.pid", "/tmp/connpy.pid"] for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: pid_line = f.readline().strip() port_line = f.readline().strip() if not pid_line: continue pid = int(pid_line) port = int(port_line) if port_line else None # Signal 0 checks for process existence without killing it os.kill(pid, 0) return {"running": True, "pid": pid, "port": port, "pid_file": pid_file} except (ValueError, OSError, ProcessLookupError): continue return {"running": False}Check if the API is currently running.
def restart_api(self, port=None)-
Expand source code
def restart_api(self, port=None): """Restart the Connpy REST API, maintaining the current port if none provided.""" if port is None: status = self.get_api_status() if status["running"] and status.get("port"): port = status["port"] self.stop_api() import time time.sleep(1) self.start_api(port)Restart the Connpy REST API, maintaining the current port if none provided.
def start_api(self, port=None)-
Expand source code
def start_api(self, port=None): """Start the Connpy REST API.""" from connpy.api import start_api try: start_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API: {e}")Start the Connpy REST API.
def stop_api(self)-
Expand source code
def stop_api(self): """Stop the Connpy REST API.""" try: import os import signal pids = ["/run/connpy.pid", "/tmp/connpy.pid"] stopped = False for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: # Read only the first line (PID) line = f.readline().strip() if not line: continue pid = int(line) os.kill(pid, signal.SIGTERM) # Remove the PID file after successful kill os.remove(pid_file) stopped = True except (ValueError, OSError, ProcessLookupError): # If process is already dead, just remove the stale PID file try: os.remove(pid_file) except OSError: pass continue return stopped except Exception as e: raise ConnpyError(f"Failed to stop API: {e}")Stop the Connpy REST API.
Inherited members