diff --git a/.gitignore b/.gitignore
index 6efb47d..21e1cd4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -133,3 +133,12 @@ dmypy.json
#App
connpy-completion-helper
+
+# Gemini & AI Tools
+.gemini/
+GEMINI.md
+
+# Node.js (used by Gemini CLI or plugins)
+node_modules/
+package-lock.json
+package.json
diff --git a/README.md b/README.md
index 7c55ab2..881056c 100644
--- a/README.md
+++ b/README.md
@@ -56,7 +56,9 @@ For more detailed information, please read our [Privacy Policy](https://connpy.g
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- - Use GPT AI to help you manage your devices.
+ - Use AI with a multi-agent system (Engineer/Architect) to manage devices.
+ Supports any LLM provider via litellm (OpenAI, Anthropic, Google, etc.).
+ Features streaming responses, interactive chat, and extensible plugin tools.
- Add plugins with your own scripts.
- Much more!
@@ -428,15 +430,46 @@ for key in routers.result:
print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))
```
### Using AI
-```
+The AI module uses a multi-agent architecture with an **Engineer** (fast execution) and an **Architect** (strategic reasoning). It supports any LLM provider through [litellm](https://github.com/BerriAI/litellm).
+```python
import connpy
conf = connpy.configfile()
-organization = 'openai-org'
-api_key = "openai-key"
-myia = connpy.ai(conf, organization, api_key)
-input = "go to router 1 and get me the full configuration"
-result = myia.ask(input, dryrun = False)
-print(result)
+# Uses models and API keys from config, or override them:
+myai = connpy.ai(conf, engineer_model="gemini/gemini-2.5-flash", engineer_api_key="your-key")
+result = myai.ask("go to router1 and show me the running configuration")
+print(result["response"])
+# Streaming is enabled by default for CLI, disable for programmatic use:
+result = myai.ask("show interfaces on all routers", stream=False)
+print(result["response"])
+```
+
+#### AI Plugin Tool Registration
+Plugins can extend the AI system by registering custom tools via the `Preload` class:
+```python
+def _register_my_tools(ai_instance):
+ tool_def = {
+ "type": "function",
+ "function": {
+ "name": "my_custom_tool",
+ "description": "Does something useful.",
+ "parameters": {
+ "type": "object",
+ "properties": {"query": {"type": "string"}},
+ "required": ["query"]
+ }
+ }
+ }
+ ai_instance.register_ai_tool(
+ tool_definition=tool_def,
+ handler=my_handler_function,
+ target="engineer", # or "architect" or "both"
+ engineer_prompt="- My tool: does X.",
+ architect_prompt=" * My tool (my_custom_tool)."
+ )
+
+class Preload:
+ def __init__(self, connapp):
+ connapp.ai.modify(_register_my_tools)
```
## http API
With the Connpy API you can run commands on devices using http requests
@@ -527,7 +560,7 @@ With the Connpy API you can run commands on devices using http requests
**Method**: `POST`
-**Description**: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request.
+**Description**: This route sends a request to the AI multi-agent system which will analyze it, execute commands on devices if needed, and return the result. Supports any LLM provider configured via litellm.
#### Request Body:
diff --git a/connpy/__init__.py b/connpy/__init__.py
index 2ce91e7..d1b297f 100644
--- a/connpy/__init__.py
+++ b/connpy/__init__.py
@@ -15,7 +15,8 @@ Connpy is a SSH, SFTP, Telnet, kubectl, and Docker pod connection manager and au
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- - Use GPT AI to help you manage your devices.
+ - Use AI with a multi-agent system (Engineer/Architect) to help you manage your devices.
+ Supports any LLM provider via litellm (OpenAI, Anthropic, Google, etc.).
- Add plugins with your own scripts.
- Much more!
@@ -496,12 +497,42 @@ for key in routers.result:
```
import connpy
conf = connpy.configfile()
-organization = 'openai-org'
-api_key = "openai-key"
-myia = connpy.ai(conf, organization, api_key)
-input = "go to router 1 and get me the full configuration"
-result = myia.ask(input, dryrun = False)
-print(result)
+# Uses models and API keys from config, or override them:
+myai = connpy.ai(conf, engineer_model="gemini/gemini-2.5-flash", engineer_api_key="your-key")
+result = myai.ask("go to router1 and show me the running configuration")
+print(result["response"])
+# Streaming is enabled by default for CLI, disable for programmatic use:
+result = myai.ask("show interfaces on all routers", stream=False)
+print(result["response"])
+```
+
+#### AI Plugin Tool Registration
+Plugins can register custom tools with the AI system using `register_ai_tool()` in their `Preload` class:
+```
+def _register_my_tools(ai_instance):
+ tool_def = {
+ "type": "function",
+ "function": {
+ "name": "my_custom_tool",
+ "description": "Does something useful.",
+ "parameters": {
+ "type": "object",
+ "properties": {"query": {"type": "string"}},
+ "required": ["query"]
+ }
+ }
+ }
+ ai_instance.register_ai_tool(
+ tool_definition=tool_def,
+ handler=my_handler_function,
+ target="engineer", # or "architect" or "both"
+ engineer_prompt="- My tool: does X.",
+ architect_prompt=" * My tool (my_custom_tool)."
+ )
+
+class Preload:
+ def __init__(self, connapp):
+ connapp.ai.modify(_register_my_tools)
```
'''
from .core import node,nodes
diff --git a/connpy/_version.py b/connpy/_version.py
index df91a88..2d6a1e5 100644
--- a/connpy/_version.py
+++ b/connpy/_version.py
@@ -1,2 +1,2 @@
-__version__ = "4.2"
+__version__ = "5.0b1"
diff --git a/connpy/ai.py b/connpy/ai.py
index 1b4f57a..91aee29 100755
--- a/connpy/ai.py
+++ b/connpy/ai.py
@@ -1,497 +1,839 @@
-from openai import OpenAI
-import time
+import os
import json
import re
-import ast
+import datetime
from textwrap import dedent
+import litellm
+from litellm import completion, stream_chunk_builder
from .core import nodes
-from copy import deepcopy
-from .hooks import ClassHook,MethodHook
+
+# Silenciar feedback de litellm
+litellm.suppress_debug_info = True
+litellm.set_verbose = False
+from .hooks import ClassHook, MethodHook
+from . import printer
+from rich.markdown import Markdown
+from rich.console import Console
+from rich.panel import Panel
+from rich.text import Text
+
+console = Console()
@ClassHook
class ai:
- ''' This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.
+ """Hybrid Multi-Agent System: Selective Escalation with Role Persistence."""
- ### Attributes:
+ SAFE_COMMANDS = [r'^show\s+', r'^ls\s*', r'^cat\s+', r'^ip\s+route\s+show', r'^ip\s+addr\s+show', r'^ip\s+link\s+show', r'^pwd$', r'^hostname$', r'^uname', r'^df\s*', r'^free\s*', r'^ps\s*', r'^ping\s+', r'^traceroute\s+']
- - model (str): Model of GPT api to use. Default is gpt-4o-mini.
-
- - temp (float): Value between 0 and 1 that control the randomness
- of generated text, with higher values increasing
- creativity. Default is 0.7.
-
- '''
-
- def __init__(self, config, org = None, api_key = None, model = None):
- '''
-
- ### Parameters:
-
- - config (obj): Pass the object created with class configfile with
- key for decryption and extra configuration if you
- are using connection manager.
-
- ### Optional Parameters:
-
- - org (str): A unique token identifying the user organization
- to interact with the API.
-
- - api_key (str): A unique authentication token required to access
- and interact with the API.
-
- - model (str): Model of GPT api to use. Default is gpt-4o-mini.
-
- - temp (float): Value between 0 and 1 that control the randomness
- of generated text, with higher values increasing
- creativity. Default is 0.7.
-
-
- '''
+ def __init__(self, config, org=None, api_key=None, engineer_model=None, architect_model=None, engineer_api_key=None, architect_api_key=None):
self.config = config
- try:
- final_api_key = api_key if api_key else self.config.config["openai"]["api_key"]
- except Exception:
- raise ValueError("Missing openai api_key")
-
- try:
- final_org = org if org else self.config.config["openai"]["organization"]
- except Exception:
- raise ValueError("Missing openai organization")
-
- self.client = OpenAI(api_key=final_api_key, organization=final_org)
- if model:
- self.model = model
- else:
- try:
- self.model = self.config.config["openai"]["model"]
- except:
- self.model = "gpt-5-nano"
- self.__prompt = {}
- self.__prompt["original_system"] = """
- You are the AI chatbot and assistant of a network connection manager and automation app called connpy. When provided with user input analyze the input and extract the following information. If user wants to chat just reply and don't call a function:
-
- - type: Given a user input, identify the type of request they want to make. The input will represent one of two options:
-
- 1. "command" - The user wants to get information from devices by running commands.
- 2. "list_nodes" - The user wants to get a list of nodes, devices, servers, or routers.
- The 'type' field should reflect whether the user input is a command or a request for a list of nodes.
-
- - filter: One or more regex patterns indicating the device or group of devices the command should be run on. The filter can have different formats, such as:
- - hostname
- - hostname@folder
- - hostname@subfolder@folder
- - partofhostname
- - @folder
- - @subfolder@folder
- - regex_pattern
-
- The filter should be extracted from the user input exactly as it was provided.
- Always preserve the exact filter pattern provided by the user, with no modifications. Do not process any regex, the application can do that.
-
- """
- self.__prompt["original_user"] = "Get the IP addresses of loopback0 for all routers from w2az1 and e1.*(prod|dev) and check if they have the ip 192.168.1.1"
- self.__prompt["original_assistant"] = {"name": "get_network_device_info", "arguments": "{\n \"type\": \"command\",\n \"filter\": [\"w2az1\",\"e1.*(prod|dev)\"]\n}"}
- self.__prompt["original_function"] = {}
- self.__prompt["original_function"]["name"] = "get_network_device_info"
- self.__prompt["original_function"]["descriptions"] = "You are the AI chatbot and assistant of a network connection manager and automation app called connpy. When provided with user input analyze the input and extract the information acording to the function, If user wants to chat just reply and don't call a function",
- self.__prompt["original_function"]["parameters"] = {}
- self.__prompt["original_function"]["parameters"]["type"] = "object"
- self.__prompt["original_function"]["parameters"]["properties"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["type"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["type"]["type"] = "string"
- self.__prompt["original_function"]["parameters"]["properties"]["type"]["description"] ="""
-Categorize the user's request based on the operation they want to perform on the nodes. The requests can be classified into the following categories:
-
- 1. "command" - This represents a request to retrieve specific information or configurations from nodes. An example would be: "go to routers in @office and get the config".
-
- 2. "list_nodes" - This is when the user wants a list of nodes. An example could be: "get me the nodes in @office".
-"""
- self.__prompt["original_function"]["parameters"]["properties"]["type"]["enum"] = ["command", "list_nodes"]
- self.__prompt["original_function"]["parameters"]["properties"]["filter"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["type"] = "array"
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"]["type"] = "string"
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"]["description"] = """One or more regex patterns indicating the device or group of devices the command should be run on. The filter should be extracted from the user input exactly as it was provided.
- The filter can have different formats, such as:
- - hostname
- - hostname@folder
- - hostname@subfolder@folder
- - partofhostname
- - @folder
- - @subfolder@folder
- - regex_pattern
- """
- self.__prompt["original_function"]["parameters"]["required"] = ["type", "filter"]
- self.__prompt["command_system"] = """
- For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server).
- The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required.
- If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
- Note: Preserving the integrity of user-provided commands is of utmost importance. If a user has provided a specific command to run, include that command exactly as it was given, even if it's not recognized or understood. Under no circumstances should you modify or alter user-provided commands.
- """
- self.__prompt["command_user"]= """
- input: show me the full configuration for all this devices:
-
- OS:
- cisco ios:
- """
- self.__prompt["command_assistant"] = {"name": "get_commands", "arguments": "{\n \"cisco ios\": \"show running-configuration\"\n}"}
- self.__prompt["command_function"] = {}
- self.__prompt["command_function"]["name"] = "get_commands"
- self.__prompt["command_function"]["descriptions"] = """
- For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server).
- The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required.
- If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
- """
- self.__prompt["command_function"]["parameters"] = {}
- self.__prompt["command_function"]["parameters"]["type"] = "object"
- self.__prompt["command_function"]["parameters"]["properties"] = {}
- self.__prompt["confirmation_system"] = """
- Please analyze the user's input and categorize it as either an affirmation or negation. Based on this analysis, respond with:
-
- 'true' if the input is an affirmation like 'do it', 'go ahead', 'sure', etc.
- 'false' if the input is a negation.
- 'none' If the input does not fit into either of these categories.
- """
- self.__prompt["confirmation_user"] = "Yes go ahead!"
- self.__prompt["confirmation_assistant"] = "True"
- self.__prompt["confirmation_function"] = {}
- self.__prompt["confirmation_function"]["name"] = "get_confirmation"
- self.__prompt["confirmation_function"]["descriptions"] = """
- Analize user request and respond:
- """
- self.__prompt["confirmation_function"]["parameters"] = {}
- self.__prompt["confirmation_function"]["parameters"]["type"] = "object"
- self.__prompt["confirmation_function"]["parameters"]["properties"] = {}
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"] = {}
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["description"] = """'true' if the input is an affirmation like 'do it', 'go ahead', 'sure', etc.
-'false' if the input is a negation.
-'none' If the input does not fit into either of these categories"""
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["type"] = "string"
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["enum"] = ["true", "false", "none"]
- self.__prompt["confirmation_function"]["parameters"]["properties"]["response"] = {}
- self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["description"] = "If the user don't message is not an affiramtion or negation, kindly ask the user to rephrase."
- self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["type"] = "string"
- self.__prompt["confirmation_function"]["parameters"]["required"] = ["result"]
-
- @MethodHook
- def _retry_function(self, function, max_retries, backoff_num, *args):
- #Retry openai requests
- retries = 0
- while retries < max_retries:
- try:
- myfunction = function(*args)
- break
- except:
- wait_time = backoff_num * (2 ** retries)
- time.sleep(wait_time)
- retries += 1
- continue
- if retries == max_retries:
- myfunction = False
- return myfunction
-
- @MethodHook
- def _clean_command_response(self, raw_response, node_list):
- # Parse response for command request to openAI GPT.
- info_dict = {}
- info_dict["commands"] = []
- info_dict["variables"] = {}
- info_dict["variables"]["__global__"] = {}
- for key, value in node_list.items():
- newvalue = {}
- commands = raw_response[value]
- # Ensure commands is a list
- if isinstance(commands, str):
- commands = [commands]
- # Determine the number of digits required for zero-padding
- num_commands = len(commands)
- num_digits = len(str(num_commands))
-
- for i, e in enumerate(commands, start=1):
- # Zero-pad the command number
- command_num = f"command{str(i).zfill(num_digits)}"
- newvalue[command_num] = e
- if f"{{command{i}}}" not in info_dict["commands"]:
- info_dict["commands"].append(f"{{{command_num}}}")
- info_dict["variables"]["__global__"][command_num] = ""
- info_dict["variables"][key] = newvalue
- return info_dict
-
-
- @MethodHook
- def _get_commands(self, user_input, nodes):
- #Send the request for commands for each device to openAI GPT.
- output_list = []
- command_function = deepcopy(self.__prompt["command_function"])
- node_list = {}
- for key, value in nodes.items():
- tags = value.get('tags', {})
- try:
- if os_value := tags.get('os'):
- node_list[key] = os_value
- output_list.append(f"{os_value}")
- command_function["parameters"]["properties"][os_value] = {}
- command_function["parameters"]["properties"][os_value]["type"] = "array"
- command_function["parameters"]["properties"][os_value]["description"] = f"OS: {os_value}"
- command_function["parameters"]["properties"][os_value]["items"] = {}
- command_function["parameters"]["properties"][os_value]["items"]["type"] = "string"
- except:
- pass
- output_str = "\n".join(list(set(output_list)))
- command_input = f"input: {user_input}\n\nOS:\n{output_str}"
- message = []
- message.append({"role": "system", "content": dedent(self.__prompt["command_system"]).strip()})
- message.append({"role": "user", "content": dedent(self.__prompt["command_user"]).strip()})
- message.append({"role": "assistant", "content": None, "function_call": self.__prompt["command_assistant"]})
- message.append({"role": "user", "content": command_input})
- functions = [command_function]
- response = self.client.chat.completions.create(
- model=self.model,
- messages=message,
- functions=functions,
- function_call={"name": "get_commands"},
- )
- output = {}
- msg = response.choices[0].message # Es un objeto ChatCompletionMessage
-
- # Puede que function_call sea None. Verificá primero.
- if msg.function_call and msg.function_call.arguments:
- json_result = json.loads(msg.function_call.arguments)
- output["response"] = self._clean_command_response(json_result, node_list)
- else:
- # Manejo de error o fallback, según tu lógica
- output["response"] = None
- return output
-
- @MethodHook
- def _get_filter(self, user_input, chat_history = None):
- #Send the request to identify the filter and other attributes from the user input to GPT.
- message = []
- message.append({"role": "system", "content": dedent(self.__prompt["original_system"]).strip()})
- message.append({"role": "user", "content": dedent(self.__prompt["original_user"]).strip()})
- message.append({"role": "assistant", "content": None, "function_call": self.__prompt["original_assistant"]})
- functions = [self.__prompt["original_function"]]
- if not chat_history:
- chat_history = []
- chat_history.append({"role": "user", "content": user_input})
- message.extend(chat_history)
- response = self.client.chat.completions.create(
- model=self.model,
- messages=message,
- functions=functions,
- function_call="auto",
- top_p=1
- )
- def extract_quoted_strings(text):
- pattern = r'["\'](.*?)["\']'
- matches = re.findall(pattern, text)
- return matches
- expected = extract_quoted_strings(user_input)
- output = {}
- msg = response.choices[0].message # Objeto ChatCompletionMessage
-
- if msg.content: # Si hay texto libre del modelo (caso "no app-related")
- output["app_related"] = False
- chat_history.append({"role": "assistant", "content": msg.content})
- output["response"] = msg.content
- else:
- # Si hay function_call, es app-related
- if msg.function_call and msg.function_call.arguments:
- json_result = json.loads(msg.function_call.arguments)
- output["app_related"] = True
- output["filter"] = json_result["filter"]
- output["type"] = json_result["type"]
- chat_history.append({
- "role": "assistant",
- "content": msg.content,
- "function_call": {
- "name": msg.function_call.name,
- "arguments": json.dumps(json_result)
- }
- })
- else:
- # Fallback defensivo si no hay nada
- output["app_related"] = False
- output["response"] = None
-
- output["expected"] = expected
- output["chat_history"] = chat_history
- return output
+ self.trusted_session = False # Trust mode for the entire session
- @MethodHook
- def _get_confirmation(self, user_input):
- #Send the request to identify if user is confirming or denying the task
- message = []
- message.append({"role": "user", "content": user_input})
- functions = [self.__prompt["confirmation_function"]]
- response = self.client.chat.completions.create(
- model=self.model,
- messages=message,
- functions=functions,
- function_call={"name": "get_confirmation"},
- top_p=1
- )
- msg = response.choices[0].message # Es un objeto ChatCompletionMessage
- output = {}
+ # 1. Cargar configuración genérica
+ aiconfig = self.config.config.get("ai", {})
+
+ # Modelos (Prioridad: Argumento -> Config -> Default)
+ self.engineer_model = engineer_model or aiconfig.get("engineer_model") or "gemini/gemini-3.1-flash-lite-preview"
+ self.architect_model = architect_model or aiconfig.get("architect_model") or "anthropic/claude-sonnet-4-6"
+
+ # API Keys (Prioridad: Argumento -> Config)
+ self.engineer_key = engineer_api_key or aiconfig.get("engineer_api_key")
+ self.architect_key = architect_api_key or aiconfig.get("architect_api_key")
+
+ # Validate configuration
+ if not self.engineer_key:
+ raise ValueError("Engineer API key not configured. Use 'conn config ai engineer_api_key Plugins can register custom tools with the AI system using Using AI
+import connpy
conf = connpy.configfile()
-organization = 'openai-org'
-api_key = "openai-key"
-myia = connpy.ai(conf, organization, api_key)
-input = "go to router 1 and get me the full configuration"
-result = myia.ask(input, dryrun = False)
-print(result)
+# Uses models and API keys from config, or override them:
+myai = connpy.ai(conf, engineer_model="gemini/gemini-2.5-flash", engineer_api_key="your-key")
+result = myai.ask("go to router1 and show me the running configuration")
+print(result["response"])
+# Streaming is enabled by default for CLI, disable for programmatic use:
+result = myai.ask("show interfaces on all routers", stream=False)
+print(result["response"])
+AI Plugin Tool Registration
+register_ai_tool() in their Preload class:def _register_my_tools(ai_instance):
+ tool_def = {
+ "type": "function",
+ "function": {
+ "name": "my_custom_tool",
+ "description": "Does something useful.",
+ "parameters": {
+ "type": "object",
+ "properties": {"query": {"type": "string"}},
+ "required": ["query"]
+ }
+ }
+ }
+ ai_instance.register_ai_tool(
+ tool_definition=tool_def,
+ handler=my_handler_function,
+ target="engineer", # or "architect" or "both"
+ engineer_prompt="- My tool: does X.",
+ architect_prompt=" * My tool (my_custom_tool)."
+ )
+
+class Preload:
+ def __init__(self, connapp):
+ connapp.ai.modify(_register_my_tools)
class ai
-(config, org=None, api_key=None, model=None)
+(config,
org=None,
api_key=None,
engineer_model=None,
architect_model=None,
engineer_api_key=None,
architect_api_key=None)
@ClassHook
class ai:
- ''' This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.
+ """Hybrid Multi-Agent System: Selective Escalation with Role Persistence."""
- ### Attributes:
+ SAFE_COMMANDS = [r'^show\s+', r'^ls\s*', r'^cat\s+', r'^ip\s+route\s+show', r'^ip\s+addr\s+show', r'^ip\s+link\s+show', r'^pwd$', r'^hostname$', r'^uname', r'^df\s*', r'^free\s*', r'^ps\s*', r'^ping\s+', r'^traceroute\s+']
- - model (str): Model of GPT api to use. Default is gpt-4o-mini.
-
- - temp (float): Value between 0 and 1 that control the randomness
- of generated text, with higher values increasing
- creativity. Default is 0.7.
-
- '''
-
- def __init__(self, config, org = None, api_key = None, model = None):
- '''
-
- ### Parameters:
-
- - config (obj): Pass the object created with class configfile with
- key for decryption and extra configuration if you
- are using connection manager.
-
- ### Optional Parameters:
-
- - org (str): A unique token identifying the user organization
- to interact with the API.
-
- - api_key (str): A unique authentication token required to access
- and interact with the API.
-
- - model (str): Model of GPT api to use. Default is gpt-4o-mini.
-
- - temp (float): Value between 0 and 1 that control the randomness
- of generated text, with higher values increasing
- creativity. Default is 0.7.
-
-
- '''
+ def __init__(self, config, org=None, api_key=None, engineer_model=None, architect_model=None, engineer_api_key=None, architect_api_key=None):
self.config = config
- try:
- final_api_key = api_key if api_key else self.config.config["openai"]["api_key"]
- except Exception:
- raise ValueError("Missing openai api_key")
-
- try:
- final_org = org if org else self.config.config["openai"]["organization"]
- except Exception:
- raise ValueError("Missing openai organization")
-
- self.client = OpenAI(api_key=final_api_key, organization=final_org)
- if model:
- self.model = model
- else:
- try:
- self.model = self.config.config["openai"]["model"]
- except:
- self.model = "gpt-5-nano"
- self.__prompt = {}
- self.__prompt["original_system"] = """
- You are the AI chatbot and assistant of a network connection manager and automation app called connpy. When provided with user input analyze the input and extract the following information. If user wants to chat just reply and don't call a function:
-
- - type: Given a user input, identify the type of request they want to make. The input will represent one of two options:
-
- 1. "command" - The user wants to get information from devices by running commands.
- 2. "list_nodes" - The user wants to get a list of nodes, devices, servers, or routers.
- The 'type' field should reflect whether the user input is a command or a request for a list of nodes.
-
- - filter: One or more regex patterns indicating the device or group of devices the command should be run on. The filter can have different formats, such as:
- - hostname
- - hostname@folder
- - hostname@subfolder@folder
- - partofhostname
- - @folder
- - @subfolder@folder
- - regex_pattern
-
- The filter should be extracted from the user input exactly as it was provided.
- Always preserve the exact filter pattern provided by the user, with no modifications. Do not process any regex, the application can do that.
-
- """
- self.__prompt["original_user"] = "Get the IP addresses of loopback0 for all routers from w2az1 and e1.*(prod|dev) and check if they have the ip 192.168.1.1"
- self.__prompt["original_assistant"] = {"name": "get_network_device_info", "arguments": "{\n \"type\": \"command\",\n \"filter\": [\"w2az1\",\"e1.*(prod|dev)\"]\n}"}
- self.__prompt["original_function"] = {}
- self.__prompt["original_function"]["name"] = "get_network_device_info"
- self.__prompt["original_function"]["descriptions"] = "You are the AI chatbot and assistant of a network connection manager and automation app called connpy. When provided with user input analyze the input and extract the information acording to the function, If user wants to chat just reply and don't call a function",
- self.__prompt["original_function"]["parameters"] = {}
- self.__prompt["original_function"]["parameters"]["type"] = "object"
- self.__prompt["original_function"]["parameters"]["properties"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["type"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["type"]["type"] = "string"
- self.__prompt["original_function"]["parameters"]["properties"]["type"]["description"] ="""
-Categorize the user's request based on the operation they want to perform on the nodes. The requests can be classified into the following categories:
-
- 1. "command" - This represents a request to retrieve specific information or configurations from nodes. An example would be: "go to routers in @office and get the config".
-
- 2. "list_nodes" - This is when the user wants a list of nodes. An example could be: "get me the nodes in @office".
-"""
- self.__prompt["original_function"]["parameters"]["properties"]["type"]["enum"] = ["command", "list_nodes"]
- self.__prompt["original_function"]["parameters"]["properties"]["filter"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["type"] = "array"
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"] = {}
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"]["type"] = "string"
- self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"]["description"] = """One or more regex patterns indicating the device or group of devices the command should be run on. The filter should be extracted from the user input exactly as it was provided.
- The filter can have different formats, such as:
- - hostname
- - hostname@folder
- - hostname@subfolder@folder
- - partofhostname
- - @folder
- - @subfolder@folder
- - regex_pattern
- """
- self.__prompt["original_function"]["parameters"]["required"] = ["type", "filter"]
- self.__prompt["command_system"] = """
- For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server).
- The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required.
- If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
- Note: Preserving the integrity of user-provided commands is of utmost importance. If a user has provided a specific command to run, include that command exactly as it was given, even if it's not recognized or understood. Under no circumstances should you modify or alter user-provided commands.
- """
- self.__prompt["command_user"]= """
- input: show me the full configuration for all this devices:
-
- OS:
- cisco ios:
- """
- self.__prompt["command_assistant"] = {"name": "get_commands", "arguments": "{\n \"cisco ios\": \"show running-configuration\"\n}"}
- self.__prompt["command_function"] = {}
- self.__prompt["command_function"]["name"] = "get_commands"
- self.__prompt["command_function"]["descriptions"] = """
- For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server).
- The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required.
- If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
- """
- self.__prompt["command_function"]["parameters"] = {}
- self.__prompt["command_function"]["parameters"]["type"] = "object"
- self.__prompt["command_function"]["parameters"]["properties"] = {}
- self.__prompt["confirmation_system"] = """
- Please analyze the user's input and categorize it as either an affirmation or negation. Based on this analysis, respond with:
-
- 'true' if the input is an affirmation like 'do it', 'go ahead', 'sure', etc.
- 'false' if the input is a negation.
- 'none' If the input does not fit into either of these categories.
- """
- self.__prompt["confirmation_user"] = "Yes go ahead!"
- self.__prompt["confirmation_assistant"] = "True"
- self.__prompt["confirmation_function"] = {}
- self.__prompt["confirmation_function"]["name"] = "get_confirmation"
- self.__prompt["confirmation_function"]["descriptions"] = """
- Analize user request and respond:
- """
- self.__prompt["confirmation_function"]["parameters"] = {}
- self.__prompt["confirmation_function"]["parameters"]["type"] = "object"
- self.__prompt["confirmation_function"]["parameters"]["properties"] = {}
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"] = {}
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["description"] = """'true' if the input is an affirmation like 'do it', 'go ahead', 'sure', etc.
-'false' if the input is a negation.
-'none' If the input does not fit into either of these categories"""
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["type"] = "string"
- self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["enum"] = ["true", "false", "none"]
- self.__prompt["confirmation_function"]["parameters"]["properties"]["response"] = {}
- self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["description"] = "If the user don't message is not an affiramtion or negation, kindly ask the user to rephrase."
- self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["type"] = "string"
- self.__prompt["confirmation_function"]["parameters"]["required"] = ["result"]
-
- @MethodHook
- def _retry_function(self, function, max_retries, backoff_num, *args):
- #Retry openai requests
- retries = 0
- while retries < max_retries:
- try:
- myfunction = function(*args)
- break
- except:
- wait_time = backoff_num * (2 ** retries)
- time.sleep(wait_time)
- retries += 1
- continue
- if retries == max_retries:
- myfunction = False
- return myfunction
-
- @MethodHook
- def _clean_command_response(self, raw_response, node_list):
- # Parse response for command request to openAI GPT.
- info_dict = {}
- info_dict["commands"] = []
- info_dict["variables"] = {}
- info_dict["variables"]["__global__"] = {}
- for key, value in node_list.items():
- newvalue = {}
- commands = raw_response[value]
- # Ensure commands is a list
- if isinstance(commands, str):
- commands = [commands]
- # Determine the number of digits required for zero-padding
- num_commands = len(commands)
- num_digits = len(str(num_commands))
-
- for i, e in enumerate(commands, start=1):
- # Zero-pad the command number
- command_num = f"command{str(i).zfill(num_digits)}"
- newvalue[command_num] = e
- if f"{{command{i}}}" not in info_dict["commands"]:
- info_dict["commands"].append(f"{{{command_num}}}")
- info_dict["variables"]["__global__"][command_num] = ""
- info_dict["variables"][key] = newvalue
- return info_dict
-
-
- @MethodHook
- def _get_commands(self, user_input, nodes):
- #Send the request for commands for each device to openAI GPT.
- output_list = []
- command_function = deepcopy(self.__prompt["command_function"])
- node_list = {}
- for key, value in nodes.items():
- tags = value.get('tags', {})
- try:
- if os_value := tags.get('os'):
- node_list[key] = os_value
- output_list.append(f"{os_value}")
- command_function["parameters"]["properties"][os_value] = {}
- command_function["parameters"]["properties"][os_value]["type"] = "array"
- command_function["parameters"]["properties"][os_value]["description"] = f"OS: {os_value}"
- command_function["parameters"]["properties"][os_value]["items"] = {}
- command_function["parameters"]["properties"][os_value]["items"]["type"] = "string"
- except:
- pass
- output_str = "\n".join(list(set(output_list)))
- command_input = f"input: {user_input}\n\nOS:\n{output_str}"
- message = []
- message.append({"role": "system", "content": dedent(self.__prompt["command_system"]).strip()})
- message.append({"role": "user", "content": dedent(self.__prompt["command_user"]).strip()})
- message.append({"role": "assistant", "content": None, "function_call": self.__prompt["command_assistant"]})
- message.append({"role": "user", "content": command_input})
- functions = [command_function]
- response = self.client.chat.completions.create(
- model=self.model,
- messages=message,
- functions=functions,
- function_call={"name": "get_commands"},
- )
- output = {}
- msg = response.choices[0].message # Es un objeto ChatCompletionMessage
-
- # Puede que function_call sea None. Verificá primero.
- if msg.function_call and msg.function_call.arguments:
- json_result = json.loads(msg.function_call.arguments)
- output["response"] = self._clean_command_response(json_result, node_list)
- else:
- # Manejo de error o fallback, según tu lógica
- output["response"] = None
- return output
-
- @MethodHook
- def _get_filter(self, user_input, chat_history = None):
- #Send the request to identify the filter and other attributes from the user input to GPT.
- message = []
- message.append({"role": "system", "content": dedent(self.__prompt["original_system"]).strip()})
- message.append({"role": "user", "content": dedent(self.__prompt["original_user"]).strip()})
- message.append({"role": "assistant", "content": None, "function_call": self.__prompt["original_assistant"]})
- functions = [self.__prompt["original_function"]]
- if not chat_history:
- chat_history = []
- chat_history.append({"role": "user", "content": user_input})
- message.extend(chat_history)
- response = self.client.chat.completions.create(
- model=self.model,
- messages=message,
- functions=functions,
- function_call="auto",
- top_p=1
- )
- def extract_quoted_strings(text):
- pattern = r'["\'](.*?)["\']'
- matches = re.findall(pattern, text)
- return matches
- expected = extract_quoted_strings(user_input)
- output = {}
- msg = response.choices[0].message # Objeto ChatCompletionMessage
-
- if msg.content: # Si hay texto libre del modelo (caso "no app-related")
- output["app_related"] = False
- chat_history.append({"role": "assistant", "content": msg.content})
- output["response"] = msg.content
- else:
- # Si hay function_call, es app-related
- if msg.function_call and msg.function_call.arguments:
- json_result = json.loads(msg.function_call.arguments)
- output["app_related"] = True
- output["filter"] = json_result["filter"]
- output["type"] = json_result["type"]
- chat_history.append({
- "role": "assistant",
- "content": msg.content,
- "function_call": {
- "name": msg.function_call.name,
- "arguments": json.dumps(json_result)
- }
- })
- else:
- # Fallback defensivo si no hay nada
- output["app_related"] = False
- output["response"] = None
-
- output["expected"] = expected
- output["chat_history"] = chat_history
- return output
+ self.trusted_session = False # Trust mode for the entire session
- @MethodHook
- def _get_confirmation(self, user_input):
- #Send the request to identify if user is confirming or denying the task
- message = []
- message.append({"role": "user", "content": user_input})
- functions = [self.__prompt["confirmation_function"]]
- response = self.client.chat.completions.create(
- model=self.model,
- messages=message,
- functions=functions,
- function_call={"name": "get_confirmation"},
- top_p=1
- )
- msg = response.choices[0].message # Es un objeto ChatCompletionMessage
- output = {}
+ # 1. Cargar configuración genérica
+ aiconfig = self.config.config.get("ai", {})
+
+ # Modelos (Prioridad: Argumento -> Config -> Default)
+ self.engineer_model = engineer_model or aiconfig.get("engineer_model") or "gemini/gemini-3.1-flash-lite-preview"
+ self.architect_model = architect_model or aiconfig.get("architect_model") or "anthropic/claude-sonnet-4-6"
+
+ # API Keys (Prioridad: Argumento -> Config)
+ self.engineer_key = engineer_api_key or aiconfig.get("engineer_api_key")
+ self.architect_key = architect_api_key or aiconfig.get("architect_api_key")
+
+ # Validate configuration
+ if not self.engineer_key:
+ raise ValueError("Engineer API key not configured. Use 'conn config ai engineer_api_key <key>' to set it.")
+ if not self.architect_key:
+ console.print("[yellow]Warning: Architect API key not configured. Architect will be unavailable.[/yellow]")
+ console.print("[yellow]Use 'conn config ai architect_api_key <key>' to enable it.[/yellow]")
+
+ # Límites
+ self.max_history = 30
+ self.max_truncate = 50000
+ self.soft_limit_iterations = 20 # Show warning and suggest Ctrl+C
+ self.hard_limit_iterations = 50 # Force stop
- if msg.function_call and msg.function_call.arguments:
- json_result = json.loads(msg.function_call.arguments)
- if json_result["result"] == "true":
- output["result"] = True
- elif json_result["result"] == "false":
- output["result"] = False
- elif json_result["result"] == "none":
- output["result"] = json_result.get("response") # .get para evitar KeyError si falta
- else:
- output["result"] = None # O el valor que tenga sentido para tu caso
+ # External tool registry (populated by plugins via ClassHook.modify)
+ self.external_engineer_tools = [] # Tool defs for Engineer LLM
+ self.external_architect_tools = [] # Tool defs for Architect LLM
+ self.external_tool_handlers = {} # {"tool_name": handler_callable}
+ self.tool_status_formatters = {} # {"tool_name": formatter_callable}
+ self.engineer_prompt_extensions = [] # Extra text for engineer prompt
+ self.architect_prompt_extensions = [] # Extra text for architect prompt
- return output
+ # Long-term memory
+ self.memory_path = os.path.expanduser("~/.config/conn/ai_memory.md")
+ self.long_term_memory = ""
+ if os.path.exists(self.memory_path):
+ try:
+ with open(self.memory_path, "r") as f:
+ self.long_term_memory = f.read()
+ except FileNotFoundError:
+ self.long_term_memory = ""
+ except PermissionError as e:
+ console.print(f"[yellow]Warning: Cannot read AI memory file: {e}[/yellow]")
+ except Exception as e:
+ console.print(f"[yellow]Warning: Failed to load AI memory: {e}[/yellow]")
- @MethodHook
- def confirm(self, user_input, max_retries=3, backoff_num=1):
- '''
- Send the user input to openAI GPT and verify if response is afirmative or negative.
+ # Prompts base agnósticos
+ self._engineer_base_prompt = dedent(f"""
+ Role: TECHNICAL EXECUTION ENGINE.
+ Expertise: Universal Networking (Cisco, Nokia, Juniper, 6wind, etc.).
+
+ Rules:
+ - BE FAST: Execute tools directly to provide swift technical answers.
+ - AUTONOMY: Proactively use iterative tool calls (list_nodes, run_commands) to find the root cause.
+ - BATCH OPERATIONS: When working on multiple devices, call tools in parallel (multiple tool_calls in same response).
+ - COMPLETE MISSIONS: Execute ALL steps of a mission before reporting back. Don't stop halfway.
+ - DIAGRAM: Use ASCII art or Unicode box-drawing characters directly in your responses to visualize topologies or paths when helpful.
+ - EVIDENCE: Include 'Key Snippets' from tool outputs. Be token-efficient.
+ - NO WANDERING: Do not speculate. If stuck, report attempts.
+ - SAFETY: When you use 'run_commands' with configuration commands, the system automatically prompts the user for confirmation. Just execute - don't ask permission first.
+
+ CRITICAL - CONSULT vs ESCALATE:
+ - ALWAYS use 'consult_architect' for: Configuration planning, design decisions, complex troubleshooting.
+ Examples: "consultalo con el arquitecto", "preguntale al arquitecto", "que opina el arquitecto"
+ You stay in control and present the advice to the user.
+
+ - ONLY use 'escalate_to_architect' when user EXPLICITLY asks to TALK to the Architect:
+ Examples: "quiero hablar con el arquitecto", "pasame con el arquitecto", "que me atienda el arquitecto"
+ After escalation, you hand over control completely.
+
+ - DEFAULT: When in doubt, use 'consult_architect'. Escalation is rare.
+
+ Network Context: {self.long_term_memory if self.long_term_memory else "Empty."}
+ """).strip()
- ### Parameters:
+ self._architect_base_prompt = dedent(f"""
+ Role: STRATEGIC REASONING ENGINE.
+ Expertise: Network Architecture, Complex Troubleshooting, and Design Validation.
+
+ Rules:
+ - STRATEGY: Define technical missions for the Engineer.
+ - DIAGRAM: Use ASCII art or Unicode box-drawing characters in your responses to visualize topologies, traffic paths, or logic flows.
+ - ENGINEER CAPABILITIES: Your Engineer can:
+ * Filter nodes (list_nodes), Run CLI commands (run_commands), Get metadata (get_node_info).
+ - ANALYSIS: Review technical findings to identify patterns or design failures.
+ - MEMORY: Update long-term facts ONLY when the user explicitly requests it.
+
+ CRITICAL - EFFICIENT DELEGATION:
+ - Plan ALL tasks upfront before delegating.
+ - Delegate ONCE with a complete, detailed mission including ALL steps.
+ - Example: "List all routers matching 'border.*', then run 'show ip bgp summary' and 'show ip route' on each, then analyze the outputs."
+ - DO NOT delegate multiple times for the same goal. Batch everything into ONE mission.
+ - Wait for Engineer's complete report before responding to user.
+
+ CRITICAL - RETURNING CONTROL:
+ - When your strategic analysis is complete and no further architectural decisions are needed, use 'return_to_engineer' to hand control back.
+ - The Engineer is better suited for ongoing technical execution and troubleshooting.
+ - Only stay in control if the user explicitly needs strategic oversight for multiple interactions.
+
+ Network Context: {self.long_term_memory if self.long_term_memory else "Empty."}
+ """).strip()
- - user_input (str): User response confirming or denying.
+ @property
+ def engineer_system_prompt(self):
+ """Build engineer system prompt with plugin extensions."""
+ if self.engineer_prompt_extensions:
+ extensions = "\n".join(self.engineer_prompt_extensions)
+ return self._engineer_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
+ return self._engineer_base_prompt
- ### Optional Parameters:
+ @property
+ def architect_system_prompt(self):
+ """Build architect system prompt with plugin extensions."""
+ if self.architect_prompt_extensions:
+ extensions = "\n".join(self.architect_prompt_extensions)
+ return self._architect_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
+ return self._architect_base_prompt
- - max_retries (int): Maximum number of retries for gpt api.
- - backoff_num (int): Backoff factor for exponential wait time
- between retries.
+ def register_ai_tool(self, tool_definition, handler, target="engineer", engineer_prompt=None, architect_prompt=None, status_formatter=None):
+ """Register an external tool for the AI system.
- ### Returns:
+ Args:
+ tool_definition (dict): OpenAI-compatible tool definition.
+ handler (callable): Function(ai_instance, **tool_args) -> str.
+ target (str): 'engineer', 'architect', or 'both'.
+ engineer_prompt (str): Extra text for engineer system prompt.
+ architect_prompt (str): Extra text for architect system prompt.
+ status_formatter (callable): Function(args_dict) -> status string.
+ """
+ name = tool_definition["function"]["name"]
+ if target in ("engineer", "both"):
+ self.external_engineer_tools.append(tool_definition)
+ if target in ("architect", "both"):
+ self.external_architect_tools.append(tool_definition)
+ self.external_tool_handlers[name] = handler
+ if engineer_prompt:
+ self.engineer_prompt_extensions.append(engineer_prompt)
+ if architect_prompt:
+ self.architect_prompt_extensions.append(architect_prompt)
+ if status_formatter:
+ self.tool_status_formatters[name] = status_formatter
- bool or str: True, False or str if AI coudn't understand the response
- '''
- result = self._retry_function(self._get_confirmation, max_retries, backoff_num, user_input)
- if result:
- output = result["result"]
- else:
- output = f"{self.model} api is not responding right now, please try again later."
- return output
-
- @MethodHook
- def ask(self, user_input, dryrun = False, chat_history = None, max_retries=3, backoff_num=1):
- '''
- Send the user input to openAI GPT and parse the response to run an action in the application.
-
- ### Parameters:
-
- - user_input (str): Request to send to openAI that will be parsed
- and returned to execute on the application.
- AI understands the following tasks:
- - Run a command on a group of devices.
- - List a group of devices.
- - Test a command on a group of devices
- and verify if the output contain an
- expected value.
-
- ### Optional Parameters:
-
- - dryrun (bool): Set to true to get the arguments to use to
- run in the app. Default is false and it
- will run the actions directly.
- - chat_history (list): List in gpt api format for the chat history.
- - max_retries (int): Maximum number of retries for gpt api.
- - backoff_num (int): Backoff factor for exponential wait time
- between retries.
-
- ### Returns:
-
- dict: Dictionary formed with the following keys:
- - input: User input received
- - app_related: True if GPT detected the request to be related
- to the application.
- - dryrun: True/False
- - response: If the request is not related to the app. this
- key will contain chatGPT answer.
- - action: The action detected by the AI to run in the app.
- - filter: If it was detected by the AI, the filter used
- to get the list of nodes to work on.
- - nodes: If it's not a dryrun, the list of nodes matched by
- the filter.
- - args: A dictionary of arguments required to run command(s)
- on the nodes.
- - result: A dictionary with the output of the commands or
- the test.
- - chat_history: The chat history between user and chatbot.
- It can be used as an attribute for next request.
+ def _stream_completion(self, model, messages, tools, api_key, status=None, label="", debug=False, **kwargs):
+ """Stream a completion call, rendering styled Markdown in real-time.
+
+ Returns (response, streamed) where:
+ - response: reconstructed ModelResponse (same as non-streaming)
+ - streamed: True if text was rendered to console during streaming
+ """
+ from rich.live import Live
+
+ stream_resp = completion(model=model, messages=messages, tools=tools, api_key=api_key, stream=True, **kwargs)
+
+ chunks = []
+ full_content = ""
+ is_streaming_text = False
+ has_tool_calls = False
+ live_display = None
+
+ # Determine styling based on current brain
+ role_label = "Network Architect" if "architect" in label.lower() else "Network Engineer"
+ border = "purple" if "architect" in label.lower() else "blue"
+ title = f"[bold {border}]{role_label}[/bold {border}]"
+
+ try:
+ for chunk in stream_resp:
+ chunks.append(chunk)
+ delta = chunk.choices[0].delta
+ # Detect tool calls
+ if hasattr(delta, 'tool_calls') and delta.tool_calls:
+ has_tool_calls = True
+
+ # Stream text content with styled rendering
+ if hasattr(delta, 'content') and delta.content and not debug:
+ full_content += delta.content
-
- '''
- output = {}
- output["dryrun"] = dryrun
- output["input"] = user_input
- original = self._retry_function(self._get_filter, max_retries, backoff_num, user_input, chat_history)
- if not original:
- output["app_related"] = False
- output["response"] = f"{self.model} api is not responding right now, please try again later."
- return output
- output["app_related"] = original["app_related"]
- output["chat_history"] = original["chat_history"]
- if not output["app_related"]:
- output["response"] = original["response"]
- else:
- type = original["type"]
- if "filter" in original:
- output["filter"] = original["filter"]
- if not self.config.config["case"]:
- if isinstance(output["filter"], list):
- output["filter"] = [item.lower() for item in output["filter"]]
+ if not is_streaming_text:
+ # Stop spinner before starting live display
+ if status:
+ status.stop()
+ live_display = Live(
+ Panel(Markdown(full_content), title=title, border_style=border, expand=False),
+ console=console,
+ refresh_per_second=8,
+ transient=False
+ )
+ live_display.start()
+ is_streaming_text = True
else:
- output["filter"] = output["filter"].lower()
- if not dryrun or type == "command":
- thisnodes = self.config._getallnodesfull(output["filter"])
- output["nodes"] = list(thisnodes.keys())
- if not type == "command":
- output["action"] = "list_nodes"
+ live_display.update(
+ Panel(Markdown(full_content), title=title, border_style=border, expand=False)
+ )
+ except Exception as e:
+ if not chunks:
+ raise
+ finally:
+ if live_display:
+ # Render final state with complete content
+ try:
+ live_display.update(
+ Panel(Markdown(full_content), title=title, border_style=border, expand=False)
+ )
+ except Exception:
+ pass
+ live_display.stop()
+
+ # Rebuild complete response from chunks
+ try:
+ response = stream_chunk_builder(chunks, messages=messages)
+ except Exception:
+ # Fallback: manual reconstruction if stream_chunk_builder fails
+ full_content_rebuilt = ""
+ tool_calls_map = {}
+ for c in chunks:
+ d = c.choices[0].delta
+ if hasattr(d, 'content') and d.content:
+ full_content_rebuilt += d.content
+ if hasattr(d, 'tool_calls') and d.tool_calls:
+ for tc in d.tool_calls:
+ idx = tc.index
+ if idx not in tool_calls_map:
+ tool_calls_map[idx] = {"id": tc.id or "", "type": "function", "function": {"name": getattr(tc.function, 'name', '') or '', "arguments": getattr(tc.function, 'arguments', '') or ''}}
+ else:
+ if tc.id: tool_calls_map[idx]["id"] = tc.id
+ if tc.function:
+ if tc.function.name: tool_calls_map[idx]["function"]["name"] = tc.function.name
+ if tc.function.arguments: tool_calls_map[idx]["function"]["arguments"] += tc.function.arguments
+
+ # Build a minimal response-like object
+ class FakeFunc:
+ def __init__(self, name, arguments): self.name = name; self.arguments = arguments
+ class FakeTC:
+ def __init__(self, d): self.id = d["id"]; self.function = FakeFunc(d["function"]["name"], d["function"]["arguments"])
+ def model_dump(self, **kw): return {"id": self.id, "type": "function", "function": {"name": self.function.name, "arguments": self.function.arguments}}
+ class FakeMsg:
+ def __init__(self, content, tcs): self.content = content or None; self.tool_calls = tcs if tcs else None; self.role = "assistant"
+ def model_dump(self, **kw):
+ d = {"role": "assistant", "content": self.content}
+ if self.tool_calls: d["tool_calls"] = [tc.model_dump() for tc in self.tool_calls]
+ return d
+ class FakeChoice:
+ def __init__(self, msg): self.message = msg
+ class FakeResp:
+ def __init__(self, choice): self.choices = [choice]; self.usage = None
+
+ tcs = [FakeTC(tool_calls_map[i]) for i in sorted(tool_calls_map)] if tool_calls_map else None
+ response = FakeResp(FakeChoice(FakeMsg(full_content_rebuilt or full_content, tcs)))
+
+ # Only count as "streamed" if we rendered text AND it was the final response (no tool calls)
+ streamed = is_streaming_text and not has_tool_calls
+ return response, streamed
+
+ def _sanitize_messages(self, messages):
+ """Sanitize message list for strict providers like Gemini.
+
+ Ensures that:
+ 1. Every assistant message with tool_calls is followed by ALL its tool responses
+ 2. No user/system messages appear between tool_calls and tool responses
+ 3. Orphaned tool_calls at the end are removed
+ 4. Orphaned tool responses without a preceding tool_call are removed
+ """
+ if not messages:
+ return messages
+
+ sanitized = []
+ i = 0
+ while i < len(messages):
+ msg = messages[i]
+ role = msg.get('role', '')
+
+ if role == 'assistant' and msg.get('tool_calls'):
+ # Collect all expected tool_call_ids
+ expected_ids = set()
+ for tc in msg['tool_calls']:
+ tc_id = tc.get('id') if isinstance(tc, dict) else getattr(tc, 'id', None)
+ if tc_id:
+ expected_ids.add(tc_id)
+
+ # Look ahead for matching tool responses
+ tool_responses = []
+ j = i + 1
+ while j < len(messages):
+ next_msg = messages[j]
+ if next_msg.get('role') == 'tool':
+ tool_responses.append(next_msg)
+ j += 1
+ else:
+ break
+
+ # Only include this assistant+tools block if we have responses
+ if tool_responses:
+ sanitized.append(msg)
+ sanitized.extend(tool_responses)
+ i = j
+ else:
+ # Orphaned tool_calls with no responses - skip the assistant message
+ i += 1
+ elif role == 'tool':
+ # Orphaned tool response (no preceding assistant with tool_calls) - skip
+ i += 1
else:
- if thisnodes:
- commands = self._retry_function(self._get_commands, max_retries, backoff_num, user_input, thisnodes)
+ sanitized.append(msg)
+ i += 1
+
+ return sanitized
+
+ def _truncate(self, text, limit=None):
+ """Truncate text to specified limit, keeping head (60%) and tail (40%)."""
+ final_limit = limit or self.max_truncate
+ if len(text) <= final_limit: return text
+ head_limit = int(final_limit * 0.6)
+ tail_limit = int(final_limit * 0.4)
+ return (text[:head_limit] + f"\n\n[... OUTPUT TRUNCATED ...]\n\n" + text[-tail_limit:])
+
+ def manage_memory_tool(self, content, action="append"):
+ """Save or update long-term memory. Only use when user explicitly requests it."""
+ if not content or not content.strip():
+ return "Error: Cannot save empty content to memory."
+
+ try:
+ mode = "a" if action == "append" else "w"
+ os.makedirs(os.path.dirname(self.memory_path), exist_ok=True)
+ with open(self.memory_path, mode) as f:
+ timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ f.write(f"\n\n## {timestamp}\n{content.strip()}\n" if action == "append" else content)
+
+ # Reload memory after update
+ with open(self.memory_path, "r") as f:
+ self.long_term_memory = f.read()
+
+ return "Memory updated successfully."
+ except PermissionError as e:
+ return f"Error: Permission denied writing to memory file: {e}"
+ except Exception as e:
+ return f"Error updating memory: {str(e)}"
+
+
+ def list_nodes_tool(self, filter_pattern=".*"):
+ """List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more."""
+ try:
+ matched_names = self.config._getallnodes(filter_pattern)
+ if not matched_names: return "No nodes found."
+ if len(matched_names) <= 5:
+ matched_data = self.config.getitems(matched_names, extract=True)
+ res = {}
+ for name, data in matched_data.items():
+ os_tag = "unknown"
+ if isinstance(data, dict):
+ ts = data.get("tags")
+ if isinstance(ts, dict): os_tag = ts.get("os", "unknown")
+ res[name] = {"os": os_tag}
+ return json.dumps(res)
+ return json.dumps({"count": len(matched_names), "nodes": matched_names, "note": "Use 'get_node_info' for details."})
+ except Exception as e:
+ return f"Error listing nodes: {str(e)}"
+
+ def _is_safe_command(self, cmd):
+ """Check if a command matches safe patterns."""
+ return any(re.match(pattern, cmd.strip(), re.IGNORECASE) for pattern in self.SAFE_COMMANDS)
+
+ def run_commands_tool(self, nodes_filter, commands, status=None):
+ """Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands."""
+ # Handle if commands is a JSON string
+ if isinstance(commands, str):
+ try:
+ commands = json.loads(commands)
+ except:
+ commands = [c.strip() for c in commands.split('\n') if c.strip()]
+
+ # Expand multi-line commands within a list (in case the AI packs them)
+ if isinstance(commands, list):
+ expanded_commands = []
+ for cmd in commands:
+ expanded_commands.extend([c.strip() for c in str(cmd).split('\n') if c.strip()])
+ commands = expanded_commands
+ else:
+ commands = [str(commands)]
+
+ # Check command safety natively
+ if not self.trusted_session:
+ unsafe_commands = [cmd for cmd in commands if not self._is_safe_command(cmd)]
+ if unsafe_commands:
+ # Stop the spinner so prompt doesn't get messed up
+ if status: status.stop()
+
+ # Show ALL commands with unsafe ones highlighted
+ formatted_cmds = []
+ for cmd in commands:
+ if cmd in unsafe_commands:
+ formatted_cmds.append(f" • [yellow]{cmd}[/yellow]")
+ else:
+ formatted_cmds.append(f" • {cmd}")
+
+ panel_content = f"Target: {nodes_filter}\nCommands:\n" + "\n".join(formatted_cmds)
+ console.print(Panel(panel_content, title="[bold yellow]⚠️ UNSAFE COMMANDS DETECTED[/bold yellow]", border_style="yellow"))
+
+ try:
+ from rich.prompt import Prompt
+ user_resp = Prompt.ask("[bold yellow]Execute? (y: yes / n: no / a: allow all this session / <text>: feedback)[/bold yellow]", default="n")
+ except KeyboardInterrupt:
+ if status: status.update("[bold blue]Engineer: Resuming...")
+ console.print("[bold red]✗ Aborted by user (Ctrl+C).[/bold red]")
+ return "Error: User cancelled execution (Ctrl+C)."
+
+ # Resume the spinner
+ if status: status.update("[bold blue]Engineer: Processing user response...")
+
+ user_resp_lower = user_resp.strip().lower()
+ if user_resp_lower in ['a', 'allow']:
+ self.trusted_session = True
+ console.print("[bold green]✓ Trust Mode Enabled. All future commands in this session will execute without confirmation.[/bold green]")
+ elif user_resp_lower in ['y', 'yes']:
+ console.print("[bold green]✓ Executing...[/bold green]")
+ elif user_resp_lower in ['n', 'no', '']:
+ console.print("[bold red]✗ Execution rejected by user.[/bold red]")
+ return "Error: User rejected execution."
else:
- output["app_related"] = False
- filterlist = ", ".join(output["filter"])
- output["response"] = f"I'm sorry, I coudn't find any device with filter{'s' if len(output['filter']) != 1 else ''}: {filterlist}."
- return output
- if not commands:
- output["app_related"] = False
- output["response"] = f"{self.model} api is not responding right now, please try again later."
- return output
- output["args"] = {}
- output["args"]["commands"] = commands["response"]["commands"]
- output["args"]["vars"] = commands["response"]["variables"]
- output["nodes"] = [item for item in output["nodes"] if output["args"]["vars"].get(item)]
- if original.get("expected"):
- output["args"]["expected"] = original["expected"]
- output["action"] = "test"
- else:
- output["action"] = "run"
- if dryrun:
- output["task"] = []
- if output["action"] == "test":
- output["task"].append({"Task": "Verify if expected value is in command(s) output"})
- output["task"].append({"Expected value to verify": output["args"]["expected"]})
- elif output["action"] == "run":
- output["task"].append({"Task": "Run command(s) on devices and return output"})
- varstocommands = deepcopy(output["args"]["vars"])
- del varstocommands["__global__"]
- output["task"].append({"Devices": varstocommands})
- if not dryrun:
- mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config)
- if output["action"] == "test":
- output["result"] = mynodes.test(**output["args"])
- output["logs"] = mynodes.output
- elif output["action"] == "run":
- output["result"] = mynodes.run(**output["args"])
- return output
+ console.print(f"[bold cyan]User feedback: [/bold cyan]{user_resp}")
+ return f"User requested changes: {user_resp}. Please adjust the commands based on this feedback and try again."
+
+ try:
+ matched_names = self.config._getallnodes(nodes_filter)
+ if not matched_names: return "No nodes found matching filter."
+ thisnodes_dict = self.config.getitems(matched_names, extract=True)
+ result = nodes(thisnodes_dict, config=self.config).run(commands)
+ return self._truncate(json.dumps(result))
+ except Exception as e:
+ return f"Error executing commands: {str(e)}"
+
+ def get_node_info_tool(self, node_name):
+ """Get detailed metadata for a specific node. Passwords are masked."""
+ try:
+ d = self.config.getitem(node_name, extract=True)
+ if 'password' in d: d['password'] = '***'
+ return json.dumps(d)
+ except Exception as e:
+ return f"Error getting node info: {str(e)}"
+
+ def _engineer_loop(self, task, status=None, debug=False, chat_history=None):
+ """Internal loop where the Engineer executes technical tasks for the Architect."""
+ # Optimización de caché para el Ingeniero
+ if "claude" in self.engineer_model.lower():
+ messages = [{"role": "system", "content": [{"type": "text", "text": self.engineer_system_prompt, "cache_control": {"type": "ephemeral"}}]}]
+ else:
+ messages = [{"role": "system", "content": self.engineer_system_prompt}]
+
+ if chat_history:
+ # Clean chat history from caching metadata if engineer is not Claude
+ if "claude" not in self.engineer_model.lower():
+ cleaned_history = []
+ for msg in chat_history[-5:]:
+ m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
+ # Remove cache_control from system messages
+ if m.get('role') == 'system' and isinstance(m.get('content'), list):
+ m['content'] = m['content'][0]['text'] if m['content'] else ""
+ cleaned_history.append(m)
+ messages.extend(cleaned_history)
+ else:
+ messages.extend(chat_history[-5:])
+
+ messages.append({"role": "user", "content": f"MISSION: {task}"})
+
+ tools = self._get_engineer_tools()
+ usage = {"input": 0, "output": 0, "total": 0}
+ iteration = 0
+ soft_limit_warned = False
+
+ try:
+ while iteration < self.hard_limit_iterations:
+ iteration += 1
+
+ # Soft limit warning
+ if iteration == self.soft_limit_iterations and not soft_limit_warned:
+ console.print(f"[yellow]⚠ Engineer has performed {iteration} steps. This is taking longer than expected.[/yellow]")
+ console.print(f"[yellow] You can press Ctrl+C to interrupt and get a summary.[/yellow]")
+ soft_limit_warned = True
+
+ if status: status.update(f"[bold blue]Engineer: Analyzing mission... (step {iteration})")
+
+ try:
+ safe_messages = self._sanitize_messages(messages)
+ response = completion(model=self.engineer_model, messages=safe_messages, tools=tools, api_key=self.engineer_key)
+ except Exception as e:
+ return f"Engineer failed to connect: {str(e)}", usage
+
+ if hasattr(response, "usage") and response.usage:
+ usage["input"] += getattr(response.usage, "prompt_tokens", 0)
+ usage["output"] += getattr(response.usage, "completion_tokens", 0)
+ usage["total"] += getattr(response.usage, "total_tokens", 0)
+
+ resp_msg = response.choices[0].message
+ msg_dict = resp_msg.model_dump(exclude_none=True)
+ if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
+ messages.append(msg_dict)
+
+ if not resp_msg.tool_calls: break
+ for tc in resp_msg.tool_calls:
+ fn, args = tc.function.name, json.loads(tc.function.arguments)
+
+ # Notificación en tiempo real de la tarea técnica
+ if status:
+ if fn == "list_nodes": status.update(f"[bold blue]Engineer: [SEARCH] {args.get('filter_pattern','.*')}")
+ elif fn == "run_commands":
+ cmds = args.get('commands', [])
+ cmd_str = cmds[0] if cmds else ""
+ status.update(f"[bold blue]Engineer: [CMD] {cmd_str}")
+ elif fn == "get_node_info": status.update(f"[bold blue]Engineer: [INSPECT] {args.get('node_name','')}")
+ elif fn in self.tool_status_formatters: status.update(self.tool_status_formatters[fn](args))
+
+ if debug: console.print(Panel(Text(json.dumps(args, indent=2)), title=f"[bold blue]Engineer Tool: {fn}[/bold blue]", border_style="blue"))
+
+ if fn == "list_nodes": obs = self.list_nodes_tool(**args)
+ elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
+ elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
+ elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
+ else: obs = f"Error: Unknown tool '{fn}'."
+
+ if debug: console.print(Panel(Text(str(obs)), title=f"[bold green]Engineer Observation: {fn}[/bold green]", border_style="green"))
+ messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
+
+ if iteration >= self.hard_limit_iterations:
+ console.print(f"[red]⛔ Engineer reached hard limit ({self.hard_limit_iterations} steps). Forcing stop.[/red]")
+
+ if debug and resp_msg.content:
+ console.print(Panel(Text(resp_msg.content), title="[bold blue]Engineer Final Report to Architect[/bold blue]", border_style="blue"))
+
+ return resp_msg.content, usage
+ except Exception as e:
+ return f"Engineer failed: {str(e)}", usage
+
+ def _get_engineer_tools(self):
+ """Define tools available to the Engineer."""
+ tools = [
+ {"type": "function", "function": {"name": "list_nodes", "description": "Lists available nodes in the inventory.", "parameters": {"type": "object", "properties": {"filter_pattern": {"type": "string", "description": "Regex to filter nodes (e.g. '.*', 'border.*')."}}}}},
+ {"type": "function", "function": {"name": "run_commands", "description": "Runs one or more commands on matched nodes. MANDATORY: You MUST call 'list_nodes' first to verify the target list.", "parameters": {"type": "object", "properties": {"nodes_filter": {"type": "string", "description": "Exact node name or verified filter pattern."}, "commands": {"type": "array", "items": {"type": "string"}, "description": "List of commands (e.g. ['show ip route', 'show int desc'])."}}, "required": ["nodes_filter", "commands"]}}},
+ {"type": "function", "function": {"name": "get_node_info", "description": "Gets full metadata for a specific node.", "parameters": {"type": "object", "properties": {"node_name": {"type": "string"}}, "required": ["node_name"]}}},
+ {"type": "function", "function": {"name": "consult_architect", "description": "Ask the Strategic Reasoning Engine for advice on complex design, architecture, or troubleshooting decisions. You remain in control and will present the response to the user. Use this for: configuration planning, design validation, complex troubleshooting.", "parameters": {"type": "object", "properties": {"question": {"type": "string", "description": "Strategic question or decision needed."}, "technical_summary": {"type": "string", "description": "Technical findings and context gathered so far."}}, "required": ["question", "technical_summary"]}}},
+ {"type": "function", "function": {"name": "escalate_to_architect", "description": "Transfer full control to the Strategic Reasoning Engine. Use ONLY when the user explicitly requests the Architect or when the problem requires strategic oversight beyond consultation. After escalation, the Architect takes over the conversation.", "parameters": {"type": "object", "properties": {"reason": {"type": "string", "description": "Why you're escalating (e.g. 'User requested Architect', 'Complex multi-site design needed')."}, "context": {"type": "string", "description": "Full context and findings to hand over."}}, "required": ["reason", "context"]}}}
+ ]
+ tools.extend(self.external_engineer_tools)
+ return tools
+
+ def _get_architect_tools(self):
+ """Define tools available to the Strategic Reasoning Engine."""
+ tools = [
+ {"type": "function", "function": {"name": "delegate_to_engineer", "description": "Delegates a technical mission to the Engineer.", "parameters": {"type": "object", "properties": {"task": {"type": "string", "description": "Detailed technical mission or goal."}}, "required": ["task"]}}},
+ {"type": "function", "function": {"name": "return_to_engineer", "description": "Return control to the Engineer. Use this when your strategic analysis is complete and the Engineer should handle the rest of the conversation.", "parameters": {"type": "object", "properties": {"summary": {"type": "string", "description": "Brief summary of your analysis to hand over to the Engineer."}}, "required": ["summary"]}}},
+ {"type": "function", "function": {"name": "manage_memory_tool", "description": "Saves information to long-term memory. MANDATORY: Only use this if the user explicitly asks to remember or save something.", "parameters": {"type": "object", "properties": {"content": {"type": "string"}, "action": {"type": "string", "enum": ["append", "replace"]}}, "required": ["content"]}}}
+ ]
+ tools.extend(self.external_architect_tools)
+ return tools
+
+ @MethodHook
+ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=False, stream=True):
+ if chat_history is None: chat_history = []
+ usage = {"input": 0, "output": 0, "total": 0}
+
+ # 1. Selector de Rol inicial (Sticky Brain)
+ explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I)
+ explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I)
+
+ if explicit_architect:
+ current_brain = "architect"
+ elif explicit_engineer:
+ current_brain = "engineer"
+ else:
+ # Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente
+ is_architect_active = False
+ for msg in reversed(chat_history[-5:]):
+ tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None)
+ if tcs:
+ for tc in tcs:
+ fn = tc.get('function', {}).get('name') if isinstance(tc, dict) else getattr(getattr(tc, 'function', None), 'name', '')
+ # Architect stays in control if delegating tasks or if Engineer escalated to them
+ # consult_architect is just Engineer asking for advice - Engineer keeps control
+ if fn in ['delegate_to_engineer', 'escalate_to_architect']:
+ is_architect_active = True; break
+ if is_architect_active: break
+ current_brain = "architect" if is_architect_active else "engineer"
+
+ # 2. Preparación de mensajes y limpieza
+ clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip()
+
+ system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt
+ tools = self._get_architect_tools() if current_brain == "architect" else self._get_engineer_tools()
+ model = self.architect_model if current_brain == "architect" else self.engineer_model
+ key = self.architect_key if current_brain == "architect" else self.engineer_key
+
+ # Estructura optimizada para Prompt Caching
+ if "claude" in model.lower():
+ messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}]
+ else:
+ messages = [{"role": "system", "content": system_prompt}]
+
+ # Interleaving de historial
+ last_role = "system"
+ for msg in chat_history[-self.max_history:]:
+ m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
+ role = m.get('role')
+ if role == last_role and role == 'user':
+ messages[-1]['content'] += "\n" + (m.get('content') or "")
+ continue
+ if role == 'assistant' and m.get('tool_calls') and m.get('content') == "": m['content'] = None
+ messages.append(m)
+ last_role = role
+
+ if last_role == 'user': messages[-1]['content'] += "\n" + clean_input
+ else: messages.append({"role": "user", "content": clean_input})
+
+ # 3. Bucle de ejecución
+ iteration = 0
+ soft_limit_warned = False
+ streamed_response = False
+
+ try:
+ while iteration < self.hard_limit_iterations:
+ iteration += 1
+
+ # Soft limit warning
+ if iteration == self.soft_limit_iterations and not soft_limit_warned:
+ console.print(f"[yellow]⚠ Agent has performed {iteration} steps. This is taking longer than expected.[/yellow]")
+ console.print(f"[yellow] You can press Ctrl+C to interrupt and get a summary of progress.[/yellow]")
+ soft_limit_warned = True
+
+ label = "[bold purple]Architect" if current_brain == "architect" else "[bold blue]Engineer"
+ if status: status.update(f"{label} is thinking... (step {iteration})")
+
+ streamed_response = False
+ try:
+ safe_messages = self._sanitize_messages(messages)
+ if stream and not debug:
+ response, streamed_response = self._stream_completion(
+ model=model, messages=safe_messages, tools=tools, api_key=key,
+ status=status, label=label, debug=debug, num_retries=3
+ )
+ else:
+ response = completion(model=model, messages=safe_messages, tools=tools, api_key=key, num_retries=3)
+ except Exception as e:
+ if current_brain == "architect":
+ if status: status.update("[bold orange3]Architect unavailable! Falling back to Engineer...")
+ # Preserve context when falling back - use clean_input directly
+ current_brain = "engineer"
+ model = self.engineer_model
+ tools = self._get_engineer_tools()
+ key = self.engineer_key
+ # Rebuild messages with Engineer system prompt and original user request
+ messages = [{"role": "system", "content": self.engineer_system_prompt}]
+ # Add chat history if exists (excluding system prompt)
+ if chat_history:
+ for msg in chat_history[-self.max_history:]:
+ if msg.get('role') != 'system':
+ messages.append(msg)
+ # Add current user request
+ messages.append({"role": "user", "content": clean_input})
+ continue
+ else:
+ return {"response": f"Error: Both engines failed. {str(e)}", "chat_history": messages[1:], "usage": usage}
+
+ if hasattr(response, "usage") and response.usage:
+ usage["input"] += getattr(response.usage, "prompt_tokens", 0)
+ usage["output"] += getattr(response.usage, "completion_tokens", 0)
+ usage["total"] += getattr(response.usage, "total_tokens", 0)
+
+ resp_msg = response.choices[0].message
+ msg_dict = resp_msg.model_dump(exclude_none=True)
+ if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
+ messages.append(msg_dict)
+
+ if debug and resp_msg.content:
+ console.print(Panel(Markdown(resp_msg.content), title=f"{label} Reasoning", border_style="purple" if current_brain == "architect" else "blue"))
+
+ if not resp_msg.tool_calls: break
+
+ # Track if we need to inject a user message after all tool responses
+ pending_user_message = None
+
+ for tc in resp_msg.tool_calls:
+ fn, args = tc.function.name, json.loads(tc.function.arguments)
+
+ # Validate tool access based on current brain
+ if fn in ['delegate_to_engineer'] and current_brain != "architect":
+ obs = f"Error: Tool '{fn}' is only available to the Architect (Architect). You are the Engineer (Engineer). Use 'run_commands' directly to execute configuration."
+ messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
+ continue
+
+ if status:
+ if fn == "delegate_to_engineer": status.update(f"[bold purple]Architect: [DELEGATING MISSION] {args.get('task','')[:40]}...")
+ elif fn == "manage_memory_tool": status.update(f"[bold purple]Architect: [UPDATING MEMORY]")
+
+ if debug: console.print(Panel(Text(json.dumps(args, indent=2)), title=f"{label} Decision: {fn}", border_style="white"))
+
+ if fn == "delegate_to_engineer":
+ obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
+ usage["input"] += eng_usage["input"]; usage["output"] += eng_usage["output"]; usage["total"] += eng_usage["total"]
+ elif fn == "consult_architect":
+ if status: status.update("[bold purple]Engineer consulting Architect...")
+ try:
+ # Consultation only - Engineer stays in control
+ claude_resp = completion(
+ model=self.architect_model,
+ messages=[
+ {"role": "system", "content": self.architect_system_prompt},
+ {"role": "user", "content": f"The Engineer needs your strategic advice.\n\nTECHNICAL SUMMARY: {args['technical_summary']}\n\nQUESTION: {args['question']}\n\nProvide strategic guidance. The Engineer will continue handling the user."}
+ ],
+ api_key=self.architect_key,
+ num_retries=3
+ )
+ obs = claude_resp.choices[0].message.content
+ if debug: console.print(Panel(Markdown(obs), title="[bold purple]Architect Consultation[/bold purple]", border_style="purple"))
+ except Exception as e:
+ if status: status.update("[bold orange3]Architect unavailable! Engineer continuing alone...")
+ obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
+
+ elif fn == "escalate_to_architect":
+ if status: status.update("[bold purple]Transferring control to Architect...")
+ # Full escalation - Architect takes over
+ current_brain = "architect"
+ model = self.architect_model
+ tools = self._get_architect_tools()
+ key = self.architect_key
+ messages[0] = {"role": "system", "content": self.architect_system_prompt}
+ # Prepare handover context to inject AFTER all tool responses
+ handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
+ pending_user_message = handover_msg
+ obs = "Control transferred to Architect. Handover context will be provided."
+ if debug: console.print(Panel(Text(handover_msg), title="[bold purple]Escalation to Architect[/bold purple]", border_style="purple"))
+
+ elif fn == "return_to_engineer":
+ if status: status.update("[bold blue]Transferring control back to Engineer...")
+ # Architect returns control to Engineer
+ current_brain = "engineer"
+ model = self.engineer_model
+ tools = self._get_engineer_tools()
+ key = self.engineer_key
+ messages[0] = {"role": "system", "content": self.engineer_system_prompt}
+ # Prepare handover context to inject AFTER all tool responses
+ handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
+ pending_user_message = handover_msg
+ obs = "Control returned to Engineer. Handover summary will be provided."
+ if debug: console.print(Panel(Text(handover_msg), title="[bold blue]Return to Engineer[/bold blue]", border_style="blue"))
+
+ elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
+ elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
+ elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
+ elif fn == "manage_memory_tool": obs = self.manage_memory_tool(**args)
+ elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
+ else: obs = f"Error: {fn} unknown."
+
+ messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
+
+ # Inject pending user message AFTER all tool responses are added
+ if pending_user_message:
+ messages.append({"role": "user", "content": pending_user_message})
+
+ if iteration >= self.hard_limit_iterations:
+ console.print(f"[red]⛔ Agent reached hard limit ({self.hard_limit_iterations} steps). Forcing stop to prevent infinite loop.[/red]")
+ # Only inject user message if we're not in the middle of tool calls
+ last_msg = messages[-1] if messages else {}
+ if last_msg.get("role") != "assistant" or not last_msg.get("tool_calls"):
+ messages.append({"role": "user", "content": "Hard iteration limit reached. Please provide a summary of your findings so far."})
+ try:
+ safe_messages = self._sanitize_messages(messages)
+ response = completion(model=model, messages=safe_messages, tools=[], api_key=key)
+ resp_msg = response.choices[0].message
+ messages.append(resp_msg.model_dump(exclude_none=True))
+ except:
+ pass
+
+ except KeyboardInterrupt:
+ if status: status.update("[bold red]Interrupted! Closing pending tasks...")
+ last_msg = messages[-1]
+ if last_msg.get("tool_calls"):
+ for tc in last_msg["tool_calls"]:
+ messages.append({"tool_call_id": tc.get("id"), "role": "tool", "name": tc.get("function", {}).get("name"), "content": "Operation cancelled by user."})
+ messages.append({"role": "user", "content": "USER INTERRUPTED. Briefly summarize what you were doing and stop."})
+ try:
+ safe_messages = self._sanitize_messages(messages)
+ response = completion(model=model, messages=safe_messages, tools=tools, api_key=key)
+ resp_msg = response.choices[0].message
+ messages.append(resp_msg.model_dump(exclude_none=True))
+ except: pass
+ finally:
+ try:
+ log_dir = self.config.defaultdir
+ os.makedirs(log_dir, exist_ok=True)
+ log_path = os.path.join(log_dir, "ai_debug.json")
+ hist = []
+ if os.path.exists(log_path):
+ try:
+ with open(log_path, "r") as f: hist = json.load(f)
+ except: hist = []
+ hist.append({"timestamp": datetime.datetime.now().isoformat(), "roles": {"strategic_engine": self.architect_model, "execution_engine": self.engineer_model}, "session": messages})
+ with open(log_path, "w") as f: json.dump(hist[-10:], f, indent=4)
+ except Exception as e:
+ if debug: console.print(f"[dim red]Debug log failed: {e}[/dim red]")
+
+ return {
+ "response": messages[-1].get("content"),
+ "chat_history": messages[1:],
+ "app_related": True,
+ "usage": usage,
+ "responder": current_brain, # "architect" or "engineer"
+ "streamed": streamed_response
+ }
+
+ @MethodHook
+ def confirm(self, user_input): return True
This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.
-- model (str): Model of GPT api to use. Default is gpt-4o-mini.
-
-- temp (float): Value between 0 and 1 that control the randomness
- of generated text, with higher values increasing
- creativity. Default is 0.7.
-
-- config (obj): Pass the object created with class configfile with
- key for decryption and extra configuration if you
- are using connection manager.
-
-- org (str): A unique token identifying the user organization
- to interact with the API.
-
-- api_key (str): A unique authentication token required to access
- and interact with the API.
-
-- model (str): Model of GPT api to use. Default is gpt-4o-mini.
-
-- temp (float): Value between 0 and 1 that control the randomness
- of generated text, with higher values increasing
- creativity. Default is 0.7.
-Hybrid Multi-Agent System: Selective Escalation with Role Persistence.
var SAFE_COMMANDSprop architect_system_prompt@property
+def architect_system_prompt(self):
+ """Build architect system prompt with plugin extensions."""
+ if self.architect_prompt_extensions:
+ extensions = "\n".join(self.architect_prompt_extensions)
+ return self._architect_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
+ return self._architect_base_prompt
+Build architect system prompt with plugin extensions.
prop engineer_system_prompt@property
+def engineer_system_prompt(self):
+ """Build engineer system prompt with plugin extensions."""
+ if self.engineer_prompt_extensions:
+ extensions = "\n".join(self.engineer_prompt_extensions)
+ return self._engineer_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
+ return self._engineer_base_prompt
+Build engineer system prompt with plugin extensions.
-def ask(self, user_input, dryrun=False, chat_history=None, max_retries=3, backoff_num=1)
+def ask(self,
user_input,
dryrun=False,
chat_history=None,
status=None,
debug=False,
stream=True)
@MethodHook
-def ask(self, user_input, dryrun = False, chat_history = None, max_retries=3, backoff_num=1):
- '''
- Send the user input to openAI GPT and parse the response to run an action in the application.
-
- ### Parameters:
-
- - user_input (str): Request to send to openAI that will be parsed
- and returned to execute on the application.
- AI understands the following tasks:
- - Run a command on a group of devices.
- - List a group of devices.
- - Test a command on a group of devices
- and verify if the output contain an
- expected value.
-
- ### Optional Parameters:
-
- - dryrun (bool): Set to true to get the arguments to use to
- run in the app. Default is false and it
- will run the actions directly.
- - chat_history (list): List in gpt api format for the chat history.
- - max_retries (int): Maximum number of retries for gpt api.
- - backoff_num (int): Backoff factor for exponential wait time
- between retries.
-
- ### Returns:
-
- dict: Dictionary formed with the following keys:
- - input: User input received
- - app_related: True if GPT detected the request to be related
- to the application.
- - dryrun: True/False
- - response: If the request is not related to the app. this
- key will contain chatGPT answer.
- - action: The action detected by the AI to run in the app.
- - filter: If it was detected by the AI, the filter used
- to get the list of nodes to work on.
- - nodes: If it's not a dryrun, the list of nodes matched by
- the filter.
- - args: A dictionary of arguments required to run command(s)
- on the nodes.
- - result: A dictionary with the output of the commands or
- the test.
- - chat_history: The chat history between user and chatbot.
- It can be used as an attribute for next request.
-
-
-
- '''
- output = {}
- output["dryrun"] = dryrun
- output["input"] = user_input
- original = self._retry_function(self._get_filter, max_retries, backoff_num, user_input, chat_history)
- if not original:
- output["app_related"] = False
- output["response"] = f"{self.model} api is not responding right now, please try again later."
- return output
- output["app_related"] = original["app_related"]
- output["chat_history"] = original["chat_history"]
- if not output["app_related"]:
- output["response"] = original["response"]
+def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=False, stream=True):
+ if chat_history is None: chat_history = []
+ usage = {"input": 0, "output": 0, "total": 0}
+
+ # 1. Selector de Rol inicial (Sticky Brain)
+ explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I)
+ explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I)
+
+ if explicit_architect:
+ current_brain = "architect"
+ elif explicit_engineer:
+ current_brain = "engineer"
else:
- type = original["type"]
- if "filter" in original:
- output["filter"] = original["filter"]
- if not self.config.config["case"]:
- if isinstance(output["filter"], list):
- output["filter"] = [item.lower() for item in output["filter"]]
+ # Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente
+ is_architect_active = False
+ for msg in reversed(chat_history[-5:]):
+ tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None)
+ if tcs:
+ for tc in tcs:
+ fn = tc.get('function', {}).get('name') if isinstance(tc, dict) else getattr(getattr(tc, 'function', None), 'name', '')
+ # Architect stays in control if delegating tasks or if Engineer escalated to them
+ # consult_architect is just Engineer asking for advice - Engineer keeps control
+ if fn in ['delegate_to_engineer', 'escalate_to_architect']:
+ is_architect_active = True; break
+ if is_architect_active: break
+ current_brain = "architect" if is_architect_active else "engineer"
+
+ # 2. Preparación de mensajes y limpieza
+ clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip()
+
+ system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt
+ tools = self._get_architect_tools() if current_brain == "architect" else self._get_engineer_tools()
+ model = self.architect_model if current_brain == "architect" else self.engineer_model
+ key = self.architect_key if current_brain == "architect" else self.engineer_key
+
+ # Estructura optimizada para Prompt Caching
+ if "claude" in model.lower():
+ messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}]
+ else:
+ messages = [{"role": "system", "content": system_prompt}]
+
+ # Interleaving de historial
+ last_role = "system"
+ for msg in chat_history[-self.max_history:]:
+ m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
+ role = m.get('role')
+ if role == last_role and role == 'user':
+ messages[-1]['content'] += "\n" + (m.get('content') or "")
+ continue
+ if role == 'assistant' and m.get('tool_calls') and m.get('content') == "": m['content'] = None
+ messages.append(m)
+ last_role = role
+
+ if last_role == 'user': messages[-1]['content'] += "\n" + clean_input
+ else: messages.append({"role": "user", "content": clean_input})
+
+ # 3. Bucle de ejecución
+ iteration = 0
+ soft_limit_warned = False
+ streamed_response = False
+
+ try:
+ while iteration < self.hard_limit_iterations:
+ iteration += 1
+
+ # Soft limit warning
+ if iteration == self.soft_limit_iterations and not soft_limit_warned:
+ console.print(f"[yellow]⚠ Agent has performed {iteration} steps. This is taking longer than expected.[/yellow]")
+ console.print(f"[yellow] You can press Ctrl+C to interrupt and get a summary of progress.[/yellow]")
+ soft_limit_warned = True
+
+ label = "[bold purple]Architect" if current_brain == "architect" else "[bold blue]Engineer"
+ if status: status.update(f"{label} is thinking... (step {iteration})")
+
+ streamed_response = False
+ try:
+ safe_messages = self._sanitize_messages(messages)
+ if stream and not debug:
+ response, streamed_response = self._stream_completion(
+ model=model, messages=safe_messages, tools=tools, api_key=key,
+ status=status, label=label, debug=debug, num_retries=3
+ )
else:
- output["filter"] = output["filter"].lower()
- if not dryrun or type == "command":
- thisnodes = self.config._getallnodesfull(output["filter"])
- output["nodes"] = list(thisnodes.keys())
- if not type == "command":
- output["action"] = "list_nodes"
- else:
- if thisnodes:
- commands = self._retry_function(self._get_commands, max_retries, backoff_num, user_input, thisnodes)
- else:
- output["app_related"] = False
- filterlist = ", ".join(output["filter"])
- output["response"] = f"I'm sorry, I coudn't find any device with filter{'s' if len(output['filter']) != 1 else ''}: {filterlist}."
- return output
- if not commands:
- output["app_related"] = False
- output["response"] = f"{self.model} api is not responding right now, please try again later."
- return output
- output["args"] = {}
- output["args"]["commands"] = commands["response"]["commands"]
- output["args"]["vars"] = commands["response"]["variables"]
- output["nodes"] = [item for item in output["nodes"] if output["args"]["vars"].get(item)]
- if original.get("expected"):
- output["args"]["expected"] = original["expected"]
- output["action"] = "test"
- else:
- output["action"] = "run"
- if dryrun:
- output["task"] = []
- if output["action"] == "test":
- output["task"].append({"Task": "Verify if expected value is in command(s) output"})
- output["task"].append({"Expected value to verify": output["args"]["expected"]})
- elif output["action"] == "run":
- output["task"].append({"Task": "Run command(s) on devices and return output"})
- varstocommands = deepcopy(output["args"]["vars"])
- del varstocommands["__global__"]
- output["task"].append({"Devices": varstocommands})
- if not dryrun:
- mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config)
- if output["action"] == "test":
- output["result"] = mynodes.test(**output["args"])
- output["logs"] = mynodes.output
- elif output["action"] == "run":
- output["result"] = mynodes.run(**output["args"])
- return output
+ response = completion(model=model, messages=safe_messages, tools=tools, api_key=key, num_retries=3)
+ except Exception as e:
+ if current_brain == "architect":
+ if status: status.update("[bold orange3]Architect unavailable! Falling back to Engineer...")
+ # Preserve context when falling back - use clean_input directly
+ current_brain = "engineer"
+ model = self.engineer_model
+ tools = self._get_engineer_tools()
+ key = self.engineer_key
+ # Rebuild messages with Engineer system prompt and original user request
+ messages = [{"role": "system", "content": self.engineer_system_prompt}]
+ # Add chat history if exists (excluding system prompt)
+ if chat_history:
+ for msg in chat_history[-self.max_history:]:
+ if msg.get('role') != 'system':
+ messages.append(msg)
+ # Add current user request
+ messages.append({"role": "user", "content": clean_input})
+ continue
+ else:
+ return {"response": f"Error: Both engines failed. {str(e)}", "chat_history": messages[1:], "usage": usage}
+
+ if hasattr(response, "usage") and response.usage:
+ usage["input"] += getattr(response.usage, "prompt_tokens", 0)
+ usage["output"] += getattr(response.usage, "completion_tokens", 0)
+ usage["total"] += getattr(response.usage, "total_tokens", 0)
+
+ resp_msg = response.choices[0].message
+ msg_dict = resp_msg.model_dump(exclude_none=True)
+ if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
+ messages.append(msg_dict)
+
+ if debug and resp_msg.content:
+ console.print(Panel(Markdown(resp_msg.content), title=f"{label} Reasoning", border_style="purple" if current_brain == "architect" else "blue"))
+
+ if not resp_msg.tool_calls: break
+
+ # Track if we need to inject a user message after all tool responses
+ pending_user_message = None
+
+ for tc in resp_msg.tool_calls:
+ fn, args = tc.function.name, json.loads(tc.function.arguments)
+
+ # Validate tool access based on current brain
+ if fn in ['delegate_to_engineer'] and current_brain != "architect":
+ obs = f"Error: Tool '{fn}' is only available to the Architect (Architect). You are the Engineer (Engineer). Use 'run_commands' directly to execute configuration."
+ messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
+ continue
+
+ if status:
+ if fn == "delegate_to_engineer": status.update(f"[bold purple]Architect: [DELEGATING MISSION] {args.get('task','')[:40]}...")
+ elif fn == "manage_memory_tool": status.update(f"[bold purple]Architect: [UPDATING MEMORY]")
+
+ if debug: console.print(Panel(Text(json.dumps(args, indent=2)), title=f"{label} Decision: {fn}", border_style="white"))
+
+ if fn == "delegate_to_engineer":
+ obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
+ usage["input"] += eng_usage["input"]; usage["output"] += eng_usage["output"]; usage["total"] += eng_usage["total"]
+ elif fn == "consult_architect":
+ if status: status.update("[bold purple]Engineer consulting Architect...")
+ try:
+ # Consultation only - Engineer stays in control
+ claude_resp = completion(
+ model=self.architect_model,
+ messages=[
+ {"role": "system", "content": self.architect_system_prompt},
+ {"role": "user", "content": f"The Engineer needs your strategic advice.\n\nTECHNICAL SUMMARY: {args['technical_summary']}\n\nQUESTION: {args['question']}\n\nProvide strategic guidance. The Engineer will continue handling the user."}
+ ],
+ api_key=self.architect_key,
+ num_retries=3
+ )
+ obs = claude_resp.choices[0].message.content
+ if debug: console.print(Panel(Markdown(obs), title="[bold purple]Architect Consultation[/bold purple]", border_style="purple"))
+ except Exception as e:
+ if status: status.update("[bold orange3]Architect unavailable! Engineer continuing alone...")
+ obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
+
+ elif fn == "escalate_to_architect":
+ if status: status.update("[bold purple]Transferring control to Architect...")
+ # Full escalation - Architect takes over
+ current_brain = "architect"
+ model = self.architect_model
+ tools = self._get_architect_tools()
+ key = self.architect_key
+ messages[0] = {"role": "system", "content": self.architect_system_prompt}
+ # Prepare handover context to inject AFTER all tool responses
+ handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
+ pending_user_message = handover_msg
+ obs = "Control transferred to Architect. Handover context will be provided."
+ if debug: console.print(Panel(Text(handover_msg), title="[bold purple]Escalation to Architect[/bold purple]", border_style="purple"))
+
+ elif fn == "return_to_engineer":
+ if status: status.update("[bold blue]Transferring control back to Engineer...")
+ # Architect returns control to Engineer
+ current_brain = "engineer"
+ model = self.engineer_model
+ tools = self._get_engineer_tools()
+ key = self.engineer_key
+ messages[0] = {"role": "system", "content": self.engineer_system_prompt}
+ # Prepare handover context to inject AFTER all tool responses
+ handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
+ pending_user_message = handover_msg
+ obs = "Control returned to Engineer. Handover summary will be provided."
+ if debug: console.print(Panel(Text(handover_msg), title="[bold blue]Return to Engineer[/bold blue]", border_style="blue"))
+
+ elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
+ elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
+ elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
+ elif fn == "manage_memory_tool": obs = self.manage_memory_tool(**args)
+ elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
+ else: obs = f"Error: {fn} unknown."
+
+ messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
+
+ # Inject pending user message AFTER all tool responses are added
+ if pending_user_message:
+ messages.append({"role": "user", "content": pending_user_message})
+
+ if iteration >= self.hard_limit_iterations:
+ console.print(f"[red]⛔ Agent reached hard limit ({self.hard_limit_iterations} steps). Forcing stop to prevent infinite loop.[/red]")
+ # Only inject user message if we're not in the middle of tool calls
+ last_msg = messages[-1] if messages else {}
+ if last_msg.get("role") != "assistant" or not last_msg.get("tool_calls"):
+ messages.append({"role": "user", "content": "Hard iteration limit reached. Please provide a summary of your findings so far."})
+ try:
+ safe_messages = self._sanitize_messages(messages)
+ response = completion(model=model, messages=safe_messages, tools=[], api_key=key)
+ resp_msg = response.choices[0].message
+ messages.append(resp_msg.model_dump(exclude_none=True))
+ except:
+ pass
+
+ except KeyboardInterrupt:
+ if status: status.update("[bold red]Interrupted! Closing pending tasks...")
+ last_msg = messages[-1]
+ if last_msg.get("tool_calls"):
+ for tc in last_msg["tool_calls"]:
+ messages.append({"tool_call_id": tc.get("id"), "role": "tool", "name": tc.get("function", {}).get("name"), "content": "Operation cancelled by user."})
+ messages.append({"role": "user", "content": "USER INTERRUPTED. Briefly summarize what you were doing and stop."})
+ try:
+ safe_messages = self._sanitize_messages(messages)
+ response = completion(model=model, messages=safe_messages, tools=tools, api_key=key)
+ resp_msg = response.choices[0].message
+ messages.append(resp_msg.model_dump(exclude_none=True))
+ except: pass
+ finally:
+ try:
+ log_dir = self.config.defaultdir
+ os.makedirs(log_dir, exist_ok=True)
+ log_path = os.path.join(log_dir, "ai_debug.json")
+ hist = []
+ if os.path.exists(log_path):
+ try:
+ with open(log_path, "r") as f: hist = json.load(f)
+ except: hist = []
+ hist.append({"timestamp": datetime.datetime.now().isoformat(), "roles": {"strategic_engine": self.architect_model, "execution_engine": self.engineer_model}, "session": messages})
+ with open(log_path, "w") as f: json.dump(hist[-10:], f, indent=4)
+ except Exception as e:
+ if debug: console.print(f"[dim red]Debug log failed: {e}[/dim red]")
+
+ return {
+ "response": messages[-1].get("content"),
+ "chat_history": messages[1:],
+ "app_related": True,
+ "usage": usage,
+ "responder": current_brain, # "architect" or "engineer"
+ "streamed": streamed_response
+ }
Send the user input to openAI GPT and parse the response to run an action in the application.
-- user_input (str): Request to send to openAI that will be parsed
- and returned to execute on the application.
- AI understands the following tasks:
- - Run a command on a group of devices.
- - List a group of devices.
- - Test a command on a group of devices
- and verify if the output contain an
- expected value.
-
-- dryrun (bool): Set to true to get the arguments to use to
- run in the app. Default is false and it
- will run the actions directly.
-- chat_history (list): List in gpt api format for the chat history.
-- max_retries (int): Maximum number of retries for gpt api.
-- backoff_num (int): Backoff factor for exponential wait time
- between retries.
-
-dict: Dictionary formed with the following keys:
- - input: User input received
- - app_related: True if GPT detected the request to be related
- to the application.
- - dryrun: True/False
- - response: If the request is not related to the app. this
- key will contain chatGPT answer.
- - action: The action detected by the AI to run in the app.
- - filter: If it was detected by the AI, the filter used
- to get the list of nodes to work on.
- - nodes: If it's not a dryrun, the list of nodes matched by
- the filter.
- - args: A dictionary of arguments required to run command(s)
- on the nodes.
- - result: A dictionary with the output of the commands or
- the test.
- - chat_history: The chat history between user and chatbot.
- It can be used as an attribute for next request.
-
-def confirm(self, user_input, max_retries=3, backoff_num=1)
+def confirm(self, user_input)
@MethodHook
-def confirm(self, user_input, max_retries=3, backoff_num=1):
- '''
- Send the user input to openAI GPT and verify if response is afirmative or negative.
-
- ### Parameters:
-
- - user_input (str): User response confirming or denying.
-
- ### Optional Parameters:
-
- - max_retries (int): Maximum number of retries for gpt api.
- - backoff_num (int): Backoff factor for exponential wait time
- between retries.
-
- ### Returns:
-
- bool or str: True, False or str if AI coudn't understand the response
- '''
- result = self._retry_function(self._get_confirmation, max_retries, backoff_num, user_input)
- if result:
- output = result["result"]
- else:
- output = f"{self.model} api is not responding right now, please try again later."
- return output
+def confirm(self, user_input): return True
Send the user input to openAI GPT and verify if response is afirmative or negative.
-- user_input (str): User response confirming or denying.
-
-- max_retries (int): Maximum number of retries for gpt api.
-- backoff_num (int): Backoff factor for exponential wait time
- between retries.
-
-bool or str: True, False or str if AI coudn't understand the response
-
+def get_node_info_tool(self, node_name)
+def get_node_info_tool(self, node_name):
+ """Get detailed metadata for a specific node. Passwords are masked."""
+ try:
+ d = self.config.getitem(node_name, extract=True)
+ if 'password' in d: d['password'] = '***'
+ return json.dumps(d)
+ except Exception as e:
+ return f"Error getting node info: {str(e)}"
+Get detailed metadata for a specific node. Passwords are masked.
+def list_nodes_tool(self, filter_pattern='.*')
+def list_nodes_tool(self, filter_pattern=".*"):
+ """List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more."""
+ try:
+ matched_names = self.config._getallnodes(filter_pattern)
+ if not matched_names: return "No nodes found."
+ if len(matched_names) <= 5:
+ matched_data = self.config.getitems(matched_names, extract=True)
+ res = {}
+ for name, data in matched_data.items():
+ os_tag = "unknown"
+ if isinstance(data, dict):
+ ts = data.get("tags")
+ if isinstance(ts, dict): os_tag = ts.get("os", "unknown")
+ res[name] = {"os": os_tag}
+ return json.dumps(res)
+ return json.dumps({"count": len(matched_names), "nodes": matched_names, "note": "Use 'get_node_info' for details."})
+ except Exception as e:
+ return f"Error listing nodes: {str(e)}"
+List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more.
+def manage_memory_tool(self, content, action='append')
+def manage_memory_tool(self, content, action="append"):
+ """Save or update long-term memory. Only use when user explicitly requests it."""
+ if not content or not content.strip():
+ return "Error: Cannot save empty content to memory."
+
+ try:
+ mode = "a" if action == "append" else "w"
+ os.makedirs(os.path.dirname(self.memory_path), exist_ok=True)
+ with open(self.memory_path, mode) as f:
+ timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
+ f.write(f"\n\n## {timestamp}\n{content.strip()}\n" if action == "append" else content)
+
+ # Reload memory after update
+ with open(self.memory_path, "r") as f:
+ self.long_term_memory = f.read()
+
+ return "Memory updated successfully."
+ except PermissionError as e:
+ return f"Error: Permission denied writing to memory file: {e}"
+ except Exception as e:
+ return f"Error updating memory: {str(e)}"
+Save or update long-term memory. Only use when user explicitly requests it.
+def register_ai_tool(self,
tool_definition,
handler,
target='engineer',
engineer_prompt=None,
architect_prompt=None,
status_formatter=None)
+def register_ai_tool(self, tool_definition, handler, target="engineer", engineer_prompt=None, architect_prompt=None, status_formatter=None):
+ """Register an external tool for the AI system.
+
+ Args:
+ tool_definition (dict): OpenAI-compatible tool definition.
+ handler (callable): Function(ai_instance, **tool_args) -> str.
+ target (str): 'engineer', 'architect', or 'both'.
+ engineer_prompt (str): Extra text for engineer system prompt.
+ architect_prompt (str): Extra text for architect system prompt.
+ status_formatter (callable): Function(args_dict) -> status string.
+ """
+ name = tool_definition["function"]["name"]
+ if target in ("engineer", "both"):
+ self.external_engineer_tools.append(tool_definition)
+ if target in ("architect", "both"):
+ self.external_architect_tools.append(tool_definition)
+ self.external_tool_handlers[name] = handler
+ if engineer_prompt:
+ self.engineer_prompt_extensions.append(engineer_prompt)
+ if architect_prompt:
+ self.architect_prompt_extensions.append(architect_prompt)
+ if status_formatter:
+ self.tool_status_formatters[name] = status_formatter
+Register an external tool for the AI system.
+tool_definition : dicthandler : callabletarget : strengineer_prompt : strarchitect_prompt : strstatus_formatter : callable
+def run_commands_tool(self, nodes_filter, commands, status=None)
+def run_commands_tool(self, nodes_filter, commands, status=None):
+ """Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands."""
+ # Handle if commands is a JSON string
+ if isinstance(commands, str):
+ try:
+ commands = json.loads(commands)
+ except:
+ commands = [c.strip() for c in commands.split('\n') if c.strip()]
+
+ # Expand multi-line commands within a list (in case the AI packs them)
+ if isinstance(commands, list):
+ expanded_commands = []
+ for cmd in commands:
+ expanded_commands.extend([c.strip() for c in str(cmd).split('\n') if c.strip()])
+ commands = expanded_commands
+ else:
+ commands = [str(commands)]
+
+ # Check command safety natively
+ if not self.trusted_session:
+ unsafe_commands = [cmd for cmd in commands if not self._is_safe_command(cmd)]
+ if unsafe_commands:
+ # Stop the spinner so prompt doesn't get messed up
+ if status: status.stop()
+
+ # Show ALL commands with unsafe ones highlighted
+ formatted_cmds = []
+ for cmd in commands:
+ if cmd in unsafe_commands:
+ formatted_cmds.append(f" • [yellow]{cmd}[/yellow]")
+ else:
+ formatted_cmds.append(f" • {cmd}")
+
+ panel_content = f"Target: {nodes_filter}\nCommands:\n" + "\n".join(formatted_cmds)
+ console.print(Panel(panel_content, title="[bold yellow]⚠️ UNSAFE COMMANDS DETECTED[/bold yellow]", border_style="yellow"))
+
+ try:
+ from rich.prompt import Prompt
+ user_resp = Prompt.ask("[bold yellow]Execute? (y: yes / n: no / a: allow all this session / <text>: feedback)[/bold yellow]", default="n")
+ except KeyboardInterrupt:
+ if status: status.update("[bold blue]Engineer: Resuming...")
+ console.print("[bold red]✗ Aborted by user (Ctrl+C).[/bold red]")
+ return "Error: User cancelled execution (Ctrl+C)."
+
+ # Resume the spinner
+ if status: status.update("[bold blue]Engineer: Processing user response...")
+
+ user_resp_lower = user_resp.strip().lower()
+ if user_resp_lower in ['a', 'allow']:
+ self.trusted_session = True
+ console.print("[bold green]✓ Trust Mode Enabled. All future commands in this session will execute without confirmation.[/bold green]")
+ elif user_resp_lower in ['y', 'yes']:
+ console.print("[bold green]✓ Executing...[/bold green]")
+ elif user_resp_lower in ['n', 'no', '']:
+ console.print("[bold red]✗ Execution rejected by user.[/bold red]")
+ return "Error: User rejected execution."
+ else:
+ console.print(f"[bold cyan]User feedback: [/bold cyan]{user_resp}")
+ return f"User requested changes: {user_resp}. Please adjust the commands based on this feedback and try again."
+
+ try:
+ matched_names = self.config._getallnodes(nodes_filter)
+ if not matched_names: return "No nodes found matching filter."
+ thisnodes_dict = self.config.getitems(matched_names, extract=True)
+ result = nodes(thisnodes_dict, config=self.config).run(commands)
+ return self._truncate(json.dumps(result))
+ except Exception as e:
+ return f"Error executing commands: {str(e)}"
+Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands.
-def getitem(self, unique, keys=None)
+def getitem(self, unique, keys=None, extract=False)
@MethodHook
-def getitem(self, unique, keys = None):
+def getitem(self, unique, keys = None, extract = False):
'''
Get an node or a group of nodes from configfile which can be passed to node/nodes class
@@ -2053,6 +2744,8 @@ def getitem(self, unique, keys = None):
- keys (list): In case you pass a folder as unique, you can filter
nodes inside the folder passing a list.
+ - extract (bool): If True, extract information from profiles.
+ Default False.
### Returns:
@@ -2068,21 +2761,35 @@ def getitem(self, unique, keys = None):
folder = self.connections[uniques["folder"]]
newfolder = deepcopy(folder)
newfolder.pop("type")
- for node in folder.keys():
- if node == "type":
+ for node_name in folder.keys():
+ if node_name == "type":
continue
- if "type" in newfolder[node].keys():
- if newfolder[node]["type"] == "subfolder":
- newfolder.pop(node)
+ if "type" in newfolder[node_name].keys():
+ if newfolder[node_name]["type"] == "subfolder":
+ newfolder.pop(node_name)
else:
- newfolder[node].pop("type")
- if keys == None:
- newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()}
- return newfolder
- else:
- f_newfolder = dict((k, newfolder[k]) for k in keys)
- f_newfolder = {"{}{}".format(k,unique):v for k,v in f_newfolder.items()}
- return f_newfolder
+ newfolder[node_name].pop("type")
+
+ if keys != None:
+ newfolder = dict((k, newfolder[k]) for k in keys)
+
+ if extract:
+ for node_name, node_keys in newfolder.items():
+ for key, value in node_keys.items():
+ profile = re.search("^@(.*)", str(value))
+ if profile:
+ try:
+ newfolder[node_name][key] = self.profiles[profile.group(1)][key]
+ except:
+ newfolder[node_name][key] = ""
+ elif value == '' and key == "protocol":
+ try:
+ newfolder[node_name][key] = self.profiles["default"][key]
+ except:
+ newfolder[node_name][key] = "ssh"
+
+ newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()}
+ return newfolder
else:
if uniques.keys() >= {"folder", "subfolder"}:
node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]]
@@ -2092,6 +2799,20 @@ def getitem(self, unique, keys = None):
node = self.connections[uniques["id"]]
newnode = deepcopy(node)
newnode.pop("type")
+
+ if extract:
+ for key, value in newnode.items():
+ profile = re.search("^@(.*)", str(value))
+ if profile:
+ try:
+ newnode[key] = self.profiles[profile.group(1)][key]
+ except:
+ newnode[key] = ""
+ elif value == '' and key == "protocol":
+ try:
+ newnode[key] = self.profiles["default"][key]
+ except:
+ newnode[key] = "ssh"
return newnode
Get an node or a group of nodes from configfile which can be passed to node/nodes class
@@ -2103,6 +2824,8 @@ def getitem(self, unique, keys = None):- keys (list): In case you pass a folder as unique, you can filter
nodes inside the folder passing a list.
+- extract (bool): If True, extract information from profiles.
+ Default False.
dict: Dictionary containing information of node or multiple
@@ -2110,7 +2833,7 @@ def getitem(self, unique, keys = None):
-def getitems(self, uniques)
+def getitems(self, uniques, extract=False)
@MethodHook
-def getitems(self, uniques):
+def getitems(self, uniques, extract = False):
'''
Get a group of nodes from configfile which can be passed to node/nodes class
@@ -2128,6 +2851,11 @@ def getitems(self, uniques):
from the connection manager. It can be a
list of strings.
+ ### Optional Parameters:
+
+ - extract (bool): If True, extract information from profiles.
+ Default False.
+
### Returns:
dict: Dictionary containing information of node or multiple
@@ -2144,17 +2872,17 @@ def getitems(self, uniques):
if not self.config["case"]:
name = name.lower()
mylist = [item.lower() for item in mylist]
- this = self.getitem(name, mylist)
+ this = self.getitem(name, mylist, extract = extract)
nodes.update(this)
elif i.startswith("@"):
if not self.config["case"]:
i = i.lower()
- this = self.getitem(i)
+ this = self.getitem(i, extract = extract)
nodes.update(this)
else:
if not self.config["case"]:
i = i.lower()
- this = self.getitem(i)
+ this = self.getitem(i, extract = extract)
nodes[i] = this
return nodes
- extract (bool): If True, extract information from profiles.
+ Default False.
+
dict: Dictionary containing information of node or multiple
dictionaries of multiple nodes.
@@ -3268,7 +4000,7 @@ class nodes:
@MethodHook
- def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None):
+ def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None, on_complete = None):
'''
Run a command or list of commands on all the nodes in nodelist.
@@ -3309,6 +4041,11 @@ class nodes:
- timeout (int): Time in seconds for expect to wait for prompt/EOF.
default 10.
+ - on_complete (callable): Optional callback called when each node
+ finishes. Receives (unique, output, status).
+ Called from the node's thread so it must
+ be thread-safe.
+
###Returns:
dict: Dictionary formed by nodes unique as keys, Output of the
@@ -3323,13 +4060,20 @@ class nodes:
Path(folder).mkdir(parents=True, exist_ok=True)
if prompt != None:
args["prompt"] = prompt
- if stdout != None:
+ if stdout != None and on_complete is None:
args["stdout"] = stdout
if timeout != None:
args["timeout"] = timeout
output = {}
status = {}
tasks = []
+
+ def _run_node(node_obj, node_args, callback):
+ """Wrapper that runs a node and fires the callback on completion."""
+ node_obj.run(**node_args)
+ if callback:
+ callback(node_obj.unique, node_obj.output, node_obj.status)
+
for n in self.nodelist:
nodesargs[n.unique] = deepcopy(args)
if vars != None:
@@ -3338,7 +4082,10 @@ class nodes:
nodesargs[n.unique]["vars"].update(vars["__global__"])
if n.unique in vars.keys():
nodesargs[n.unique]["vars"].update(vars[n.unique])
- tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
+ if on_complete:
+ tasks.append(threading.Thread(target=_run_node, args=(n, nodesargs[n.unique], on_complete)))
+ else:
+ tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
taskslist = list(self._splitlist(tasks, parallel))
for t in taskslist:
for i in t:
@@ -3472,7 +4219,7 @@ class nodes:
Methods
-def run(self,
commands,
vars=None,
*,
folder=None,
prompt=None,
stdout=None,
parallel=10,
timeout=None)
+def run(self,
commands,
vars=None,
*,
folder=None,
prompt=None,
stdout=None,
parallel=10,
timeout=None,
on_complete=None)
-
@@ -3480,7 +4227,7 @@ class nodes:
Expand source code
@MethodHook
-def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None):
+def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None, on_complete = None):
'''
Run a command or list of commands on all the nodes in nodelist.
@@ -3521,6 +4268,11 @@ def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = No
- timeout (int): Time in seconds for expect to wait for prompt/EOF.
default 10.
+ - on_complete (callable): Optional callback called when each node
+ finishes. Receives (unique, output, status).
+ Called from the node's thread so it must
+ be thread-safe.
+
###Returns:
dict: Dictionary formed by nodes unique as keys, Output of the
@@ -3535,13 +4287,20 @@ def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = No
Path(folder).mkdir(parents=True, exist_ok=True)
if prompt != None:
args["prompt"] = prompt
- if stdout != None:
+ if stdout != None and on_complete is None:
args["stdout"] = stdout
if timeout != None:
args["timeout"] = timeout
output = {}
status = {}
tasks = []
+
+ def _run_node(node_obj, node_args, callback):
+ """Wrapper that runs a node and fires the callback on completion."""
+ node_obj.run(**node_args)
+ if callback:
+ callback(node_obj.unique, node_obj.output, node_obj.status)
+
for n in self.nodelist:
nodesargs[n.unique] = deepcopy(args)
if vars != None:
@@ -3550,7 +4309,10 @@ def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = No
nodesargs[n.unique]["vars"].update(vars["__global__"])
if n.unique in vars.keys():
nodesargs[n.unique]["vars"].update(vars[n.unique])
- tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
+ if on_complete:
+ tasks.append(threading.Thread(target=_run_node, args=(n, nodesargs[n.unique], on_complete)))
+ else:
+ tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
taskslist = list(self._splitlist(tasks, parallel))
for t in taskslist:
for i in t:
@@ -3598,6 +4360,11 @@ def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = No
- timeout (int): Time in seconds for expect to wait for prompt/EOF.
default 10.
+
+- on_complete (callable): Optional callback called when each node
+ finishes. Receives (unique, output, status).
+ Called from the node's thread so it must
+ be thread-safe.
Returns:
dict: Dictionary formed by nodes unique as keys, Output of the
@@ -3803,7 +4570,10 @@ def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10,
- Using manager configuration
- Running parallel tasks
- Using variables
-- Using AI
+- Using AI
+
@@ -3820,8 +4590,16 @@ def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10,
-
ai
-
diff --git a/requirements.txt b/requirements.txt
index ed00f25..0ecbc86 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,7 +3,7 @@ Flask_Cors>=4.0.1
google_api_python_client>=2.125.0
google_auth_oauthlib>=1.2.0
inquirer>=3.3.0
-openai>=1.98.0
+litellm>=1.40.0
pexpect>=4.8.0
protobuf>=5.27.2
pycryptodome>=3.18.0
diff --git a/setup.cfg b/setup.cfg
index 28a9aef..95d7444 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -33,7 +33,7 @@ install_requires =
pyfzf
waitress
PyYAML
- openai
+ litellm
rich
protobuf
google_api_python_client