Compare commits

..

2 Commits

Author SHA1 Message Date
fluzzi32 f6078b6fde update version 2026-05-05 18:25:33 -03:00
fluzzi32 37db74f47d refactor(core): stabilize gRPC streaming, plugin invocation, and CLI UX
- Implement threaded plugin execution with Queue-based streaming in PluginService
- Refactor remote logger to preserve ANSI colors and fix TTY line endings (\r\n)
- Intelligent terminal filtering: disable SSM screen-clearing filter after success
- Sanitize SSH-only flags in core.py when using SFTP protocol
- Rewrite completion tree with pre/post-node states and flag deduplication
- Update gRPC unit tests to match new streaming response structure
2026-05-05 18:24:31 -03:00
7 changed files with 160 additions and 50 deletions
+1 -1
View File
@@ -1 +1 @@
__version__ = "6.0.0b5" __version__ = "6.0.0b6"
+28
View File
@@ -184,9 +184,37 @@ def _build_tree(nodes, folders, profiles, plugins, configdir):
"folders": None, "folders": None,
} }
# --- Connect (default command) ---
# Long flags are offered; short forms (-d/-t) only used for navigation.
# Two states: before node (offer nodes + remaining long flags)
# after node (offer only remaining long flags, no more nodes)
connect_flags_long = ["--debug", "--sftp"]
connect_flags_all = ["--debug", "-d", "--sftp", "-t"]
# Post-node: only offer remaining long flags
connect_after_node = {"__exclude_used__": True}
for f in connect_flags_all:
connect_after_node[f] = connect_after_node
# Pre-node: offer nodes + remaining long flags, consume node → post-node state
connect_dict = {"__exclude_used__": True}
connect_dict["__extra__"] = lambda w: (
list(nodes) + list(folders) + (list(plugins.keys()) if plugins else [])
)
connect_dict["*"] = connect_after_node
for f in connect_flags_all:
connect_dict[f] = connect_dict
# --- Main Tree --- # --- Main Tree ---
return { return {
# Root: offer nodes + long flags; after a node go to post-node state
"__extra__": lambda w: list(nodes) + list(folders) + (list(plugins.keys()) if plugins else []), "__extra__": lambda w: list(nodes) + list(folders) + (list(plugins.keys()) if plugins else []),
"*": connect_after_node,
"--debug": connect_dict,
"-d": connect_dict,
"--sftp": connect_dict,
"-t": connect_dict,
"--add": {"profile": _profile_values}, "--add": {"profile": _profile_values},
"--del": {"profile": _profile_values, "__extra__": _nodes_folders}, "--del": {"profile": _profile_values, "__extra__": _nodes_folders},
+9 -2
View File
@@ -348,7 +348,8 @@ class node:
x.start() x.start()
if debug: if debug:
if 'mylog' in dir(self): if 'mylog' in dir(self):
print(self.mylog.getvalue().decode()) if not async_mode:
print(self.mylog.getvalue().decode())
def _teardown_interact_environment(self): def _teardown_interact_environment(self):
if 'logfile' in dir(self) and hasattr(self, 'mylog'): if 'logfile' in dir(self) and hasattr(self, 'mylog'):
@@ -760,7 +761,12 @@ class node:
elif self.protocol == "sftp": elif self.protocol == "sftp":
cmd += " -P " + self.port cmd += " -P " + self.port
if self.options: if self.options:
cmd += " " + self.options opts = self.options
if self.protocol == "sftp":
# Strip SSH-only flags that sftp doesn't support
opts = re.sub(r'(?<!\S)-[XxtTAaNf]\b', '', opts).strip()
if opts:
cmd += " " + opts
if self.jumphost: if self.jumphost:
cmd += " " + self.jumphost cmd += " " + self.jumphost
user_host = f"{self.user}@{self.host}" if self.user else self.host user_host = f"{self.user}@{self.host}" if self.user else self.host
@@ -875,6 +881,7 @@ class node:
if logger: if logger:
logger("debug", f"Command:\n{cmd}") logger("debug", f"Command:\n{cmd}")
self.mylog = io.BytesIO() self.mylog = io.BytesIO()
self.mylog.write(f"[i] [DEBUG] Command:\r\n {cmd}\r\n".encode())
child.logfile_read = self.mylog child.logfile_read = self.mylog
+33 -1
View File
@@ -139,7 +139,39 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
if sftp: if sftp:
n.protocol = "sftp" n.protocol = "sftp"
connect = n._connect(debug=debug) # Build a logger that captures debug messages as ANSI-colored bytes for the client
debug_chunks = []
if debug:
from io import StringIO
from rich.console import Console as RichConsole
from ..printer import connpy_theme
from .. import printer as _printer
def remote_logger(msg_type, message):
buf = StringIO()
c = RichConsole(file=buf, force_terminal=True, width=120, theme=connpy_theme)
if msg_type == "debug":
c.print(_printer._format_multiline("i", f"[DEBUG] {message}", style="info"))
elif msg_type == "success":
c.print(_printer._format_multiline("", message, style="success"))
elif msg_type == "error":
c.print(_printer._format_multiline("", message, style="error"))
else:
c.print(str(message))
rendered = buf.getvalue()
if rendered:
# Raw TTY needs \r\n instead of \n
rendered = rendered.replace('\n', '\r\n')
debug_chunks.append(rendered.encode())
else:
remote_logger = None
connect = n._connect(debug=debug, logger=remote_logger)
# Send debug output to client before checking result (always show the command)
for chunk in debug_chunks:
yield connpy_pb2.InteractResponse(stdout_data=chunk)
if connect != True: if connect != True:
yield connpy_pb2.InteractResponse(success=False, error_message=str(connect)) yield connpy_pb2.InteractResponse(success=False, error_message=str(connect))
return return
+48 -24
View File
@@ -86,25 +86,37 @@ class NodeStub:
old_tty = termios.tcgetattr(sys.stdin) old_tty = termios.tcgetattr(sys.stdin)
try: try:
import time
tty.setraw(sys.stdin.fileno()) tty.setraw(sys.stdin.fileno())
response_iterator = self.stub.interact_node(request_generator()) response_iterator = self.stub.interact_node(request_generator())
# First response is connection status # First phase: Wait for connection status, print early data
try: try:
first_res = next(response_iterator) for res in response_iterator:
if first_res.success: if res.stdout_data:
# Connection established on server, show success message data = res.stdout_data
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) if debug:
printer.success(conn_msg) data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'')
tty.setraw(sys.stdin.fileno()) os.write(sys.stdout.fileno(), data)
else:
# Connection failed on server if res.success:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) # Connection established on server, show success message
printer.error(f"Connection failed: {first_res.error_message}") termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
return printer.success(conn_msg)
tty.setraw(sys.stdin.fileno())
break
if res.error_message:
# Connection failed on server
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
printer.error(f"Connection failed: {res.error_message}")
return
except StopIteration: except StopIteration:
return return
# Second phase: Stream active session
# Clear screen filter is only applied before success (Phase 1).
# Once the user has a prompt, Ctrl+L must work normally.
for res in response_iterator: for res in response_iterator:
if res.stdout_data: if res.stdout_data:
os.write(sys.stdout.fileno(), res.stdout_data) os.write(sys.stdout.fileno(), res.stdout_data)
@@ -160,25 +172,37 @@ class NodeStub:
old_tty = termios.tcgetattr(sys.stdin) old_tty = termios.tcgetattr(sys.stdin)
try: try:
import time
tty.setraw(sys.stdin.fileno()) tty.setraw(sys.stdin.fileno())
response_iterator = self.stub.interact_node(request_generator()) response_iterator = self.stub.interact_node(request_generator())
# First response is connection status # First phase: Wait for connection status, print early data
try: try:
first_res = next(response_iterator) for res in response_iterator:
if first_res.success: if res.stdout_data:
# Connection established on server, show success message data = res.stdout_data
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) if debug:
printer.success(conn_msg) data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'')
tty.setraw(sys.stdin.fileno()) os.write(sys.stdout.fileno(), data)
else:
# Connection failed on server if res.success:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) # Connection established on server, show success message
printer.error(f"Connection failed: {first_res.error_message}") termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
return printer.success(conn_msg)
tty.setraw(sys.stdin.fileno())
break
if res.error_message:
# Connection failed on server
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
printer.error(f"Connection failed: {res.error_message}")
return
except StopIteration: except StopIteration:
return return
# Second phase: Stream active session
# Clear screen filter is only applied before success (Phase 1).
# Once the user has a prompt, Ctrl+L must work normally.
for res in response_iterator: for res in response_iterator:
if res.stdout_data: if res.stdout_data:
os.write(sys.stdout.fileno(), res.stdout_data) os.write(sys.stdout.fileno(), res.stdout_data)
+37 -18
View File
@@ -233,25 +233,44 @@ class PluginService(BaseService):
from rich.console import Console from rich.console import Console
from rich.console import Console from rich.console import Console
buf = io.StringIO() import queue
import threading
q = queue.Queue()
class QueueIO(io.StringIO):
def write(self, s):
q.put(s)
return len(s)
def flush(self):
pass
buf = QueueIO()
old_console = printer._get_console() old_console = printer._get_console()
old_err_console = printer._get_err_console() old_err_console = printer._get_err_console()
printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) def run_plugin():
printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
printer.set_thread_stream(buf) printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
printer.set_thread_stream(buf)
try:
if hasattr(module, "Entrypoint"):
module.Entrypoint(args, parser, app)
except BaseException as e:
if not isinstance(e, SystemExit):
import traceback
printer.err_console.print(traceback.format_exc())
finally:
printer.set_thread_console(old_console)
printer.set_thread_err_console(old_err_console)
printer.set_thread_stream(None)
q.put(None)
t = threading.Thread(target=run_plugin, daemon=True)
t.start()
try: while True:
if hasattr(module, "Entrypoint"): item = q.get()
module.Entrypoint(args, parser, app) if item is None:
except BaseException as e: break
if not isinstance(e, SystemExit): yield item
import traceback
printer.err_console.print(traceback.format_exc())
finally:
printer.set_thread_console(old_console)
printer.set_thread_err_console(old_err_console)
printer.set_thread_stream(None)
for line in buf.getvalue().splitlines(keepends=True):
yield line
+4 -4
View File
@@ -78,15 +78,15 @@ class TestStubsMessageFormatting:
@patch("select.select") @patch("select.select")
def test_connect_dynamic_msg_formatting_ssm(self, mock_select, mock_read, mock_setraw, mock_getattr, mock_setattr): def test_connect_dynamic_msg_formatting_ssm(self, mock_select, mock_read, mock_setraw, mock_getattr, mock_setattr):
from connpy.grpc_layer.stubs import NodeStub from connpy.grpc_layer.stubs import NodeStub
mock_getattr.return_value = [0, 0, 0, 0, 0, 0, [0] * 32] mock_getattr.return_value = [0, 0, 0, 0, 0, 0, [0] * 32]
mock_channel = MagicMock() mock_channel = MagicMock()
stub = NodeStub(mock_channel, "localhost:8048") stub = NodeStub(mock_channel, "localhost:8048")
mock_resp = MagicMock() mock_resp = MagicMock()
mock_resp.success = True mock_resp.success = True
stub.stub.interact_node.return_value = iter([mock_resp]) mock_resp.stdout_data = b''
stub.stub.interact_node.return_value = iter([mock_resp])
with patch("connpy.printer.success") as mock_success: with patch("connpy.printer.success") as mock_success:
with patch("sys.stdin.fileno", return_value=0): with patch("sys.stdin.fileno", return_value=0):
mock_select.return_value = ([], [], []) mock_select.return_value = ([], [], [])