connpy v6.0.0b4: AI Stability, Remote Sync & UI Polish (Clean Commit)

This commit is contained in:
2026-05-01 18:55:25 -03:00
parent c81f6e049f
commit a192bd1912
18 changed files with 717 additions and 292 deletions
+1
View File
@@ -160,3 +160,4 @@ ssm_implemmetaiton_plan.md
async_interact_plan.md
repo_consolidado_limpio.md
connpy_roadmap.md
MULTI_USER_PLAN.md
+1 -1
View File
@@ -1 +1 @@
__version__ = "6.0.0b3"
__version__ = "6.0.0b4"
+53 -11
View File
@@ -273,9 +273,12 @@ class ai:
if not debug and not chunk_callback:
if not is_streaming_text:
# Stop spinner before starting live display
# Stop spinner definitively
if status:
status.stop()
try:
status.stop()
except Exception:
pass
live_display = Live(
Panel(Markdown(full_content), title=title, border_style=border, expand=False),
console=self.console,
@@ -463,7 +466,7 @@ class ai:
tail_limit = int(final_limit * 0.4)
return (text[:head_limit] + f"\n\n[... OUTPUT TRUNCATED ...]\n\n" + text[-tail_limit:])
def _print_debug_observation(self, fn, obs):
def _print_debug_observation(self, fn, obs, status=None):
"""Prints a tool observation in a readable way during debug mode."""
# Try to parse as JSON if it's a string
if isinstance(obs, str):
@@ -487,6 +490,7 @@ class ai:
content = Text("Empty data set")
else:
# Add a small spacer instead of a Rule for cleaner look
from rich.console import Group
content = Group(*elements)
elif isinstance(obs_data, list):
content = Text("\n".join(f"{item}" for item in obs_data))
@@ -494,7 +498,18 @@ class ai:
content = Text(str(obs_data))
title = f"[bold]{fn}[/bold]"
# Stop status before printing panel to avoid ghosting
if status:
try: status.stop()
except: pass
self.console.print(Panel(content, title=title, border_style="ai_status"))
# Resume status
if status:
try: status.start()
except: pass
def manage_memory_tool(self, content, action="append"):
"""Save or update long-term memory. Only use when user explicitly requests it."""
@@ -695,7 +710,7 @@ class ai:
elif fn in self.tool_status_formatters: status.update(self.tool_status_formatters[fn](args))
if debug:
self._print_debug_observation(f"Decision: {fn}", args)
self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
@@ -704,7 +719,7 @@ class ai:
else: obs = f"Error: Unknown tool '{fn}'."
if debug:
self._print_debug_observation(f"Observation: {fn}", obs)
self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -974,7 +989,7 @@ class ai:
streamed_response = False
try:
safe_messages = self._sanitize_messages(messages)
if stream and (not debug or chunk_callback):
if stream and chunk_callback:
response, streamed_response = self._stream_completion(
model=model, messages=safe_messages, tools=tools, api_key=key,
status=status, label=label, debug=debug, num_retries=3,
@@ -1017,7 +1032,13 @@ class ai:
# In CLI debug mode, only print intermediate reasoning if there are tool calls.
# If there are no tool calls, this content is the final answer and will be printed by the caller.
if resp_msg.tool_calls:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(resp_msg.content), title=f"[{current_brain}][bold]{label} Reasoning[/bold][/{current_brain}]", border_style="architect" if current_brain == "architect" else "engineer"))
if status:
try: status.start()
except: pass
if not resp_msg.tool_calls: break
@@ -1038,7 +1059,7 @@ class ai:
elif fn == "manage_memory_tool": status.update(f"[architect]Architect: [UPDATING MEMORY]")
if debug:
self._print_debug_observation(f"Decision: {fn}", args)
self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "delegate_to_engineer":
obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
@@ -1057,7 +1078,14 @@ class ai:
num_retries=3
)
obs = claude_resp.choices[0].message.content
if debug: self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
if status:
try: status.start()
except: pass
except Exception as e:
if status: status.update("[unavailable]Architect unavailable! Engineer continuing alone...")
obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
@@ -1074,7 +1102,14 @@ class ai:
handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
pending_user_message = handover_msg
obs = "Control transferred to Architect. Handover context will be provided."
if debug: self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
if status:
try: status.start()
except: pass
elif fn == "return_to_engineer":
if status: status.update("[engineer]Transferring control back to Engineer...")
@@ -1088,7 +1123,14 @@ class ai:
handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
pending_user_message = handover_msg
obs = "Control returned to Engineer. Handover summary will be provided."
if debug: self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
if status:
try: status.start()
except: pass
elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
@@ -1098,7 +1140,7 @@ class ai:
else: obs = f"Error: {fn} unknown."
if debug and fn not in ["delegate_to_engineer", "consult_architect", "escalate_to_architect", "return_to_engineer"]:
self._print_debug_observation(f"Observation: {fn}", obs)
self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
-2
View File
@@ -88,7 +88,6 @@ class AIHandler:
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
console.print()
def interactive_chat(self, args, session_id):
history = None
@@ -132,7 +131,6 @@ class AIHandler:
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
console.print()
except (KeyboardInterrupt, EOFError):
console.print("\n[dim]Session closed.[/dim]")
break
+2 -2
View File
@@ -234,8 +234,8 @@ def _build_tree(nodes, folders, profiles, plugins, configdir):
"-a": None, "-r": None, "-s": None, "-e": None, "-h": None,
},
"plugin": {
"--add": lambda w: get_cwd(w, "--add"),
"--update": lambda w: get_cwd(w, "--update"),
"--add": {"*": lambda w: get_cwd(w, "--add")},
"--update": {"*": lambda w: get_cwd(w, "--update")},
"--del": lambda w: _get_plugins("--del", configdir),
"--enable": lambda w: _get_plugins("--enable", configdir),
"--disable": lambda w: _get_plugins("--disable", configdir),
+6 -12
View File
@@ -78,7 +78,9 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
unique_id = first_req.id
sftp = first_req.sftp
debug = first_req.debug
printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]")
if debug:
printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]")
if first_req.connection_params_json:
import json
@@ -177,7 +179,8 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
while True:
data = response_queue.get()
if data is None:
printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]")
if debug:
printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]")
break
yield connpy_pb2.InteractResponse(stdout_data=data)
@handle_errors
@@ -388,12 +391,7 @@ class ExecutionServicer(connpy_pb2_grpc.ExecutionServiceServicer):
def _worker():
try:
# Set task name in thread state for printer if available
if request.name:
printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]")
self.service.run_commands(
nodes_filter=nodes_filter,
self.service.run_commands( nodes_filter=nodes_filter,
commands=list(request.commands),
folder=request.folder if request.folder else None,
prompt=request.prompt if request.prompt else None,
@@ -439,10 +437,6 @@ class ExecutionServicer(connpy_pb2_grpc.ExecutionServiceServicer):
def _worker():
try:
# Set task name in thread state for printer if available
if request.name:
printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]")
self.service.test_commands(
nodes_filter=nodes_filter,
commands=list(request.commands),
+29 -20
View File
@@ -586,7 +586,6 @@ class AIStub:
if response.status_update:
if response.requires_confirmation:
if status: status.stop()
if live_display: live_display.stop()
# Show prompt and wait for answer
prompt_text = Text.from_ansi(response.status_update)
@@ -595,7 +594,6 @@ class AIStub:
if status:
status.update("[ai_status]Agent: Resuming...")
status.start()
if live_display: live_display.start()
req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
continue
@@ -606,41 +604,52 @@ class AIStub:
if response.debug_message:
if debug:
if status:
try: status.stop()
except: pass
printer.console.print(Text.from_ansi(response.debug_message))
if status:
try: status.start()
except: pass
continue
if response.important_message:
if status:
try: status.stop()
except: pass
printer.console.print(Text.from_ansi(response.important_message))
if status:
try: status.start()
except: pass
continue
if not response.is_final:
full_content += response.text_chunk
if not live_display and not debug:
if status: status.stop()
live_display = Live(
Panel(Markdown(full_content), title="AI Assistant", expand=False),
console=printer.console,
refresh_per_second=8,
transient=False
)
live_display.start()
elif live_display:
live_display.update(Panel(Markdown(full_content), title="AI Assistant", expand=False))
if response.text_chunk:
full_content += response.text_chunk
if status and not debug:
# Update the spinner line with a preview of the response
preview = full_content.replace("\n", " ").strip()
if len(preview) > 60: preview = preview[:57] + "..."
status.update(f"[ai_status]{preview}")
continue
if response.is_final:
# Final stop for status to ensure it disappears before the panel
if status:
try: status.stop()
except: pass
final_result = from_struct(response.full_result)
responder = final_result.get("responder", "engineer")
alias = "architect" if responder == "architect" else "engineer"
role_label = "Network Architect" if responder == "architect" else "Network Engineer"
title = f"[bold {alias}]{role_label}[/bold {alias}]"
if live_display:
live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
live_display.stop()
elif full_content:
printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
# Always print the final Panel
content_to_print = full_content or final_result.get("response", "")
if content_to_print:
printer.console.print(Panel(Markdown(content_to_print), title=title, border_style=alias, expand=False))
break
except Exception as e:
# Check if it was a gRPC error that we should let handle_errors catch
+4 -2
View File
@@ -20,7 +20,8 @@ class MethodHook:
try:
args, kwargs = hook(*args, **kwargs)
except Exception as e:
printer.error(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
hook_name = getattr(hook, "__name__", str(hook))
printer.error(f"{self.func.__name__} Pre-hook {hook_name} raised an exception: {e}")
result = self.func(*args, **kwargs)
@@ -32,7 +33,8 @@ class MethodHook:
try:
result = hook(*args, **kwargs, result=result) # Pass result to hooks
except Exception as e:
printer.error(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
hook_name = getattr(hook, "__name__", str(hook))
printer.error(f"{self.func.__name__} Post-hook {hook_name} raised an exception: {e}")
return result
+66 -24
View File
@@ -1,6 +1,7 @@
from .base import BaseService
import yaml
import os
from copy import deepcopy
from .exceptions import InvalidConfigurationError, NodeNotFoundError, ReservedNameError
from ..configfile import NoAliasDumper
@@ -23,13 +24,45 @@ class ImportExportService(BaseService):
def export_to_dict(self, folders=None):
"""Export nodes/folders to a dictionary."""
if not folders:
return self.config._getallnodesfull(extract=False)
return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != "@" and f not in self.config._getallfolders():
raise NodeNotFoundError(f"Folder '{f}' not found.")
return self.config._getallnodesfull(folders, extract=False)
flat = self.config._getallnodesfull(folders, extract=False)
nested = {}
for k, v in flat.items():
uniques = self.config._explode_unique(k)
if not uniques:
continue
if "folder" in uniques and "subfolder" in uniques:
f_name = uniques["folder"]
s_name = uniques["subfolder"]
i_name = uniques["id"]
if f_name not in nested:
nested[f_name] = {"type": "folder"}
if s_name not in nested[f_name]:
nested[f_name][s_name] = {"type": "subfolder"}
nested[f_name][s_name][i_name] = v
elif "folder" in uniques:
f_name = uniques["folder"]
i_name = uniques["id"]
if f_name not in nested:
nested[f_name] = {"type": "folder"}
nested[f_name][i_name] = v
else:
i_name = uniques["id"]
nested[i_name] = v
return nested
def import_from_file(self, file_path):
"""Import nodes/folders from a YAML file."""
@@ -48,26 +81,35 @@ class ImportExportService(BaseService):
if not isinstance(data, dict):
raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")
# Process imports
for k, v in data.items():
uniques = self.config._explode_unique(k)
# Ensure folders exist
if "folder" in uniques:
folder_name = f"@{uniques['folder']}"
if folder_name not in self.config._getallfolders():
folder_uniques = self.config._explode_unique(folder_name)
self.config._folder_add(**folder_uniques)
if "subfolder" in uniques:
sub_name = f"@{uniques['subfolder']}@{uniques['folder']}"
if sub_name not in self.config._getallfolders():
sub_uniques = self.config._explode_unique(sub_name)
self.config._folder_add(**sub_uniques)
# Add node/connection
v.update(uniques)
self._validate_node_name(k)
self.config._connections_add(**v)
def _traverse_import(node_data, current_folder='', current_subfolder=''):
for k, v in node_data.items():
if k == "type":
continue
if isinstance(v, dict):
node_type = v.get("type", "connection")
if node_type == "folder":
self.config._folder_add(folder=k)
_traverse_import(v, current_folder=k, current_subfolder='')
elif node_type == "subfolder":
self.config._folder_add(folder=current_folder, subfolder=k)
_traverse_import(v, current_folder=current_folder, current_subfolder=k)
elif node_type == "connection":
unique_id = k
if current_subfolder:
unique_id = f"{k}@{current_subfolder}@{current_folder}"
elif current_folder:
unique_id = f"{k}@{current_folder}"
self._validate_node_name(unique_id)
kwargs = deepcopy(v)
kwargs['id'] = k
kwargs['folder'] = current_folder
kwargs['subfolder'] = current_subfolder
self.config._connections_add(**kwargs)
else:
# Invalid format skip
pass
_traverse_import(data)
self.config._saveconfig(self.config.file)
+8 -2
View File
@@ -67,8 +67,14 @@ class NodeService(BaseService):
case_sensitive = self.config.config.get("case", False)
if filter_str:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
if filter_str.startswith("@"):
if not case_sensitive:
folders = [f for f in folders if f.lower() == filter_str.lower()]
else:
folders = [f for f in folders if f == filter_str]
else:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
def get_node_details(self, unique_id):
+1 -5
View File
@@ -134,7 +134,6 @@ el.replaceWith(d);
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
console.print()
def interactive_chat(self, args, session_id):
history = None
@@ -178,7 +177,6 @@ el.replaceWith(d);
if "usage" in result:
u = result["usage"]
console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")
console.print()
except (KeyboardInterrupt, EOFError):
console.print("\n[dim]Session closed.[/dim]")
break</code></pre>
@@ -306,7 +304,6 @@ el.replaceWith(d);
if &#34;usage&#34; in result:
u = result[&#34;usage&#34;]
console.print(f&#34;[debug]Tokens: {u[&#39;total&#39;]} (Input: {u[&#39;input&#39;]}, Output: {u[&#39;output&#39;]})[/debug]&#34;)
console.print()
except (KeyboardInterrupt, EOFError):
console.print(&#34;\n[dim]Session closed.[/dim]&#34;)
break</code></pre>
@@ -335,8 +332,7 @@ el.replaceWith(d);
if &#34;usage&#34; in result:
u = result[&#34;usage&#34;]
console.print(f&#34;[debug]Tokens: {u[&#39;total&#39;]} (Input: {u[&#39;input&#39;]}, Output: {u[&#39;output&#39;]})[/debug]&#34;)
console.print()</code></pre>
console.print(f&#34;[debug]Tokens: {u[&#39;total&#39;]} (Input: {u[&#39;input&#39;]}, Output: {u[&#39;output&#39;]})[/debug]&#34;)</code></pre>
</details>
<div class="desc"></div>
</dd>
+4
View File
@@ -117,6 +117,10 @@ Here are some important instructions and tips for configuring your new node:
- `prompt`: Replaces default app prompt to identify the end of output or where the user can start inputting commands.
- `kube_command`: Replaces the default command (`/bin/bash`) for `kubectl exec`.
- `docker_command`: Replaces the default command for `docker exec`.
- `region`: AWS Region used for `aws ssm start-session`.
- `profile`: AWS Profile used for `aws ssm start-session`.
- `ssh_options`: Additional SSH options injected when an SSM node is used as a jumphost (e.g., `-i ~/.ssh/key.pem`).
- `nc_command`: Replaces the default `nc` command used when bridging connections through Docker or Kubernetes (e.g., `ip netns exec global-vrf nc`).
&#34;&#34;&#34;
if type == &#34;bashcompletion&#34;:
return &#39;&#39;&#39;
+6 -12
View File
@@ -367,12 +367,7 @@ el.replaceWith(d);
def _worker():
try:
# Set task name in thread state for printer if available
if request.name:
printer.console.print(f&#34;[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]&#34;)
self.service.run_commands(
nodes_filter=nodes_filter,
self.service.run_commands( nodes_filter=nodes_filter,
commands=list(request.commands),
folder=request.folder if request.folder else None,
prompt=request.prompt if request.prompt else None,
@@ -418,10 +413,6 @@ el.replaceWith(d);
def _worker():
try:
# Set task name in thread state for printer if available
if request.name:
printer.console.print(f&#34;[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]&#34;)
self.service.test_commands(
nodes_filter=nodes_filter,
commands=list(request.commands),
@@ -658,7 +649,9 @@ interceptor chooses to service this RPC, or None otherwise.</p></div>
unique_id = first_req.id
sftp = first_req.sftp
debug = first_req.debug
printer.console.print(f&#34;[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]&#34;)
if debug:
printer.console.print(f&#34;[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]&#34;)
if first_req.connection_params_json:
import json
@@ -757,7 +750,8 @@ interceptor chooses to service this RPC, or None otherwise.</p></div>
while True:
data = response_queue.get()
if data is None:
printer.console.print(f&#34;[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]&#34;)
if debug:
printer.console.print(f&#34;[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]&#34;)
break
yield connpy_pb2.InteractResponse(stdout_data=data)
@handle_errors
+58 -40
View File
@@ -182,7 +182,6 @@ el.replaceWith(d);
if response.status_update:
if response.requires_confirmation:
if status: status.stop()
if live_display: live_display.stop()
# Show prompt and wait for answer
prompt_text = Text.from_ansi(response.status_update)
@@ -191,7 +190,6 @@ el.replaceWith(d);
if status:
status.update(&#34;[ai_status]Agent: Resuming...&#34;)
status.start()
if live_display: live_display.start()
req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
continue
@@ -202,41 +200,52 @@ el.replaceWith(d);
if response.debug_message:
if debug:
if status:
try: status.stop()
except: pass
printer.console.print(Text.from_ansi(response.debug_message))
if status:
try: status.start()
except: pass
continue
if response.important_message:
if status:
try: status.stop()
except: pass
printer.console.print(Text.from_ansi(response.important_message))
if status:
try: status.start()
except: pass
continue
if not response.is_final:
full_content += response.text_chunk
if not live_display and not debug:
if status: status.stop()
live_display = Live(
Panel(Markdown(full_content), title=&#34;AI Assistant&#34;, expand=False),
console=printer.console,
refresh_per_second=8,
transient=False
)
live_display.start()
elif live_display:
live_display.update(Panel(Markdown(full_content), title=&#34;AI Assistant&#34;, expand=False))
if response.text_chunk:
full_content += response.text_chunk
if status and not debug:
# Update the spinner line with a preview of the response
preview = full_content.replace(&#34;\n&#34;, &#34; &#34;).strip()
if len(preview) &gt; 60: preview = preview[:57] + &#34;...&#34;
status.update(f&#34;[ai_status]{preview}&#34;)
continue
if response.is_final:
# Final stop for status to ensure it disappears before the panel
if status:
try: status.stop()
except: pass
final_result = from_struct(response.full_result)
responder = final_result.get(&#34;responder&#34;, &#34;engineer&#34;)
alias = &#34;architect&#34; if responder == &#34;architect&#34; else &#34;engineer&#34;
role_label = &#34;Network Architect&#34; if responder == &#34;architect&#34; else &#34;Network Engineer&#34;
title = f&#34;[bold {alias}]{role_label}[/bold {alias}]&#34;
if live_display:
live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
live_display.stop()
elif full_content:
printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
# Always print the final Panel
content_to_print = full_content or final_result.get(&#34;response&#34;, &#34;&#34;)
if content_to_print:
printer.console.print(Panel(Markdown(content_to_print), title=title, border_style=alias, expand=False))
break
except Exception as e:
# Check if it was a gRPC error that we should let handle_errors catch
@@ -366,7 +375,6 @@ def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debu
if response.status_update:
if response.requires_confirmation:
if status: status.stop()
if live_display: live_display.stop()
# Show prompt and wait for answer
prompt_text = Text.from_ansi(response.status_update)
@@ -375,7 +383,6 @@ def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debu
if status:
status.update(&#34;[ai_status]Agent: Resuming...&#34;)
status.start()
if live_display: live_display.start()
req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
continue
@@ -386,41 +393,52 @@ def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debu
if response.debug_message:
if debug:
if status:
try: status.stop()
except: pass
printer.console.print(Text.from_ansi(response.debug_message))
if status:
try: status.start()
except: pass
continue
if response.important_message:
if status:
try: status.stop()
except: pass
printer.console.print(Text.from_ansi(response.important_message))
if status:
try: status.start()
except: pass
continue
if not response.is_final:
full_content += response.text_chunk
if not live_display and not debug:
if status: status.stop()
live_display = Live(
Panel(Markdown(full_content), title=&#34;AI Assistant&#34;, expand=False),
console=printer.console,
refresh_per_second=8,
transient=False
)
live_display.start()
elif live_display:
live_display.update(Panel(Markdown(full_content), title=&#34;AI Assistant&#34;, expand=False))
if response.text_chunk:
full_content += response.text_chunk
if status and not debug:
# Update the spinner line with a preview of the response
preview = full_content.replace(&#34;\n&#34;, &#34; &#34;).strip()
if len(preview) &gt; 60: preview = preview[:57] + &#34;...&#34;
status.update(f&#34;[ai_status]{preview}&#34;)
continue
if response.is_final:
# Final stop for status to ensure it disappears before the panel
if status:
try: status.stop()
except: pass
final_result = from_struct(response.full_result)
responder = final_result.get(&#34;responder&#34;, &#34;engineer&#34;)
alias = &#34;architect&#34; if responder == &#34;architect&#34; else &#34;engineer&#34;
role_label = &#34;Network Architect&#34; if responder == &#34;architect&#34; else &#34;Network Engineer&#34;
title = f&#34;[bold {alias}]{role_label}[/bold {alias}]&#34;
if live_display:
live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
live_display.stop()
elif full_content:
printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
# Always print the final Panel
content_to_print = full_content or final_result.get(&#34;response&#34;, &#34;&#34;)
if content_to_print:
printer.console.print(Panel(Markdown(content_to_print), title=title, border_style=alias, expand=False))
break
except Exception as e:
# Check if it was a gRPC error that we should let handle_errors catch
+128 -25
View File
@@ -1182,9 +1182,12 @@ class ai:
if not debug and not chunk_callback:
if not is_streaming_text:
# Stop spinner before starting live display
# Stop spinner definitively
if status:
status.stop()
try:
status.stop()
except Exception:
pass
live_display = Live(
Panel(Markdown(full_content), title=title, border_style=border, expand=False),
console=self.console,
@@ -1372,7 +1375,7 @@ class ai:
tail_limit = int(final_limit * 0.4)
return (text[:head_limit] + f&#34;\n\n[... OUTPUT TRUNCATED ...]\n\n&#34; + text[-tail_limit:])
def _print_debug_observation(self, fn, obs):
def _print_debug_observation(self, fn, obs, status=None):
&#34;&#34;&#34;Prints a tool observation in a readable way during debug mode.&#34;&#34;&#34;
# Try to parse as JSON if it&#39;s a string
if isinstance(obs, str):
@@ -1396,6 +1399,7 @@ class ai:
content = Text(&#34;Empty data set&#34;)
else:
# Add a small spacer instead of a Rule for cleaner look
from rich.console import Group
content = Group(*elements)
elif isinstance(obs_data, list):
content = Text(&#34;\n&#34;.join(f&#34;• {item}&#34; for item in obs_data))
@@ -1403,7 +1407,18 @@ class ai:
content = Text(str(obs_data))
title = f&#34;[bold]{fn}[/bold]&#34;
# Stop status before printing panel to avoid ghosting
if status:
try: status.stop()
except: pass
self.console.print(Panel(content, title=title, border_style=&#34;ai_status&#34;))
# Resume status
if status:
try: status.start()
except: pass
def manage_memory_tool(self, content, action=&#34;append&#34;):
&#34;&#34;&#34;Save or update long-term memory. Only use when user explicitly requests it.&#34;&#34;&#34;
@@ -1604,7 +1619,7 @@ class ai:
elif fn in self.tool_status_formatters: status.update(self.tool_status_formatters[fn](args))
if debug:
self._print_debug_observation(f&#34;Decision: {fn}&#34;, args)
self._print_debug_observation(f&#34;Decision: {fn}&#34;, args, status=status)
if fn == &#34;list_nodes&#34;: obs = self.list_nodes_tool(**args)
elif fn == &#34;run_commands&#34;: obs = self.run_commands_tool(**args, status=status)
@@ -1613,7 +1628,7 @@ class ai:
else: obs = f&#34;Error: Unknown tool &#39;{fn}&#39;.&#34;
if debug:
self._print_debug_observation(f&#34;Observation: {fn}&#34;, obs)
self._print_debug_observation(f&#34;Observation: {fn}&#34;, obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -1883,7 +1898,7 @@ class ai:
streamed_response = False
try:
safe_messages = self._sanitize_messages(messages)
if stream and (not debug or chunk_callback):
if stream and chunk_callback:
response, streamed_response = self._stream_completion(
model=model, messages=safe_messages, tools=tools, api_key=key,
status=status, label=label, debug=debug, num_retries=3,
@@ -1926,7 +1941,13 @@ class ai:
# In CLI debug mode, only print intermediate reasoning if there are tool calls.
# If there are no tool calls, this content is the final answer and will be printed by the caller.
if resp_msg.tool_calls:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(resp_msg.content), title=f&#34;[{current_brain}][bold]{label} Reasoning[/bold][/{current_brain}]&#34;, border_style=&#34;architect&#34; if current_brain == &#34;architect&#34; else &#34;engineer&#34;))
if status:
try: status.start()
except: pass
if not resp_msg.tool_calls: break
@@ -1947,7 +1968,7 @@ class ai:
elif fn == &#34;manage_memory_tool&#34;: status.update(f&#34;[architect]Architect: [UPDATING MEMORY]&#34;)
if debug:
self._print_debug_observation(f&#34;Decision: {fn}&#34;, args)
self._print_debug_observation(f&#34;Decision: {fn}&#34;, args, status=status)
if fn == &#34;delegate_to_engineer&#34;:
obs, eng_usage = self._engineer_loop(args[&#34;task&#34;], status=status, debug=debug, chat_history=messages[:-1])
@@ -1966,7 +1987,14 @@ class ai:
num_retries=3
)
obs = claude_resp.choices[0].message.content
if debug: self.console.print(Panel(Markdown(obs), title=&#34;[architect]Architect Consultation[/architect]&#34;, border_style=&#34;architect&#34;))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(obs), title=&#34;[architect]Architect Consultation[/architect]&#34;, border_style=&#34;architect&#34;))
if status:
try: status.start()
except: pass
except Exception as e:
if status: status.update(&#34;[unavailable]Architect unavailable! Engineer continuing alone...&#34;)
obs = f&#34;Architect unavailable ({str(e)}). Proceeding with your best technical judgment.&#34;
@@ -1983,7 +2011,14 @@ class ai:
handover_msg = f&#34;HANDOVER FROM EXECUTION ENGINE\n\nReason: {args[&#39;reason&#39;]}\n\nContext: {args[&#39;context&#39;]}\n\nYou are now in control of this conversation.&#34;
pending_user_message = handover_msg
obs = &#34;Control transferred to Architect. Handover context will be provided.&#34;
if debug: self.console.print(Panel(Text(handover_msg), title=&#34;[architect]Escalation to Architect[/architect]&#34;, border_style=&#34;architect&#34;))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title=&#34;[architect]Escalation to Architect[/architect]&#34;, border_style=&#34;architect&#34;))
if status:
try: status.start()
except: pass
elif fn == &#34;return_to_engineer&#34;:
if status: status.update(&#34;[engineer]Transferring control back to Engineer...&#34;)
@@ -1997,7 +2032,14 @@ class ai:
handover_msg = f&#34;HANDOVER FROM ARCHITECT\n\nSummary: {args[&#39;summary&#39;]}\n\nYou are now back in control. Continue handling the user&#39;s requests.&#34;
pending_user_message = handover_msg
obs = &#34;Control returned to Engineer. Handover summary will be provided.&#34;
if debug: self.console.print(Panel(Text(handover_msg), title=&#34;[engineer]Return to Engineer[/engineer]&#34;, border_style=&#34;engineer&#34;))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title=&#34;[engineer]Return to Engineer[/engineer]&#34;, border_style=&#34;engineer&#34;))
if status:
try: status.start()
except: pass
elif fn == &#34;list_nodes&#34;: obs = self.list_nodes_tool(**args)
elif fn == &#34;run_commands&#34;: obs = self.run_commands_tool(**args, status=status)
@@ -2007,7 +2049,7 @@ class ai:
else: obs = f&#34;Error: {fn} unknown.&#34;
if debug and fn not in [&#34;delegate_to_engineer&#34;, &#34;consult_architect&#34;, &#34;escalate_to_architect&#34;, &#34;return_to_engineer&#34;]:
self._print_debug_observation(f&#34;Observation: {fn}&#34;, obs)
self._print_debug_observation(f&#34;Observation: {fn}&#34;, obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -2229,7 +2271,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
streamed_response = False
try:
safe_messages = self._sanitize_messages(messages)
if stream and (not debug or chunk_callback):
if stream and chunk_callback:
response, streamed_response = self._stream_completion(
model=model, messages=safe_messages, tools=tools, api_key=key,
status=status, label=label, debug=debug, num_retries=3,
@@ -2272,7 +2314,13 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
# In CLI debug mode, only print intermediate reasoning if there are tool calls.
# If there are no tool calls, this content is the final answer and will be printed by the caller.
if resp_msg.tool_calls:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(resp_msg.content), title=f&#34;[{current_brain}][bold]{label} Reasoning[/bold][/{current_brain}]&#34;, border_style=&#34;architect&#34; if current_brain == &#34;architect&#34; else &#34;engineer&#34;))
if status:
try: status.start()
except: pass
if not resp_msg.tool_calls: break
@@ -2293,7 +2341,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
elif fn == &#34;manage_memory_tool&#34;: status.update(f&#34;[architect]Architect: [UPDATING MEMORY]&#34;)
if debug:
self._print_debug_observation(f&#34;Decision: {fn}&#34;, args)
self._print_debug_observation(f&#34;Decision: {fn}&#34;, args, status=status)
if fn == &#34;delegate_to_engineer&#34;:
obs, eng_usage = self._engineer_loop(args[&#34;task&#34;], status=status, debug=debug, chat_history=messages[:-1])
@@ -2312,7 +2360,14 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
num_retries=3
)
obs = claude_resp.choices[0].message.content
if debug: self.console.print(Panel(Markdown(obs), title=&#34;[architect]Architect Consultation[/architect]&#34;, border_style=&#34;architect&#34;))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(obs), title=&#34;[architect]Architect Consultation[/architect]&#34;, border_style=&#34;architect&#34;))
if status:
try: status.start()
except: pass
except Exception as e:
if status: status.update(&#34;[unavailable]Architect unavailable! Engineer continuing alone...&#34;)
obs = f&#34;Architect unavailable ({str(e)}). Proceeding with your best technical judgment.&#34;
@@ -2329,7 +2384,14 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
handover_msg = f&#34;HANDOVER FROM EXECUTION ENGINE\n\nReason: {args[&#39;reason&#39;]}\n\nContext: {args[&#39;context&#39;]}\n\nYou are now in control of this conversation.&#34;
pending_user_message = handover_msg
obs = &#34;Control transferred to Architect. Handover context will be provided.&#34;
if debug: self.console.print(Panel(Text(handover_msg), title=&#34;[architect]Escalation to Architect[/architect]&#34;, border_style=&#34;architect&#34;))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title=&#34;[architect]Escalation to Architect[/architect]&#34;, border_style=&#34;architect&#34;))
if status:
try: status.start()
except: pass
elif fn == &#34;return_to_engineer&#34;:
if status: status.update(&#34;[engineer]Transferring control back to Engineer...&#34;)
@@ -2343,7 +2405,14 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
handover_msg = f&#34;HANDOVER FROM ARCHITECT\n\nSummary: {args[&#39;summary&#39;]}\n\nYou are now back in control. Continue handling the user&#39;s requests.&#34;
pending_user_message = handover_msg
obs = &#34;Control returned to Engineer. Handover summary will be provided.&#34;
if debug: self.console.print(Panel(Text(handover_msg), title=&#34;[engineer]Return to Engineer[/engineer]&#34;, border_style=&#34;engineer&#34;))
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title=&#34;[engineer]Return to Engineer[/engineer]&#34;, border_style=&#34;engineer&#34;))
if status:
try: status.start()
except: pass
elif fn == &#34;list_nodes&#34;: obs = self.list_nodes_tool(**args)
elif fn == &#34;run_commands&#34;: obs = self.run_commands_tool(**args, status=status)
@@ -2353,7 +2422,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
else: obs = f&#34;Error: {fn} unknown.&#34;
if debug and fn not in [&#34;delegate_to_engineer&#34;, &#34;consult_architect&#34;, &#34;escalate_to_architect&#34;, &#34;return_to_engineer&#34;]:
self._print_debug_observation(f&#34;Observation: {fn}&#34;, obs)
self._print_debug_observation(f&#34;Observation: {fn}&#34;, obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
@@ -3748,6 +3817,42 @@ class node:
else:
jumphost_cmd = jumphost_cmd + &#34; {}&#34;.format(&#34;@&#34;.join([self.jumphost[&#34;user&#34;],self.jumphost[&#34;host&#34;]]))
self.jumphost = f&#34;-o ProxyCommand=\&#34;{jumphost_cmd}\&#34;&#34;
elif self.jumphost[&#34;protocol&#34;] == &#34;ssm&#34;:
ssm_target = self.jumphost[&#34;host&#34;]
ssm_cmd = f&#34;aws ssm start-session --target {ssm_target} --document-name AWS-StartSSHSession --parameters &#39;portNumber=22&#39;&#34;
if isinstance(self.jumphost.get(&#34;tags&#34;), dict):
if &#34;profile&#34; in self.jumphost[&#34;tags&#34;]:
ssm_cmd += f&#34; --profile {self.jumphost[&#39;tags&#39;][&#39;profile&#39;]}&#34;
if &#34;region&#34; in self.jumphost[&#34;tags&#34;]:
ssm_cmd += f&#34; --region {self.jumphost[&#39;tags&#39;][&#39;region&#39;]}&#34;
if self.jumphost[&#34;options&#34;] != &#39;&#39;:
ssm_cmd += f&#34; {self.jumphost[&#39;options&#39;]}&#34;
bastion_user_part = f&#34;{self.jumphost[&#39;user&#39;]}@{ssm_target}&#34; if self.jumphost[&#39;user&#39;] else ssm_target
ssh_opts = &#34;&#34;
if isinstance(self.jumphost.get(&#34;tags&#34;), dict) and &#34;ssh_options&#34; in self.jumphost[&#34;tags&#34;]:
ssh_opts = f&#34; {self.jumphost[&#39;tags&#39;][&#39;ssh_options&#39;]}&#34;
inner_ssh = f&#34;ssh{ssh_opts} -o ProxyCommand=&#39;{ssm_cmd}&#39; -W %h:%p {bastion_user_part}&#34;
self.jumphost = f&#34;-o ProxyCommand=\&#34;{inner_ssh}\&#34;&#34;
elif self.jumphost[&#34;protocol&#34;] in [&#34;kubectl&#34;, &#34;docker&#34;]:
nc_cmd = &#34;nc&#34;
if isinstance(self.jumphost.get(&#34;tags&#34;), dict) and &#34;nc_command&#34; in self.jumphost[&#34;tags&#34;]:
nc_cmd = self.jumphost[&#34;tags&#34;][&#34;nc_command&#34;]
if self.jumphost[&#34;protocol&#34;] == &#34;kubectl&#34;:
proxy_cmd = f&#34;kubectl exec &#34;
if self.jumphost[&#34;options&#34;] != &#39;&#39;:
proxy_cmd += f&#34;{self.jumphost[&#39;options&#39;]} &#34;
proxy_cmd += f&#34;{self.jumphost[&#39;host&#39;]} -i -- {nc_cmd} %h %p&#34;
else:
proxy_cmd = f&#34;docker &#34;
if self.jumphost[&#34;options&#34;] != &#39;&#39;:
proxy_cmd += f&#34;{self.jumphost[&#39;options&#39;]} &#34;
proxy_cmd += f&#34;exec -i {self.jumphost[&#39;host&#39;]} {nc_cmd} %h %p&#34;
self.jumphost = f&#34;-o ProxyCommand=\&#34;{proxy_cmd}\&#34;&#34;
else:
self.jumphost = &#34;&#34;
@@ -4003,8 +4108,6 @@ class node:
self.mylog.write(data)
async def keepalive_task():
if self.idletime &lt;= 0:
return
while True:
await asyncio.sleep(1)
if time() - self.lastinput &gt;= self.idletime:
@@ -4015,8 +4118,6 @@ class node:
pass
async def savelog_task():
if not hasattr(self, &#39;logfile&#39;) or not hasattr(self, &#39;mylog&#39;):
return
prev_size = 0
while True:
await asyncio.sleep(5)
@@ -4035,10 +4136,12 @@ class node:
# We want to exit if either happens, so return_exceptions=False, but we need to cancel the others.
tasks = [
asyncio.create_task(ingress_task()),
asyncio.create_task(egress_task()),
asyncio.create_task(keepalive_task()),
asyncio.create_task(savelog_task())
asyncio.create_task(egress_task())
]
if self.idletime &gt; 0:
tasks.append(asyncio.create_task(keepalive_task()))
if hasattr(self, &#39;logfile&#39;) and hasattr(self, &#39;mylog&#39;):
tasks.append(asyncio.create_task(savelog_task()))
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for p in pending:
p.cancel()
@@ -4404,7 +4507,7 @@ class node:
&#34;telnet&#34;: [&#39;[u|U]sername:&#39;, &#39;refused&#39;, &#39;supported&#39;, &#39;invalid|unrecognized option&#39;, &#39;ssh-keygen.*\&#34;&#39;, &#39;timeout|timed.out&#39;, &#39;unavailable&#39;, &#39;closed&#39;, password_prompt, prompt, &#39;suspend&#39;, pexpect.EOF, pexpect.TIMEOUT, &#34;No route to host&#34;, &#34;resolve hostname&#34;, &#34;no matching&#34;, &#34;[b|B]ad (owner|permissions)&#34;],
&#34;kubectl&#34;: [&#39;[u|U]sername:&#39;, &#39;[r|R]efused&#39;, &#39;[E|e]rror&#39;, &#39;DEPRECATED&#39;, pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF, &#34;expired|invalid&#34;],
&#34;docker&#34;: [&#39;[u|U]sername:&#39;, &#39;Cannot&#39;, &#39;[E|e]rror&#39;, &#39;failed&#39;, &#39;not a docker command&#39;, &#39;unknown&#39;, &#39;unable to resolve&#39;, pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF],
&#34;ssm&#34;: [&#39;[u|U]sername:&#39;, &#39;Cannot&#39;, &#39;[E|e]rror&#39;, &#39;failed&#39;, &#39;SessionManagerPlugin&#39;, &#39;unknown&#39;, &#39;unable to resolve&#39;, pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF]
&#34;ssm&#34;: [&#39;[u|U]sername:&#39;, &#39;Cannot&#39;, &#39;[E|e]rror&#39;, &#39;failed&#39;, &#39;SessionManagerPlugin&#39;, &#39;[u|U]nknown&#39;, &#39;unable to resolve&#39;, pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF]
}
error_indices = {
+130 -48
View File
@@ -73,13 +73,45 @@ el.replaceWith(d);
def export_to_dict(self, folders=None):
&#34;&#34;&#34;Export nodes/folders to a dictionary.&#34;&#34;&#34;
if not folders:
return self.config._getallnodesfull(extract=False)
return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != &#34;@&#34; and f not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{f}&#39; not found.&#34;)
return self.config._getallnodesfull(folders, extract=False)
flat = self.config._getallnodesfull(folders, extract=False)
nested = {}
for k, v in flat.items():
uniques = self.config._explode_unique(k)
if not uniques:
continue
if &#34;folder&#34; in uniques and &#34;subfolder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
s_name = uniques[&#34;subfolder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
if s_name not in nested[f_name]:
nested[f_name][s_name] = {&#34;type&#34;: &#34;subfolder&#34;}
nested[f_name][s_name][i_name] = v
elif &#34;folder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
nested[f_name][i_name] = v
else:
i_name = uniques[&#34;id&#34;]
nested[i_name] = v
return nested
def import_from_file(self, file_path):
&#34;&#34;&#34;Import nodes/folders from a YAML file.&#34;&#34;&#34;
@@ -98,28 +130,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError(&#34;Invalid import data format: expected a dictionary of nodes.&#34;)
# Process imports
for k, v in data.items():
uniques = self.config._explode_unique(k)
# Ensure folders exist
if &#34;folder&#34; in uniques:
folder_name = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if folder_name not in self.config._getallfolders():
folder_uniques = self.config._explode_unique(folder_name)
self.config._folder_add(**folder_uniques)
if &#34;subfolder&#34; in uniques:
sub_name = f&#34;@{uniques[&#39;subfolder&#39;]}@{uniques[&#39;folder&#39;]}&#34;
if sub_name not in self.config._getallfolders():
sub_uniques = self.config._explode_unique(sub_name)
self.config._folder_add(**sub_uniques)
# Add node/connection
v.update(uniques)
self._validate_node_name(k)
self.config._connections_add(**v)
def _traverse_import(node_data, current_folder=&#39;&#39;, current_subfolder=&#39;&#39;):
for k, v in node_data.items():
if k == &#34;type&#34;:
continue
if isinstance(v, dict):
node_type = v.get(&#34;type&#34;, &#34;connection&#34;)
if node_type == &#34;folder&#34;:
self.config._folder_add(folder=k)
_traverse_import(v, current_folder=k, current_subfolder=&#39;&#39;)
elif node_type == &#34;subfolder&#34;:
self.config._folder_add(folder=current_folder, subfolder=k)
_traverse_import(v, current_folder=current_folder, current_subfolder=k)
elif node_type == &#34;connection&#34;:
unique_id = k
if current_subfolder:
unique_id = f&#34;{k}@{current_subfolder}@{current_folder}&#34;
elif current_folder:
unique_id = f&#34;{k}@{current_folder}&#34;
self._validate_node_name(unique_id)
kwargs = deepcopy(v)
kwargs[&#39;id&#39;] = k
kwargs[&#39;folder&#39;] = current_folder
kwargs[&#39;subfolder&#39;] = current_subfolder
self.config._connections_add(**kwargs)
else:
# Invalid format skip
pass
_traverse_import(data)
self.config._saveconfig(self.config.file)</code></pre>
</details>
<div class="desc"><p>Business logic for YAML/JSON inventory import and export.</p>
@@ -146,13 +187,45 @@ el.replaceWith(d);
<pre><code class="python">def export_to_dict(self, folders=None):
&#34;&#34;&#34;Export nodes/folders to a dictionary.&#34;&#34;&#34;
if not folders:
return self.config._getallnodesfull(extract=False)
return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != &#34;@&#34; and f not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{f}&#39; not found.&#34;)
return self.config._getallnodesfull(folders, extract=False)</code></pre>
flat = self.config._getallnodesfull(folders, extract=False)
nested = {}
for k, v in flat.items():
uniques = self.config._explode_unique(k)
if not uniques:
continue
if &#34;folder&#34; in uniques and &#34;subfolder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
s_name = uniques[&#34;subfolder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
if s_name not in nested[f_name]:
nested[f_name][s_name] = {&#34;type&#34;: &#34;subfolder&#34;}
nested[f_name][s_name][i_name] = v
elif &#34;folder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
nested[f_name][i_name] = v
else:
i_name = uniques[&#34;id&#34;]
nested[i_name] = v
return nested</code></pre>
</details>
<div class="desc"><p>Export nodes/folders to a dictionary.</p></div>
</dd>
@@ -191,28 +264,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError(&#34;Invalid import data format: expected a dictionary of nodes.&#34;)
# Process imports
for k, v in data.items():
uniques = self.config._explode_unique(k)
# Ensure folders exist
if &#34;folder&#34; in uniques:
folder_name = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if folder_name not in self.config._getallfolders():
folder_uniques = self.config._explode_unique(folder_name)
self.config._folder_add(**folder_uniques)
if &#34;subfolder&#34; in uniques:
sub_name = f&#34;@{uniques[&#39;subfolder&#39;]}@{uniques[&#39;folder&#39;]}&#34;
if sub_name not in self.config._getallfolders():
sub_uniques = self.config._explode_unique(sub_name)
self.config._folder_add(**sub_uniques)
# Add node/connection
v.update(uniques)
self._validate_node_name(k)
self.config._connections_add(**v)
def _traverse_import(node_data, current_folder=&#39;&#39;, current_subfolder=&#39;&#39;):
for k, v in node_data.items():
if k == &#34;type&#34;:
continue
if isinstance(v, dict):
node_type = v.get(&#34;type&#34;, &#34;connection&#34;)
if node_type == &#34;folder&#34;:
self.config._folder_add(folder=k)
_traverse_import(v, current_folder=k, current_subfolder=&#39;&#39;)
elif node_type == &#34;subfolder&#34;:
self.config._folder_add(folder=current_folder, subfolder=k)
_traverse_import(v, current_folder=current_folder, current_subfolder=k)
elif node_type == &#34;connection&#34;:
unique_id = k
if current_subfolder:
unique_id = f&#34;{k}@{current_subfolder}@{current_folder}&#34;
elif current_folder:
unique_id = f&#34;{k}@{current_folder}&#34;
self._validate_node_name(unique_id)
kwargs = deepcopy(v)
kwargs[&#39;id&#39;] = k
kwargs[&#39;folder&#39;] = current_folder
kwargs[&#39;subfolder&#39;] = current_subfolder
self.config._connections_add(**kwargs)
else:
# Invalid format skip
pass
_traverse_import(data)
self.config._saveconfig(self.config.file)</code></pre>
</details>
<div class="desc"><p>Import nodes/folders from a dictionary.</p></div>
+175 -67
View File
@@ -972,13 +972,45 @@ el.replaceWith(d);
def export_to_dict(self, folders=None):
&#34;&#34;&#34;Export nodes/folders to a dictionary.&#34;&#34;&#34;
if not folders:
return self.config._getallnodesfull(extract=False)
return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != &#34;@&#34; and f not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{f}&#39; not found.&#34;)
return self.config._getallnodesfull(folders, extract=False)
flat = self.config._getallnodesfull(folders, extract=False)
nested = {}
for k, v in flat.items():
uniques = self.config._explode_unique(k)
if not uniques:
continue
if &#34;folder&#34; in uniques and &#34;subfolder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
s_name = uniques[&#34;subfolder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
if s_name not in nested[f_name]:
nested[f_name][s_name] = {&#34;type&#34;: &#34;subfolder&#34;}
nested[f_name][s_name][i_name] = v
elif &#34;folder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
nested[f_name][i_name] = v
else:
i_name = uniques[&#34;id&#34;]
nested[i_name] = v
return nested
def import_from_file(self, file_path):
&#34;&#34;&#34;Import nodes/folders from a YAML file.&#34;&#34;&#34;
@@ -997,28 +1029,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError(&#34;Invalid import data format: expected a dictionary of nodes.&#34;)
# Process imports
for k, v in data.items():
uniques = self.config._explode_unique(k)
# Ensure folders exist
if &#34;folder&#34; in uniques:
folder_name = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if folder_name not in self.config._getallfolders():
folder_uniques = self.config._explode_unique(folder_name)
self.config._folder_add(**folder_uniques)
if &#34;subfolder&#34; in uniques:
sub_name = f&#34;@{uniques[&#39;subfolder&#39;]}@{uniques[&#39;folder&#39;]}&#34;
if sub_name not in self.config._getallfolders():
sub_uniques = self.config._explode_unique(sub_name)
self.config._folder_add(**sub_uniques)
# Add node/connection
v.update(uniques)
self._validate_node_name(k)
self.config._connections_add(**v)
def _traverse_import(node_data, current_folder=&#39;&#39;, current_subfolder=&#39;&#39;):
for k, v in node_data.items():
if k == &#34;type&#34;:
continue
if isinstance(v, dict):
node_type = v.get(&#34;type&#34;, &#34;connection&#34;)
if node_type == &#34;folder&#34;:
self.config._folder_add(folder=k)
_traverse_import(v, current_folder=k, current_subfolder=&#39;&#39;)
elif node_type == &#34;subfolder&#34;:
self.config._folder_add(folder=current_folder, subfolder=k)
_traverse_import(v, current_folder=current_folder, current_subfolder=k)
elif node_type == &#34;connection&#34;:
unique_id = k
if current_subfolder:
unique_id = f&#34;{k}@{current_subfolder}@{current_folder}&#34;
elif current_folder:
unique_id = f&#34;{k}@{current_folder}&#34;
self._validate_node_name(unique_id)
kwargs = deepcopy(v)
kwargs[&#39;id&#39;] = k
kwargs[&#39;folder&#39;] = current_folder
kwargs[&#39;subfolder&#39;] = current_subfolder
self.config._connections_add(**kwargs)
else:
# Invalid format skip
pass
_traverse_import(data)
self.config._saveconfig(self.config.file)</code></pre>
</details>
<div class="desc"><p>Business logic for YAML/JSON inventory import and export.</p>
@@ -1045,13 +1086,45 @@ el.replaceWith(d);
<pre><code class="python">def export_to_dict(self, folders=None):
&#34;&#34;&#34;Export nodes/folders to a dictionary.&#34;&#34;&#34;
if not folders:
return self.config._getallnodesfull(extract=False)
return deepcopy(self.config.connections)
else:
# Validate folders exist
for f in folders:
if f != &#34;@&#34; and f not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{f}&#39; not found.&#34;)
return self.config._getallnodesfull(folders, extract=False)</code></pre>
flat = self.config._getallnodesfull(folders, extract=False)
nested = {}
for k, v in flat.items():
uniques = self.config._explode_unique(k)
if not uniques:
continue
if &#34;folder&#34; in uniques and &#34;subfolder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
s_name = uniques[&#34;subfolder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
if s_name not in nested[f_name]:
nested[f_name][s_name] = {&#34;type&#34;: &#34;subfolder&#34;}
nested[f_name][s_name][i_name] = v
elif &#34;folder&#34; in uniques:
f_name = uniques[&#34;folder&#34;]
i_name = uniques[&#34;id&#34;]
if f_name not in nested:
nested[f_name] = {&#34;type&#34;: &#34;folder&#34;}
nested[f_name][i_name] = v
else:
i_name = uniques[&#34;id&#34;]
nested[i_name] = v
return nested</code></pre>
</details>
<div class="desc"><p>Export nodes/folders to a dictionary.</p></div>
</dd>
@@ -1090,28 +1163,37 @@ el.replaceWith(d);
if not isinstance(data, dict):
raise InvalidConfigurationError(&#34;Invalid import data format: expected a dictionary of nodes.&#34;)
# Process imports
for k, v in data.items():
uniques = self.config._explode_unique(k)
# Ensure folders exist
if &#34;folder&#34; in uniques:
folder_name = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if folder_name not in self.config._getallfolders():
folder_uniques = self.config._explode_unique(folder_name)
self.config._folder_add(**folder_uniques)
if &#34;subfolder&#34; in uniques:
sub_name = f&#34;@{uniques[&#39;subfolder&#39;]}@{uniques[&#39;folder&#39;]}&#34;
if sub_name not in self.config._getallfolders():
sub_uniques = self.config._explode_unique(sub_name)
self.config._folder_add(**sub_uniques)
# Add node/connection
v.update(uniques)
self._validate_node_name(k)
self.config._connections_add(**v)
def _traverse_import(node_data, current_folder=&#39;&#39;, current_subfolder=&#39;&#39;):
for k, v in node_data.items():
if k == &#34;type&#34;:
continue
if isinstance(v, dict):
node_type = v.get(&#34;type&#34;, &#34;connection&#34;)
if node_type == &#34;folder&#34;:
self.config._folder_add(folder=k)
_traverse_import(v, current_folder=k, current_subfolder=&#39;&#39;)
elif node_type == &#34;subfolder&#34;:
self.config._folder_add(folder=current_folder, subfolder=k)
_traverse_import(v, current_folder=current_folder, current_subfolder=k)
elif node_type == &#34;connection&#34;:
unique_id = k
if current_subfolder:
unique_id = f&#34;{k}@{current_subfolder}@{current_folder}&#34;
elif current_folder:
unique_id = f&#34;{k}@{current_folder}&#34;
self._validate_node_name(unique_id)
kwargs = deepcopy(v)
kwargs[&#39;id&#39;] = k
kwargs[&#39;folder&#39;] = current_folder
kwargs[&#39;subfolder&#39;] = current_subfolder
self.config._connections_add(**kwargs)
else:
# Invalid format skip
pass
_traverse_import(data)
self.config._saveconfig(self.config.file)</code></pre>
</details>
<div class="desc"><p>Import nodes/folders from a dictionary.</p></div>
@@ -1282,8 +1364,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get(&#34;case&#34;, False)
if filter_str:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
if filter_str.startswith(&#34;@&#34;):
if not case_sensitive:
folders = [f for f in folders if f.lower() == filter_str.lower()]
else:
folders = [f for f in folders if f == filter_str]
else:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
def get_node_details(self, unique_id):
@@ -1304,13 +1392,20 @@ el.replaceWith(d);
&#34;&#34;&#34;Generate and update the internal nodes cache.&#34;&#34;&#34;
self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)
def validate_parent_folder(self, unique_id):
def validate_parent_folder(self, unique_id, is_folder=False):
&#34;&#34;&#34;Check if parent folder exists for a given node unique ID.&#34;&#34;&#34;
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
if is_folder:
uniques = self.config._explode_unique(unique_id)
if uniques and &#34;subfolder&#34; in uniques and &#34;folder&#34; in uniques:
parent_folder = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
else:
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
def add_node(self, unique_id, data, is_folder=False):
@@ -1330,7 +1425,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if &#34;subfolder&#34; in uniques:
self.validate_parent_folder(unique_id)
self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -1512,7 +1607,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if &#34;subfolder&#34; in uniques:
self.validate_parent_folder(unique_id)
self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -1728,8 +1823,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get(&#34;case&#34;, False)
if filter_str:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
if filter_str.startswith(&#34;@&#34;):
if not case_sensitive:
folders = [f for f in folders if f.lower() == filter_str.lower()]
else:
folders = [f for f in folders if f == filter_str]
else:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders</code></pre>
</details>
<div class="desc"><p>Return all unique folders, optionally filtered by regex.</p></div>
@@ -1858,20 +1959,27 @@ el.replaceWith(d);
<div class="desc"><p>Explicitly update an existing node.</p></div>
</dd>
<dt id="connpy.services.NodeService.validate_parent_folder"><code class="name flex">
<span>def <span class="ident">validate_parent_folder</span></span>(<span>self, unique_id)</span>
<span>def <span class="ident">validate_parent_folder</span></span>(<span>self, unique_id, is_folder=False)</span>
</code></dt>
<dd>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def validate_parent_folder(self, unique_id):
<pre><code class="python">def validate_parent_folder(self, unique_id, is_folder=False):
&#34;&#34;&#34;Check if parent folder exists for a given node unique ID.&#34;&#34;&#34;
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)</code></pre>
if is_folder:
uniques = self.config._explode_unique(unique_id)
if uniques and &#34;subfolder&#34; in uniques and &#34;folder&#34; in uniques:
parent_folder = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
else:
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)</code></pre>
</details>
<div class="desc"><p>Check if parent folder exists for a given node unique ID.</p></div>
</dd>
+45 -19
View File
@@ -117,8 +117,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get(&#34;case&#34;, False)
if filter_str:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
if filter_str.startswith(&#34;@&#34;):
if not case_sensitive:
folders = [f for f in folders if f.lower() == filter_str.lower()]
else:
folders = [f for f in folders if f == filter_str]
else:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders
def get_node_details(self, unique_id):
@@ -139,13 +145,20 @@ el.replaceWith(d);
&#34;&#34;&#34;Generate and update the internal nodes cache.&#34;&#34;&#34;
self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)
def validate_parent_folder(self, unique_id):
def validate_parent_folder(self, unique_id, is_folder=False):
&#34;&#34;&#34;Check if parent folder exists for a given node unique ID.&#34;&#34;&#34;
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
if is_folder:
uniques = self.config._explode_unique(unique_id)
if uniques and &#34;subfolder&#34; in uniques and &#34;folder&#34; in uniques:
parent_folder = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
else:
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
def add_node(self, unique_id, data, is_folder=False):
@@ -165,7 +178,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if &#34;subfolder&#34; in uniques:
self.validate_parent_folder(unique_id)
self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -347,7 +360,7 @@ el.replaceWith(d);
# Check if parent folder exists when creating a subfolder
if &#34;subfolder&#34; in uniques:
self.validate_parent_folder(unique_id)
self.validate_parent_folder(unique_id, is_folder=True)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
@@ -563,8 +576,14 @@ el.replaceWith(d);
case_sensitive = self.config.config.get(&#34;case&#34;, False)
if filter_str:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
if filter_str.startswith(&#34;@&#34;):
if not case_sensitive:
folders = [f for f in folders if f.lower() == filter_str.lower()]
else:
folders = [f for f in folders if f == filter_str]
else:
flags = re.IGNORECASE if not case_sensitive else 0
folders = [f for f in folders if re.search(filter_str, f, flags)]
return folders</code></pre>
</details>
<div class="desc"><p>Return all unique folders, optionally filtered by regex.</p></div>
@@ -693,20 +712,27 @@ el.replaceWith(d);
<div class="desc"><p>Explicitly update an existing node.</p></div>
</dd>
<dt id="connpy.services.node_service.NodeService.validate_parent_folder"><code class="name flex">
<span>def <span class="ident">validate_parent_folder</span></span>(<span>self, unique_id)</span>
<span>def <span class="ident">validate_parent_folder</span></span>(<span>self, unique_id, is_folder=False)</span>
</code></dt>
<dd>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def validate_parent_folder(self, unique_id):
<pre><code class="python">def validate_parent_folder(self, unique_id, is_folder=False):
&#34;&#34;&#34;Check if parent folder exists for a given node unique ID.&#34;&#34;&#34;
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)</code></pre>
if is_folder:
uniques = self.config._explode_unique(unique_id)
if uniques and &#34;subfolder&#34; in uniques and &#34;folder&#34; in uniques:
parent_folder = f&#34;@{uniques[&#39;folder&#39;]}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)
else:
node_folder = unique_id.partition(&#34;@&#34;)[2]
if node_folder:
parent_folder = f&#34;@{node_folder}&#34;
if parent_folder not in self.config._getallfolders():
raise NodeNotFoundError(f&#34;Folder &#39;{parent_folder}&#39; not found.&#34;)</code></pre>
</details>
<div class="desc"><p>Check if parent folder exists for a given node unique ID.</p></div>
</dd>