Module connpy.cli.ai_handler
Classes
class AIHandler (app)-
Expand source code
class AIHandler: def __init__(self, app): self.app = app def dispatch(self, args): if args.list_sessions: sessions = self.app.services.ai.list_sessions() if not sessions: printer.info("No saved AI sessions found.") return columns = ["ID", "Title", "Created At", "Model"] rows = [[s["id"], s["title"], s["created_at"], s["model"]] for s in sessions] printer.table("AI Persisted Sessions", columns, rows) return if args.delete_session: try: self.app.services.ai.delete_session(args.delete_session[0]) printer.success(f"Session {args.delete_session[0]} deleted.") except Exception as e: printer.error(str(e)) return if args.mcp is not None: return self.configure_mcp(args) # Determinar session_id para retomar session_id = None if args.resume: sessions = self.app.services.ai.list_sessions() session_id = sessions[0]["id"] if sessions else None if not session_id: printer.warning("No previous session found to resume.") elif args.session: session_id = args.session[0] # Configurar argumentos adicionales para el servicio de AI # Prioridad: CLI Args > Configuración Local settings = self.app.services.config_svc.get_settings().get("ai", {}) arguments = {} for key in ["engineer_model", "engineer_api_key", "architect_model", "architect_api_key"]: cli_val = getattr(args, key, None) if cli_val: arguments[key] = cli_val[0] elif settings.get(key): arguments[key] = settings.get(key) # Check keys only if running in local mode (not remote) if getattr(self.app.services, "mode", "local") == "local": if not arguments.get("engineer_api_key"): printer.error("Engineer API key not configured. The chat cannot start.") printer.info("Use 'connpy config --engineer-api-key <key>' to set it.") sys.exit(1) if not arguments.get("architect_api_key"): printer.warning("Architect API key not configured. Architect will be unavailable.") printer.info("Use 'connpy config --architect-api-key <key>' to enable it.") # El resto de la interacción el CLI la maneja con el agente subyacente self.app.myai = self.app.services.ai self.ai_overrides = arguments if args.ask: self.single_question(args, session_id) else: self.interactive_chat(args, session_id) def single_question(self, args, session_id): query = " ".join(args.ask) with console.status("[ai_status]Agent is thinking and analyzing...") as status: result = self.app.myai.ask(query, status=status, debug=args.debug, session_id=session_id, trust=args.trust, **self.ai_overrides) responder = result.get("responder", "engineer") border = "architect" if responder == "architect" else "engineer" title = "[architect][bold]Network Architect[/bold][/architect]" if responder == "architect" else "[engineer][bold]Network Engineer[/bold][/engineer]" if not result.get("streamed"): mdprint(Panel(Markdown(result["response"]), title=title, border_style=border, expand=False)) if "usage" in result: u = result["usage"] console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]") def interactive_chat(self, args, session_id): history = None if session_id: session_data = self.app.myai.load_session_data(session_id) if session_data: history = session_data.get("history", []) mdprint(Rule(title=f"[header] Resuming Session: {session_data.get('title')} [/header]", style="border")) if history: mdprint(f"[debug]Analyzing {len(history)} previous messages...[/debug]\n") else: printer.error(f"Could not load session {session_id}. Starting clean.") if not history: mdprint(Rule(style="engineer")) mdprint(Markdown("**Networking Expert Agent**: Hi! I'm your assistant. I can help you diagnose issues, run commands, and manage your nodes.\nType 'exit' to quit.\n")) mdprint(Rule(style="engineer")) while True: try: user_query = Prompt.ask("[user_prompt]User[/user_prompt]") if not user_query.strip(): continue if user_query.lower() in ['exit', 'quit', 'bye', 'cancel']: break with console.status("[ai_status]Agent is thinking...") as status: result = self.app.myai.ask(user_query, chat_history=history, status=status, debug=args.debug, trust=args.trust, **self.ai_overrides) new_history = result.get("chat_history") if new_history is not None: history = new_history responder = result.get("responder", "engineer") border = "architect" if responder == "architect" else "engineer" title = "[architect][bold]Network Architect[/bold][/architect]" if responder == "architect" else "[engineer][bold]Network Engineer[/bold][/engineer]" if not result.get("streamed"): response_text = result.get("response", "") if response_text: mdprint(Panel(Markdown(response_text), title=title, border_style=border, expand=False)) if "usage" in result: u = result["usage"] console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]") except (KeyboardInterrupt, EOFError): console.print("\n[dim]Session closed.[/dim]") break def configure_mcp(self, args): """Handle MCP server configuration via CLI tokens or interactive wizard.""" mcp_args = args.mcp # 1. Non-interactive CLI Mode (if arguments are provided) if mcp_args: action = mcp_args[0].lower() if action == "list": settings = self.app.services.config_svc.get_settings() mcp_servers = settings.get("ai", {}).get("mcp_servers", {}) if not mcp_servers: printer.info("No MCP servers configured.") else: columns = ["Name", "URL", "Enabled", "Auto-load OS"] rows = [] for name, cfg in mcp_servers.items(): rows.append([ name, cfg.get("url", ""), "[green]Yes[/green]" if cfg.get("enabled", True) else "[red]No[/red]", cfg.get("auto_load_on_os", "Any") ]) printer.table("Configured MCP Servers", columns, rows) return elif action == "add": if len(mcp_args) < 3: printer.error("Usage: connpy ai --mcp add <name> <url> [os_filter]") return name, url = mcp_args[1], mcp_args[2] os_filter = mcp_args[3] if len(mcp_args) > 3 else None try: self.app.services.ai.configure_mcp(name, url=url, auto_load_on_os=os_filter) printer.success(f"MCP server '{name}' added/updated.") except Exception as e: printer.error(str(e)) return elif action == "remove": if len(mcp_args) < 2: printer.error("Usage: connpy ai --mcp remove <name>") return name = mcp_args[1] try: self.app.services.ai.configure_mcp(name, remove=True) printer.success(f"MCP server '{name}' removed.") except Exception as e: printer.error(str(e)) return elif action in ["enable", "disable"]: if len(mcp_args) < 2: printer.error(f"Usage: connpy ai --mcp {action} <name>") return name = mcp_args[1] enabled = (action == "enable") try: self.app.services.ai.configure_mcp(name, enabled=enabled) printer.success(f"MCP server '{name}' {'enabled' if enabled else 'disabled'}.") except Exception as e: printer.error(str(e)) return else: printer.error(f"Unknown MCP action: {action}") printer.info("Available actions: list, add, remove, enable, disable") return # 2. Interactive Wizard Mode (if no arguments provided) # Import forms dynamically to avoid circular dependencies if any if not hasattr(self.app, "cli_forms"): from .forms import Forms self.app.cli_forms = Forms(self.app) settings = self.app.services.config_svc.get_settings() mcp_servers = settings.get("ai", {}).get("mcp_servers", {}) result = self.app.cli_forms.mcp_wizard(mcp_servers) if not result: return action = result["action"] try: if action == "list": # Recursive call to the non-interactive list logic args.mcp = ["list"] return self.configure_mcp(args) elif action == "add": self.app.services.ai.configure_mcp( result["name"], url=result["url"], enabled=result["enabled"], auto_load_on_os=result["os"] ) printer.success(f"MCP server '{result['name']}' saved.") elif action == "update": # Used for toggle self.app.services.ai.configure_mcp( result["name"], enabled=result["enabled"] ) printer.success(f"MCP server '{result['name']}' updated.") elif action == "remove": self.app.services.ai.configure_mcp(result["name"], remove=True) printer.success(f"MCP server '{result['name']}' removed.") except Exception as e: printer.error(str(e))Methods
def configure_mcp(self, args)-
Expand source code
def configure_mcp(self, args): """Handle MCP server configuration via CLI tokens or interactive wizard.""" mcp_args = args.mcp # 1. Non-interactive CLI Mode (if arguments are provided) if mcp_args: action = mcp_args[0].lower() if action == "list": settings = self.app.services.config_svc.get_settings() mcp_servers = settings.get("ai", {}).get("mcp_servers", {}) if not mcp_servers: printer.info("No MCP servers configured.") else: columns = ["Name", "URL", "Enabled", "Auto-load OS"] rows = [] for name, cfg in mcp_servers.items(): rows.append([ name, cfg.get("url", ""), "[green]Yes[/green]" if cfg.get("enabled", True) else "[red]No[/red]", cfg.get("auto_load_on_os", "Any") ]) printer.table("Configured MCP Servers", columns, rows) return elif action == "add": if len(mcp_args) < 3: printer.error("Usage: connpy ai --mcp add <name> <url> [os_filter]") return name, url = mcp_args[1], mcp_args[2] os_filter = mcp_args[3] if len(mcp_args) > 3 else None try: self.app.services.ai.configure_mcp(name, url=url, auto_load_on_os=os_filter) printer.success(f"MCP server '{name}' added/updated.") except Exception as e: printer.error(str(e)) return elif action == "remove": if len(mcp_args) < 2: printer.error("Usage: connpy ai --mcp remove <name>") return name = mcp_args[1] try: self.app.services.ai.configure_mcp(name, remove=True) printer.success(f"MCP server '{name}' removed.") except Exception as e: printer.error(str(e)) return elif action in ["enable", "disable"]: if len(mcp_args) < 2: printer.error(f"Usage: connpy ai --mcp {action} <name>") return name = mcp_args[1] enabled = (action == "enable") try: self.app.services.ai.configure_mcp(name, enabled=enabled) printer.success(f"MCP server '{name}' {'enabled' if enabled else 'disabled'}.") except Exception as e: printer.error(str(e)) return else: printer.error(f"Unknown MCP action: {action}") printer.info("Available actions: list, add, remove, enable, disable") return # 2. Interactive Wizard Mode (if no arguments provided) # Import forms dynamically to avoid circular dependencies if any if not hasattr(self.app, "cli_forms"): from .forms import Forms self.app.cli_forms = Forms(self.app) settings = self.app.services.config_svc.get_settings() mcp_servers = settings.get("ai", {}).get("mcp_servers", {}) result = self.app.cli_forms.mcp_wizard(mcp_servers) if not result: return action = result["action"] try: if action == "list": # Recursive call to the non-interactive list logic args.mcp = ["list"] return self.configure_mcp(args) elif action == "add": self.app.services.ai.configure_mcp( result["name"], url=result["url"], enabled=result["enabled"], auto_load_on_os=result["os"] ) printer.success(f"MCP server '{result['name']}' saved.") elif action == "update": # Used for toggle self.app.services.ai.configure_mcp( result["name"], enabled=result["enabled"] ) printer.success(f"MCP server '{result['name']}' updated.") elif action == "remove": self.app.services.ai.configure_mcp(result["name"], remove=True) printer.success(f"MCP server '{result['name']}' removed.") except Exception as e: printer.error(str(e))Handle MCP server configuration via CLI tokens or interactive wizard.
def dispatch(self, args)-
Expand source code
def dispatch(self, args): if args.list_sessions: sessions = self.app.services.ai.list_sessions() if not sessions: printer.info("No saved AI sessions found.") return columns = ["ID", "Title", "Created At", "Model"] rows = [[s["id"], s["title"], s["created_at"], s["model"]] for s in sessions] printer.table("AI Persisted Sessions", columns, rows) return if args.delete_session: try: self.app.services.ai.delete_session(args.delete_session[0]) printer.success(f"Session {args.delete_session[0]} deleted.") except Exception as e: printer.error(str(e)) return if args.mcp is not None: return self.configure_mcp(args) # Determinar session_id para retomar session_id = None if args.resume: sessions = self.app.services.ai.list_sessions() session_id = sessions[0]["id"] if sessions else None if not session_id: printer.warning("No previous session found to resume.") elif args.session: session_id = args.session[0] # Configurar argumentos adicionales para el servicio de AI # Prioridad: CLI Args > Configuración Local settings = self.app.services.config_svc.get_settings().get("ai", {}) arguments = {} for key in ["engineer_model", "engineer_api_key", "architect_model", "architect_api_key"]: cli_val = getattr(args, key, None) if cli_val: arguments[key] = cli_val[0] elif settings.get(key): arguments[key] = settings.get(key) # Check keys only if running in local mode (not remote) if getattr(self.app.services, "mode", "local") == "local": if not arguments.get("engineer_api_key"): printer.error("Engineer API key not configured. The chat cannot start.") printer.info("Use 'connpy config --engineer-api-key <key>' to set it.") sys.exit(1) if not arguments.get("architect_api_key"): printer.warning("Architect API key not configured. Architect will be unavailable.") printer.info("Use 'connpy config --architect-api-key <key>' to enable it.") # El resto de la interacción el CLI la maneja con el agente subyacente self.app.myai = self.app.services.ai self.ai_overrides = arguments if args.ask: self.single_question(args, session_id) else: self.interactive_chat(args, session_id) def interactive_chat(self, args, session_id)-
Expand source code
def interactive_chat(self, args, session_id): history = None if session_id: session_data = self.app.myai.load_session_data(session_id) if session_data: history = session_data.get("history", []) mdprint(Rule(title=f"[header] Resuming Session: {session_data.get('title')} [/header]", style="border")) if history: mdprint(f"[debug]Analyzing {len(history)} previous messages...[/debug]\n") else: printer.error(f"Could not load session {session_id}. Starting clean.") if not history: mdprint(Rule(style="engineer")) mdprint(Markdown("**Networking Expert Agent**: Hi! I'm your assistant. I can help you diagnose issues, run commands, and manage your nodes.\nType 'exit' to quit.\n")) mdprint(Rule(style="engineer")) while True: try: user_query = Prompt.ask("[user_prompt]User[/user_prompt]") if not user_query.strip(): continue if user_query.lower() in ['exit', 'quit', 'bye', 'cancel']: break with console.status("[ai_status]Agent is thinking...") as status: result = self.app.myai.ask(user_query, chat_history=history, status=status, debug=args.debug, trust=args.trust, **self.ai_overrides) new_history = result.get("chat_history") if new_history is not None: history = new_history responder = result.get("responder", "engineer") border = "architect" if responder == "architect" else "engineer" title = "[architect][bold]Network Architect[/bold][/architect]" if responder == "architect" else "[engineer][bold]Network Engineer[/bold][/engineer]" if not result.get("streamed"): response_text = result.get("response", "") if response_text: mdprint(Panel(Markdown(response_text), title=title, border_style=border, expand=False)) if "usage" in result: u = result["usage"] console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]") except (KeyboardInterrupt, EOFError): console.print("\n[dim]Session closed.[/dim]") break def single_question(self, args, session_id)-
Expand source code
def single_question(self, args, session_id): query = " ".join(args.ask) with console.status("[ai_status]Agent is thinking and analyzing...") as status: result = self.app.myai.ask(query, status=status, debug=args.debug, session_id=session_id, trust=args.trust, **self.ai_overrides) responder = result.get("responder", "engineer") border = "architect" if responder == "architect" else "engineer" title = "[architect][bold]Network Architect[/bold][/architect]" if responder == "architect" else "[engineer][bold]Network Engineer[/bold][/engineer]" if not result.get("streamed"): mdprint(Panel(Markdown(result["response"]), title=title, border_style=border, expand=False)) if "usage" in result: u = result["usage"] console.print(f"[debug]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/debug]")