From c81f6e049f18d18a5a4224b43c3a3fc09e2fb06b Mon Sep 17 00:00:00 2001 From: Fede Luzzi Date: Thu, 30 Apr 2026 19:25:17 -0300 Subject: [PATCH] feat: advanced jumphost capabilities and core bug fixes - **Core/Jumphost**: Implement native jumphost support for `ssm`, `kubectl`, and `docker` protocols via transparent ProxyCommands. - **Core/Tags**: Introduce `ssh_options` tag to pass custom arguments (e.g., `-i key.pem`) inside nested SSM SSH tunnels. - **Core/Tags**: Introduce `nc_command` tag to override default `nc` behavior in Docker/Kube tunnels (crucial for IOS-XRd VRF netns isolation). - **Core/Async**: Fix a critical bug where interactive terminal sessions disconnected immediately due to premature `asyncio.FIRST_COMPLETED` task resolution when logs or idletime were disabled. - **Services/Node**: Fix logic error in `validate_parent_folder` that prevented the creation of new nested subfolders (CLI was incorrectly checking for the subfolder's own existence instead of the parent's). - **CLI/Help**: Update documentation to include the new `region`, `profile`, `ssh_options`, and `nc_command` well-known tags. --- connpy/_version.py | 2 +- connpy/cli/help_text.py | 4 +++ connpy/core.py | 50 +++++++++++++++++++++++++++------ connpy/services/node_service.py | 21 +++++++++----- 4 files changed, 61 insertions(+), 16 deletions(-) diff --git a/connpy/_version.py b/connpy/_version.py index 23ed2cb..f653cc0 100644 --- a/connpy/_version.py +++ b/connpy/_version.py @@ -1 +1 @@ -__version__ = "6.0.0b2" +__version__ = "6.0.0b3" diff --git a/connpy/cli/help_text.py b/connpy/cli/help_text.py index 3096bac..2a53a1f 100644 --- a/connpy/cli/help_text.py +++ b/connpy/cli/help_text.py @@ -55,6 +55,10 @@ Here are some important instructions and tips for configuring your new node: - `prompt`: Replaces default app prompt to identify the end of output or where the user can start inputting commands. - `kube_command`: Replaces the default command (`/bin/bash`) for `kubectl exec`. - `docker_command`: Replaces the default command for `docker exec`. + - `region`: AWS Region used for `aws ssm start-session`. + - `profile`: AWS Profile used for `aws ssm start-session`. + - `ssh_options`: Additional SSH options injected when an SSM node is used as a jumphost (e.g., `-i ~/.ssh/key.pem`). + - `nc_command`: Replaces the default `nc` command used when bridging connections through Docker or Kubernetes (e.g., `ip netns exec global-vrf nc`). """ if type == "bashcompletion": return ''' diff --git a/connpy/core.py b/connpy/core.py index ef76328..6ccbc42 100755 --- a/connpy/core.py +++ b/connpy/core.py @@ -146,6 +146,42 @@ class node: else: jumphost_cmd = jumphost_cmd + " {}".format("@".join([self.jumphost["user"],self.jumphost["host"]])) self.jumphost = f"-o ProxyCommand=\"{jumphost_cmd}\"" + elif self.jumphost["protocol"] == "ssm": + ssm_target = self.jumphost["host"] + ssm_cmd = f"aws ssm start-session --target {ssm_target} --document-name AWS-StartSSHSession --parameters 'portNumber=22'" + if isinstance(self.jumphost.get("tags"), dict): + if "profile" in self.jumphost["tags"]: + ssm_cmd += f" --profile {self.jumphost['tags']['profile']}" + if "region" in self.jumphost["tags"]: + ssm_cmd += f" --region {self.jumphost['tags']['region']}" + if self.jumphost["options"] != '': + ssm_cmd += f" {self.jumphost['options']}" + + bastion_user_part = f"{self.jumphost['user']}@{ssm_target}" if self.jumphost['user'] else ssm_target + + ssh_opts = "" + if isinstance(self.jumphost.get("tags"), dict) and "ssh_options" in self.jumphost["tags"]: + ssh_opts = f" {self.jumphost['tags']['ssh_options']}" + + inner_ssh = f"ssh{ssh_opts} -o ProxyCommand='{ssm_cmd}' -W %h:%p {bastion_user_part}" + self.jumphost = f"-o ProxyCommand=\"{inner_ssh}\"" + elif self.jumphost["protocol"] in ["kubectl", "docker"]: + nc_cmd = "nc" + if isinstance(self.jumphost.get("tags"), dict) and "nc_command" in self.jumphost["tags"]: + nc_cmd = self.jumphost["tags"]["nc_command"] + + if self.jumphost["protocol"] == "kubectl": + proxy_cmd = f"kubectl exec " + if self.jumphost["options"] != '': + proxy_cmd += f"{self.jumphost['options']} " + proxy_cmd += f"{self.jumphost['host']} -i -- {nc_cmd} %h %p" + else: + proxy_cmd = f"docker " + if self.jumphost["options"] != '': + proxy_cmd += f"{self.jumphost['options']} " + proxy_cmd += f"exec -i {self.jumphost['host']} {nc_cmd} %h %p" + + self.jumphost = f"-o ProxyCommand=\"{proxy_cmd}\"" else: self.jumphost = "" @@ -401,8 +437,6 @@ class node: self.mylog.write(data) async def keepalive_task(): - if self.idletime <= 0: - return while True: await asyncio.sleep(1) if time() - self.lastinput >= self.idletime: @@ -413,8 +447,6 @@ class node: pass async def savelog_task(): - if not hasattr(self, 'logfile') or not hasattr(self, 'mylog'): - return prev_size = 0 while True: await asyncio.sleep(5) @@ -433,10 +465,12 @@ class node: # We want to exit if either happens, so return_exceptions=False, but we need to cancel the others. tasks = [ asyncio.create_task(ingress_task()), - asyncio.create_task(egress_task()), - asyncio.create_task(keepalive_task()), - asyncio.create_task(savelog_task()) + asyncio.create_task(egress_task()) ] + if self.idletime > 0: + tasks.append(asyncio.create_task(keepalive_task())) + if hasattr(self, 'logfile') and hasattr(self, 'mylog'): + tasks.append(asyncio.create_task(savelog_task())) done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for p in pending: p.cancel() @@ -802,7 +836,7 @@ class node: "telnet": ['[u|U]sername:', 'refused', 'supported', 'invalid|unrecognized option', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"], "kubectl": ['[u|U]sername:', '[r|R]efused', '[E|e]rror', 'DEPRECATED', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF, "expired|invalid"], "docker": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'not a docker command', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF], - "ssm": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'SessionManagerPlugin', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF] + "ssm": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'SessionManagerPlugin', '[u|U]nknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF] } error_indices = { diff --git a/connpy/services/node_service.py b/connpy/services/node_service.py index 2a04c03..de27be6 100644 --- a/connpy/services/node_service.py +++ b/connpy/services/node_service.py @@ -89,13 +89,20 @@ class NodeService(BaseService): """Generate and update the internal nodes cache.""" self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles) - def validate_parent_folder(self, unique_id): + def validate_parent_folder(self, unique_id, is_folder=False): """Check if parent folder exists for a given node unique ID.""" - node_folder = unique_id.partition("@")[2] - if node_folder: - parent_folder = f"@{node_folder}" - if parent_folder not in self.config._getallfolders(): - raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") + if is_folder: + uniques = self.config._explode_unique(unique_id) + if uniques and "subfolder" in uniques and "folder" in uniques: + parent_folder = f"@{uniques['folder']}" + if parent_folder not in self.config._getallfolders(): + raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") + else: + node_folder = unique_id.partition("@")[2] + if node_folder: + parent_folder = f"@{node_folder}" + if parent_folder not in self.config._getallfolders(): + raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") def add_node(self, unique_id, data, is_folder=False): @@ -115,7 +122,7 @@ class NodeService(BaseService): # Check if parent folder exists when creating a subfolder if "subfolder" in uniques: - self.validate_parent_folder(unique_id) + self.validate_parent_folder(unique_id, is_folder=True) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file)