diff --git a/connpy/core.py b/connpy/core.py
index ac9be84..6a850c3 100755
--- a/connpy/core.py
+++ b/connpy/core.py
@@ -392,6 +392,9 @@ class node:
loop = asyncio.get_running_loop()
child_reader_queue = asyncio.Queue()
+ # Track command byte positions for copilot context navigation
+ cmd_byte_positions = [0]
+
def _child_read_ready():
try:
data = os.read(child_fd, 4096)
@@ -428,12 +431,16 @@ class node:
node_info["os"] = self.tags.get("os", "unknown")
# Invoke copilot (async callback handles UI)
- await copilot_handler(buffer, node_info, local_stream, child_fd)
+ await copilot_handler(buffer, node_info, local_stream, child_fd, cmd_byte_positions)
continue
# Remove any stray \x00 bytes and forward normally
clean_data = data.replace(b'\x00', b'')
if clean_data:
+ # Track command boundaries when user hits Enter
+ if hasattr(self, 'mylog') and (b'\r' in clean_data or b'\n' in clean_data):
+ cmd_byte_positions.append(self.mylog.tell())
+
try:
os.write(child_fd, clean_data)
except OSError:
@@ -551,7 +558,7 @@ class node:
from prompt_toolkit.history import InMemoryHistory
copilot_history = InMemoryHistory()
- async def handler(buffer, node_info, stream, child_fd):
+ async def handler(buffer, node_info, stream, child_fd, cmd_byte_positions=None):
import termios, tty
import asyncio
import os
@@ -601,13 +608,11 @@ class node:
console.print(Panel(
"[bold cyan]AI Terminal Copilot[/bold cyan]\n"
"[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel.\n"
- "Ctrl+\u2191/\u2193 to adjust context lines. \u2191\u2193 for question history.[/dim]",
+ "Tab to change context mode. Ctrl+\u2191/\u2193 to adjust context. \u2191\u2193 for question history.[/dim]",
border_style="cyan"
))
# 2. Capturar pregunta del usuario
- total_lines = len(buffer.split('\n'))
- context_lines = [min(50, total_lines)]
cancelled = [False]
from prompt_toolkit import PromptSession
@@ -616,40 +621,135 @@ class node:
bindings = KeyBindings()
+ # Command blocks logic
+ raw_bytes = self.mylog.getvalue() if hasattr(self, 'mylog') else b''
+ blocks = []
+
+ if cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes:
+ import re
+ # Extract the prompt regex for validation
+ default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
+ device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
+ # Remove unescaped $ end-anchors so we can match the prompt within the line
+ prompt_re_str = re.sub(r'(?= total_lines:
- context_lines[0] = min(50, total_lines)
+ if context_mode[0] == MODE_LINES:
+ if context_lines[0] >= total_lines:
+ context_lines[0] = min(50, total_lines)
+ else:
+ context_lines[0] = min(context_lines[0] + 50, total_lines)
else:
- context_lines[0] = min(context_lines[0] + 50, total_lines)
+ if context_cmd[0] < total_cmds:
+ context_cmd[0] += 1
+ else:
+ context_cmd[0] = 1
event.app.invalidate()
@bindings.add('c-down')
def _(event):
- if context_lines[0] <= min(50, total_lines):
- context_lines[0] = total_lines
+ if context_mode[0] == MODE_LINES:
+ if context_lines[0] <= min(50, total_lines):
+ context_lines[0] = total_lines
+ else:
+ context_lines[0] = max(context_lines[0] - 50, min(50, total_lines))
else:
- context_lines[0] = max(context_lines[0] - 50, min(50, total_lines))
+ if context_cmd[0] > 1:
+ context_cmd[0] -= 1
+ else:
+ context_cmd[0] = total_cmds
event.app.invalidate()
+ @bindings.add('tab')
+ def _(event):
+ context_mode[0] = (context_mode[0] + 1) % 3
+ event.app.invalidate()
+
@bindings.add('escape')
def _(event):
cancelled[0] = True
event.app.exit(result='')
+
+ def get_current_block():
+ idx = max(0, total_cmds - context_cmd[0])
+ return idx, blocks[idx]
+ def get_active_buffer():
+ """Build the active buffer for the current selection mode."""
+ if context_mode[0] == MODE_LINES:
+ buffer_lines = buffer.split('\n')
+ return '\n'.join(buffer_lines[-context_lines[0]:])
+
+ idx, (start, preview) = get_current_block()
+ if context_mode[0] == MODE_SINGLE and idx + 1 < total_cmds:
+ end = blocks[idx + 1][0]
+ active_raw = raw_bytes[start:end]
+ else:
+ active_raw = raw_bytes[start:]
+ return preview + "\n" + self._logclean(active_raw.decode(errors='replace'), var=True)
+
def get_prompt_text():
- return HTML(f"Ask [Ctx: {context_lines[0]}/{total_lines}L]: ")
-
+ if context_mode[0] == MODE_LINES:
+ return HTML(f"Ask [Ctx: {context_lines[0]}/{total_lines}L]: ")
+
+ lines_count = len(get_active_buffer().split('\n'))
+ if context_mode[0] == MODE_SINGLE:
+ return HTML(f"Ask [Cmd {context_cmd[0]} ~{lines_count}L]: ")
+ else:
+ return HTML(f"Ask [Cmd {context_cmd[0]}\u2192END ~{lines_count}L]: ")
+
+ def get_toolbar():
+ mode_labels = {MODE_RANGE: "RANGE", MODE_SINGLE: "SINGLE", MODE_LINES: "LINES"}
+ mode_label = mode_labels[context_mode[0]]
+ if context_mode[0] == MODE_LINES:
+ return HTML(f"\u25b6 Ctrl+\u2191/\u2193 adjusts by 50 lines [Tab: {mode_label}]")
+ _, (_, preview) = get_current_block()
+ return HTML(f"\u25b6 {preview} [Tab: {mode_label}]")
+
session = PromptSession(history=copilot_history)
- question = await session.prompt_async(get_prompt_text, key_bindings=bindings)
+ question = await session.prompt_async(
+ get_prompt_text,
+ key_bindings=bindings,
+ bottom_toolbar=get_toolbar
+ )
+
if cancelled[0] or not question.strip():
console.print("\n[dim]Copilot cancelled.[/dim]")
os.write(child_fd, b'\x15\r')
return
-
- # Slice the buffer dynamically based on selected context
- buffer_lines = buffer.split('\n')
- active_buffer = '\n'.join(buffer_lines[-context_lines[0]:])
+
+ active_buffer = get_active_buffer()
# 3. Llamar al AI con spinner
from .services.ai_service import AIService