feat: complete overhaul of AI subsystem with multi-agent litellm architecture

- Refactored AI module to use litellm, supporting Anthropic, Google, OpenAI, etc.
- Introduced 'Engineer' (execution) and 'Architect' (strategic) AI agents.
- Added real-time streaming responses and interactive chat mode via 'rich'.
- Added CLI arguments for model/key overrides (--engineer-model, --architect-model).
- Replaced 'openai' with 'litellm' in requirements.txt and setup.cfg.
- Updated nodes.run() to support an 'on_complete' callback for live node-status streaming.
- Fixed an undefined variable bug (config.profiles -> self.profiles) in configfile.py.
- Updated README and docstrings with new AI plugin tool registration API.
- Regenerated HTML documentation using pdoc3.
- Bumped version to 5.0b1 for beta release.
This commit is contained in:
2026-04-03 15:11:37 -03:00
parent b2c414485c
commit 257cb05cc1
12 changed files with 2646 additions and 1353 deletions
+9
View File
@@ -133,3 +133,12 @@ dmypy.json
#App
connpy-completion-helper
# Gemini & AI Tools
.gemini/
GEMINI.md
# Node.js (used by Gemini CLI or plugins)
node_modules/
package-lock.json
package.json
+42 -9
View File
@@ -56,7 +56,9 @@ For more detailed information, please read our [Privacy Policy](https://connpy.g
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- Use GPT AI to help you manage your devices.
- Use AI with a multi-agent system (Engineer/Architect) to manage devices.
Supports any LLM provider via litellm (OpenAI, Anthropic, Google, etc.).
Features streaming responses, interactive chat, and extensible plugin tools.
- Add plugins with your own scripts.
- Much more!
@@ -428,15 +430,46 @@ for key in routers.result:
print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))
```
### Using AI
```
The AI module uses a multi-agent architecture with an **Engineer** (fast execution) and an **Architect** (strategic reasoning). It supports any LLM provider through [litellm](https://github.com/BerriAI/litellm).
```python
import connpy
conf = connpy.configfile()
organization = 'openai-org'
api_key = "openai-key"
myia = connpy.ai(conf, organization, api_key)
input = "go to router 1 and get me the full configuration"
result = myia.ask(input, dryrun = False)
print(result)
# Uses models and API keys from config, or override them:
myai = connpy.ai(conf, engineer_model="gemini/gemini-2.5-flash", engineer_api_key="your-key")
result = myai.ask("go to router1 and show me the running configuration")
print(result["response"])
# Streaming is enabled by default for CLI, disable for programmatic use:
result = myai.ask("show interfaces on all routers", stream=False)
print(result["response"])
```
#### AI Plugin Tool Registration
Plugins can extend the AI system by registering custom tools via the `Preload` class:
```python
def _register_my_tools(ai_instance):
tool_def = {
"type": "function",
"function": {
"name": "my_custom_tool",
"description": "Does something useful.",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"]
}
}
}
ai_instance.register_ai_tool(
tool_definition=tool_def,
handler=my_handler_function,
target="engineer", # or "architect" or "both"
engineer_prompt="- My tool: does X.",
architect_prompt=" * My tool (my_custom_tool)."
)
class Preload:
def __init__(self, connapp):
connapp.ai.modify(_register_my_tools)
```
## http API
With the Connpy API you can run commands on devices using http requests
@@ -527,7 +560,7 @@ With the Connpy API you can run commands on devices using http requests
**Method**: `POST`
**Description**: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request.
**Description**: This route sends a request to the AI multi-agent system which will analyze it, execute commands on devices if needed, and return the result. Supports any LLM provider configured via litellm.
#### Request Body:
+38 -7
View File
@@ -15,7 +15,8 @@ Connpy is a SSH, SFTP, Telnet, kubectl, and Docker pod connection manager and au
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- Use GPT AI to help you manage your devices.
- Use AI with a multi-agent system (Engineer/Architect) to help you manage your devices.
Supports any LLM provider via litellm (OpenAI, Anthropic, Google, etc.).
- Add plugins with your own scripts.
- Much more!
@@ -496,12 +497,42 @@ for key in routers.result:
```
import connpy
conf = connpy.configfile()
organization = 'openai-org'
api_key = "openai-key"
myia = connpy.ai(conf, organization, api_key)
input = "go to router 1 and get me the full configuration"
result = myia.ask(input, dryrun = False)
print(result)
# Uses models and API keys from config, or override them:
myai = connpy.ai(conf, engineer_model="gemini/gemini-2.5-flash", engineer_api_key="your-key")
result = myai.ask("go to router1 and show me the running configuration")
print(result["response"])
# Streaming is enabled by default for CLI, disable for programmatic use:
result = myai.ask("show interfaces on all routers", stream=False)
print(result["response"])
```
#### AI Plugin Tool Registration
Plugins can register custom tools with the AI system using `register_ai_tool()` in their `Preload` class:
```
def _register_my_tools(ai_instance):
tool_def = {
"type": "function",
"function": {
"name": "my_custom_tool",
"description": "Does something useful.",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"]
}
}
}
ai_instance.register_ai_tool(
tool_definition=tool_def,
handler=my_handler_function,
target="engineer", # or "architect" or "both"
engineer_prompt="- My tool: does X.",
architect_prompt=" * My tool (my_custom_tool)."
)
class Preload:
def __init__(self, connapp):
connapp.ai.modify(_register_my_tools)
```
'''
from .core import node,nodes
+1 -1
View File
@@ -1,2 +1,2 @@
__version__ = "4.2"
__version__ = "5.0b1"
+814 -472
View File
File diff suppressed because it is too large Load Diff
+3 -3
View File
@@ -135,15 +135,15 @@ def main():
exit()
elif wordsnumber >= 3 and words[0] == "ai":
if wordsnumber == 3:
strings = ["--help", "--org", "--model", "--api_key"]
strings = ["--help", "--engineer-model", "--engineer-api-key", "--architect-model", "--architect-api-key", "--debug"]
else:
strings = ["--org", "--model", "--api_key"]
strings = ["--engineer-model", "--engineer-api-key", "--architect-model", "--architect-api-key", "--debug"]
elif wordsnumber == 3:
strings=[]
if words[0] == "profile":
strings=["--add", "--rm", "--del", "--edit", "--mod", "--show", "--help"]
if words[0] == "config":
strings=["--allow-uppercase", "--keepalive", "--completion", "--fzf", "--configfolder", "--openai-org", "--openai-org-api-key", "--openai-org-model","--help"]
strings=["--allow-uppercase", "--keepalive", "--completion", "--fzf", "--configfolder", "--engineer-model", "--engineer-api-key", "--architect-model", "--architect-api-key", "--help"]
if words[0] == "api":
strings=["--start", "--stop", "--restart", "--debug", "--help"]
if words[0] in ["--mod", "--edit", "-e", "--show", "-s", "--add", "-a", "--rm", "--del", "-r"]:
+54 -19
View File
@@ -155,7 +155,7 @@ class configfile:
return result
@MethodHook
def getitem(self, unique, keys = None):
def getitem(self, unique, keys = None, extract = False):
'''
Get an node or a group of nodes from configfile which can be passed to node/nodes class
@@ -169,6 +169,8 @@ class configfile:
- keys (list): In case you pass a folder as unique, you can filter
nodes inside the folder passing a list.
- extract (bool): If True, extract information from profiles.
Default False.
### Returns:
@@ -184,21 +186,35 @@ class configfile:
folder = self.connections[uniques["folder"]]
newfolder = deepcopy(folder)
newfolder.pop("type")
for node in folder.keys():
if node == "type":
for node_name in folder.keys():
if node_name == "type":
continue
if "type" in newfolder[node].keys():
if newfolder[node]["type"] == "subfolder":
newfolder.pop(node)
if "type" in newfolder[node_name].keys():
if newfolder[node_name]["type"] == "subfolder":
newfolder.pop(node_name)
else:
newfolder[node].pop("type")
if keys == None:
newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()}
return newfolder
else:
f_newfolder = dict((k, newfolder[k]) for k in keys)
f_newfolder = {"{}{}".format(k,unique):v for k,v in f_newfolder.items()}
return f_newfolder
newfolder[node_name].pop("type")
if keys != None:
newfolder = dict((k, newfolder[k]) for k in keys)
if extract:
for node_name, node_keys in newfolder.items():
for key, value in node_keys.items():
profile = re.search("^@(.*)", str(value))
if profile:
try:
newfolder[node_name][key] = self.profiles[profile.group(1)][key]
except:
newfolder[node_name][key] = ""
elif value == '' and key == "protocol":
try:
newfolder[node_name][key] = self.profiles["default"][key]
except:
newfolder[node_name][key] = "ssh"
newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()}
return newfolder
else:
if uniques.keys() >= {"folder", "subfolder"}:
node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]]
@@ -208,10 +224,24 @@ class configfile:
node = self.connections[uniques["id"]]
newnode = deepcopy(node)
newnode.pop("type")
if extract:
for key, value in newnode.items():
profile = re.search("^@(.*)", str(value))
if profile:
try:
newnode[key] = self.profiles[profile.group(1)][key]
except:
newnode[key] = ""
elif value == '' and key == "protocol":
try:
newnode[key] = self.profiles["default"][key]
except:
newnode[key] = "ssh"
return newnode
@MethodHook
def getitems(self, uniques):
def getitems(self, uniques, extract = False):
'''
Get a group of nodes from configfile which can be passed to node/nodes class
@@ -221,6 +251,11 @@ class configfile:
from the connection manager. It can be a
list of strings.
### Optional Parameters:
- extract (bool): If True, extract information from profiles.
Default False.
### Returns:
dict: Dictionary containing information of node or multiple
@@ -237,17 +272,17 @@ class configfile:
if not self.config["case"]:
name = name.lower()
mylist = [item.lower() for item in mylist]
this = self.getitem(name, mylist)
this = self.getitem(name, mylist, extract = extract)
nodes.update(this)
elif i.startswith("@"):
if not self.config["case"]:
i = i.lower()
this = self.getitem(i)
this = self.getitem(i, extract = extract)
nodes.update(this)
else:
if not self.config["case"]:
i = i.lower()
this = self.getitem(i)
this = self.getitem(i, extract = extract)
nodes[i] = this
return nodes
@@ -360,7 +395,7 @@ class configfile:
nodes[node][key] = ""
elif value == '' and key == "protocol":
try:
nodes[node][key] = config.profiles["default"][key]
nodes[node][key] = self.profiles["default"][key]
except:
nodes[node][key] = "ssh"
return nodes
+163 -113
View File
@@ -24,7 +24,9 @@ from rich.panel import Panel
from rich.text import Text
from rich.rule import Rule
from rich.style import Style
from rich.prompt import Prompt
mdprint = Console().print
console = Console()
try:
from pyfzf.pyfzf import FzfPrompt
except:
@@ -129,9 +131,11 @@ class connapp:
# AIPARSER
aiparser = subparsers.add_parser("ai", description="Make request to an AI")
aiparser.add_argument("ask", nargs='*', help="Ask connpy AI something")
aiparser.add_argument("--model", nargs=1, help="Set the OPENAI model id")
aiparser.add_argument("--org", nargs=1, help="Set the OPENAI organization id")
aiparser.add_argument("--api_key", nargs=1, help="Set the OPENAI API key")
aiparser.add_argument("--engineer-model", nargs=1, help="Override engineer model")
aiparser.add_argument("--engineer-api-key", nargs=1, help="Override engineer api key")
aiparser.add_argument("--architect-model", nargs=1, help="Override architect model")
aiparser.add_argument("--architect-api-key", nargs=1, help="Override architect api key")
aiparser.add_argument("--debug", action="store_true", help="Show AI reasoning and tool calls")
aiparser.set_defaults(func=self._func_ai)
#RUNPARSER
runparser = subparsers.add_parser("run", description="Run scripts or commands on nodes", formatter_class=argparse.RawTextHelpFormatter)
@@ -164,9 +168,10 @@ class connapp:
configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
configcrud.add_argument("--openai-org", dest="organization", nargs=1, action=self._store_type, help="Set openai organization", metavar="ORGANIZATION")
configcrud.add_argument("--openai-api-key", dest="api_key", nargs=1, action=self._store_type, help="Set openai api_key", metavar="API_KEY")
configcrud.add_argument("--openai-model", dest="model", nargs=1, action=self._store_type, help="Set openai model", metavar="MODEL")
configcrud.add_argument("--engineer-model", dest="engineer_model", nargs=1, action=self._store_type, help="Set engineer model", metavar="MODEL")
configcrud.add_argument("--engineer-api-key", dest="engineer_api_key", nargs=1, action=self._store_type, help="Set engineer api_key", metavar="API_KEY")
configcrud.add_argument("--architect-model", dest="architect_model", nargs=1, action=self._store_type, help="Set architect model", metavar="MODEL")
configcrud.add_argument("--architect-api-key", dest="architect_api_key", nargs=1, action=self._store_type, help="Set architect api_key", metavar="API_KEY")
configparser.set_defaults(func=self._func_others)
#Add plugins
self.plugins = Plugins()
@@ -478,9 +483,17 @@ class connapp:
def _func_others(self, args):
#Function called when using other commands
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "organization": self._openai, "api_key": self._openai, "model": self._openai}
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config}
return actions.get(args.command)(args)
def _ai_config(self, args):
if "ai" in self.config.config:
aiconfig = self.config.config["ai"]
else:
aiconfig = {}
aiconfig[args.command] = args.data[0]
self._change_settings("ai", aiconfig)
def _ls(self, args):
if args.data == "nodes":
attribute = "nodes_list"
@@ -647,6 +660,26 @@ class connapp:
openaikeys[args.command] = args.data[0]
self._change_settings("openai", openaikeys)
def _anthropic(self, args):
if "anthropic" in self.config.config:
anthropickeys = self.config.config["anthropic"]
else:
anthropickeys = {}
# Mapear el nombre del argumento al nombre de la clave en el config (sin el prefijo 'anthropic_')
key_name = args.command.replace("anthropic_", "")
anthropickeys[key_name] = args.data[0]
self._change_settings("anthropic", anthropickeys)
def _google(self, args):
if "google" in self.config.config:
googlekeys = self.config.config["google"]
else:
googlekeys = {}
# Mapear el nombre del argumento al nombre de la clave en el config (sin el prefijo 'google_')
key_name = args.command.replace("google_", "")
googlekeys[key_name] = args.data[0]
self._change_settings("google", googlekeys)
def _change_settings(self, name, value):
self.config.config[name] = value
@@ -844,58 +877,93 @@ class connapp:
def _func_ai(self, args):
arguments = {}
if args.model:
arguments["model"] = args.model[0]
if args.org:
arguments["org"] = args.org[0]
if args.api_key:
arguments["api_key"] = args.api_key[0]
if args.engineer_model:
arguments["engineer_model"] = args.engineer_model[0]
if args.engineer_api_key:
arguments["engineer_api_key"] = args.engineer_api_key[0]
if args.architect_model:
arguments["architect_model"] = args.architect_model[0]
if args.architect_api_key:
arguments["architect_api_key"] = args.architect_api_key[0]
self.myai = self.ai(self.config, **arguments)
if args.ask:
input = " ".join(args.ask)
request = self.myai.ask(input, dryrun = True)
if not request["app_related"]:
mdprint(Markdown(request["response"]))
print("\r")
# Single question mode
query = " ".join(args.ask)
with console.status("[bold green]Agent is thinking and analyzing...") as status:
result = self.myai.ask(query, status=status, debug=args.debug)
# Determine title and color based on responder
responder = result.get("responder", "engineer")
if responder == "architect":
title = "[bold purple]Network Architect[/bold purple]"
border_style = "purple"
else:
if request["action"] == "list_nodes":
if request["filter"]:
nodes = self.config._getallnodes(request["filter"])
else:
nodes = self.config._getallnodes()
list = "\n".join(nodes)
print(list)
else:
yaml_data = yaml.dump(request["task"])
confirmation = f"I'm going to run the following task:\n```{yaml_data}```"
mdprint(Markdown(confirmation))
question = [inquirer.Confirm("task", message="Are you sure you want to continue?")]
print("\r")
confirm = inquirer.prompt(question)
if confirm == None:
exit(7)
if confirm["task"]:
script = {}
script["name"] = "RESULT"
script["output"] = "stdout"
script["nodes"] = request["nodes"]
script["action"] = request["action"]
if "expected" in request:
script["expected"] = request["expected"]
script.update(request["args"])
self._cli_run(script)
title = "[bold blue]Network Engineer[/bold blue]"
border_style = "blue"
# Only render in panel if response wasn't already streamed
if not result.get("streamed"):
mdprint(Panel(Markdown(result["response"]), title=title, border_style=border_style, expand=False))
# Mostrar tokens consumidos
if "usage" in result:
u = result["usage"]
console.print(f"[dim]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/dim]")
print("\r")
else:
# Interactive chat mode
history = None
mdprint(Markdown("**Chatbot**: Hi! How can I help you today?\n\n---"))
mdprint(Rule(style="bold blue"))
mdprint(Markdown("**Networking Expert Agent**: Hi! I'm your assistant. I can help you diagnose issues, run commands, and manage your nodes.\nType 'exit' to quit.\n"))
mdprint(Rule(style="bold blue"))
while True:
questions = [
inquirer.Text('message', message="User", validate=self._ai_validation),
]
answers = inquirer.prompt(questions)
if answers == None:
exit(7)
response, history = self._process_input(answers["message"], history)
mdprint(Markdown(f"""**Chatbot**:\n{response}\n\n---"""))
try:
user_query = Prompt.ask("[bold cyan]User[/bold cyan]")
if not user_query.strip():
continue
if user_query.lower() in ['exit', 'quit', 'bye']:
break
# User message is already in the prompt, no need to print it again
try:
with console.status("[bold green]Agent is thinking...") as status:
result = self.myai.ask(user_query, chat_history=history, status=status, debug=args.debug)
except KeyboardInterrupt:
# La interrupción ahora se maneja dentro de myai.ask para no perder el contexto
# y generar un resumen de lo que se estaba haciendo.
continue
history = result.get("chat_history")
# Determine title and color based on responder
responder = result.get("responder", "engineer")
if responder == "architect":
title = "[bold purple]Network Architect[/bold purple]"
border_style = "purple"
else:
title = "[bold blue]Network Engineer[/bold blue]"
border_style = "blue"
# Only render in panel if response wasn't already streamed
if not result.get("streamed"):
mdprint(Panel(Markdown(result["response"]), title=title, border_style=border_style, expand=False))
# Mostrar tokens consumidos
if "usage" in result:
u = result["usage"]
console.print(f"[dim]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/dim]")
print("\r")
except KeyboardInterrupt:
break
return
@@ -905,56 +973,6 @@ class connapp:
raise inquirer.errors.ValidationError("", reason="Can't send empty messages")
return True
def _process_input(self, input, history):
response = self.myai.ask(input , chat_history = history, dryrun = True)
if not response["app_related"]:
try:
if not history:
history = []
history.extend(response["chat_history"])
except:
if not history:
history = None
return response["response"], history
else:
history = None
if response["action"] == "list_nodes":
if response["filter"]:
nodes = self.config._getallnodes(response["filter"])
else:
nodes = self.config._getallnodes()
list = "\n".join(nodes)
response = f"```{list}\n```"
else:
yaml_data = yaml.dump(response["task"])
confirmresponse = f"I'm going to run the following task:\n```{yaml_data}```\nPlease confirm"
while True:
mdprint(Markdown(f"""**Chatbot**:\n{confirmresponse}"""))
questions = [
inquirer.Text('message', message="User", validate=self._ai_validation),
]
answers = inquirer.prompt(questions)
if answers == None:
exit(7)
confirmation = self.myai.confirm(answers["message"])
if isinstance(confirmation, bool):
if not confirmation:
response = "Request cancelled"
else:
nodes = self.nodes(self.config.getitems(response["nodes"]), config = self.config)
if response["action"] == "run":
output = nodes.run(**response["args"])
response = ""
elif response["action"] == "test":
result = nodes.test(**response["args"])
yaml_result = yaml.dump(result,default_flow_style=False, indent=4)
output = nodes.output
response = f"This is the result for your test:\n```\n{yaml_result}\n```"
for k,v in output.items():
response += f"\n***{k}***:\n```\n{v}\n```\n"
break
return response, history
def _func_api(self, args):
if args.command == "stop" or args.command == "restart":
args.data = self.stop_api()
@@ -1003,6 +1021,7 @@ class connapp:
def _cli_run(self, script):
import threading as _threading
args = {}
try:
action = script["action"]
@@ -1043,33 +1062,62 @@ class connapp:
except:
columns = 80
PANEL_WIDTH = columns
header = f"{script['name'].upper()}"
# Streaming mode: print each node's panel as it completes
if action == "run" and stdout:
mdprint(Rule(header, style="bold cyan"))
print_lock = _threading.Lock()
def _on_node_complete(unique, node_output, node_status):
if node_status == 0:
status_str = "[bold green]✓ PASS[/bold green]"
border = "green"
title_line = f"[bold]{unique}[/bold] — {status_str}"
else:
status_str = f"[bold red]✗ FAIL({node_status})[/bold red]"
border = "red"
title_line = f"[bold]{unique}[/bold] — {status_str}"
stripped = node_output.strip() if node_output else ""
code_block = Text(stripped + "\n") if stripped else Text()
panel_content = Group(Text(), Text(""), code_block)
with print_lock:
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style=border))
nodes.run(**args, on_complete=_on_node_complete)
return
# Batch mode: wait for all nodes, then print
if action == "run":
nodes.run(**args)
header = f"{script['name'].upper()}"
elif action == "test":
nodes.test(**args)
header = f"{script['name'].upper()}"
else:
printer.error(f"Wrong action '{action}'")
exit(13)
mdprint(Rule(header, style="white"))
mdprint(Rule(header, style="bold cyan"))
for node in nodes.status:
status_str = "[✓] PASS(0)" if nodes.status[node] == 0 else f"[x] FAIL({nodes.status[node]})"
title_line = f"{node}{status_str}"
if nodes.status[node] == 0:
status_str = "[bold green]✓ PASS[/bold green]"
border = "green"
else:
status_str = f"[bold red]✗ FAIL({nodes.status[node]})[/bold red]"
border = "red"
title_line = f"[bold]{node}[/bold] — {status_str}"
test_output = Text()
if action == "test" and nodes.status[node] == 0:
results = nodes.result[node]
test_output.append("TEST RESULTS:\n")
test_output.append("TEST RESULTS:\n", style="bold cyan")
max_key_len = max(len(k) for k in results.keys())
for k, v in results.items():
status = "[✓]" if str(v).upper() == "TRUE" else "[x]"
test_output.append(f" {k.ljust(max_key_len)} {status}\n")
if str(v).upper() == "TRUE":
test_output.append(f" {k.ljust(max_key_len)} \n", style="green")
else:
test_output.append(f" {k.ljust(max_key_len)}\n", style="red")
output = nodes.output[node].strip()
code_block = Text()
@@ -1080,8 +1128,10 @@ class connapp:
highlight_words = [k for k, v in nodes.result[node].items() if str(v).upper() == "TRUE"]
code_block.highlight_words(highlight_words, style=Style(color="green", bold=True, underline=True))
panel_content = Group(test_output, Text(""), code_block)
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style="white"))
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style=border))
def _choose(self, list, name, action):
#Generates an inquirer list to pick
+18 -3
View File
@@ -710,7 +710,7 @@ class nodes:
@MethodHook
def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None):
def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None, on_complete = None):
'''
Run a command or list of commands on all the nodes in nodelist.
@@ -751,6 +751,11 @@ class nodes:
- timeout (int): Time in seconds for expect to wait for prompt/EOF.
default 10.
- on_complete (callable): Optional callback called when each node
finishes. Receives (unique, output, status).
Called from the node's thread so it must
be thread-safe.
###Returns:
dict: Dictionary formed by nodes unique as keys, Output of the
@@ -765,13 +770,20 @@ class nodes:
Path(folder).mkdir(parents=True, exist_ok=True)
if prompt != None:
args["prompt"] = prompt
if stdout != None:
if stdout != None and on_complete is None:
args["stdout"] = stdout
if timeout != None:
args["timeout"] = timeout
output = {}
status = {}
tasks = []
def _run_node(node_obj, node_args, callback):
"""Wrapper that runs a node and fires the callback on completion."""
node_obj.run(**node_args)
if callback:
callback(node_obj.unique, node_obj.output, node_obj.status)
for n in self.nodelist:
nodesargs[n.unique] = deepcopy(args)
if vars != None:
@@ -780,7 +792,10 @@ class nodes:
nodesargs[n.unique]["vars"].update(vars["__global__"])
if n.unique in vars.keys():
nodesargs[n.unique]["vars"].update(vars[n.unique])
tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
if on_complete:
tasks.append(threading.Thread(target=_run_node, args=(n, nodesargs[n.unique], on_complete)))
else:
tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
taskslist = list(self._splitlist(tasks, parallel))
for t in taskslist:
for i in t:
+1514 -736
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -3,7 +3,7 @@ Flask_Cors>=4.0.1
google_api_python_client>=2.125.0
google_auth_oauthlib>=1.2.0
inquirer>=3.3.0
openai>=1.98.0
litellm>=1.40.0
pexpect>=4.8.0
protobuf>=5.27.2
pycryptodome>=3.18.0
+1 -1
View File
@@ -33,7 +33,7 @@ install_requires =
pyfzf
waitress
PyYAML
openai
litellm
rich
protobuf
google_api_python_client