feat(copilot): stabilize concurrency and enhance terminal context management
- AI Concurrency: - Implemented a dedicated background event loop (ConnpyAILoop) in a separate thread for AI tasks to ensure thread safety and event loop affinity. - Added 'run_ai_async' utility to funnel all LiteLLM calls through the dedicated loop. - Implemented global 'cleanup()' for safe closure of sync/async LiteLLM sessions. - gRPC & Remote Sessions: - Enhanced 'NodeServicer' to identify command blocks within the terminal buffer using prompt regex/byte tracking. - Added support for selective context retrieval via 'context_start_pos' in the gRPC Interact stream. - Synchronized remote Copilot behavior by enriching questions with session history (last 5 queries) in 'NodeStub'. - Optimized token usage by cleaning 'node_info' metadata before AI transmission. - Terminal Context & Core: - Modified 'node.connect' to always initialize 'mylog' (BytesIO) buffer regardless of disk logging configuration, ensuring Copilot context availability. - Integrated 'ai.cleanup()' in CLI (connapp) and Server (api) exit points for graceful shutdowns. - Suppressed LiteLLM internal streaming coroutine warnings during task cancellation.
This commit is contained in:
@@ -3,6 +3,8 @@ import sys
|
|||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
import datetime
|
import datetime
|
||||||
|
import threading
|
||||||
|
import asyncio
|
||||||
from textwrap import dedent
|
from textwrap import dedent
|
||||||
from .core import nodes
|
from .core import nodes
|
||||||
|
|
||||||
@@ -37,6 +39,61 @@ from rich.rule import Rule
|
|||||||
console = printer.console
|
console = printer.console
|
||||||
|
|
||||||
|
|
||||||
|
_ai_loop = None
|
||||||
|
_ai_thread = None
|
||||||
|
_ai_lock = threading.Lock()
|
||||||
|
|
||||||
|
def _get_ai_loop():
|
||||||
|
global _ai_loop, _ai_thread
|
||||||
|
with _ai_lock:
|
||||||
|
if _ai_loop is None:
|
||||||
|
_ai_loop = asyncio.new_event_loop()
|
||||||
|
_ai_thread = threading.Thread(target=_ai_loop.run_forever, name="ConnpyAILoop", daemon=True)
|
||||||
|
_ai_thread.start()
|
||||||
|
return _ai_loop
|
||||||
|
|
||||||
|
def run_ai_async(coro):
|
||||||
|
"""Run a coroutine in the dedicated AI background loop."""
|
||||||
|
loop = _get_ai_loop()
|
||||||
|
return asyncio.run_coroutine_threadsafe(coro, loop)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup():
|
||||||
|
"""Safely close any global litellm sessions in the dedicated AI loop."""
|
||||||
|
global _ai_loop
|
||||||
|
if _ai_loop:
|
||||||
|
try:
|
||||||
|
future = asyncio.run_coroutine_threadsafe(_async_cleanup(), _ai_loop)
|
||||||
|
future.result(timeout=5)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def _async_cleanup():
|
||||||
|
"""Internal async cleanup for litellm sessions."""
|
||||||
|
try:
|
||||||
|
import litellm
|
||||||
|
# 1. Close synchronous session
|
||||||
|
if hasattr(litellm, "client_session") and litellm.client_session:
|
||||||
|
try:
|
||||||
|
if hasattr(litellm.client_session, "close"):
|
||||||
|
res = litellm.client_session.close()
|
||||||
|
if asyncio.iscoroutine(res): await res
|
||||||
|
except: pass
|
||||||
|
litellm.client_session = None
|
||||||
|
|
||||||
|
# 2. Close asynchronous session
|
||||||
|
if hasattr(litellm, "aclient_session") and litellm.aclient_session:
|
||||||
|
try:
|
||||||
|
session = litellm.aclient_session
|
||||||
|
litellm.aclient_session = None
|
||||||
|
if hasattr(session, "close"):
|
||||||
|
await session.close()
|
||||||
|
except: pass
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@ClassHook
|
@ClassHook
|
||||||
class ai:
|
class ai:
|
||||||
"""Hybrid Multi-Agent System: Selective Escalation with Role Persistence."""
|
"""Hybrid Multi-Agent System: Selective Escalation with Role Persistence."""
|
||||||
@@ -1358,6 +1415,7 @@ Node: {node_name}"""
|
|||||||
from litellm import acompletion
|
from litellm import acompletion
|
||||||
import asyncio
|
import asyncio
|
||||||
import warnings
|
import warnings
|
||||||
|
import aiohttp
|
||||||
|
|
||||||
# Suppress unawaited coroutine warnings from LiteLLM's internal streaming logic during sudden cancellation
|
# Suppress unawaited coroutine warnings from LiteLLM's internal streaming logic during sudden cancellation
|
||||||
warnings.filterwarnings("ignore", message="coroutine '.*async_streaming.*' was never awaited", category=RuntimeWarning)
|
warnings.filterwarnings("ignore", message="coroutine '.*async_streaming.*' was never awaited", category=RuntimeWarning)
|
||||||
|
|||||||
@@ -54,6 +54,8 @@ def debug_api(port=8048, config=None):
|
|||||||
printer.info(f"gRPC Server running in debug mode on port {port}...")
|
printer.info(f"gRPC Server running in debug mode on port {port}...")
|
||||||
_wait_for_termination()
|
_wait_for_termination()
|
||||||
server.stop(0)
|
server.stop(0)
|
||||||
|
from .ai import cleanup
|
||||||
|
cleanup()
|
||||||
|
|
||||||
def start_server(port=8048, config=None):
|
def start_server(port=8048, config=None):
|
||||||
try:
|
try:
|
||||||
@@ -67,6 +69,9 @@ def start_server(port=8048, config=None):
|
|||||||
conf = config or configfile()
|
conf = config or configfile()
|
||||||
server = serve(conf, port=port, debug=False)
|
server = serve(conf, port=port, debug=False)
|
||||||
_wait_for_termination()
|
_wait_for_termination()
|
||||||
|
server.stop(0)
|
||||||
|
from .ai import cleanup
|
||||||
|
cleanup()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
printer.error(f"Background API failed to start: {e}")
|
printer.error(f"Background API failed to start: {e}")
|
||||||
os._exit(1)
|
os._exit(1)
|
||||||
|
|||||||
@@ -470,6 +470,13 @@ class connapp:
|
|||||||
# Handle global Ctrl+C gracefully
|
# Handle global Ctrl+C gracefully
|
||||||
printer.warning("Operation cancelled by user.")
|
printer.warning("Operation cancelled by user.")
|
||||||
sys.exit(130)
|
sys.exit(130)
|
||||||
|
finally:
|
||||||
|
# Safely cleanup AI sessions (litellm)
|
||||||
|
try:
|
||||||
|
from .ai import cleanup
|
||||||
|
cleanup()
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
class _store_type(argparse.Action):
|
class _store_type(argparse.Action):
|
||||||
#Custom store type for cli app.
|
#Custom store type for cli app.
|
||||||
|
|||||||
+10
-7
@@ -354,14 +354,17 @@ class node:
|
|||||||
port_str = f":{self.port}" if self.port and self.protocol not in ["ssm", "kubectl", "docker"] else ""
|
port_str = f":{self.port}" if self.port and self.protocol not in ["ssm", "kubectl", "docker"] else ""
|
||||||
logger("success", f"Connected to {self.unique} at {self.host}{port_str} via: {self.protocol}")
|
logger("success", f"Connected to {self.unique} at {self.host}{port_str} via: {self.protocol}")
|
||||||
|
|
||||||
if 'logfile' in dir(self):
|
# Always initialize self.mylog to capture terminal context for the AI Copilot
|
||||||
# Initialize self.mylog
|
if not hasattr(self, 'mylog'):
|
||||||
if not 'mylog' in dir(self):
|
self.mylog = io.BytesIO()
|
||||||
self.mylog = io.BytesIO()
|
|
||||||
if not async_mode:
|
|
||||||
self.child.logfile_read = self.mylog
|
|
||||||
|
|
||||||
# Start the _savelog thread
|
if not async_mode:
|
||||||
|
self.child.logfile_read = self.mylog
|
||||||
|
|
||||||
|
# Only start disk-logging tasks if logfile is configured
|
||||||
|
if 'logfile' in dir(self):
|
||||||
|
if not async_mode:
|
||||||
|
# Start the _savelog thread (sync mode)
|
||||||
log_thread = threading.Thread(target=self._savelog)
|
log_thread = threading.Thread(target=self._savelog)
|
||||||
log_thread.daemon = True
|
log_thread.daemon = True
|
||||||
log_thread.start()
|
log_thread.start()
|
||||||
|
|||||||
@@ -207,8 +207,55 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
|
|||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
|
|
||||||
node_info_json = json.dumps(node_info) if node_info else ""
|
# Build context blocks like local CLI does
|
||||||
|
blocks = []
|
||||||
|
raw_bytes = n.mylog.getvalue() if hasattr(n, 'mylog') else b''
|
||||||
|
|
||||||
|
if cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes:
|
||||||
|
default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
|
||||||
|
device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
|
||||||
|
prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt)
|
||||||
|
try:
|
||||||
|
prompt_re = re.compile(prompt_re_str)
|
||||||
|
except Exception:
|
||||||
|
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
|
||||||
|
|
||||||
|
for i in range(1, len(cmd_byte_positions)):
|
||||||
|
pos, known_cmd = cmd_byte_positions[i]
|
||||||
|
prev_pos = cmd_byte_positions[i-1][0]
|
||||||
|
|
||||||
|
if known_cmd:
|
||||||
|
prev_chunk = raw_bytes[prev_pos:pos]
|
||||||
|
prev_cleaned = n._logclean(prev_chunk.decode(errors='replace'), var=True)
|
||||||
|
prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
|
||||||
|
prompt_text = prev_lines[-1].strip() if prev_lines else ""
|
||||||
|
preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
|
||||||
|
blocks.append({"pos": pos, "preview": preview[:80], "type": "cmd"})
|
||||||
|
else:
|
||||||
|
chunk = raw_bytes[prev_pos:pos]
|
||||||
|
cleaned = n._logclean(chunk.decode(errors='replace'), var=True)
|
||||||
|
lines = [l for l in cleaned.split('\n') if l.strip()]
|
||||||
|
preview = lines[-1].strip() if lines else ""
|
||||||
|
|
||||||
|
if preview:
|
||||||
|
match = prompt_re.search(preview)
|
||||||
|
if match:
|
||||||
|
cmd_text = preview[match.end():].strip()
|
||||||
|
if cmd_text:
|
||||||
|
blocks.append({"pos": pos, "preview": preview[:80], "type": "cmd"})
|
||||||
|
|
||||||
|
clean_buffer = n._logclean(raw_bytes.decode(errors='replace'), var=True)
|
||||||
|
last_line = clean_buffer.split('\n')[-1].strip() if clean_buffer.strip() else "(prompt)"
|
||||||
|
blocks.append({"pos": len(raw_bytes), "preview": last_line[:80], "type": "current"})
|
||||||
|
|
||||||
|
if node_info is None:
|
||||||
|
node_info = {}
|
||||||
|
node_info["context_blocks"] = blocks
|
||||||
|
node_info["full_buffer"] = buffer
|
||||||
|
|
||||||
|
node_info_json = json.dumps(node_info)
|
||||||
# 1. Send prompt to client
|
# 1. Send prompt to client
|
||||||
response_queue.put(connpy_pb2.InteractResponse(
|
response_queue.put(connpy_pb2.InteractResponse(
|
||||||
copilot_prompt=True,
|
copilot_prompt=True,
|
||||||
@@ -231,8 +278,17 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
|
|||||||
os.write(child_fd, b'\x15\r')
|
os.write(child_fd, b'\x15\r')
|
||||||
return
|
return
|
||||||
question = req_data["question"]
|
question = req_data["question"]
|
||||||
|
|
||||||
context_buffer = req_data.get("context_buffer", "")
|
context_buffer = req_data.get("context_buffer", "")
|
||||||
if not context_buffer:
|
if context_buffer.startswith('{"context_start_pos"'):
|
||||||
|
try:
|
||||||
|
parsed = json.loads(context_buffer)
|
||||||
|
start_pos = parsed["context_start_pos"]
|
||||||
|
selected_raw = raw_bytes[start_pos:]
|
||||||
|
context_buffer = n._logclean(selected_raw.decode(errors='replace'), var=True)
|
||||||
|
except Exception:
|
||||||
|
context_buffer = buffer
|
||||||
|
elif not context_buffer:
|
||||||
context_buffer = buffer
|
context_buffer = buffer
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
os.write(child_fd, b'\x15\r')
|
os.write(child_fd, b'\x15\r')
|
||||||
@@ -248,7 +304,10 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
|
|||||||
copilot_stream_chunk=chunk_text
|
copilot_stream_chunk=chunk_text
|
||||||
))
|
))
|
||||||
|
|
||||||
ai_task = asyncio.create_task(service.aask_copilot(context_buffer, question, node_info, chunk_callback=chunk_callback))
|
# Create a clean version of node_info for the AI to save tokens and match local CLI behavior
|
||||||
|
ai_node_info = {k: v for k, v in node_info.items() if k not in ("context_blocks", "full_buffer")}
|
||||||
|
|
||||||
|
ai_task = asyncio.create_task(service.aask_copilot(context_buffer, question, ai_node_info, chunk_callback=chunk_callback))
|
||||||
wait_action_task = asyncio.create_task(remote_stream.copilot_queue.get())
|
wait_action_task = asyncio.create_task(remote_stream.copilot_queue.get())
|
||||||
|
|
||||||
done, pending = await asyncio.wait(
|
done, pending = await asyncio.wait(
|
||||||
|
|||||||
@@ -345,7 +345,17 @@ class NodeStub:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
active_buffer = get_active_buffer()
|
active_buffer = get_active_buffer()
|
||||||
request_queue.put(connpy_pb2.InteractRequest(copilot_question=question, copilot_context_buffer=active_buffer))
|
# Enrich question with history (same as local CLI)
|
||||||
|
past_questions = self.copilot_history.get_strings()
|
||||||
|
if len(past_questions) > 1:
|
||||||
|
# Limit history to last 5 questions to save tokens, excluding current
|
||||||
|
recent_history = past_questions[-6:-1]
|
||||||
|
history_text = "\n".join(f"- {q}" for q in recent_history)
|
||||||
|
enriched_question = f"Previous questions in this session:\n{history_text}\n\nCurrent Question:\n{question}"
|
||||||
|
else:
|
||||||
|
enriched_question = question
|
||||||
|
|
||||||
|
request_queue.put(connpy_pb2.InteractRequest(copilot_question=enriched_question, copilot_context_buffer=active_buffer))
|
||||||
|
|
||||||
from rich.live import Live
|
from rich.live import Live
|
||||||
live_text = "Thinking..."
|
live_text = "Thinking..."
|
||||||
@@ -800,7 +810,17 @@ class NodeStub:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
active_buffer = get_active_buffer()
|
active_buffer = get_active_buffer()
|
||||||
request_queue.put(connpy_pb2.InteractRequest(copilot_question=question, copilot_context_buffer=active_buffer))
|
# Enrich question with history (same as local CLI)
|
||||||
|
past_questions = self.copilot_history.get_strings()
|
||||||
|
if len(past_questions) > 1:
|
||||||
|
# Limit history to last 5 questions to save tokens, excluding current
|
||||||
|
recent_history = past_questions[-6:-1]
|
||||||
|
history_text = "\n".join(f"- {q}" for q in recent_history)
|
||||||
|
enriched_question = f"Previous questions in this session:\n{history_text}\n\nCurrent Question:\n{question}"
|
||||||
|
else:
|
||||||
|
enriched_question = question
|
||||||
|
|
||||||
|
request_queue.put(connpy_pb2.InteractRequest(copilot_question=enriched_question, copilot_context_buffer=active_buffer))
|
||||||
|
|
||||||
from rich.live import Live
|
from rich.live import Live
|
||||||
live_text = "Thinking..."
|
live_text = "Thinking..."
|
||||||
|
|||||||
@@ -19,15 +19,18 @@ class AIService(BaseService):
|
|||||||
|
|
||||||
def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
|
def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
|
||||||
"""Ask the AI copilot for terminal assistance."""
|
"""Ask the AI copilot for terminal assistance."""
|
||||||
from connpy.ai import ai
|
from connpy.ai import ai, run_ai_async
|
||||||
agent = ai(self.config)
|
agent = ai(self.config)
|
||||||
return agent.ask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)
|
future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
|
||||||
|
return future.result()
|
||||||
|
|
||||||
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
|
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
|
||||||
"""Ask the AI copilot for terminal assistance asynchronously."""
|
"""Ask the AI copilot for terminal assistance asynchronously."""
|
||||||
from connpy.ai import ai
|
from connpy.ai import ai, run_ai_async
|
||||||
|
import asyncio
|
||||||
agent = ai(self.config)
|
agent = ai(self.config)
|
||||||
return await agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)
|
future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
|
||||||
|
return await asyncio.wrap_future(future)
|
||||||
|
|
||||||
|
|
||||||
def list_sessions(self):
|
def list_sessions(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user