From dba7e24ddabbe93b4bef3c86096afa01a48d5242 Mon Sep 17 00:00:00 2001 From: Fede Luzzi Date: Mon, 11 May 2026 12:30:43 -0300 Subject: [PATCH] feat(copilot): stabilize concurrency and enhance terminal context management - AI Concurrency: - Implemented a dedicated background event loop (ConnpyAILoop) in a separate thread for AI tasks to ensure thread safety and event loop affinity. - Added 'run_ai_async' utility to funnel all LiteLLM calls through the dedicated loop. - Implemented global 'cleanup()' for safe closure of sync/async LiteLLM sessions. - gRPC & Remote Sessions: - Enhanced 'NodeServicer' to identify command blocks within the terminal buffer using prompt regex/byte tracking. - Added support for selective context retrieval via 'context_start_pos' in the gRPC Interact stream. - Synchronized remote Copilot behavior by enriching questions with session history (last 5 queries) in 'NodeStub'. - Optimized token usage by cleaning 'node_info' metadata before AI transmission. - Terminal Context & Core: - Modified 'node.connect' to always initialize 'mylog' (BytesIO) buffer regardless of disk logging configuration, ensuring Copilot context availability. - Integrated 'ai.cleanup()' in CLI (connapp) and Server (api) exit points for graceful shutdowns. - Suppressed LiteLLM internal streaming coroutine warnings during task cancellation. --- connpy/ai.py | 58 +++++++++++++++++++++++++++++++ connpy/api.py | 5 +++ connpy/connapp.py | 7 ++++ connpy/core.py | 15 ++++---- connpy/grpc_layer/server.py | 65 +++++++++++++++++++++++++++++++++-- connpy/grpc_layer/stubs.py | 24 +++++++++++-- connpy/services/ai_service.py | 11 +++--- 7 files changed, 170 insertions(+), 15 deletions(-) diff --git a/connpy/ai.py b/connpy/ai.py index e85b57e..98d1cbc 100755 --- a/connpy/ai.py +++ b/connpy/ai.py @@ -3,6 +3,8 @@ import sys import json import re import datetime +import threading +import asyncio from textwrap import dedent from .core import nodes @@ -37,6 +39,61 @@ from rich.rule import Rule console = printer.console +_ai_loop = None +_ai_thread = None +_ai_lock = threading.Lock() + +def _get_ai_loop(): + global _ai_loop, _ai_thread + with _ai_lock: + if _ai_loop is None: + _ai_loop = asyncio.new_event_loop() + _ai_thread = threading.Thread(target=_ai_loop.run_forever, name="ConnpyAILoop", daemon=True) + _ai_thread.start() + return _ai_loop + +def run_ai_async(coro): + """Run a coroutine in the dedicated AI background loop.""" + loop = _get_ai_loop() + return asyncio.run_coroutine_threadsafe(coro, loop) + + +def cleanup(): + """Safely close any global litellm sessions in the dedicated AI loop.""" + global _ai_loop + if _ai_loop: + try: + future = asyncio.run_coroutine_threadsafe(_async_cleanup(), _ai_loop) + future.result(timeout=5) + except: + pass + + +async def _async_cleanup(): + """Internal async cleanup for litellm sessions.""" + try: + import litellm + # 1. Close synchronous session + if hasattr(litellm, "client_session") and litellm.client_session: + try: + if hasattr(litellm.client_session, "close"): + res = litellm.client_session.close() + if asyncio.iscoroutine(res): await res + except: pass + litellm.client_session = None + + # 2. Close asynchronous session + if hasattr(litellm, "aclient_session") and litellm.aclient_session: + try: + session = litellm.aclient_session + litellm.aclient_session = None + if hasattr(session, "close"): + await session.close() + except: pass + except ImportError: + pass + + @ClassHook class ai: """Hybrid Multi-Agent System: Selective Escalation with Role Persistence.""" @@ -1358,6 +1415,7 @@ Node: {node_name}""" from litellm import acompletion import asyncio import warnings + import aiohttp # Suppress unawaited coroutine warnings from LiteLLM's internal streaming logic during sudden cancellation warnings.filterwarnings("ignore", message="coroutine '.*async_streaming.*' was never awaited", category=RuntimeWarning) diff --git a/connpy/api.py b/connpy/api.py index 7c1f809..d5d6974 100755 --- a/connpy/api.py +++ b/connpy/api.py @@ -54,6 +54,8 @@ def debug_api(port=8048, config=None): printer.info(f"gRPC Server running in debug mode on port {port}...") _wait_for_termination() server.stop(0) + from .ai import cleanup + cleanup() def start_server(port=8048, config=None): try: @@ -67,6 +69,9 @@ def start_server(port=8048, config=None): conf = config or configfile() server = serve(conf, port=port, debug=False) _wait_for_termination() + server.stop(0) + from .ai import cleanup + cleanup() except Exception as e: printer.error(f"Background API failed to start: {e}") os._exit(1) diff --git a/connpy/connapp.py b/connpy/connapp.py index 0f53132..d41a990 100755 --- a/connpy/connapp.py +++ b/connpy/connapp.py @@ -470,6 +470,13 @@ class connapp: # Handle global Ctrl+C gracefully printer.warning("Operation cancelled by user.") sys.exit(130) + finally: + # Safely cleanup AI sessions (litellm) + try: + from .ai import cleanup + cleanup() + except ImportError: + pass class _store_type(argparse.Action): #Custom store type for cli app. diff --git a/connpy/core.py b/connpy/core.py index 2781e12..68e8f21 100755 --- a/connpy/core.py +++ b/connpy/core.py @@ -354,14 +354,17 @@ class node: port_str = f":{self.port}" if self.port and self.protocol not in ["ssm", "kubectl", "docker"] else "" logger("success", f"Connected to {self.unique} at {self.host}{port_str} via: {self.protocol}") + # Always initialize self.mylog to capture terminal context for the AI Copilot + if not hasattr(self, 'mylog'): + self.mylog = io.BytesIO() + + if not async_mode: + self.child.logfile_read = self.mylog + + # Only start disk-logging tasks if logfile is configured if 'logfile' in dir(self): - # Initialize self.mylog - if not 'mylog' in dir(self): - self.mylog = io.BytesIO() if not async_mode: - self.child.logfile_read = self.mylog - - # Start the _savelog thread + # Start the _savelog thread (sync mode) log_thread = threading.Thread(target=self._savelog) log_thread.daemon = True log_thread.start() diff --git a/connpy/grpc_layer/server.py b/connpy/grpc_layer/server.py index c019be4..7b1911b 100644 --- a/connpy/grpc_layer/server.py +++ b/connpy/grpc_layer/server.py @@ -207,8 +207,55 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): import json import asyncio import os + import re - node_info_json = json.dumps(node_info) if node_info else "" + # Build context blocks like local CLI does + blocks = [] + raw_bytes = n.mylog.getvalue() if hasattr(n, 'mylog') else b'' + + if cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes: + default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$' + device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt + prompt_re_str = re.sub(r'(? 1: + # Limit history to last 5 questions to save tokens, excluding current + recent_history = past_questions[-6:-1] + history_text = "\n".join(f"- {q}" for q in recent_history) + enriched_question = f"Previous questions in this session:\n{history_text}\n\nCurrent Question:\n{question}" + else: + enriched_question = question + + request_queue.put(connpy_pb2.InteractRequest(copilot_question=enriched_question, copilot_context_buffer=active_buffer)) from rich.live import Live live_text = "Thinking..." @@ -800,7 +810,17 @@ class NodeStub: continue active_buffer = get_active_buffer() - request_queue.put(connpy_pb2.InteractRequest(copilot_question=question, copilot_context_buffer=active_buffer)) + # Enrich question with history (same as local CLI) + past_questions = self.copilot_history.get_strings() + if len(past_questions) > 1: + # Limit history to last 5 questions to save tokens, excluding current + recent_history = past_questions[-6:-1] + history_text = "\n".join(f"- {q}" for q in recent_history) + enriched_question = f"Previous questions in this session:\n{history_text}\n\nCurrent Question:\n{question}" + else: + enriched_question = question + + request_queue.put(connpy_pb2.InteractRequest(copilot_question=enriched_question, copilot_context_buffer=active_buffer)) from rich.live import Live live_text = "Thinking..." diff --git a/connpy/services/ai_service.py b/connpy/services/ai_service.py index 5a600ea..73c5186 100644 --- a/connpy/services/ai_service.py +++ b/connpy/services/ai_service.py @@ -19,15 +19,18 @@ class AIService(BaseService): def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None): """Ask the AI copilot for terminal assistance.""" - from connpy.ai import ai + from connpy.ai import ai, run_ai_async agent = ai(self.config) - return agent.ask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback) + future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)) + return future.result() async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None): """Ask the AI copilot for terminal assistance asynchronously.""" - from connpy.ai import ai + from connpy.ai import ai, run_ai_async + import asyncio agent = ai(self.config) - return await agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback) + future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)) + return await asyncio.wrap_future(future) def list_sessions(self):