From a192bd19125aa28b12f09d8fc1f6874fc4fd9b8d Mon Sep 17 00:00:00 2001
From: Fede Luzzi
Date: Fri, 1 May 2026 18:55:25 -0300
Subject: [PATCH] connpy v6.0.0b4: AI Stability, Remote Sync & UI Polish (Clean
Commit)
---
.gitignore | 1 +
connpy/_version.py | 2 +-
connpy/ai.py | 64 ++++-
connpy/cli/ai_handler.py | 2 -
connpy/completion.py | 4 +-
connpy/grpc_layer/server.py | 18 +-
connpy/grpc_layer/stubs.py | 49 ++--
connpy/hooks.py | 6 +-
connpy/services/import_export_service.py | 90 +++++--
connpy/services/node_service.py | 10 +-
docs/connpy/cli/ai_handler.html | 6 +-
docs/connpy/cli/help_text.html | 4 +
docs/connpy/grpc_layer/server.html | 18 +-
docs/connpy/grpc_layer/stubs.html | 98 ++++---
docs/connpy/index.html | 153 +++++++++--
.../services/import_export_service.html | 178 +++++++++----
docs/connpy/services/index.html | 242 +++++++++++++-----
docs/connpy/services/node_service.html | 64 +++--
18 files changed, 717 insertions(+), 292 deletions(-)
diff --git a/.gitignore b/.gitignore
index dbae319..d4c8802 100644
--- a/.gitignore
+++ b/.gitignore
@@ -160,3 +160,4 @@ ssm_implemmetaiton_plan.md
async_interact_plan.md
repo_consolidado_limpio.md
connpy_roadmap.md
+MULTI_USER_PLAN.md
diff --git a/connpy/_version.py b/connpy/_version.py
index f653cc0..4115f10 100644
--- a/connpy/_version.py
+++ b/connpy/_version.py
@@ -1 +1 @@
-__version__ = "6.0.0b3"
+__version__ = "6.0.0b4"
diff --git a/connpy/ai.py b/connpy/ai.py
index 436a584..424fe56 100755
--- a/connpy/ai.py
+++ b/connpy/ai.py
@@ -273,9 +273,12 @@ class ai:
if not debug and not chunk_callback:
if not is_streaming_text:
- # Stop spinner before starting live display
+ # Stop spinner definitively
if status:
- status.stop()
+ try:
+ status.stop()
+ except Exception:
+ pass
live_display = Live(
Panel(Markdown(full_content), title=title, border_style=border, expand=False),
console=self.console,
@@ -463,7 +466,7 @@ class ai:
tail_limit = int(final_limit * 0.4)
return (text[:head_limit] + f"\n\n[... OUTPUT TRUNCATED ...]\n\n" + text[-tail_limit:])
- def _print_debug_observation(self, fn, obs):
+ def _print_debug_observation(self, fn, obs, status=None):
"""Prints a tool observation in a readable way during debug mode."""
# Try to parse as JSON if it's a string
if isinstance(obs, str):
@@ -487,6 +490,7 @@ class ai:
content = Text("Empty data set")
else:
# Add a small spacer instead of a Rule for cleaner look
+ from rich.console import Group
content = Group(*elements)
elif isinstance(obs_data, list):
content = Text("\n".join(f"• {item}" for item in obs_data))
@@ -494,7 +498,18 @@ class ai:
content = Text(str(obs_data))
title = f"[bold]{fn}[/bold]"
+
+ # Stop status before printing panel to avoid ghosting
+ if status:
+ try: status.stop()
+ except: pass
+
self.console.print(Panel(content, title=title, border_style="ai_status"))
+
+ # Resume status
+ if status:
+ try: status.start()
+ except: pass
def manage_memory_tool(self, content, action="append"):
"""Save or update long-term memory. Only use when user explicitly requests it."""
@@ -695,7 +710,7 @@ class ai:
elif fn in self.tool_status_formatters: status.update(self.tool_status_formatters[fn](args))
if debug:
- self._print_debug_observation(f"Decision: {fn}", args)
+ self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
@@ -704,7 +719,7 @@ class ai:
else: obs = f"Error: Unknown tool '{fn}'."
if debug:
- self._print_debug_observation(f"Observation: {fn}", obs)
+ self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -974,7 +989,7 @@ class ai:
streamed_response = False
try:
safe_messages = self._sanitize_messages(messages)
- if stream and (not debug or chunk_callback):
+ if stream and chunk_callback:
response, streamed_response = self._stream_completion(
model=model, messages=safe_messages, tools=tools, api_key=key,
status=status, label=label, debug=debug, num_retries=3,
@@ -1017,7 +1032,13 @@ class ai:
# In CLI debug mode, only print intermediate reasoning if there are tool calls.
# If there are no tool calls, this content is the final answer and will be printed by the caller.
if resp_msg.tool_calls:
+ if status:
+ try: status.stop()
+ except: pass
self.console.print(Panel(Markdown(resp_msg.content), title=f"[{current_brain}][bold]{label} Reasoning[/bold][/{current_brain}]", border_style="architect" if current_brain == "architect" else "engineer"))
+ if status:
+ try: status.start()
+ except: pass
if not resp_msg.tool_calls: break
@@ -1038,7 +1059,7 @@ class ai:
elif fn == "manage_memory_tool": status.update(f"[architect]Architect: [UPDATING MEMORY]")
if debug:
- self._print_debug_observation(f"Decision: {fn}", args)
+ self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "delegate_to_engineer":
obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
@@ -1057,7 +1078,14 @@ class ai:
num_retries=3
)
obs = claude_resp.choices[0].message.content
- if debug: self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
+ if status:
+ try: status.start()
+ except: pass
except Exception as e:
if status: status.update("[unavailable]Architect unavailable! Engineer continuing alone...")
obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
@@ -1074,7 +1102,14 @@ class ai:
handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
pending_user_message = handover_msg
obs = "Control transferred to Architect. Handover context will be provided."
- if debug: self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
+ if status:
+ try: status.start()
+ except: pass
elif fn == "return_to_engineer":
if status: status.update("[engineer]Transferring control back to Engineer...")
@@ -1088,7 +1123,14 @@ class ai:
handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
pending_user_message = handover_msg
obs = "Control returned to Engineer. Handover summary will be provided."
- if debug: self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
+ if status:
+ try: status.start()
+ except: pass
elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
@@ -1098,7 +1140,7 @@ class ai:
else: obs = f"Error: {fn} unknown."
if debug and fn not in ["delegate_to_engineer", "consult_architect", "escalate_to_architect", "return_to_engineer"]:
- self._print_debug_observation(f"Observation: {fn}", obs)
+ self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
diff --git a/connpy/cli/ai_handler.py b/connpy/cli/ai_handler.py
index fc57b72..7c2cf1e 100644
--- a/connpy/cli/ai_handler.py
+++ b/connpy/cli/ai_handler.py
@@ -88,7 +88,6 @@ class AIHandler:
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
- console.print()
def interactive_chat(self, args, session_id):
history = None
@@ -132,7 +131,6 @@ class AIHandler:
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
- console.print()
except (KeyboardInterrupt, EOFError):
console.print("\n[dim]Session closed.[/dim]")
break
diff --git a/connpy/completion.py b/connpy/completion.py
index e436705..016f37c 100755
--- a/connpy/completion.py
+++ b/connpy/completion.py
@@ -234,8 +234,8 @@ def _build_tree(nodes, folders, profiles, plugins, configdir):
"-a": None, "-r": None, "-s": None, "-e": None, "-h": None,
},
"plugin": {
- "--add": lambda w: get_cwd(w, "--add"),
- "--update": lambda w: get_cwd(w, "--update"),
+ "--add": {"*": lambda w: get_cwd(w, "--add")},
+ "--update": {"*": lambda w: get_cwd(w, "--update")},
"--del": lambda w: _get_plugins("--del", configdir),
"--enable": lambda w: _get_plugins("--enable", configdir),
"--disable": lambda w: _get_plugins("--disable", configdir),
diff --git a/connpy/grpc_layer/server.py b/connpy/grpc_layer/server.py
index 955793f..ca313f6 100644
--- a/connpy/grpc_layer/server.py
+++ b/connpy/grpc_layer/server.py
@@ -78,7 +78,9 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
unique_id = first_req.id
sftp = first_req.sftp
debug = first_req.debug
- printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]")
+
+ if debug:
+ printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]")
if first_req.connection_params_json:
import json
@@ -177,7 +179,8 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
while True:
data = response_queue.get()
if data is None:
- printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]")
+ if debug:
+ printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]")
break
yield connpy_pb2.InteractResponse(stdout_data=data)
@handle_errors
@@ -388,12 +391,7 @@ class ExecutionServicer(connpy_pb2_grpc.ExecutionServiceServicer):
def _worker():
try:
- # Set task name in thread state for printer if available
- if request.name:
- printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]")
-
- self.service.run_commands(
- nodes_filter=nodes_filter,
+ self.service.run_commands( nodes_filter=nodes_filter,
commands=list(request.commands),
folder=request.folder if request.folder else None,
prompt=request.prompt if request.prompt else None,
@@ -439,10 +437,6 @@ class ExecutionServicer(connpy_pb2_grpc.ExecutionServiceServicer):
def _worker():
try:
- # Set task name in thread state for printer if available
- if request.name:
- printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]")
-
self.service.test_commands(
nodes_filter=nodes_filter,
commands=list(request.commands),
diff --git a/connpy/grpc_layer/stubs.py b/connpy/grpc_layer/stubs.py
index 52219b8..a594edc 100644
--- a/connpy/grpc_layer/stubs.py
+++ b/connpy/grpc_layer/stubs.py
@@ -586,7 +586,6 @@ class AIStub:
if response.status_update:
if response.requires_confirmation:
if status: status.stop()
- if live_display: live_display.stop()
# Show prompt and wait for answer
prompt_text = Text.from_ansi(response.status_update)
@@ -595,7 +594,6 @@ class AIStub:
if status:
status.update("[ai_status]Agent: Resuming...")
status.start()
- if live_display: live_display.start()
req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
continue
@@ -606,41 +604,52 @@ class AIStub:
if response.debug_message:
if debug:
+ if status:
+ try: status.stop()
+ except: pass
printer.console.print(Text.from_ansi(response.debug_message))
+ if status:
+ try: status.start()
+ except: pass
continue
if response.important_message:
+ if status:
+ try: status.stop()
+ except: pass
printer.console.print(Text.from_ansi(response.important_message))
+ if status:
+ try: status.start()
+ except: pass
continue
if not response.is_final:
- full_content += response.text_chunk
-
- if not live_display and not debug:
- if status: status.stop()
- live_display = Live(
- Panel(Markdown(full_content), title="AI Assistant", expand=False),
- console=printer.console,
- refresh_per_second=8,
- transient=False
- )
- live_display.start()
- elif live_display:
- live_display.update(Panel(Markdown(full_content), title="AI Assistant", expand=False))
+ if response.text_chunk:
+ full_content += response.text_chunk
+
+ if status and not debug:
+ # Update the spinner line with a preview of the response
+ preview = full_content.replace("\n", " ").strip()
+ if len(preview) > 60: preview = preview[:57] + "..."
+ status.update(f"[ai_status]{preview}")
continue
if response.is_final:
+ # Final stop for status to ensure it disappears before the panel
+ if status:
+ try: status.stop()
+ except: pass
+
final_result = from_struct(response.full_result)
responder = final_result.get("responder", "engineer")
alias = "architect" if responder == "architect" else "engineer"
role_label = "Network Architect" if responder == "architect" else "Network Engineer"
title = f"[bold {alias}]{role_label}[/bold {alias}]"
- if live_display:
- live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
- live_display.stop()
- elif full_content:
- printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
+ # Always print the final Panel
+ content_to_print = full_content or final_result.get("response", "")
+ if content_to_print:
+ printer.console.print(Panel(Markdown(content_to_print), title=title, border_style=alias, expand=False))
break
except Exception as e:
# Check if it was a gRPC error that we should let handle_errors catch
diff --git a/connpy/hooks.py b/connpy/hooks.py
index def8668..658c407 100755
--- a/connpy/hooks.py
+++ b/connpy/hooks.py
@@ -20,7 +20,8 @@ class MethodHook:
try:
args, kwargs = hook(*args, **kwargs)
except Exception as e:
- printer.error(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
+ hook_name = getattr(hook, "__name__", str(hook))
+ printer.error(f"{self.func.__name__} Pre-hook {hook_name} raised an exception: {e}")
result = self.func(*args, **kwargs)
@@ -32,7 +33,8 @@ class MethodHook:
try:
result = hook(*args, **kwargs, result=result) # Pass result to hooks
except Exception as e:
- printer.error(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
+ hook_name = getattr(hook, "__name__", str(hook))
+ printer.error(f"{self.func.__name__} Post-hook {hook_name} raised an exception: {e}")
return result
diff --git a/connpy/services/import_export_service.py b/connpy/services/import_export_service.py
index d28d087..7d16eec 100644
--- a/connpy/services/import_export_service.py
+++ b/connpy/services/import_export_service.py
@@ -1,6 +1,7 @@
from .base import BaseService
import yaml
import os
+from copy import deepcopy
from .exceptions import InvalidConfigurationError, NodeNotFoundError, ReservedNameError
from ..configfile import NoAliasDumper
@@ -23,13 +24,45 @@ class ImportExportService(BaseService):
def export_to_dict(self, folders=None):
"""Export nodes/folders to a dictionary."""
if not folders:
- return self.config._getallnodesfull(extract=False)
+ return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != "@" and f not in self.config._getallfolders():
raise NodeNotFoundError(f"Folder '{f}' not found.")
- return self.config._getallnodesfull(folders, extract=False)
+
+ flat = self.config._getallnodesfull(folders, extract=False)
+ nested = {}
+ for k, v in flat.items():
+ uniques = self.config._explode_unique(k)
+ if not uniques:
+ continue
+
+ if "folder" in uniques and "subfolder" in uniques:
+ f_name = uniques["folder"]
+ s_name = uniques["subfolder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+ if s_name not in nested[f_name]:
+ nested[f_name][s_name] = {"type": "subfolder"}
+
+ nested[f_name][s_name][i_name] = v
+
+ elif "folder" in uniques:
+ f_name = uniques["folder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+
+ nested[f_name][i_name] = v
+ else:
+ i_name = uniques["id"]
+ nested[i_name] = v
+
+ return nested
def import_from_file(self, file_path):
"""Import nodes/folders from a YAML file."""
@@ -48,26 +81,35 @@ class ImportExportService(BaseService):
if not isinstance(data, dict):
raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")
- # Process imports
- for k, v in data.items():
- uniques = self.config._explode_unique(k)
-
- # Ensure folders exist
- if "folder" in uniques:
- folder_name = f"@{uniques['folder']}"
- if folder_name not in self.config._getallfolders():
- folder_uniques = self.config._explode_unique(folder_name)
- self.config._folder_add(**folder_uniques)
-
- if "subfolder" in uniques:
- sub_name = f"@{uniques['subfolder']}@{uniques['folder']}"
- if sub_name not in self.config._getallfolders():
- sub_uniques = self.config._explode_unique(sub_name)
- self.config._folder_add(**sub_uniques)
-
- # Add node/connection
- v.update(uniques)
- self._validate_node_name(k)
- self.config._connections_add(**v)
-
+ def _traverse_import(node_data, current_folder='', current_subfolder=''):
+ for k, v in node_data.items():
+ if k == "type":
+ continue
+ if isinstance(v, dict):
+ node_type = v.get("type", "connection")
+ if node_type == "folder":
+ self.config._folder_add(folder=k)
+ _traverse_import(v, current_folder=k, current_subfolder='')
+ elif node_type == "subfolder":
+ self.config._folder_add(folder=current_folder, subfolder=k)
+ _traverse_import(v, current_folder=current_folder, current_subfolder=k)
+ elif node_type == "connection":
+ unique_id = k
+ if current_subfolder:
+ unique_id = f"{k}@{current_subfolder}@{current_folder}"
+ elif current_folder:
+ unique_id = f"{k}@{current_folder}"
+ self._validate_node_name(unique_id)
+
+ kwargs = deepcopy(v)
+ kwargs['id'] = k
+ kwargs['folder'] = current_folder
+ kwargs['subfolder'] = current_subfolder
+
+ self.config._connections_add(**kwargs)
+ else:
+ # Invalid format skip
+ pass
+
+ _traverse_import(data)
self.config._saveconfig(self.config.file)
diff --git a/connpy/services/node_service.py b/connpy/services/node_service.py
index de27be6..e828a46 100644
--- a/connpy/services/node_service.py
+++ b/connpy/services/node_service.py
@@ -67,8 +67,14 @@ class NodeService(BaseService):
case_sensitive = self.config.config.get("case", False)
if filter_str:
- flags = re.IGNORECASE if not case_sensitive else 0
- folders = [f for f in folders if re.search(filter_str, f, flags)]
+ if filter_str.startswith("@"):
+ if not case_sensitive:
+ folders = [f for f in folders if f.lower() == filter_str.lower()]
+ else:
+ folders = [f for f in folders if f == filter_str]
+ else:
+ flags = re.IGNORECASE if not case_sensitive else 0
+ folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
def get_node_details(self, unique_id):
diff --git a/docs/connpy/cli/ai_handler.html b/docs/connpy/cli/ai_handler.html
index 32cd613..1756f97 100644
--- a/docs/connpy/cli/ai_handler.html
+++ b/docs/connpy/cli/ai_handler.html
@@ -134,7 +134,6 @@ el.replaceWith(d);
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
- console.print()
def interactive_chat(self, args, session_id):
history = None
@@ -178,7 +177,6 @@ el.replaceWith(d);
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
- console.print()
except (KeyboardInterrupt, EOFError):
console.print("\n[dim]Session closed.[/dim]")
break
@@ -306,7 +304,6 @@ el.replaceWith(d);
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
- console.print()
except (KeyboardInterrupt, EOFError):
console.print("\n[dim]Session closed.[/dim]")
break
@@ -335,8 +332,7 @@ el.replaceWith(d);
if "usage" in result:
u = result["usage"]
- console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
- console.print()
+ console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
diff --git a/docs/connpy/cli/help_text.html b/docs/connpy/cli/help_text.html
index e892b41..1440196 100644
--- a/docs/connpy/cli/help_text.html
+++ b/docs/connpy/cli/help_text.html
@@ -117,6 +117,10 @@ Here are some important instructions and tips for configuring your new node:
- `prompt`: Replaces default app prompt to identify the end of output or where the user can start inputting commands.
- `kube_command`: Replaces the default command (`/bin/bash`) for `kubectl exec`.
- `docker_command`: Replaces the default command for `docker exec`.
+ - `region`: AWS Region used for `aws ssm start-session`.
+ - `profile`: AWS Profile used for `aws ssm start-session`.
+ - `ssh_options`: Additional SSH options injected when an SSM node is used as a jumphost (e.g., `-i ~/.ssh/key.pem`).
+ - `nc_command`: Replaces the default `nc` command used when bridging connections through Docker or Kubernetes (e.g., `ip netns exec global-vrf nc`).
"""
if type == "bashcompletion":
return '''
diff --git a/docs/connpy/grpc_layer/server.html b/docs/connpy/grpc_layer/server.html
index 5729d79..05a9d91 100644
--- a/docs/connpy/grpc_layer/server.html
+++ b/docs/connpy/grpc_layer/server.html
@@ -367,12 +367,7 @@ el.replaceWith(d);
def _worker():
try:
- # Set task name in thread state for printer if available
- if request.name:
- printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]")
-
- self.service.run_commands(
- nodes_filter=nodes_filter,
+ self.service.run_commands( nodes_filter=nodes_filter,
commands=list(request.commands),
folder=request.folder if request.folder else None,
prompt=request.prompt if request.prompt else None,
@@ -418,10 +413,6 @@ el.replaceWith(d);
def _worker():
try:
- # Set task name in thread state for printer if available
- if request.name:
- printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]")
-
self.service.test_commands(
nodes_filter=nodes_filter,
commands=list(request.commands),
@@ -658,7 +649,9 @@ interceptor chooses to service this RPC, or None otherwise.
unique_id = first_req.id
sftp = first_req.sftp
debug = first_req.debug
- printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]")
+
+ if debug:
+ printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]")
if first_req.connection_params_json:
import json
@@ -757,7 +750,8 @@ interceptor chooses to service this RPC, or None otherwise.
while True:
data = response_queue.get()
if data is None:
- printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]")
+ if debug:
+ printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]")
break
yield connpy_pb2.InteractResponse(stdout_data=data)
@handle_errors
diff --git a/docs/connpy/grpc_layer/stubs.html b/docs/connpy/grpc_layer/stubs.html
index 2af011c..836414e 100644
--- a/docs/connpy/grpc_layer/stubs.html
+++ b/docs/connpy/grpc_layer/stubs.html
@@ -182,7 +182,6 @@ el.replaceWith(d);
if response.status_update:
if response.requires_confirmation:
if status: status.stop()
- if live_display: live_display.stop()
# Show prompt and wait for answer
prompt_text = Text.from_ansi(response.status_update)
@@ -191,7 +190,6 @@ el.replaceWith(d);
if status:
status.update("[ai_status]Agent: Resuming...")
status.start()
- if live_display: live_display.start()
req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
continue
@@ -202,41 +200,52 @@ el.replaceWith(d);
if response.debug_message:
if debug:
+ if status:
+ try: status.stop()
+ except: pass
printer.console.print(Text.from_ansi(response.debug_message))
+ if status:
+ try: status.start()
+ except: pass
continue
if response.important_message:
+ if status:
+ try: status.stop()
+ except: pass
printer.console.print(Text.from_ansi(response.important_message))
+ if status:
+ try: status.start()
+ except: pass
continue
if not response.is_final:
- full_content += response.text_chunk
-
- if not live_display and not debug:
- if status: status.stop()
- live_display = Live(
- Panel(Markdown(full_content), title="AI Assistant", expand=False),
- console=printer.console,
- refresh_per_second=8,
- transient=False
- )
- live_display.start()
- elif live_display:
- live_display.update(Panel(Markdown(full_content), title="AI Assistant", expand=False))
+ if response.text_chunk:
+ full_content += response.text_chunk
+
+ if status and not debug:
+ # Update the spinner line with a preview of the response
+ preview = full_content.replace("\n", " ").strip()
+ if len(preview) > 60: preview = preview[:57] + "..."
+ status.update(f"[ai_status]{preview}")
continue
if response.is_final:
+ # Final stop for status to ensure it disappears before the panel
+ if status:
+ try: status.stop()
+ except: pass
+
final_result = from_struct(response.full_result)
responder = final_result.get("responder", "engineer")
alias = "architect" if responder == "architect" else "engineer"
role_label = "Network Architect" if responder == "architect" else "Network Engineer"
title = f"[bold {alias}]{role_label}[/bold {alias}]"
- if live_display:
- live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
- live_display.stop()
- elif full_content:
- printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
+ # Always print the final Panel
+ content_to_print = full_content or final_result.get("response", "")
+ if content_to_print:
+ printer.console.print(Panel(Markdown(content_to_print), title=title, border_style=alias, expand=False))
break
except Exception as e:
# Check if it was a gRPC error that we should let handle_errors catch
@@ -366,7 +375,6 @@ def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debu
if response.status_update:
if response.requires_confirmation:
if status: status.stop()
- if live_display: live_display.stop()
# Show prompt and wait for answer
prompt_text = Text.from_ansi(response.status_update)
@@ -375,7 +383,6 @@ def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debu
if status:
status.update("[ai_status]Agent: Resuming...")
status.start()
- if live_display: live_display.start()
req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
continue
@@ -386,41 +393,52 @@ def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debu
if response.debug_message:
if debug:
+ if status:
+ try: status.stop()
+ except: pass
printer.console.print(Text.from_ansi(response.debug_message))
+ if status:
+ try: status.start()
+ except: pass
continue
if response.important_message:
+ if status:
+ try: status.stop()
+ except: pass
printer.console.print(Text.from_ansi(response.important_message))
+ if status:
+ try: status.start()
+ except: pass
continue
if not response.is_final:
- full_content += response.text_chunk
-
- if not live_display and not debug:
- if status: status.stop()
- live_display = Live(
- Panel(Markdown(full_content), title="AI Assistant", expand=False),
- console=printer.console,
- refresh_per_second=8,
- transient=False
- )
- live_display.start()
- elif live_display:
- live_display.update(Panel(Markdown(full_content), title="AI Assistant", expand=False))
+ if response.text_chunk:
+ full_content += response.text_chunk
+
+ if status and not debug:
+ # Update the spinner line with a preview of the response
+ preview = full_content.replace("\n", " ").strip()
+ if len(preview) > 60: preview = preview[:57] + "..."
+ status.update(f"[ai_status]{preview}")
continue
if response.is_final:
+ # Final stop for status to ensure it disappears before the panel
+ if status:
+ try: status.stop()
+ except: pass
+
final_result = from_struct(response.full_result)
responder = final_result.get("responder", "engineer")
alias = "architect" if responder == "architect" else "engineer"
role_label = "Network Architect" if responder == "architect" else "Network Engineer"
title = f"[bold {alias}]{role_label}[/bold {alias}]"
- if live_display:
- live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
- live_display.stop()
- elif full_content:
- printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
+ # Always print the final Panel
+ content_to_print = full_content or final_result.get("response", "")
+ if content_to_print:
+ printer.console.print(Panel(Markdown(content_to_print), title=title, border_style=alias, expand=False))
break
except Exception as e:
# Check if it was a gRPC error that we should let handle_errors catch
diff --git a/docs/connpy/index.html b/docs/connpy/index.html
index 41827af..ae26ce2 100644
--- a/docs/connpy/index.html
+++ b/docs/connpy/index.html
@@ -1182,9 +1182,12 @@ class ai:
if not debug and not chunk_callback:
if not is_streaming_text:
- # Stop spinner before starting live display
+ # Stop spinner definitively
if status:
- status.stop()
+ try:
+ status.stop()
+ except Exception:
+ pass
live_display = Live(
Panel(Markdown(full_content), title=title, border_style=border, expand=False),
console=self.console,
@@ -1372,7 +1375,7 @@ class ai:
tail_limit = int(final_limit * 0.4)
return (text[:head_limit] + f"\n\n[... OUTPUT TRUNCATED ...]\n\n" + text[-tail_limit:])
- def _print_debug_observation(self, fn, obs):
+ def _print_debug_observation(self, fn, obs, status=None):
"""Prints a tool observation in a readable way during debug mode."""
# Try to parse as JSON if it's a string
if isinstance(obs, str):
@@ -1396,6 +1399,7 @@ class ai:
content = Text("Empty data set")
else:
# Add a small spacer instead of a Rule for cleaner look
+ from rich.console import Group
content = Group(*elements)
elif isinstance(obs_data, list):
content = Text("\n".join(f"• {item}" for item in obs_data))
@@ -1403,7 +1407,18 @@ class ai:
content = Text(str(obs_data))
title = f"[bold]{fn}[/bold]"
+
+ # Stop status before printing panel to avoid ghosting
+ if status:
+ try: status.stop()
+ except: pass
+
self.console.print(Panel(content, title=title, border_style="ai_status"))
+
+ # Resume status
+ if status:
+ try: status.start()
+ except: pass
def manage_memory_tool(self, content, action="append"):
"""Save or update long-term memory. Only use when user explicitly requests it."""
@@ -1604,7 +1619,7 @@ class ai:
elif fn in self.tool_status_formatters: status.update(self.tool_status_formatters[fn](args))
if debug:
- self._print_debug_observation(f"Decision: {fn}", args)
+ self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
@@ -1613,7 +1628,7 @@ class ai:
else: obs = f"Error: Unknown tool '{fn}'."
if debug:
- self._print_debug_observation(f"Observation: {fn}", obs)
+ self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -1883,7 +1898,7 @@ class ai:
streamed_response = False
try:
safe_messages = self._sanitize_messages(messages)
- if stream and (not debug or chunk_callback):
+ if stream and chunk_callback:
response, streamed_response = self._stream_completion(
model=model, messages=safe_messages, tools=tools, api_key=key,
status=status, label=label, debug=debug, num_retries=3,
@@ -1926,7 +1941,13 @@ class ai:
# In CLI debug mode, only print intermediate reasoning if there are tool calls.
# If there are no tool calls, this content is the final answer and will be printed by the caller.
if resp_msg.tool_calls:
+ if status:
+ try: status.stop()
+ except: pass
self.console.print(Panel(Markdown(resp_msg.content), title=f"[{current_brain}][bold]{label} Reasoning[/bold][/{current_brain}]", border_style="architect" if current_brain == "architect" else "engineer"))
+ if status:
+ try: status.start()
+ except: pass
if not resp_msg.tool_calls: break
@@ -1947,7 +1968,7 @@ class ai:
elif fn == "manage_memory_tool": status.update(f"[architect]Architect: [UPDATING MEMORY]")
if debug:
- self._print_debug_observation(f"Decision: {fn}", args)
+ self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "delegate_to_engineer":
obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
@@ -1966,7 +1987,14 @@ class ai:
num_retries=3
)
obs = claude_resp.choices[0].message.content
- if debug: self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
+ if status:
+ try: status.start()
+ except: pass
except Exception as e:
if status: status.update("[unavailable]Architect unavailable! Engineer continuing alone...")
obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
@@ -1983,7 +2011,14 @@ class ai:
handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
pending_user_message = handover_msg
obs = "Control transferred to Architect. Handover context will be provided."
- if debug: self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
+ if status:
+ try: status.start()
+ except: pass
elif fn == "return_to_engineer":
if status: status.update("[engineer]Transferring control back to Engineer...")
@@ -1997,7 +2032,14 @@ class ai:
handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
pending_user_message = handover_msg
obs = "Control returned to Engineer. Handover summary will be provided."
- if debug: self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
+ if status:
+ try: status.start()
+ except: pass
elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
@@ -2007,7 +2049,7 @@ class ai:
else: obs = f"Error: {fn} unknown."
if debug and fn not in ["delegate_to_engineer", "consult_architect", "escalate_to_architect", "return_to_engineer"]:
- self._print_debug_observation(f"Observation: {fn}", obs)
+ self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -2229,7 +2271,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
streamed_response = False
try:
safe_messages = self._sanitize_messages(messages)
- if stream and (not debug or chunk_callback):
+ if stream and chunk_callback:
response, streamed_response = self._stream_completion(
model=model, messages=safe_messages, tools=tools, api_key=key,
status=status, label=label, debug=debug, num_retries=3,
@@ -2272,7 +2314,13 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
# In CLI debug mode, only print intermediate reasoning if there are tool calls.
# If there are no tool calls, this content is the final answer and will be printed by the caller.
if resp_msg.tool_calls:
+ if status:
+ try: status.stop()
+ except: pass
self.console.print(Panel(Markdown(resp_msg.content), title=f"[{current_brain}][bold]{label} Reasoning[/bold][/{current_brain}]", border_style="architect" if current_brain == "architect" else "engineer"))
+ if status:
+ try: status.start()
+ except: pass
if not resp_msg.tool_calls: break
@@ -2293,7 +2341,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
elif fn == "manage_memory_tool": status.update(f"[architect]Architect: [UPDATING MEMORY]")
if debug:
- self._print_debug_observation(f"Decision: {fn}", args)
+ self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "delegate_to_engineer":
obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
@@ -2312,7 +2360,14 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
num_retries=3
)
obs = claude_resp.choices[0].message.content
- if debug: self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
+ if status:
+ try: status.start()
+ except: pass
except Exception as e:
if status: status.update("[unavailable]Architect unavailable! Engineer continuing alone...")
obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
@@ -2329,7 +2384,14 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
pending_user_message = handover_msg
obs = "Control transferred to Architect. Handover context will be provided."
- if debug: self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
+ if status:
+ try: status.start()
+ except: pass
elif fn == "return_to_engineer":
if status: status.update("[engineer]Transferring control back to Engineer...")
@@ -2343,7 +2405,14 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
pending_user_message = handover_msg
obs = "Control returned to Engineer. Handover summary will be provided."
- if debug: self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
+ if debug:
+ if status:
+ try: status.stop()
+ except: pass
+ self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
+ if status:
+ try: status.start()
+ except: pass
elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
@@ -2353,7 +2422,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
else: obs = f"Error: {fn} unknown."
if debug and fn not in ["delegate_to_engineer", "consult_architect", "escalate_to_architect", "return_to_engineer"]:
- self._print_debug_observation(f"Observation: {fn}", obs)
+ self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -3748,6 +3817,42 @@ class node:
else:
jumphost_cmd = jumphost_cmd + " {}".format("@".join([self.jumphost["user"],self.jumphost["host"]]))
self.jumphost = f"-o ProxyCommand=\"{jumphost_cmd}\""
+ elif self.jumphost["protocol"] == "ssm":
+ ssm_target = self.jumphost["host"]
+ ssm_cmd = f"aws ssm start-session --target {ssm_target} --document-name AWS-StartSSHSession --parameters 'portNumber=22'"
+ if isinstance(self.jumphost.get("tags"), dict):
+ if "profile" in self.jumphost["tags"]:
+ ssm_cmd += f" --profile {self.jumphost['tags']['profile']}"
+ if "region" in self.jumphost["tags"]:
+ ssm_cmd += f" --region {self.jumphost['tags']['region']}"
+ if self.jumphost["options"] != '':
+ ssm_cmd += f" {self.jumphost['options']}"
+
+ bastion_user_part = f"{self.jumphost['user']}@{ssm_target}" if self.jumphost['user'] else ssm_target
+
+ ssh_opts = ""
+ if isinstance(self.jumphost.get("tags"), dict) and "ssh_options" in self.jumphost["tags"]:
+ ssh_opts = f" {self.jumphost['tags']['ssh_options']}"
+
+ inner_ssh = f"ssh{ssh_opts} -o ProxyCommand='{ssm_cmd}' -W %h:%p {bastion_user_part}"
+ self.jumphost = f"-o ProxyCommand=\"{inner_ssh}\""
+ elif self.jumphost["protocol"] in ["kubectl", "docker"]:
+ nc_cmd = "nc"
+ if isinstance(self.jumphost.get("tags"), dict) and "nc_command" in self.jumphost["tags"]:
+ nc_cmd = self.jumphost["tags"]["nc_command"]
+
+ if self.jumphost["protocol"] == "kubectl":
+ proxy_cmd = f"kubectl exec "
+ if self.jumphost["options"] != '':
+ proxy_cmd += f"{self.jumphost['options']} "
+ proxy_cmd += f"{self.jumphost['host']} -i -- {nc_cmd} %h %p"
+ else:
+ proxy_cmd = f"docker "
+ if self.jumphost["options"] != '':
+ proxy_cmd += f"{self.jumphost['options']} "
+ proxy_cmd += f"exec -i {self.jumphost['host']} {nc_cmd} %h %p"
+
+ self.jumphost = f"-o ProxyCommand=\"{proxy_cmd}\""
else:
self.jumphost = ""
@@ -4003,8 +4108,6 @@ class node:
self.mylog.write(data)
async def keepalive_task():
- if self.idletime <= 0:
- return
while True:
await asyncio.sleep(1)
if time() - self.lastinput >= self.idletime:
@@ -4015,8 +4118,6 @@ class node:
pass
async def savelog_task():
- if not hasattr(self, 'logfile') or not hasattr(self, 'mylog'):
- return
prev_size = 0
while True:
await asyncio.sleep(5)
@@ -4035,10 +4136,12 @@ class node:
# We want to exit if either happens, so return_exceptions=False, but we need to cancel the others.
tasks = [
asyncio.create_task(ingress_task()),
- asyncio.create_task(egress_task()),
- asyncio.create_task(keepalive_task()),
- asyncio.create_task(savelog_task())
+ asyncio.create_task(egress_task())
]
+ if self.idletime > 0:
+ tasks.append(asyncio.create_task(keepalive_task()))
+ if hasattr(self, 'logfile') and hasattr(self, 'mylog'):
+ tasks.append(asyncio.create_task(savelog_task()))
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for p in pending:
p.cancel()
@@ -4404,7 +4507,7 @@ class node:
"telnet": ['[u|U]sername:', 'refused', 'supported', 'invalid|unrecognized option', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"],
"kubectl": ['[u|U]sername:', '[r|R]efused', '[E|e]rror', 'DEPRECATED', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF, "expired|invalid"],
"docker": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'not a docker command', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF],
- "ssm": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'SessionManagerPlugin', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF]
+ "ssm": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'SessionManagerPlugin', '[u|U]nknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF]
}
error_indices = {
diff --git a/docs/connpy/services/import_export_service.html b/docs/connpy/services/import_export_service.html
index 9e3fa76..58e0736 100644
--- a/docs/connpy/services/import_export_service.html
+++ b/docs/connpy/services/import_export_service.html
@@ -73,13 +73,45 @@ el.replaceWith(d);
def export_to_dict(self, folders=None):
"""Export nodes/folders to a dictionary."""
if not folders:
- return self.config._getallnodesfull(extract=False)
+ return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != "@" and f not in self.config._getallfolders():
raise NodeNotFoundError(f"Folder '{f}' not found.")
- return self.config._getallnodesfull(folders, extract=False)
+
+ flat = self.config._getallnodesfull(folders, extract=False)
+ nested = {}
+ for k, v in flat.items():
+ uniques = self.config._explode_unique(k)
+ if not uniques:
+ continue
+
+ if "folder" in uniques and "subfolder" in uniques:
+ f_name = uniques["folder"]
+ s_name = uniques["subfolder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+ if s_name not in nested[f_name]:
+ nested[f_name][s_name] = {"type": "subfolder"}
+
+ nested[f_name][s_name][i_name] = v
+
+ elif "folder" in uniques:
+ f_name = uniques["folder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+
+ nested[f_name][i_name] = v
+ else:
+ i_name = uniques["id"]
+ nested[i_name] = v
+
+ return nested
def import_from_file(self, file_path):
"""Import nodes/folders from a YAML file."""
@@ -98,28 +130,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")
- # Process imports
- for k, v in data.items():
- uniques = self.config._explode_unique(k)
-
- # Ensure folders exist
- if "folder" in uniques:
- folder_name = f"@{uniques['folder']}"
- if folder_name not in self.config._getallfolders():
- folder_uniques = self.config._explode_unique(folder_name)
- self.config._folder_add(**folder_uniques)
-
- if "subfolder" in uniques:
- sub_name = f"@{uniques['subfolder']}@{uniques['folder']}"
- if sub_name not in self.config._getallfolders():
- sub_uniques = self.config._explode_unique(sub_name)
- self.config._folder_add(**sub_uniques)
-
- # Add node/connection
- v.update(uniques)
- self._validate_node_name(k)
- self.config._connections_add(**v)
-
+ def _traverse_import(node_data, current_folder='', current_subfolder=''):
+ for k, v in node_data.items():
+ if k == "type":
+ continue
+ if isinstance(v, dict):
+ node_type = v.get("type", "connection")
+ if node_type == "folder":
+ self.config._folder_add(folder=k)
+ _traverse_import(v, current_folder=k, current_subfolder='')
+ elif node_type == "subfolder":
+ self.config._folder_add(folder=current_folder, subfolder=k)
+ _traverse_import(v, current_folder=current_folder, current_subfolder=k)
+ elif node_type == "connection":
+ unique_id = k
+ if current_subfolder:
+ unique_id = f"{k}@{current_subfolder}@{current_folder}"
+ elif current_folder:
+ unique_id = f"{k}@{current_folder}"
+ self._validate_node_name(unique_id)
+
+ kwargs = deepcopy(v)
+ kwargs['id'] = k
+ kwargs['folder'] = current_folder
+ kwargs['subfolder'] = current_subfolder
+
+ self.config._connections_add(**kwargs)
+ else:
+ # Invalid format skip
+ pass
+
+ _traverse_import(data)
self.config._saveconfig(self.config.file)
Business logic for YAML/JSON inventory import and export.
@@ -146,13 +187,45 @@ el.replaceWith(d);
def export_to_dict(self, folders=None):
"""Export nodes/folders to a dictionary."""
if not folders:
- return self.config._getallnodesfull(extract=False)
+ return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != "@" and f not in self.config._getallfolders():
raise NodeNotFoundError(f"Folder '{f}' not found.")
- return self.config._getallnodesfull(folders, extract=False)
+
+ flat = self.config._getallnodesfull(folders, extract=False)
+ nested = {}
+ for k, v in flat.items():
+ uniques = self.config._explode_unique(k)
+ if not uniques:
+ continue
+
+ if "folder" in uniques and "subfolder" in uniques:
+ f_name = uniques["folder"]
+ s_name = uniques["subfolder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+ if s_name not in nested[f_name]:
+ nested[f_name][s_name] = {"type": "subfolder"}
+
+ nested[f_name][s_name][i_name] = v
+
+ elif "folder" in uniques:
+ f_name = uniques["folder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+
+ nested[f_name][i_name] = v
+ else:
+ i_name = uniques["id"]
+ nested[i_name] = v
+
+ return nested
Export nodes/folders to a dictionary.
@@ -191,28 +264,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")
- # Process imports
- for k, v in data.items():
- uniques = self.config._explode_unique(k)
-
- # Ensure folders exist
- if "folder" in uniques:
- folder_name = f"@{uniques['folder']}"
- if folder_name not in self.config._getallfolders():
- folder_uniques = self.config._explode_unique(folder_name)
- self.config._folder_add(**folder_uniques)
-
- if "subfolder" in uniques:
- sub_name = f"@{uniques['subfolder']}@{uniques['folder']}"
- if sub_name not in self.config._getallfolders():
- sub_uniques = self.config._explode_unique(sub_name)
- self.config._folder_add(**sub_uniques)
-
- # Add node/connection
- v.update(uniques)
- self._validate_node_name(k)
- self.config._connections_add(**v)
-
+ def _traverse_import(node_data, current_folder='', current_subfolder=''):
+ for k, v in node_data.items():
+ if k == "type":
+ continue
+ if isinstance(v, dict):
+ node_type = v.get("type", "connection")
+ if node_type == "folder":
+ self.config._folder_add(folder=k)
+ _traverse_import(v, current_folder=k, current_subfolder='')
+ elif node_type == "subfolder":
+ self.config._folder_add(folder=current_folder, subfolder=k)
+ _traverse_import(v, current_folder=current_folder, current_subfolder=k)
+ elif node_type == "connection":
+ unique_id = k
+ if current_subfolder:
+ unique_id = f"{k}@{current_subfolder}@{current_folder}"
+ elif current_folder:
+ unique_id = f"{k}@{current_folder}"
+ self._validate_node_name(unique_id)
+
+ kwargs = deepcopy(v)
+ kwargs['id'] = k
+ kwargs['folder'] = current_folder
+ kwargs['subfolder'] = current_subfolder
+
+ self.config._connections_add(**kwargs)
+ else:
+ # Invalid format skip
+ pass
+
+ _traverse_import(data)
self.config._saveconfig(self.config.file)
Import nodes/folders from a dictionary.
diff --git a/docs/connpy/services/index.html b/docs/connpy/services/index.html
index 0cf3b3b..e77a80a 100644
--- a/docs/connpy/services/index.html
+++ b/docs/connpy/services/index.html
@@ -972,13 +972,45 @@ el.replaceWith(d);
def export_to_dict(self, folders=None):
"""Export nodes/folders to a dictionary."""
if not folders:
- return self.config._getallnodesfull(extract=False)
+ return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != "@" and f not in self.config._getallfolders():
raise NodeNotFoundError(f"Folder '{f}' not found.")
- return self.config._getallnodesfull(folders, extract=False)
+
+ flat = self.config._getallnodesfull(folders, extract=False)
+ nested = {}
+ for k, v in flat.items():
+ uniques = self.config._explode_unique(k)
+ if not uniques:
+ continue
+
+ if "folder" in uniques and "subfolder" in uniques:
+ f_name = uniques["folder"]
+ s_name = uniques["subfolder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+ if s_name not in nested[f_name]:
+ nested[f_name][s_name] = {"type": "subfolder"}
+
+ nested[f_name][s_name][i_name] = v
+
+ elif "folder" in uniques:
+ f_name = uniques["folder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+
+ nested[f_name][i_name] = v
+ else:
+ i_name = uniques["id"]
+ nested[i_name] = v
+
+ return nested
def import_from_file(self, file_path):
"""Import nodes/folders from a YAML file."""
@@ -997,28 +1029,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")
- # Process imports
- for k, v in data.items():
- uniques = self.config._explode_unique(k)
-
- # Ensure folders exist
- if "folder" in uniques:
- folder_name = f"@{uniques['folder']}"
- if folder_name not in self.config._getallfolders():
- folder_uniques = self.config._explode_unique(folder_name)
- self.config._folder_add(**folder_uniques)
-
- if "subfolder" in uniques:
- sub_name = f"@{uniques['subfolder']}@{uniques['folder']}"
- if sub_name not in self.config._getallfolders():
- sub_uniques = self.config._explode_unique(sub_name)
- self.config._folder_add(**sub_uniques)
-
- # Add node/connection
- v.update(uniques)
- self._validate_node_name(k)
- self.config._connections_add(**v)
-
+ def _traverse_import(node_data, current_folder='', current_subfolder=''):
+ for k, v in node_data.items():
+ if k == "type":
+ continue
+ if isinstance(v, dict):
+ node_type = v.get("type", "connection")
+ if node_type == "folder":
+ self.config._folder_add(folder=k)
+ _traverse_import(v, current_folder=k, current_subfolder='')
+ elif node_type == "subfolder":
+ self.config._folder_add(folder=current_folder, subfolder=k)
+ _traverse_import(v, current_folder=current_folder, current_subfolder=k)
+ elif node_type == "connection":
+ unique_id = k
+ if current_subfolder:
+ unique_id = f"{k}@{current_subfolder}@{current_folder}"
+ elif current_folder:
+ unique_id = f"{k}@{current_folder}"
+ self._validate_node_name(unique_id)
+
+ kwargs = deepcopy(v)
+ kwargs['id'] = k
+ kwargs['folder'] = current_folder
+ kwargs['subfolder'] = current_subfolder
+
+ self.config._connections_add(**kwargs)
+ else:
+ # Invalid format skip
+ pass
+
+ _traverse_import(data)
self.config._saveconfig(self.config.file)
Business logic for YAML/JSON inventory import and export.
@@ -1045,13 +1086,45 @@ el.replaceWith(d);
def export_to_dict(self, folders=None):
"""Export nodes/folders to a dictionary."""
if not folders:
- return self.config._getallnodesfull(extract=False)
+ return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != "@" and f not in self.config._getallfolders():
raise NodeNotFoundError(f"Folder '{f}' not found.")
- return self.config._getallnodesfull(folders, extract=False)
+
+ flat = self.config._getallnodesfull(folders, extract=False)
+ nested = {}
+ for k, v in flat.items():
+ uniques = self.config._explode_unique(k)
+ if not uniques:
+ continue
+
+ if "folder" in uniques and "subfolder" in uniques:
+ f_name = uniques["folder"]
+ s_name = uniques["subfolder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+ if s_name not in nested[f_name]:
+ nested[f_name][s_name] = {"type": "subfolder"}
+
+ nested[f_name][s_name][i_name] = v
+
+ elif "folder" in uniques:
+ f_name = uniques["folder"]
+ i_name = uniques["id"]
+
+ if f_name not in nested:
+ nested[f_name] = {"type": "folder"}
+
+ nested[f_name][i_name] = v
+ else:
+ i_name = uniques["id"]
+ nested[i_name] = v
+
+ return nested
Export nodes/folders to a dictionary.
@@ -1090,28 +1163,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")
- # Process imports
- for k, v in data.items():
- uniques = self.config._explode_unique(k)
-
- # Ensure folders exist
- if "folder" in uniques:
- folder_name = f"@{uniques['folder']}"
- if folder_name not in self.config._getallfolders():
- folder_uniques = self.config._explode_unique(folder_name)
- self.config._folder_add(**folder_uniques)
-
- if "subfolder" in uniques:
- sub_name = f"@{uniques['subfolder']}@{uniques['folder']}"
- if sub_name not in self.config._getallfolders():
- sub_uniques = self.config._explode_unique(sub_name)
- self.config._folder_add(**sub_uniques)
-
- # Add node/connection
- v.update(uniques)
- self._validate_node_name(k)
- self.config._connections_add(**v)
-
+ def _traverse_import(node_data, current_folder='', current_subfolder=''):
+ for k, v in node_data.items():
+ if k == "type":
+ continue
+ if isinstance(v, dict):
+ node_type = v.get("type", "connection")
+ if node_type == "folder":
+ self.config._folder_add(folder=k)
+ _traverse_import(v, current_folder=k, current_subfolder='')
+ elif node_type == "subfolder":
+ self.config._folder_add(folder=current_folder, subfolder=k)
+ _traverse_import(v, current_folder=current_folder, current_subfolder=k)
+ elif node_type == "connection":
+ unique_id = k
+ if current_subfolder:
+ unique_id = f"{k}@{current_subfolder}@{current_folder}"
+ elif current_folder:
+ unique_id = f"{k}@{current_folder}"
+ self._validate_node_name(unique_id)
+
+ kwargs = deepcopy(v)
+ kwargs['id'] = k
+ kwargs['folder'] = current_folder
+ kwargs['subfolder'] = current_subfolder
+
+ self.config._connections_add(**kwargs)
+ else:
+ # Invalid format skip
+ pass
+
+ _traverse_import(data)
self.config._saveconfig(self.config.file)
Import nodes/folders from a dictionary.
@@ -1282,8 +1364,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get("case", False)
if filter_str:
- flags = re.IGNORECASE if not case_sensitive else 0
- folders = [f for f in folders if re.search(filter_str, f, flags)]
+ if filter_str.startswith("@"):
+ if not case_sensitive:
+ folders = [f for f in folders if f.lower() == filter_str.lower()]
+ else:
+ folders = [f for f in folders if f == filter_str]
+ else:
+ flags = re.IGNORECASE if not case_sensitive else 0
+ folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
def get_node_details(self, unique_id):
@@ -1304,13 +1392,20 @@ el.replaceWith(d);
"""Generate and update the internal nodes cache."""
self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)
- def validate_parent_folder(self, unique_id):
+ def validate_parent_folder(self, unique_id, is_folder=False):
"""Check if parent folder exists for a given node unique ID."""
- node_folder = unique_id.partition("@")[2]
- if node_folder:
- parent_folder = f"@{node_folder}"
- if parent_folder not in self.config._getallfolders():
- raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ if is_folder:
+ uniques = self.config._explode_unique(unique_id)
+ if uniques and "subfolder" in uniques and "folder" in uniques:
+ parent_folder = f"@{uniques['folder']}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ else:
+ node_folder = unique_id.partition("@")[2]
+ if node_folder:
+ parent_folder = f"@{node_folder}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
def add_node(self, unique_id, data, is_folder=False):
@@ -1330,7 +1425,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if "subfolder" in uniques:
- self.validate_parent_folder(unique_id)
+ self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -1512,7 +1607,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if "subfolder" in uniques:
- self.validate_parent_folder(unique_id)
+ self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -1728,8 +1823,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get("case", False)
if filter_str:
- flags = re.IGNORECASE if not case_sensitive else 0
- folders = [f for f in folders if re.search(filter_str, f, flags)]
+ if filter_str.startswith("@"):
+ if not case_sensitive:
+ folders = [f for f in folders if f.lower() == filter_str.lower()]
+ else:
+ folders = [f for f in folders if f == filter_str]
+ else:
+ flags = re.IGNORECASE if not case_sensitive else 0
+ folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
Return all unique folders, optionally filtered by regex.
@@ -1858,20 +1959,27 @@ el.replaceWith(d);
Explicitly update an existing node.
-def validate_parent_folder(self, unique_id)
+def validate_parent_folder(self, unique_id, is_folder=False)
Expand source code
-def validate_parent_folder(self, unique_id):
+def validate_parent_folder(self, unique_id, is_folder=False):
"""Check if parent folder exists for a given node unique ID."""
- node_folder = unique_id.partition("@")[2]
- if node_folder:
- parent_folder = f"@{node_folder}"
- if parent_folder not in self.config._getallfolders():
- raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ if is_folder:
+ uniques = self.config._explode_unique(unique_id)
+ if uniques and "subfolder" in uniques and "folder" in uniques:
+ parent_folder = f"@{uniques['folder']}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ else:
+ node_folder = unique_id.partition("@")[2]
+ if node_folder:
+ parent_folder = f"@{node_folder}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
Check if parent folder exists for a given node unique ID.
diff --git a/docs/connpy/services/node_service.html b/docs/connpy/services/node_service.html
index 52e23b1..0a66d09 100644
--- a/docs/connpy/services/node_service.html
+++ b/docs/connpy/services/node_service.html
@@ -117,8 +117,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get("case", False)
if filter_str:
- flags = re.IGNORECASE if not case_sensitive else 0
- folders = [f for f in folders if re.search(filter_str, f, flags)]
+ if filter_str.startswith("@"):
+ if not case_sensitive:
+ folders = [f for f in folders if f.lower() == filter_str.lower()]
+ else:
+ folders = [f for f in folders if f == filter_str]
+ else:
+ flags = re.IGNORECASE if not case_sensitive else 0
+ folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
def get_node_details(self, unique_id):
@@ -139,13 +145,20 @@ el.replaceWith(d);
"""Generate and update the internal nodes cache."""
self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)
- def validate_parent_folder(self, unique_id):
+ def validate_parent_folder(self, unique_id, is_folder=False):
"""Check if parent folder exists for a given node unique ID."""
- node_folder = unique_id.partition("@")[2]
- if node_folder:
- parent_folder = f"@{node_folder}"
- if parent_folder not in self.config._getallfolders():
- raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ if is_folder:
+ uniques = self.config._explode_unique(unique_id)
+ if uniques and "subfolder" in uniques and "folder" in uniques:
+ parent_folder = f"@{uniques['folder']}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ else:
+ node_folder = unique_id.partition("@")[2]
+ if node_folder:
+ parent_folder = f"@{node_folder}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
def add_node(self, unique_id, data, is_folder=False):
@@ -165,7 +178,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if "subfolder" in uniques:
- self.validate_parent_folder(unique_id)
+ self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -347,7 +360,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if "subfolder" in uniques:
- self.validate_parent_folder(unique_id)
+ self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -563,8 +576,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get("case", False)
if filter_str:
- flags = re.IGNORECASE if not case_sensitive else 0
- folders = [f for f in folders if re.search(filter_str, f, flags)]
+ if filter_str.startswith("@"):
+ if not case_sensitive:
+ folders = [f for f in folders if f.lower() == filter_str.lower()]
+ else:
+ folders = [f for f in folders if f == filter_str]
+ else:
+ flags = re.IGNORECASE if not case_sensitive else 0
+ folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
Return all unique folders, optionally filtered by regex.
@@ -693,20 +712,27 @@ el.replaceWith(d);
Explicitly update an existing node.
-def validate_parent_folder(self, unique_id)
+def validate_parent_folder(self, unique_id, is_folder=False)
Expand source code
-def validate_parent_folder(self, unique_id):
+def validate_parent_folder(self, unique_id, is_folder=False):
"""Check if parent folder exists for a given node unique ID."""
- node_folder = unique_id.partition("@")[2]
- if node_folder:
- parent_folder = f"@{node_folder}"
- if parent_folder not in self.config._getallfolders():
- raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ if is_folder:
+ uniques = self.config._explode_unique(unique_id)
+ if uniques and "subfolder" in uniques and "folder" in uniques:
+ parent_folder = f"@{uniques['folder']}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
+ else:
+ node_folder = unique_id.partition("@")[2]
+ if node_folder:
+ parent_folder = f"@{node_folder}"
+ if parent_folder not in self.config._getallfolders():
+ raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
Check if parent folder exists for a given node unique ID.