Module connpy.services.node_service
Classes
class NodeService (config=None)-
Expand source code
class NodeService(BaseService): def __init__(self, config=None): super().__init__(config) def list_nodes(self, filter_str=None, format_str=None): """Return a listed filtered by regex match and formatted if needed.""" nodes = self.config._getallnodes() case_sensitive = self.config.config.get("case", False) if filter_str: flags = re.IGNORECASE if not case_sensitive else 0 nodes = [n for n in nodes if re.search(filter_str, n, flags)] if not format_str: return nodes from .profile_service import ProfileService profile_service = ProfileService(self.config) formatted_nodes = [] for n_id in nodes: # Use ProfileService to resolve profiles for dynamic formatting details = self.config.getitem(n_id, extract=False) if details: details = profile_service.resolve_node_data(details) name = n_id.split("@")[0] location = n_id.partition("@")[2] or "root" # Prepare context for .format() with all details context = details.copy() context.update({ "name": name, "NAME": name.upper(), "location": location, "LOCATION": location.upper(), }) # Add exploded uniques (id, folder, subfolder) uniques = self.config._explode_unique(n_id) if uniques: context.update(uniques) # Add uppercase versions of all keys for convenience for k, v in list(context.items()): if isinstance(v, str): context[k.upper()] = v.upper() try: formatted_nodes.append(format_str.format(**context)) except (KeyError, IndexError, ValueError): # Fallback to original string if format fails formatted_nodes.append(n_id) return formatted_nodes def list_folders(self, filter_str=None): """Return all unique folders, optionally filtered by regex.""" folders = self.config._getallfolders() case_sensitive = self.config.config.get("case", False) if filter_str: if filter_str.startswith("@"): if not case_sensitive: folders = [f for f in folders if f.lower() == filter_str.lower()] else: folders = [f for f in folders if f == filter_str] else: flags = re.IGNORECASE if not case_sensitive else 0 folders = [f for f in folders if re.search(filter_str, f, flags)] return folders def get_node_details(self, unique_id): """Return full configuration dictionary for a specific node.""" try: details = self.config.getitem(unique_id) if not details: raise NodeNotFoundError(f"Node '{unique_id}' not found.") return details except (KeyError, TypeError): raise NodeNotFoundError(f"Node '{unique_id}' not found.") def explode_unique(self, unique_id): """Explode a unique ID into a dictionary of its parts.""" return self.config._explode_unique(unique_id) def generate_cache(self, nodes=None, folders=None, profiles=None): """Generate and update the internal nodes cache.""" self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles) def validate_parent_folder(self, unique_id, is_folder=False): """Check if parent folder exists for a given node unique ID.""" if is_folder: uniques = self.config._explode_unique(unique_id) if uniques and "subfolder" in uniques and "folder" in uniques: parent_folder = f"@{uniques['folder']}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") else: node_folder = unique_id.partition("@")[2] if node_folder: parent_folder = f"@{node_folder}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") def add_node(self, unique_id, data, is_folder=False): """Logic for adding a new node or folder to configuration.""" if not is_folder: self._validate_node_name(unique_id) all_nodes = self.config._getallnodes() all_folders = self.config._getallfolders() if is_folder: if unique_id in all_folders: raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.") uniques = self.config._explode_unique(unique_id) if not uniques: raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.") # Check if parent folder exists when creating a subfolder if "subfolder" in uniques: self.validate_parent_folder(unique_id, is_folder=True) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) else: if unique_id in all_nodes: raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.") # Check if parent folder exists when creating a node in a folder self.validate_parent_folder(unique_id) # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques and "id" in uniques: data["id"] = uniques["id"] self.config._connections_add(**data) self.config._saveconfig(self.config.file) def update_node(self, unique_id, data): """Explicitly update an existing node.""" all_nodes = self.config._getallnodes() if unique_id not in all_nodes: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques: data["id"] = uniques["id"] # config._connections_add actually handles updates if ID exists correctly self.config._connections_add(**data) self.config._saveconfig(self.config.file) def delete_node(self, unique_id, is_folder=False): """Logic for deleting a node or folder.""" if is_folder: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.") self.config._folder_del(**uniques) else: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.") self.config._connections_del(**uniques) self.config._saveconfig(self.config.file) def connect_node(self, unique_id, sftp=False, debug=False, logger=None): """Interact with a node directly.""" from connpy.core import node from .profile_service import ProfileService node_data = self.config.getitem(unique_id, extract=False) if not node_data: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Resolve profiles profile_service = ProfileService(self.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.config) if sftp: n.protocol = "sftp" n.interact(debug=debug, logger=logger) def move_node(self, src_id, dst_id, copy=False): """Move or copy a node.""" self._validate_node_name(dst_id) node_data = self.config.getitem(src_id) if not node_data: raise NodeNotFoundError(f"Source node '{src_id}' not found.") if dst_id in self.config._getallnodes(): raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.") new_uniques = self.config._explode_unique(dst_id) if not new_uniques: raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.") new_node_data = node_data.copy() new_node_data.update(new_uniques) self.config._connections_add(**new_node_data) if not copy: src_uniques = self.config._explode_unique(src_id) self.config._connections_del(**src_uniques) self.config._saveconfig(self.config.file) def bulk_add(self, ids, hosts, common_data): """Add multiple nodes with shared common configuration.""" count = 0 all_nodes = self.config._getallnodes() for i, uid in enumerate(ids): if uid in all_nodes: continue try: self._validate_node_name(uid) except ReservedNameError: # For bulk, we might want to just skip or log. # CLI caller will handle if it wants to be strict. continue host = hosts[i] if i < len(hosts) else hosts[0] uniques = self.config._explode_unique(uid) if not uniques: continue node_data = common_data.copy() node_data.pop("ids", None) node_data.pop("location", None) node_data.update(uniques) node_data["host"] = host node_data["type"] = "connection" self.config._connections_add(**node_data) count += 1 if count > 0: self.config._saveconfig(self.config.file) return count def full_replace(self, connections, profiles): """Replace all connections and profiles with new data.""" self.config.connections = connections self.config.profiles = profiles self.config._saveconfig(self.config.file) def get_inventory(self): """Return a full snapshot of connections and profiles.""" return { "connections": self.config.connections, "profiles": self.config.profiles }Base class for all connpy services, providing common configuration access.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def add_node(self, unique_id, data, is_folder=False)-
Expand source code
def add_node(self, unique_id, data, is_folder=False): """Logic for adding a new node or folder to configuration.""" if not is_folder: self._validate_node_name(unique_id) all_nodes = self.config._getallnodes() all_folders = self.config._getallfolders() if is_folder: if unique_id in all_folders: raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.") uniques = self.config._explode_unique(unique_id) if not uniques: raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.") # Check if parent folder exists when creating a subfolder if "subfolder" in uniques: self.validate_parent_folder(unique_id, is_folder=True) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) else: if unique_id in all_nodes: raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.") # Check if parent folder exists when creating a node in a folder self.validate_parent_folder(unique_id) # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques and "id" in uniques: data["id"] = uniques["id"] self.config._connections_add(**data) self.config._saveconfig(self.config.file)Logic for adding a new node or folder to configuration.
def bulk_add(self, ids, hosts, common_data)-
Expand source code
def bulk_add(self, ids, hosts, common_data): """Add multiple nodes with shared common configuration.""" count = 0 all_nodes = self.config._getallnodes() for i, uid in enumerate(ids): if uid in all_nodes: continue try: self._validate_node_name(uid) except ReservedNameError: # For bulk, we might want to just skip or log. # CLI caller will handle if it wants to be strict. continue host = hosts[i] if i < len(hosts) else hosts[0] uniques = self.config._explode_unique(uid) if not uniques: continue node_data = common_data.copy() node_data.pop("ids", None) node_data.pop("location", None) node_data.update(uniques) node_data["host"] = host node_data["type"] = "connection" self.config._connections_add(**node_data) count += 1 if count > 0: self.config._saveconfig(self.config.file) return countAdd multiple nodes with shared common configuration.
def connect_node(self, unique_id, sftp=False, debug=False, logger=None)-
Expand source code
def connect_node(self, unique_id, sftp=False, debug=False, logger=None): """Interact with a node directly.""" from connpy.core import node from .profile_service import ProfileService node_data = self.config.getitem(unique_id, extract=False) if not node_data: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Resolve profiles profile_service = ProfileService(self.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.config) if sftp: n.protocol = "sftp" n.interact(debug=debug, logger=logger)Interact with a node directly.
def delete_node(self, unique_id, is_folder=False)-
Expand source code
def delete_node(self, unique_id, is_folder=False): """Logic for deleting a node or folder.""" if is_folder: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.") self.config._folder_del(**uniques) else: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.") self.config._connections_del(**uniques) self.config._saveconfig(self.config.file)Logic for deleting a node or folder.
def explode_unique(self, unique_id)-
Expand source code
def explode_unique(self, unique_id): """Explode a unique ID into a dictionary of its parts.""" return self.config._explode_unique(unique_id)Explode a unique ID into a dictionary of its parts.
def full_replace(self, connections, profiles)-
Expand source code
def full_replace(self, connections, profiles): """Replace all connections and profiles with new data.""" self.config.connections = connections self.config.profiles = profiles self.config._saveconfig(self.config.file)Replace all connections and profiles with new data.
def generate_cache(self, nodes=None, folders=None, profiles=None)-
Expand source code
def generate_cache(self, nodes=None, folders=None, profiles=None): """Generate and update the internal nodes cache.""" self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)Generate and update the internal nodes cache.
def get_inventory(self)-
Expand source code
def get_inventory(self): """Return a full snapshot of connections and profiles.""" return { "connections": self.config.connections, "profiles": self.config.profiles }Return a full snapshot of connections and profiles.
def get_node_details(self, unique_id)-
Expand source code
def get_node_details(self, unique_id): """Return full configuration dictionary for a specific node.""" try: details = self.config.getitem(unique_id) if not details: raise NodeNotFoundError(f"Node '{unique_id}' not found.") return details except (KeyError, TypeError): raise NodeNotFoundError(f"Node '{unique_id}' not found.")Return full configuration dictionary for a specific node.
def list_folders(self, filter_str=None)-
Expand source code
def list_folders(self, filter_str=None): """Return all unique folders, optionally filtered by regex.""" folders = self.config._getallfolders() case_sensitive = self.config.config.get("case", False) if filter_str: if filter_str.startswith("@"): if not case_sensitive: folders = [f for f in folders if f.lower() == filter_str.lower()] else: folders = [f for f in folders if f == filter_str] else: flags = re.IGNORECASE if not case_sensitive else 0 folders = [f for f in folders if re.search(filter_str, f, flags)] return foldersReturn all unique folders, optionally filtered by regex.
def list_nodes(self, filter_str=None, format_str=None)-
Expand source code
def list_nodes(self, filter_str=None, format_str=None): """Return a listed filtered by regex match and formatted if needed.""" nodes = self.config._getallnodes() case_sensitive = self.config.config.get("case", False) if filter_str: flags = re.IGNORECASE if not case_sensitive else 0 nodes = [n for n in nodes if re.search(filter_str, n, flags)] if not format_str: return nodes from .profile_service import ProfileService profile_service = ProfileService(self.config) formatted_nodes = [] for n_id in nodes: # Use ProfileService to resolve profiles for dynamic formatting details = self.config.getitem(n_id, extract=False) if details: details = profile_service.resolve_node_data(details) name = n_id.split("@")[0] location = n_id.partition("@")[2] or "root" # Prepare context for .format() with all details context = details.copy() context.update({ "name": name, "NAME": name.upper(), "location": location, "LOCATION": location.upper(), }) # Add exploded uniques (id, folder, subfolder) uniques = self.config._explode_unique(n_id) if uniques: context.update(uniques) # Add uppercase versions of all keys for convenience for k, v in list(context.items()): if isinstance(v, str): context[k.upper()] = v.upper() try: formatted_nodes.append(format_str.format(**context)) except (KeyError, IndexError, ValueError): # Fallback to original string if format fails formatted_nodes.append(n_id) return formatted_nodesReturn a listed filtered by regex match and formatted if needed.
def move_node(self, src_id, dst_id, copy=False)-
Expand source code
def move_node(self, src_id, dst_id, copy=False): """Move or copy a node.""" self._validate_node_name(dst_id) node_data = self.config.getitem(src_id) if not node_data: raise NodeNotFoundError(f"Source node '{src_id}' not found.") if dst_id in self.config._getallnodes(): raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.") new_uniques = self.config._explode_unique(dst_id) if not new_uniques: raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.") new_node_data = node_data.copy() new_node_data.update(new_uniques) self.config._connections_add(**new_node_data) if not copy: src_uniques = self.config._explode_unique(src_id) self.config._connections_del(**src_uniques) self.config._saveconfig(self.config.file)Move or copy a node.
def update_node(self, unique_id, data)-
Expand source code
def update_node(self, unique_id, data): """Explicitly update an existing node.""" all_nodes = self.config._getallnodes() if unique_id not in all_nodes: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques: data["id"] = uniques["id"] # config._connections_add actually handles updates if ID exists correctly self.config._connections_add(**data) self.config._saveconfig(self.config.file)Explicitly update an existing node.
def validate_parent_folder(self, unique_id, is_folder=False)-
Expand source code
def validate_parent_folder(self, unique_id, is_folder=False): """Check if parent folder exists for a given node unique ID.""" if is_folder: uniques = self.config._explode_unique(unique_id) if uniques and "subfolder" in uniques and "folder" in uniques: parent_folder = f"@{uniques['folder']}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") else: node_folder = unique_id.partition("@")[2] if node_folder: parent_folder = f"@{node_folder}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")Check if parent folder exists for a given node unique ID.
Inherited members