remoto con fixes de todo

This commit is contained in:
2026-05-08 21:53:57 -03:00
parent d62af8b037
commit a08d84139d
9 changed files with 565 additions and 180 deletions
+1 -1
View File
@@ -1 +1 @@
__version__ = "6.0.0b6"
__version__ = "6.0.0b7"
+146
View File
@@ -1351,5 +1351,151 @@ Node: {node_name}"""
"error": str(e)
}
@MethodHook
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
import json
import re
from litellm import acompletion
import asyncio
import warnings
# Suppress unawaited coroutine warnings from LiteLLM's internal streaming logic during sudden cancellation
warnings.filterwarnings("ignore", message="coroutine '.*async_streaming.*' was never awaited", category=RuntimeWarning)
node_info = node_info or {}
os_info = node_info.get("os", "unknown")
node_name = node_info.get("name", "unknown")
vendor_reference = ""
if os_info and os_info != "unknown":
try:
os_filename = os_info.lower().replace(" ", "_")
ref_path = os.path.join(self.config.defaultdir, "ai_references", f"{os_filename}.md")
if os.path.exists(ref_path):
with open(ref_path, "r") as f:
vendor_reference = f.read().strip()
except Exception:
pass
system_prompt = f"""Role: TERMINAL COPILOT. You assist a network engineer during a live SSH session.
Rules:
1. Answer the user's question directly based on the Terminal Context.
2. If the user asks you to analyze, parse, or extract data from the Terminal Context, DO IT directly in the <guide> section (you can use markdown tables or lists). Do NOT just give them a command to do it themselves.
3. If the user wants to execute an action, provide the required CLI commands inside a <commands> block, one command per line. If no commands are needed, leave it empty or omit the block.
4. ULTRA-CONCISE. Keep your guide to the point.
5. You MUST output your response in the following strict format:
<guide>
Your brief tactical guide in markdown. 3-4 sentences max.
</guide>
<commands>
command 1
command 2
</commands>
<risk>
low, high, or destructive
</risk>
6. Risk level: "low" for read-only/no commands, "high" for config changes, "destructive" for potentially dangerous ops.
Terminal Context:
{terminal_buffer}
Device OS: {os_info}
Node: {node_name}"""
if vendor_reference:
system_prompt += f"\n\nVendor Command Reference:\n{vendor_reference}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_question}
]
try:
response = await acompletion(
model=self.engineer_model,
messages=messages,
api_key=self.engineer_key,
stream=True
)
full_content = ""
streamed_guide = ""
async for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_content += delta.content
if chunk_callback:
start_idx = full_content.find("<guide>")
if start_idx != -1:
after_start = full_content[start_idx + 7:]
end_idx = after_start.find("</guide>")
if end_idx != -1:
current_guide = after_start[:end_idx]
else:
current_guide = after_start
if current_guide.endswith("<"): current_guide = current_guide[:-1]
elif current_guide.endswith("</"): current_guide = current_guide[:-2]
elif current_guide.endswith("</g"): current_guide = current_guide[:-3]
elif current_guide.endswith("</gu"): current_guide = current_guide[:-4]
elif current_guide.endswith("</gui"): current_guide = current_guide[:-5]
elif current_guide.endswith("</guid"): current_guide = current_guide[:-6]
elif current_guide.endswith("</guide"): current_guide = current_guide[:-7]
new_text = current_guide[len(streamed_guide):]
if new_text:
chunk_callback(new_text)
streamed_guide += new_text
guide = ""
commands = []
risk_level = "low"
guide_match = re.search(r"<guide>(.*?)</guide>", full_content, re.DOTALL)
if guide_match:
guide = guide_match.group(1).strip()
cmd_match = re.search(r"<commands>(.*?)</commands>", full_content, re.DOTALL)
if cmd_match:
cmds_raw = cmd_match.group(1).strip()
if cmds_raw:
commands = [c.strip() for c in cmds_raw.split('\n') if c.strip()]
risk_match = re.search(r"<risk>(.*?)</risk>", full_content, re.DOTALL)
if risk_match:
risk_level = risk_match.group(1).strip().lower()
if not guide and full_content and not ("<guide>" in full_content):
guide = full_content.strip()
return {
"commands": commands,
"guide": guide,
"risk_level": risk_level,
"error": None
}
except asyncio.CancelledError:
# Client cancelled the request via gRPC or local interrupt
if 'response' in locals():
try:
if hasattr(response, 'aclose'):
# Fire and forget the close to avoid blocking the cancel
asyncio.create_task(response.aclose())
elif hasattr(response, 'close'):
response.close()
except Exception:
pass
return None
except Exception as e:
return {
"commands": [],
"guide": "",
"risk_level": "low",
"error": str(e)
}
@MethodHook
def confirm(self, user_input): return True
+133 -34
View File
@@ -18,7 +18,31 @@ import asyncio
import fcntl
from . import printer
from .tunnels import LocalStream
from contextlib import contextmanager
@contextmanager
def copilot_terminal_mode():
import sys, tty, termios
fd = sys.stdin.fileno()
try:
old_settings = termios.tcgetattr(fd)
# Primero pasamos a raw mode absoluto para matar ISIG, ICANON, ECHO, etc.
tty.setraw(fd)
# Luego rehabilitamos OPOST para que rich.Live se dibuje correctamente
new_settings = termios.tcgetattr(fd)
new_settings[1] = new_settings[1] | termios.OPOST
termios.tcsetattr(fd, termios.TCSANOW, new_settings)
yield
except Exception:
yield
finally:
try:
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
except Exception:
pass
#functions and classes
@ClassHook
@@ -393,7 +417,8 @@ class node:
child_reader_queue = asyncio.Queue()
# Track command byte positions for copilot context navigation
cmd_byte_positions = [0]
# Each entry is (byte_position, command_text_or_None)
cmd_byte_positions = [(0, None)]
def _child_read_ready():
try:
@@ -440,7 +465,7 @@ class node:
if clean_data:
# Track command boundaries when user hits Enter
if hasattr(self, 'mylog') and (b'\r' in clean_data or b'\n' in clean_data):
cmd_byte_positions.append(self.mylog.tell())
cmd_byte_positions.append((self.mylog.tell(), None))
try:
os.write(child_fd, clean_data)
@@ -613,19 +638,16 @@ class node:
import asyncio
import os
import sys
import fcntl
flags = 0
stdin_fd = sys.stdin.fileno()
try:
# Disable LocalStream reader so it doesn't steal keystrokes from Prompt
loop = asyncio.get_running_loop()
loop.remove_reader(sys.stdin.fileno())
# Override SIGINT so asyncio doesn't kill the event loop when we press Ctrl+C
import signal
orig_sigint = signal.getsignal(signal.SIGINT)
def custom_sigint(sig, frame):
raise KeyboardInterrupt()
signal.signal(signal.SIGINT, custom_sigint)
# 1. Salir de raw mode para poder usar input() y rich
stdin_fd = sys.stdin.fileno()
@@ -635,6 +657,9 @@ class node:
import copy
new_settings = copy.deepcopy(original_settings)
new_settings[3] = new_settings[3] & ~termios.ECHOCTL
# CRITICAL: Prevent OS from translating Ctrl+C into SIGINT
# This prevents the asyncio event loop from crashing when user hits Ctrl+C
new_settings[3] = new_settings[3] & ~termios.ISIG
termios.tcsetattr(stdin_fd, termios.TCSADRAIN, new_settings)
# Remove O_NONBLOCK from stdin so Prompt.ask() works
@@ -688,18 +713,32 @@ class node:
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
for i in range(1, len(cmd_byte_positions)):
chunk = raw_bytes[cmd_byte_positions[i-1]:cmd_byte_positions[i]]
cleaned = self._logclean(chunk.decode(errors='replace'), var=True)
lines = [l for l in cleaned.split('\n') if l.strip()]
preview = lines[-1].strip() if lines else ""
pos, known_cmd = cmd_byte_positions[i]
prev_pos = cmd_byte_positions[i-1][0]
if preview:
match = prompt_re.search(preview)
if match:
cmd_text = preview[match.end():].strip()
# Only add if there is actual text typed (filters out empty enters and paginations)
if cmd_text:
blocks.append((cmd_byte_positions[i], preview[:80]))
if known_cmd:
# AI-injected command: we already know the command text
# Build preview from prompt (last line of previous chunk) + command
prev_chunk = raw_bytes[prev_pos:pos]
prev_cleaned = self._logclean(prev_chunk.decode(errors='replace'), var=True)
prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
prompt_text = prev_lines[-1].strip() if prev_lines else ""
preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
blocks.append((pos, preview[:80]))
else:
# User-typed command: derive from raw log chunk
chunk = raw_bytes[prev_pos:pos]
cleaned = self._logclean(chunk.decode(errors='replace'), var=True)
lines = [l for l in cleaned.split('\n') if l.strip()]
preview = lines[-1].strip() if lines else ""
if preview:
match = prompt_re.search(preview)
if match:
cmd_text = preview[match.end():].strip()
# Only add if there is actual text typed (filters out empty enters and paginations)
if cmd_text:
blocks.append((pos, preview[:80]))
# Add synthetic "current prompt" block (zero context)
last_line = buffer.split('\n')[-1].strip() if buffer.strip() else "(prompt)"
@@ -787,12 +826,23 @@ class node:
_, (_, preview) = get_current_block()
return HTML(f"<ansigray>\u25b6 {preview} [Tab: {mode_label}]</ansigray>")
session = PromptSession(history=copilot_history)
question = await session.prompt_async(
get_prompt_text,
key_bindings=bindings,
bottom_toolbar=get_toolbar
)
import threading
def preload_ai_deps():
try:
import litellm
except Exception:
pass
threading.Thread(target=preload_ai_deps, daemon=True).start()
try:
session = PromptSession(history=copilot_history)
question = await session.prompt_async(
get_prompt_text,
key_bindings=bindings,
bottom_toolbar=get_toolbar
)
except KeyboardInterrupt:
question = ""
if cancelled[0] or not question.strip() or question.strip() == "CANCEL":
console.print("\n[dim]Copilot cancelled.[/dim]")
@@ -832,8 +882,48 @@ class node:
except Exception:
live.update(Panel(Markdown(live_text), title="[bold cyan]Copilot Guide[/bold cyan]", border_style="cyan"))
with Live(panel, console=console, refresh_per_second=10) as live:
result = await asyncio.to_thread(service.ask_copilot, active_buffer, enriched_question, node_info, chunk_callback=on_chunk)
with copilot_terminal_mode(), Live(panel, console=console, refresh_per_second=10) as live:
# Launch the AI call as a task
ai_task = asyncio.create_task(service.aask_copilot(active_buffer, enriched_question, node_info, chunk_callback=on_chunk))
# Make stdin non-blocking
import fcntl
flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
cancelled = False
result = None
try:
while not ai_task.done():
try:
key = os.read(sys.stdin.fileno(), 1024)
if b'\x03' in key:
cancelled = True
ai_task.cancel()
console.print("\n[dim]Copilot cancelled via Ctrl+C.[/dim]")
break
except OSError:
pass
# Yield to event loop to allow AI task to progress
await asyncio.sleep(0.05)
if not cancelled:
result = ai_task.result()
except asyncio.CancelledError:
cancelled = True
console.print("\n[dim]Copilot cancelled.[/dim]")
except KeyboardInterrupt:
cancelled = True
ai_task.cancel()
console.print("\n[dim]Copilot cancelled via Ctrl+C.[/dim]")
finally:
# Restore stdin flags
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
if cancelled or not result:
os.write(child_fd, b'\x15\r')
return
if result.get("error"):
console.print(f"[red]Error: {result['error']}[/red]")
@@ -868,10 +958,13 @@ class node:
event.app.exit(result='n')
pt_color = "ansi" + risk_style
action = await confirm_session.prompt_async(
HTML(f"<{pt_color}>Send commands? (y/n/e/number/range) [n]: </{pt_color}>"),
key_bindings=confirm_bindings
)
try:
action = await confirm_session.prompt_async(
HTML(f"<{pt_color}>Send commands? (y/n/e/number/range) [n]: </{pt_color}>"),
key_bindings=confirm_bindings
)
except KeyboardInterrupt:
action = "n"
if not action.strip():
action = "n"
@@ -883,6 +976,8 @@ class node:
os.write(child_fd, b'\x15') # Ctrl+U to clear line
await asyncio.sleep(0.1)
for cmd in commands:
if cmd_byte_positions is not None and hasattr(self, 'mylog'):
cmd_byte_positions.append((self.mylog.tell(), cmd))
os.write(child_fd, (cmd + "\n").encode())
await asyncio.sleep(0.3)
elif action_l.startswith('e'):
@@ -910,6 +1005,8 @@ class node:
await asyncio.sleep(0.1)
for cmd in edited_cmd.split('\n'):
if cmd.strip():
if cmd_byte_positions is not None and hasattr(self, 'mylog'):
cmd_byte_positions.append((self.mylog.tell(), cmd.strip()))
os.write(child_fd, (cmd.strip() + "\n").encode())
await asyncio.sleep(0.3)
else:
@@ -938,9 +1035,13 @@ class node:
os.write(child_fd, b'\x15') # Ctrl+U to clear line
await asyncio.sleep(0.1)
if len(valid_indices) == 1:
if cmd_byte_positions is not None and hasattr(self, 'mylog'):
cmd_byte_positions.append((self.mylog.tell(), commands[valid_indices[0]]))
os.write(child_fd, (commands[valid_indices[0]] + "\n").encode())
else:
for idx in valid_indices:
if cmd_byte_positions is not None and hasattr(self, 'mylog'):
cmd_byte_positions.append((self.mylog.tell(), commands[idx]))
os.write(child_fd, (commands[idx] + "\n").encode())
await asyncio.sleep(0.3)
else:
@@ -966,8 +1067,6 @@ class node:
# 6. Restaurar raw mode, O_NONBLOCK y SIGINT
tty.setraw(stdin_fd)
fcntl.fcntl(stdin_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
if 'orig_sigint' in locals():
signal.signal(signal.SIGINT, orig_sigint)
# Re-enable LocalStream reader
try:
File diff suppressed because one or more lines are too long
+1 -1
View File
@@ -3,7 +3,7 @@
import grpc
import warnings
from . import connpy_pb2 as connpy__pb2
import connpy_pb2 as connpy__pb2
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
GRPC_GENERATED_VERSION = '1.80.0'
+35 -2
View File
@@ -217,6 +217,14 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
))
# 2. Await the question from client via the copilot_queue
import threading
def preload_ai_deps():
try:
import litellm
except Exception:
pass
threading.Thread(target=preload_ai_deps, daemon=True).start()
try:
req_data = await asyncio.wait_for(remote_stream.copilot_queue.get(), timeout=120)
if "question" not in req_data or not req_data["question"] or req_data["question"] == "CANCEL":
@@ -240,7 +248,27 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
copilot_stream_chunk=chunk_text
))
result = await asyncio.to_thread(service.ask_copilot, context_buffer, question, node_info, chunk_callback=chunk_callback)
ai_task = asyncio.create_task(service.aask_copilot(context_buffer, question, node_info, chunk_callback=chunk_callback))
wait_action_task = asyncio.create_task(remote_stream.copilot_queue.get())
done, pending = await asyncio.wait(
[ai_task, wait_action_task],
return_when=asyncio.FIRST_COMPLETED
)
if wait_action_task in done:
req_data = wait_action_task.result()
ai_task.cancel()
if req_data.get("question") == "CANCEL" or req_data.get("action") == "cancel":
os.write(child_fd, b'\x15\r')
return
return
else:
wait_action_task.cancel()
result = ai_task.result()
if not result:
os.write(child_fd, b'\x15\r')
return
# 4. Send response back to client
response_queue.put(connpy_pb2.InteractResponse(
@@ -250,10 +278,12 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
# 5. Wait for user action
try:
action_data = await asyncio.wait_for(remote_stream.copilot_queue.get(), timeout=60)
if "action" not in action_data or not action_data["action"]:
if "action" not in action_data or not action_data["action"] or action_data["action"] == "cancel":
os.write(child_fd, b'\x15\r')
return
action = action_data["action"]
except asyncio.TimeoutError:
os.write(child_fd, b'\x15\r')
return
if action == "send_all":
@@ -262,6 +292,7 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
await asyncio.sleep(0.1)
for cmd in commands:
os.write(child_fd, (cmd + "\n").encode())
response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=cmd))
await asyncio.sleep(0.3)
elif action.startswith("custom:"):
custom_cmds = action[7:]
@@ -270,6 +301,7 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
for cmd in custom_cmds.split('\n'):
if cmd.strip():
os.write(child_fd, (cmd.strip() + "\n").encode())
response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=cmd.strip()))
await asyncio.sleep(0.3)
elif action not in ('cancel', 'n', 'no'):
# Handle numbers and ranges like "1,2,4-6"
@@ -294,6 +326,7 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
await asyncio.sleep(0.1)
for idx in valid_indices:
os.write(child_fd, (commands[idx] + "\n").encode())
response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=commands[idx]))
await asyncio.sleep(0.3)
else:
os.write(child_fd, b'\x15\r')
+166 -66
View File
@@ -53,7 +53,7 @@ class NodeStub:
request_queue = queue.Queue()
client_buffer_bytes = bytearray()
cmd_byte_positions = [0]
cmd_byte_positions = [(0, None)]
pause_stdin = [False]
wake_r, wake_w = os.pipe()
@@ -101,7 +101,7 @@ class NodeStub:
if not data:
break
if b'\r' in data or b'\n' in data:
cmd_byte_positions.append(len(client_buffer_bytes))
cmd_byte_positions.append((len(client_buffer_bytes), None))
yield connpy_pb2.InteractRequest(stdin_data=data)
except OSError:
break
@@ -123,9 +123,26 @@ class NodeStub:
tty.setraw(sys.stdin.fileno())
response_iterator = self.stub.interact_node(request_generator())
import queue
response_queue = queue.Queue()
def response_consumer():
try:
for r in response_iterator:
response_queue.put(r)
except Exception:
pass
response_queue.put(None)
t_consumer = threading.Thread(target=response_consumer, daemon=True)
t_consumer.start()
# First phase: Wait for connection status, print early data
try:
for res in response_iterator:
while True:
res = response_queue.get()
if res is None:
return
if res.stdout_data:
data = res.stdout_data
if debug:
@@ -145,13 +162,16 @@ class NodeStub:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
printer.error(f"Connection failed: {res.error_message}")
return
except StopIteration:
except queue.Empty:
return
# Second phase: Stream active session
# Clear screen filter is only applied before success (Phase 1).
# Once the user has a prompt, Ctrl+L must work normally.
for res in response_iterator:
while True:
res = response_queue.get()
if res is None:
break
if res.copilot_prompt:
pause_generator()
import json
@@ -165,6 +185,7 @@ class NodeStub:
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from ..printer import connpy_theme
from ..core import copilot_terminal_mode
if not hasattr(self, 'copilot_history'):
self.copilot_history = InMemoryHistory()
@@ -200,17 +221,30 @@ class NodeStub:
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
for i in range(1, len(cmd_byte_positions)):
chunk = raw_bytes[cmd_byte_positions[i-1]:cmd_byte_positions[i]]
cleaned = dummy_node._logclean(chunk.decode(errors='replace'), var=True)
lines = [l for l in cleaned.split('\n') if l.strip()]
preview = lines[-1].strip() if lines else ""
pos, known_cmd = cmd_byte_positions[i]
prev_pos = cmd_byte_positions[i-1][0]
if preview:
match = prompt_re.search(preview)
if match:
cmd_text = preview[match.end():].strip()
if cmd_text:
blocks.append((cmd_byte_positions[i], preview[:80]))
if known_cmd:
# AI-injected command: we already know the command text
prev_chunk = raw_bytes[prev_pos:pos]
prev_cleaned = dummy_node._logclean(prev_chunk.decode(errors='replace'), var=True)
prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
prompt_text = prev_lines[-1].strip() if prev_lines else ""
preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
blocks.append((pos, preview[:80]))
else:
# User-typed command: derive from raw log chunk
chunk = raw_bytes[prev_pos:pos]
cleaned = dummy_node._logclean(chunk.decode(errors='replace'), var=True)
lines = [l for l in cleaned.split('\n') if l.strip()]
preview = lines[-1].strip() if lines else ""
if preview:
match = prompt_re.search(preview)
if match:
cmd_text = preview[match.end():].strip()
if cmd_text:
blocks.append((pos, preview[:80]))
clean_buffer = dummy_node._logclean(raw_bytes.decode(errors='replace'), var=True)
last_line = clean_buffer.split('\n')[-1].strip() if clean_buffer.strip() else "(prompt)"
@@ -303,19 +337,11 @@ class NodeStub:
except KeyboardInterrupt:
question = ""
# Switch back to raw mode immediately so Ctrl+C during streaming doesn't break gRPC
tty.setraw(sys.stdin.fileno())
# IMPORTANT: Enable OPOST so rich.Live renders correctly (translates \n to \r\n).
# Without this, the UI repeats the panel multiple times in raw mode.
mode = termios.tcgetattr(sys.stdin.fileno())
mode[1] = mode[1] | termios.OPOST
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, mode)
if not question or not question.strip() or question.strip() == "CANCEL":
console.print("\n[dim]Copilot cancelled.[/dim]")
request_queue.put(connpy_pb2.InteractRequest(copilot_question="CANCEL"))
resume_generator()
tty.setraw(sys.stdin.fileno())
continue
active_buffer = get_active_buffer()
@@ -327,9 +353,30 @@ class NodeStub:
result = {}
cancelled = False
try:
with Live(panel, console=console, refresh_per_second=10) as live:
for chunk_res in response_iterator:
with copilot_terminal_mode(), Live(panel, console=console, refresh_per_second=10) as live:
# Make stdin non-blocking to check for Ctrl+C locally
import fcntl
flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
while True:
# 1. Read input for Ctrl+C
try:
key = os.read(sys.stdin.fileno(), 1024)
if b'\x03' in key:
cancelled = True
request_queue.put(connpy_pb2.InteractRequest(copilot_question="CANCEL"))
console.print("\n[dim]Copilot cancelled via Ctrl+C. Disconnecting...[/dim]")
break
except OSError:
pass
# 2. Wait for response chunk
try:
chunk_res = response_queue.get(timeout=0.1)
if chunk_res is None:
break
if chunk_res.copilot_stream_chunk:
if live_text == "Thinking...": live_text = ""
live_text += chunk_res.copilot_stream_chunk
@@ -337,13 +384,16 @@ class NodeStub:
elif chunk_res.copilot_response_json:
result = json.loads(chunk_res.copilot_response_json)
break
except KeyboardInterrupt:
cancelled = True
console.print("\n[dim]Copilot cancelled via Ctrl+C. Disconnecting...[/dim]")
break
except queue.Empty:
continue
# Restore blocking mode
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
if cancelled:
break
resume_generator()
tty.setraw(sys.stdin.fileno())
continue
if result.get("error"):
console.print(f"[red]Error: {result['error']}[/red]")
@@ -424,6 +474,9 @@ class NodeStub:
tty.setraw(sys.stdin.fileno())
continue
if res.copilot_injected_command:
cmd_byte_positions.append((len(client_buffer_bytes), res.copilot_injected_command))
if res.stdout_data:
os.write(sys.stdout.fileno(), res.stdout_data)
client_buffer_bytes.extend(res.stdout_data)
@@ -445,7 +498,7 @@ class NodeStub:
params_json = json.dumps(connection_params)
request_queue = queue.Queue()
client_buffer_bytes = bytearray()
cmd_byte_positions = [0]
cmd_byte_positions = [(0, None)]
pause_stdin = [False]
wake_r, wake_w = os.pipe()
@@ -494,7 +547,7 @@ class NodeStub:
if not data:
break
if b'\r' in data or b'\n' in data:
cmd_byte_positions.append(len(client_buffer_bytes))
cmd_byte_positions.append((len(client_buffer_bytes), None))
yield connpy_pb2.InteractRequest(stdin_data=data)
except OSError:
break
@@ -517,9 +570,26 @@ class NodeStub:
tty.setraw(sys.stdin.fileno())
response_iterator = self.stub.interact_node(request_generator())
import queue
response_queue = queue.Queue()
def response_consumer():
try:
for r in response_iterator:
response_queue.put(r)
except Exception:
pass
response_queue.put(None)
t_consumer = threading.Thread(target=response_consumer, daemon=True)
t_consumer.start()
# First phase: Wait for connection status, print early data
try:
for res in response_iterator:
while True:
res = response_queue.get()
if res is None:
return
if res.stdout_data:
data = res.stdout_data
if debug:
@@ -539,13 +609,14 @@ class NodeStub:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
printer.error(f"Connection failed: {res.error_message}")
return
except StopIteration:
except queue.Empty:
return
# Second phase: Stream active session
# Clear screen filter is only applied before success (Phase 1).
# Once the user has a prompt, Ctrl+L must work normally.
for res in response_iterator:
while True:
res = response_queue.get()
if res is None:
break
if res.copilot_prompt:
pause_generator()
import json
@@ -559,6 +630,7 @@ class NodeStub:
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from ..printer import connpy_theme
from ..core import copilot_terminal_mode
if not hasattr(self, 'copilot_history'):
self.copilot_history = InMemoryHistory()
@@ -594,17 +666,30 @@ class NodeStub:
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
for i in range(1, len(cmd_byte_positions)):
chunk = raw_bytes[cmd_byte_positions[i-1]:cmd_byte_positions[i]]
cleaned = dummy_node._logclean(chunk.decode(errors='replace'), var=True)
lines = [l for l in cleaned.split('\n') if l.strip()]
preview = lines[-1].strip() if lines else ""
pos, known_cmd = cmd_byte_positions[i]
prev_pos = cmd_byte_positions[i-1][0]
if preview:
match = prompt_re.search(preview)
if match:
cmd_text = preview[match.end():].strip()
if cmd_text:
blocks.append((cmd_byte_positions[i], preview[:80]))
if known_cmd:
# AI-injected command: we already know the command text
prev_chunk = raw_bytes[prev_pos:pos]
prev_cleaned = dummy_node._logclean(prev_chunk.decode(errors='replace'), var=True)
prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
prompt_text = prev_lines[-1].strip() if prev_lines else ""
preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
blocks.append((pos, preview[:80]))
else:
# User-typed command: derive from raw log chunk
chunk = raw_bytes[prev_pos:pos]
cleaned = dummy_node._logclean(chunk.decode(errors='replace'), var=True)
lines = [l for l in cleaned.split('\n') if l.strip()]
preview = lines[-1].strip() if lines else ""
if preview:
match = prompt_re.search(preview)
if match:
cmd_text = preview[match.end():].strip()
if cmd_text:
blocks.append((pos, preview[:80]))
clean_buffer = dummy_node._logclean(raw_bytes.decode(errors='replace'), var=True)
last_line = clean_buffer.split('\n')[-1].strip() if clean_buffer.strip() else "(prompt)"
@@ -697,19 +782,11 @@ class NodeStub:
except KeyboardInterrupt:
question = ""
# Switch back to raw mode immediately so Ctrl+C during streaming doesn't break gRPC
tty.setraw(sys.stdin.fileno())
# IMPORTANT: Enable OPOST so rich.Live renders correctly (translates \n to \r\n).
# Without this, the UI repeats the panel multiple times in raw mode.
mode = termios.tcgetattr(sys.stdin.fileno())
mode[1] = mode[1] | termios.OPOST
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, mode)
if not question or not question.strip() or question.strip() == "CANCEL":
console.print("\n[dim]Copilot cancelled.[/dim]")
request_queue.put(connpy_pb2.InteractRequest(copilot_question="CANCEL"))
resume_generator()
tty.setraw(sys.stdin.fileno())
continue
active_buffer = get_active_buffer()
@@ -721,9 +798,27 @@ class NodeStub:
result = {}
cancelled = False
try:
with Live(panel, console=console, refresh_per_second=10) as live:
for chunk_res in response_iterator:
with copilot_terminal_mode(), Live(panel, console=console, refresh_per_second=10) as live:
import fcntl
flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
while True:
try:
key = os.read(sys.stdin.fileno(), 1024)
if b'\x03' in key:
cancelled = True
request_queue.put(connpy_pb2.InteractRequest(copilot_question="CANCEL"))
console.print("\n[dim]Copilot cancelled via Ctrl+C. Disconnecting...[/dim]")
break
except OSError:
pass
try:
chunk_res = response_queue.get(timeout=0.1)
if chunk_res is None:
break
if chunk_res.copilot_stream_chunk:
if live_text == "Thinking...": live_text = ""
live_text += chunk_res.copilot_stream_chunk
@@ -731,13 +826,15 @@ class NodeStub:
elif chunk_res.copilot_response_json:
result = json.loads(chunk_res.copilot_response_json)
break
except KeyboardInterrupt:
cancelled = True
console.print("\n[dim]Copilot cancelled via Ctrl+C. Disconnecting...[/dim]")
break
except queue.Empty:
continue
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
if cancelled:
break
resume_generator()
tty.setraw(sys.stdin.fileno())
continue
if result.get("error"):
console.print(f"[red]Error: {result['error']}[/red]")
@@ -818,6 +915,9 @@ class NodeStub:
tty.setraw(sys.stdin.fileno())
continue
if res.copilot_injected_command:
cmd_byte_positions.append((len(client_buffer_bytes), res.copilot_injected_command))
if res.stdout_data:
os.write(sys.stdout.fileno(), res.stdout_data)
client_buffer_bytes.extend(res.stdout_data)
+1
View File
@@ -106,6 +106,7 @@ message InteractResponse {
string copilot_response_json = 6;
string copilot_node_info_json = 7;
string copilot_stream_chunk = 8;
string copilot_injected_command = 9;
}
message FilterRequest {
+6
View File
@@ -23,6 +23,12 @@ class AIService(BaseService):
agent = ai(self.config)
return agent.ask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
"""Ask the AI copilot for terminal assistance asynchronously."""
from connpy.ai import ai
agent = ai(self.config)
return await agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)
def list_sessions(self):
"""Return a list of all saved AI sessions."""