Module connpy.grpc_layer.stubs

Functions

def handle_errors(func)
Expand source code
def handle_errors(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except grpc.RpcError as e:
            # Re-raise gRPC errors as native ConnpyError to keep CLI handlers agnostic
            details = e.details()

            # Identify the host if available on the instance
            instance = args[0] if args else None
            host = getattr(instance, "remote_host", "remote host")

            # Make common gRPC errors more readable
            if "failed to connect to all addresses" in details:
                simplified = f"Failed to connect to remote host at {host} (Connection refused)"
            elif "Method not found" in details:
                simplified = f"Remote server at {host} is using an incompatible version"
            elif "Deadline Exceeded" in details:
                simplified = f"Request to {host} timed out"
            else:
                simplified = details

            raise ConnpyError(simplified)
    return wrapper

Classes

class AIStub (channel, remote_host)
Expand source code
class AIStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.AIServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debug=False, status=None, **overrides):
        import queue
        from rich.prompt import Prompt
        from rich.text import Text
        from rich.live import Live
        from rich.panel import Panel
        from rich.markdown import Markdown
        
        req_queue = queue.Queue()
        
        initial_req = connpy_pb2.AskRequest(
            input_text=input_text,
            dryrun=dryrun,
            session_id=session_id or "",
            debug=debug,
            engineer_model=overrides.get("engineer_model", ""),
            engineer_api_key=overrides.get("engineer_api_key", ""),
            architect_model=overrides.get("architect_model", ""),
            architect_api_key=overrides.get("architect_api_key", ""),
            trust=overrides.get("trust", False)
        )
        if chat_history is not None:
            initial_req.chat_history.CopyFrom(to_value(chat_history))
            
        req_queue.put(initial_req)

        def request_generator():
            while True:
                req = req_queue.get()
                if req is None: break
                yield req

        responses = self.stub.ask(request_generator())
        
        full_content = ""
        live_display = None
        final_result = {"response": "", "chat_history": []}

        # Background thread to pull responses from gRPC into a local queue
        # This prevents KeyboardInterrupt from corrupting the gRPC iterator state
        response_queue = queue.Queue()
        
        def pull_responses():
            try:
                for response in responses:
                    response_queue.put(("data", response))
            except Exception as e:
                response_queue.put(("error", e))
            finally:
                response_queue.put((None, None))

        threading.Thread(target=pull_responses, daemon=True).start()

        try:
            while True:
                try:
                    # BLOCKING GET from local queue (interruptible by signal)
                    msg_type, response = response_queue.get()
                except KeyboardInterrupt:
                    # Signal interruption to the server
                    if status:
                        status.update("[error]Interrupted! Closing pending tasks...")
                    
                    # Send the interrupt signal to the server
                    req_queue.put(connpy_pb2.AskRequest(interrupt=True))
                    
                    # CONTINUE the loop to receive remaining data and summary from the queue
                    continue
                
                if msg_type is None: # Sentinel
                    break
                
                if msg_type == "error":
                    # Re-raise or handle gRPC error from background thread
                    if isinstance(response, grpc.RpcError):
                        raise response
                    printer.warning(f"Stream interrupted: {response}")
                    break

                if response.status_update:
                    if response.requires_confirmation:
                        if status: status.stop()
                        if live_display: live_display.stop()
                        
                        # Show prompt and wait for answer
                        prompt_text = Text.from_ansi(response.status_update)
                        ans = Prompt.ask(prompt_text)
                        
                        if status: 
                            status.update("[ai_status]Agent: Resuming...")
                            status.start()
                        if live_display: live_display.start()
                        
                        req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
                        continue
                        
                    if status:
                        status.update(response.status_update)
                    continue
                
                if response.debug_message:
                    if debug:
                        printer.console.print(Text.from_ansi(response.debug_message))
                    continue
                
                if response.important_message:
                    printer.console.print(Text.from_ansi(response.important_message))
                    continue

                if not response.is_final:
                    full_content += response.text_chunk
                    
                    if not live_display and not debug:
                        if status: status.stop()
                        live_display = Live(
                            Panel(Markdown(full_content), title="AI Assistant", expand=False),
                            console=printer.console,
                            refresh_per_second=8,
                            transient=False
                        )
                        live_display.start()
                    elif live_display:
                        live_display.update(Panel(Markdown(full_content), title="AI Assistant", expand=False))
                    continue
                
                if response.is_final:
                    final_result = from_struct(response.full_result)
                    responder = final_result.get("responder", "engineer")
                    alias = "architect" if responder == "architect" else "engineer"
                    role_label = "Network Architect" if responder == "architect" else "Network Engineer"
                    title = f"[bold {alias}]{role_label}[/bold {alias}]"
                    
                    if live_display:
                        live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
                        live_display.stop()
                    elif full_content:
                        printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
                    break
        except Exception as e:
            # Check if it was a gRPC error that we should let handle_errors catch
            if isinstance(e, grpc.RpcError):
                raise
            printer.warning(f"Stream interrupted: {e}")
        finally:
            req_queue.put(None)
        
        if full_content:
            final_result["streamed"] = True
            
        return final_result

    @handle_errors
    def confirm(self, input_text, console=None):
        return self.stub.confirm(connpy_pb2.StringRequest(value=input_text)).value

    @handle_errors
    def list_sessions(self):
        return from_value(self.stub.list_sessions(Empty()).data)

    @handle_errors
    def delete_session(self, session_id):
        self.stub.delete_session(connpy_pb2.StringRequest(value=session_id))

    @handle_errors
    def configure_provider(self, provider, model=None, api_key=None):
        req = connpy_pb2.ProviderRequest(provider=provider, model=model or "", api_key=api_key or "")
        self.stub.configure_provider(req)

    @handle_errors
    def load_session_data(self, session_id):
        return from_struct(self.stub.load_session_data(connpy_pb2.StringRequest(value=session_id)).data)

Methods

def ask(self,
input_text,
dryrun=False,
chat_history=None,
session_id=None,
debug=False,
status=None,
**overrides)
Expand source code
@handle_errors
def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debug=False, status=None, **overrides):
    import queue
    from rich.prompt import Prompt
    from rich.text import Text
    from rich.live import Live
    from rich.panel import Panel
    from rich.markdown import Markdown
    
    req_queue = queue.Queue()
    
    initial_req = connpy_pb2.AskRequest(
        input_text=input_text,
        dryrun=dryrun,
        session_id=session_id or "",
        debug=debug,
        engineer_model=overrides.get("engineer_model", ""),
        engineer_api_key=overrides.get("engineer_api_key", ""),
        architect_model=overrides.get("architect_model", ""),
        architect_api_key=overrides.get("architect_api_key", ""),
        trust=overrides.get("trust", False)
    )
    if chat_history is not None:
        initial_req.chat_history.CopyFrom(to_value(chat_history))
        
    req_queue.put(initial_req)

    def request_generator():
        while True:
            req = req_queue.get()
            if req is None: break
            yield req

    responses = self.stub.ask(request_generator())
    
    full_content = ""
    live_display = None
    final_result = {"response": "", "chat_history": []}

    # Background thread to pull responses from gRPC into a local queue
    # This prevents KeyboardInterrupt from corrupting the gRPC iterator state
    response_queue = queue.Queue()
    
    def pull_responses():
        try:
            for response in responses:
                response_queue.put(("data", response))
        except Exception as e:
            response_queue.put(("error", e))
        finally:
            response_queue.put((None, None))

    threading.Thread(target=pull_responses, daemon=True).start()

    try:
        while True:
            try:
                # BLOCKING GET from local queue (interruptible by signal)
                msg_type, response = response_queue.get()
            except KeyboardInterrupt:
                # Signal interruption to the server
                if status:
                    status.update("[error]Interrupted! Closing pending tasks...")
                
                # Send the interrupt signal to the server
                req_queue.put(connpy_pb2.AskRequest(interrupt=True))
                
                # CONTINUE the loop to receive remaining data and summary from the queue
                continue
            
            if msg_type is None: # Sentinel
                break
            
            if msg_type == "error":
                # Re-raise or handle gRPC error from background thread
                if isinstance(response, grpc.RpcError):
                    raise response
                printer.warning(f"Stream interrupted: {response}")
                break

            if response.status_update:
                if response.requires_confirmation:
                    if status: status.stop()
                    if live_display: live_display.stop()
                    
                    # Show prompt and wait for answer
                    prompt_text = Text.from_ansi(response.status_update)
                    ans = Prompt.ask(prompt_text)
                    
                    if status: 
                        status.update("[ai_status]Agent: Resuming...")
                        status.start()
                    if live_display: live_display.start()
                    
                    req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
                    continue
                    
                if status:
                    status.update(response.status_update)
                continue
            
            if response.debug_message:
                if debug:
                    printer.console.print(Text.from_ansi(response.debug_message))
                continue
            
            if response.important_message:
                printer.console.print(Text.from_ansi(response.important_message))
                continue

            if not response.is_final:
                full_content += response.text_chunk
                
                if not live_display and not debug:
                    if status: status.stop()
                    live_display = Live(
                        Panel(Markdown(full_content), title="AI Assistant", expand=False),
                        console=printer.console,
                        refresh_per_second=8,
                        transient=False
                    )
                    live_display.start()
                elif live_display:
                    live_display.update(Panel(Markdown(full_content), title="AI Assistant", expand=False))
                continue
            
            if response.is_final:
                final_result = from_struct(response.full_result)
                responder = final_result.get("responder", "engineer")
                alias = "architect" if responder == "architect" else "engineer"
                role_label = "Network Architect" if responder == "architect" else "Network Engineer"
                title = f"[bold {alias}]{role_label}[/bold {alias}]"
                
                if live_display:
                    live_display.update(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
                    live_display.stop()
                elif full_content:
                    printer.console.print(Panel(Markdown(full_content), title=title, border_style=alias, expand=False))
                break
    except Exception as e:
        # Check if it was a gRPC error that we should let handle_errors catch
        if isinstance(e, grpc.RpcError):
            raise
        printer.warning(f"Stream interrupted: {e}")
    finally:
        req_queue.put(None)
    
    if full_content:
        final_result["streamed"] = True
        
    return final_result
def configure_provider(self, provider, model=None, api_key=None)
Expand source code
@handle_errors
def configure_provider(self, provider, model=None, api_key=None):
    req = connpy_pb2.ProviderRequest(provider=provider, model=model or "", api_key=api_key or "")
    self.stub.configure_provider(req)
def confirm(self, input_text, console=None)
Expand source code
@handle_errors
def confirm(self, input_text, console=None):
    return self.stub.confirm(connpy_pb2.StringRequest(value=input_text)).value
def delete_session(self, session_id)
Expand source code
@handle_errors
def delete_session(self, session_id):
    self.stub.delete_session(connpy_pb2.StringRequest(value=session_id))
def list_sessions(self)
Expand source code
@handle_errors
def list_sessions(self):
    return from_value(self.stub.list_sessions(Empty()).data)
def load_session_data(self, session_id)
Expand source code
@handle_errors
def load_session_data(self, session_id):
    return from_struct(self.stub.load_session_data(connpy_pb2.StringRequest(value=session_id)).data)
class ConfigStub (channel, remote_host)
Expand source code
class ConfigStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.ConfigServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def get_settings(self):
        return from_struct(self.stub.get_settings(Empty()).data)

    @handle_errors
    def update_setting(self, key, value):
        self.stub.update_setting(connpy_pb2.UpdateRequest(key=key, value=to_value(value)))

    @handle_errors
    def get_default_dir(self):
        return self.stub.get_default_dir(Empty()).value

    @handle_errors
    def set_config_folder(self, folder):
        self.stub.set_config_folder(connpy_pb2.StringRequest(value=folder))

    @handle_errors
    def encrypt_password(self, password):
        return self.stub.encrypt_password(connpy_pb2.StringRequest(value=password)).value

Methods

def encrypt_password(self, password)
Expand source code
@handle_errors
def encrypt_password(self, password):
    return self.stub.encrypt_password(connpy_pb2.StringRequest(value=password)).value
def get_default_dir(self)
Expand source code
@handle_errors
def get_default_dir(self):
    return self.stub.get_default_dir(Empty()).value
def get_settings(self)
Expand source code
@handle_errors
def get_settings(self):
    return from_struct(self.stub.get_settings(Empty()).data)
def set_config_folder(self, folder)
Expand source code
@handle_errors
def set_config_folder(self, folder):
    self.stub.set_config_folder(connpy_pb2.StringRequest(value=folder))
def update_setting(self, key, value)
Expand source code
@handle_errors
def update_setting(self, key, value):
    self.stub.update_setting(connpy_pb2.UpdateRequest(key=key, value=to_value(value)))
class ExecutionStub (channel, remote_host)
Expand source code
class ExecutionStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.ExecutionServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def run_commands(self, nodes_filter, commands, variables=None, parallel=10, timeout=10, folder=None, prompt=None, **kwargs):
        nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
        req = connpy_pb2.RunRequest(
            nodes=nodes_list,
            commands=commands,
            folder=folder or "",
            prompt=prompt or "",
            parallel=parallel,
            timeout=timeout,
            name=kwargs.get("name", "")
        )
        if variables is not None:
            req.vars.CopyFrom(to_struct(variables))
            
        final_results = {}
        on_complete = kwargs.get("on_node_complete")
        
        for response in self.stub.run_commands(req):
            if on_complete:
                on_complete(response.unique_id, response.output, response.status)
            final_results[response.unique_id] = {
                "output": response.output,
                "status": response.status
            }
                
        return final_results

    @handle_errors
    def test_commands(self, nodes_filter, commands, expected, variables=None, parallel=10, timeout=10, prompt=None, **kwargs):
        nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
        req = connpy_pb2.TestRequest(
            nodes=nodes_list,
            commands=commands,
            expected=expected if isinstance(expected, list) else [expected],
            folder=kwargs.get("folder", ""),
            prompt=prompt or "",
            parallel=parallel,
            timeout=timeout,
            name=kwargs.get("name", "")
        )
        if variables is not None:
            req.vars.CopyFrom(to_struct(variables))
            
        final_results = {}
        on_complete = kwargs.get("on_node_complete")
        
        for response in self.stub.test_commands(req):
            result_dict = from_struct(response.test_result) if response.HasField("test_result") else {}
            if on_complete:
                on_complete(response.unique_id, response.output, response.status, result_dict)
            final_results[response.unique_id] = result_dict
                
        return final_results

    @handle_errors
    def run_cli_script(self, nodes_filter, script_path, parallel=10):
        req = connpy_pb2.ScriptRequest(param1=nodes_filter, param2=script_path, parallel=parallel)
        return from_struct(self.stub.run_cli_script(req).data)

    @handle_errors
    def run_yaml_playbook(self, playbook_path, parallel=10):
        req = connpy_pb2.ScriptRequest(param1=playbook_path, parallel=parallel)
        return from_struct(self.stub.run_yaml_playbook(req).data)

Methods

def run_cli_script(self, nodes_filter, script_path, parallel=10)
Expand source code
@handle_errors
def run_cli_script(self, nodes_filter, script_path, parallel=10):
    req = connpy_pb2.ScriptRequest(param1=nodes_filter, param2=script_path, parallel=parallel)
    return from_struct(self.stub.run_cli_script(req).data)
def run_commands(self,
nodes_filter,
commands,
variables=None,
parallel=10,
timeout=10,
folder=None,
prompt=None,
**kwargs)
Expand source code
@handle_errors
def run_commands(self, nodes_filter, commands, variables=None, parallel=10, timeout=10, folder=None, prompt=None, **kwargs):
    nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
    req = connpy_pb2.RunRequest(
        nodes=nodes_list,
        commands=commands,
        folder=folder or "",
        prompt=prompt or "",
        parallel=parallel,
        timeout=timeout,
        name=kwargs.get("name", "")
    )
    if variables is not None:
        req.vars.CopyFrom(to_struct(variables))
        
    final_results = {}
    on_complete = kwargs.get("on_node_complete")
    
    for response in self.stub.run_commands(req):
        if on_complete:
            on_complete(response.unique_id, response.output, response.status)
        final_results[response.unique_id] = {
            "output": response.output,
            "status": response.status
        }
            
    return final_results
def run_yaml_playbook(self, playbook_path, parallel=10)
Expand source code
@handle_errors
def run_yaml_playbook(self, playbook_path, parallel=10):
    req = connpy_pb2.ScriptRequest(param1=playbook_path, parallel=parallel)
    return from_struct(self.stub.run_yaml_playbook(req).data)
def test_commands(self,
nodes_filter,
commands,
expected,
variables=None,
parallel=10,
timeout=10,
prompt=None,
**kwargs)
Expand source code
@handle_errors
def test_commands(self, nodes_filter, commands, expected, variables=None, parallel=10, timeout=10, prompt=None, **kwargs):
    nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
    req = connpy_pb2.TestRequest(
        nodes=nodes_list,
        commands=commands,
        expected=expected if isinstance(expected, list) else [expected],
        folder=kwargs.get("folder", ""),
        prompt=prompt or "",
        parallel=parallel,
        timeout=timeout,
        name=kwargs.get("name", "")
    )
    if variables is not None:
        req.vars.CopyFrom(to_struct(variables))
        
    final_results = {}
    on_complete = kwargs.get("on_node_complete")
    
    for response in self.stub.test_commands(req):
        result_dict = from_struct(response.test_result) if response.HasField("test_result") else {}
        if on_complete:
            on_complete(response.unique_id, response.output, response.status, result_dict)
        final_results[response.unique_id] = result_dict
            
    return final_results
class ImportExportStub (channel, remote_host)
Expand source code
class ImportExportStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.ImportExportServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def export_to_file(self, file_path, folders=None):
        req = connpy_pb2.ExportRequest(file_path=file_path, folders=folders or [])
        self.stub.export_to_file(req)

    @handle_errors
    def import_from_file(self, file_path):
        with open(file_path, "r") as f:
            content = f.read()
        # Marker to tell the server this is content, not a path
        marker_content = f"---YAML---\n{content}"
        self.stub.import_from_file(connpy_pb2.StringRequest(value=marker_content))

    @handle_errors
    def set_reserved_names(self, names):
        self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))

Methods

def export_to_file(self, file_path, folders=None)
Expand source code
@handle_errors
def export_to_file(self, file_path, folders=None):
    req = connpy_pb2.ExportRequest(file_path=file_path, folders=folders or [])
    self.stub.export_to_file(req)
def import_from_file(self, file_path)
Expand source code
@handle_errors
def import_from_file(self, file_path):
    with open(file_path, "r") as f:
        content = f.read()
    # Marker to tell the server this is content, not a path
    marker_content = f"---YAML---\n{content}"
    self.stub.import_from_file(connpy_pb2.StringRequest(value=marker_content))
def set_reserved_names(self, names)
Expand source code
@handle_errors
def set_reserved_names(self, names):
    self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))
class NodeStub (channel, remote_host, config=None)
Expand source code
class NodeStub:
    def __init__(self, channel, remote_host, config=None):
        self.stub = connpy_pb2_grpc.NodeServiceStub(channel)
        self.remote_host = remote_host
        self.config = config

    @handle_errors
    def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
        import sys
        import select
        import tty
        import termios
        import os
        import threading
        
        def request_generator():
            cols, rows = 80, 24
            try:
                size = os.get_terminal_size()
                cols, rows = size.columns, size.lines
            except OSError:
                pass
                
            yield connpy_pb2.InteractRequest(
                id=unique_id, sftp=sftp, debug=debug, cols=cols, rows=rows
            )
            
            while True:
                r, _, _ = select.select([sys.stdin.fileno()], [], [])
                if r:
                    try:
                        data = os.read(sys.stdin.fileno(), 1024)
                        if not data:
                            break
                        yield connpy_pb2.InteractRequest(stdin_data=data)
                    except OSError:
                        break

        # Fetch node details for the connection message
        try:
            node_details = self.get_node_details(unique_id)
            host = node_details.get("host", "unknown")
            port = str(node_details.get("port", ""))
            protocol = "sftp" if sftp else node_details.get("protocol", "ssh")
            port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
            conn_msg = f"Connected to {unique_id} at {host}{port_str} via: {protocol}"
        except Exception:
            conn_msg = f"Connected to {unique_id}"

        old_tty = termios.tcgetattr(sys.stdin)
        try:
            tty.setraw(sys.stdin.fileno())
            response_iterator = self.stub.interact_node(request_generator())
            
            # First response is connection status
            try:
                first_res = next(response_iterator)
                if first_res.success:
                    # Connection established on server, show success message
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.success(conn_msg)
                    tty.setraw(sys.stdin.fileno())
                else:
                    # Connection failed on server
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.error(f"Connection failed: {first_res.error_message}")
                    return
            except StopIteration:
                return
            
            for res in response_iterator:
                if res.stdout_data:
                    os.write(sys.stdout.fileno(), res.stdout_data)
        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

    @handle_errors
    def connect_dynamic(self, connection_params, debug=False):
        import sys
        import select
        import tty
        import termios
        import os
        import json
        
        params_json = json.dumps(connection_params)
        
        def request_generator():
            cols, rows = 80, 24
            try:
                size = os.get_terminal_size()
                cols, rows = size.columns, size.lines
            except OSError:
                pass
                
            yield connpy_pb2.InteractRequest(
                id="dynamic", debug=debug, cols=cols, rows=rows,
                connection_params_json=params_json
            )
            
            while True:
                r, _, _ = select.select([sys.stdin.fileno()], [], [])
                if r:
                    try:
                        data = os.read(sys.stdin.fileno(), 1024)
                        if not data:
                            break
                        yield connpy_pb2.InteractRequest(stdin_data=data)
                    except OSError:
                        break

        # Prepare connection message
        try:
            node_name = connection_params.get("name", "dynamic@remote")
            host = connection_params.get("host", "dynamic")
            port = str(connection_params.get("port", ""))
            protocol = connection_params.get("protocol", "ssh")
            port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
            conn_msg = f"Connected to {node_name} at {host}{port_str} via: {protocol}"
        except Exception:
            node_name = connection_params.get("name", "dynamic@remote") if isinstance(connection_params, dict) else "dynamic@remote"
            conn_msg = f"Connected to {node_name}"

        old_tty = termios.tcgetattr(sys.stdin)
        try:
            tty.setraw(sys.stdin.fileno())
            response_iterator = self.stub.interact_node(request_generator())
            
            # First response is connection status
            try:
                first_res = next(response_iterator)
                if first_res.success:
                    # Connection established on server, show success message
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.success(conn_msg)
                    tty.setraw(sys.stdin.fileno())
                else:
                    # Connection failed on server
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.error(f"Connection failed: {first_res.error_message}")
                    return
            except StopIteration:
                return
                
            for res in response_iterator:
                if res.stdout_data:
                    os.write(sys.stdout.fileno(), res.stdout_data)
        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

    @MethodHook
    @handle_errors
    def list_nodes(self, filter_str=None, format_str=None):
        req = connpy_pb2.FilterRequest(filter_str=filter_str or "", format_str=format_str or "")
        return from_value(self.stub.list_nodes(req).data) or []

    @MethodHook
    @handle_errors
    def list_folders(self, filter_str=None):
        req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
        return from_value(self.stub.list_folders(req).data) or []

    @handle_errors
    def get_node_details(self, unique_id):
        return from_struct(self.stub.get_node_details(connpy_pb2.IdRequest(id=unique_id)).data)

    @handle_errors
    def explode_unique(self, unique_id):
        return from_value(self.stub.explode_unique(connpy_pb2.IdRequest(id=unique_id)).data)

    @handle_errors
    def validate_parent_folder(self, unique_id):
        self.stub.validate_parent_folder(connpy_pb2.IdRequest(id=unique_id))

    @handle_errors
    def generate_cache(self, nodes=None, folders=None, profiles=None):
        # 1. Update remote cache on server
        self.stub.generate_cache(Empty())
        
        # 2. Update local fzf/text cache files
        # If no data provided, we fetch it all from remote to sync local files
        if nodes is None and folders is None and profiles is None:
            nodes = self.list_nodes()
            folders = self.list_folders()
            # We don't have direct access to ProfileStub here, but usually 
            # node cache is what matters for fzf. We'll fetch profiles if we can.
            # For now, let's sync what we have.
            
        if nodes is not None or folders is not None or profiles is not None:
            self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)

    def _trigger_local_cache_sync(self):
        """Helper to fetch remote data and update local fzf cache files after a change."""
        try:
            nodes = self.list_nodes()
            folders = self.list_folders()
            self.generate_cache(nodes=nodes, folders=folders)
        except Exception:
            # Failure to sync cache shouldn't break the main operation's success feedback
            pass

    @handle_errors
    def add_node(self, unique_id, data, is_folder=False):
        req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=is_folder)
        self.stub.add_node(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def update_node(self, unique_id, data):
        req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=False)
        self.stub.update_node(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def delete_node(self, unique_id, is_folder=False):
        req = connpy_pb2.DeleteRequest(id=unique_id, is_folder=is_folder)
        self.stub.delete_node(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def move_node(self, src_id, dst_id, copy=False):
        req = connpy_pb2.MoveRequest(src_id=src_id, dst_id=dst_id, copy=copy)
        self.stub.move_node(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def bulk_add(self, ids, hosts, common_data):
        req = connpy_pb2.BulkRequest(ids=ids, hosts=hosts, common_data=to_struct(common_data))
        self.stub.bulk_add(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def set_reserved_names(self, names):
        self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))
        self._trigger_local_cache_sync()

    @handle_errors
    def full_replace(self, connections, profiles):
        req = connpy_pb2.FullReplaceRequest(
            connections=to_struct(connections),
            profiles=to_struct(profiles)
        )
        self.stub.full_replace(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def get_inventory(self):
        resp = self.stub.get_inventory(Empty())
        return {
            "connections": from_struct(resp.connections),
            "profiles": from_struct(resp.profiles)
        }

Methods

def add_node(self, unique_id, data, is_folder=False)
Expand source code
@handle_errors
def add_node(self, unique_id, data, is_folder=False):
    req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=is_folder)
    self.stub.add_node(req)
    self._trigger_local_cache_sync()
def bulk_add(self, ids, hosts, common_data)
Expand source code
@handle_errors
def bulk_add(self, ids, hosts, common_data):
    req = connpy_pb2.BulkRequest(ids=ids, hosts=hosts, common_data=to_struct(common_data))
    self.stub.bulk_add(req)
    self._trigger_local_cache_sync()
def connect_dynamic(self, connection_params, debug=False)
Expand source code
@handle_errors
def connect_dynamic(self, connection_params, debug=False):
    import sys
    import select
    import tty
    import termios
    import os
    import json
    
    params_json = json.dumps(connection_params)
    
    def request_generator():
        cols, rows = 80, 24
        try:
            size = os.get_terminal_size()
            cols, rows = size.columns, size.lines
        except OSError:
            pass
            
        yield connpy_pb2.InteractRequest(
            id="dynamic", debug=debug, cols=cols, rows=rows,
            connection_params_json=params_json
        )
        
        while True:
            r, _, _ = select.select([sys.stdin.fileno()], [], [])
            if r:
                try:
                    data = os.read(sys.stdin.fileno(), 1024)
                    if not data:
                        break
                    yield connpy_pb2.InteractRequest(stdin_data=data)
                except OSError:
                    break

    # Prepare connection message
    try:
        node_name = connection_params.get("name", "dynamic@remote")
        host = connection_params.get("host", "dynamic")
        port = str(connection_params.get("port", ""))
        protocol = connection_params.get("protocol", "ssh")
        port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
        conn_msg = f"Connected to {node_name} at {host}{port_str} via: {protocol}"
    except Exception:
        node_name = connection_params.get("name", "dynamic@remote") if isinstance(connection_params, dict) else "dynamic@remote"
        conn_msg = f"Connected to {node_name}"

    old_tty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        response_iterator = self.stub.interact_node(request_generator())
        
        # First response is connection status
        try:
            first_res = next(response_iterator)
            if first_res.success:
                # Connection established on server, show success message
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                printer.success(conn_msg)
                tty.setraw(sys.stdin.fileno())
            else:
                # Connection failed on server
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                printer.error(f"Connection failed: {first_res.error_message}")
                return
        except StopIteration:
            return
            
        for res in response_iterator:
            if res.stdout_data:
                os.write(sys.stdout.fileno(), res.stdout_data)
    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
def connect_node(self, unique_id, sftp=False, debug=False, logger=None)
Expand source code
@handle_errors
def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
    import sys
    import select
    import tty
    import termios
    import os
    import threading
    
    def request_generator():
        cols, rows = 80, 24
        try:
            size = os.get_terminal_size()
            cols, rows = size.columns, size.lines
        except OSError:
            pass
            
        yield connpy_pb2.InteractRequest(
            id=unique_id, sftp=sftp, debug=debug, cols=cols, rows=rows
        )
        
        while True:
            r, _, _ = select.select([sys.stdin.fileno()], [], [])
            if r:
                try:
                    data = os.read(sys.stdin.fileno(), 1024)
                    if not data:
                        break
                    yield connpy_pb2.InteractRequest(stdin_data=data)
                except OSError:
                    break

    # Fetch node details for the connection message
    try:
        node_details = self.get_node_details(unique_id)
        host = node_details.get("host", "unknown")
        port = str(node_details.get("port", ""))
        protocol = "sftp" if sftp else node_details.get("protocol", "ssh")
        port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
        conn_msg = f"Connected to {unique_id} at {host}{port_str} via: {protocol}"
    except Exception:
        conn_msg = f"Connected to {unique_id}"

    old_tty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        response_iterator = self.stub.interact_node(request_generator())
        
        # First response is connection status
        try:
            first_res = next(response_iterator)
            if first_res.success:
                # Connection established on server, show success message
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                printer.success(conn_msg)
                tty.setraw(sys.stdin.fileno())
            else:
                # Connection failed on server
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                printer.error(f"Connection failed: {first_res.error_message}")
                return
        except StopIteration:
            return
        
        for res in response_iterator:
            if res.stdout_data:
                os.write(sys.stdout.fileno(), res.stdout_data)
    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
def delete_node(self, unique_id, is_folder=False)
Expand source code
@handle_errors
def delete_node(self, unique_id, is_folder=False):
    req = connpy_pb2.DeleteRequest(id=unique_id, is_folder=is_folder)
    self.stub.delete_node(req)
    self._trigger_local_cache_sync()
def explode_unique(self, unique_id)
Expand source code
@handle_errors
def explode_unique(self, unique_id):
    return from_value(self.stub.explode_unique(connpy_pb2.IdRequest(id=unique_id)).data)
def full_replace(self, connections, profiles)
Expand source code
@handle_errors
def full_replace(self, connections, profiles):
    req = connpy_pb2.FullReplaceRequest(
        connections=to_struct(connections),
        profiles=to_struct(profiles)
    )
    self.stub.full_replace(req)
    self._trigger_local_cache_sync()
def generate_cache(self, nodes=None, folders=None, profiles=None)
Expand source code
@handle_errors
def generate_cache(self, nodes=None, folders=None, profiles=None):
    # 1. Update remote cache on server
    self.stub.generate_cache(Empty())
    
    # 2. Update local fzf/text cache files
    # If no data provided, we fetch it all from remote to sync local files
    if nodes is None and folders is None and profiles is None:
        nodes = self.list_nodes()
        folders = self.list_folders()
        # We don't have direct access to ProfileStub here, but usually 
        # node cache is what matters for fzf. We'll fetch profiles if we can.
        # For now, let's sync what we have.
        
    if nodes is not None or folders is not None or profiles is not None:
        self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)
def get_inventory(self)
Expand source code
@handle_errors
def get_inventory(self):
    resp = self.stub.get_inventory(Empty())
    return {
        "connections": from_struct(resp.connections),
        "profiles": from_struct(resp.profiles)
    }
def get_node_details(self, unique_id)
Expand source code
@handle_errors
def get_node_details(self, unique_id):
    return from_struct(self.stub.get_node_details(connpy_pb2.IdRequest(id=unique_id)).data)
def list_folders(self, filter_str=None)
Expand source code
@MethodHook
@handle_errors
def list_folders(self, filter_str=None):
    req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
    return from_value(self.stub.list_folders(req).data) or []
def list_nodes(self, filter_str=None, format_str=None)
Expand source code
@MethodHook
@handle_errors
def list_nodes(self, filter_str=None, format_str=None):
    req = connpy_pb2.FilterRequest(filter_str=filter_str or "", format_str=format_str or "")
    return from_value(self.stub.list_nodes(req).data) or []
def move_node(self, src_id, dst_id, copy=False)
Expand source code
@handle_errors
def move_node(self, src_id, dst_id, copy=False):
    req = connpy_pb2.MoveRequest(src_id=src_id, dst_id=dst_id, copy=copy)
    self.stub.move_node(req)
    self._trigger_local_cache_sync()
def set_reserved_names(self, names)
Expand source code
@handle_errors
def set_reserved_names(self, names):
    self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))
    self._trigger_local_cache_sync()
def update_node(self, unique_id, data)
Expand source code
@handle_errors
def update_node(self, unique_id, data):
    req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=False)
    self.stub.update_node(req)
    self._trigger_local_cache_sync()
def validate_parent_folder(self, unique_id)
Expand source code
@handle_errors
def validate_parent_folder(self, unique_id):
    self.stub.validate_parent_folder(connpy_pb2.IdRequest(id=unique_id))
class PluginStub (channel, remote_host)
Expand source code
class PluginStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.PluginServiceStub(channel)
        self.remote_stub = remote_plugin_pb2_grpc.RemotePluginServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def list_plugins(self):
        return from_value(self.stub.list_plugins(Empty()).data)

    @handle_errors
    def add_plugin(self, name, source_file, update=False):
        # Read the local file content to send it to the server
        with open(source_file, "r") as f:
            content = f.read()
        
        # Use source_file as a marker for "content-inside"
        marker_content = f"---CONTENT---\n{content}"
        req = connpy_pb2.PluginRequest(name=name, source_file=marker_content, update=update)
        self.stub.add_plugin(req)

    @handle_errors
    def delete_plugin(self, name):
        self.stub.delete_plugin(connpy_pb2.IdRequest(id=name))

    @handle_errors
    def enable_plugin(self, name):
        self.stub.enable_plugin(connpy_pb2.IdRequest(id=name))

    @handle_errors
    def disable_plugin(self, name):
        self.stub.disable_plugin(connpy_pb2.IdRequest(id=name))

    @handle_errors
    def get_plugin_source(self, name):
        resp = self.remote_stub.get_plugin_source(remote_plugin_pb2.IdRequest(id=name))
        return resp.value

    @handle_errors
    def invoke_plugin(self, name, args_namespace):
        import json
        args_dict = {k: v for k, v in vars(args_namespace).items()
                     if isinstance(v, (str, int, float, bool, list, type(None)))}
        if hasattr(args_namespace, "func") and hasattr(args_namespace.func, "__name__"):
            args_dict["__func_name__"] = args_namespace.func.__name__
            
        req = remote_plugin_pb2.PluginInvokeRequest(name=name, args_json=json.dumps(args_dict))
        for chunk in self.remote_stub.invoke_plugin(req):
            yield chunk.text

Methods

def add_plugin(self, name, source_file, update=False)
Expand source code
@handle_errors
def add_plugin(self, name, source_file, update=False):
    # Read the local file content to send it to the server
    with open(source_file, "r") as f:
        content = f.read()
    
    # Use source_file as a marker for "content-inside"
    marker_content = f"---CONTENT---\n{content}"
    req = connpy_pb2.PluginRequest(name=name, source_file=marker_content, update=update)
    self.stub.add_plugin(req)
def delete_plugin(self, name)
Expand source code
@handle_errors
def delete_plugin(self, name):
    self.stub.delete_plugin(connpy_pb2.IdRequest(id=name))
def disable_plugin(self, name)
Expand source code
@handle_errors
def disable_plugin(self, name):
    self.stub.disable_plugin(connpy_pb2.IdRequest(id=name))
def enable_plugin(self, name)
Expand source code
@handle_errors
def enable_plugin(self, name):
    self.stub.enable_plugin(connpy_pb2.IdRequest(id=name))
def get_plugin_source(self, name)
Expand source code
@handle_errors
def get_plugin_source(self, name):
    resp = self.remote_stub.get_plugin_source(remote_plugin_pb2.IdRequest(id=name))
    return resp.value
def invoke_plugin(self, name, args_namespace)
Expand source code
@handle_errors
def invoke_plugin(self, name, args_namespace):
    import json
    args_dict = {k: v for k, v in vars(args_namespace).items()
                 if isinstance(v, (str, int, float, bool, list, type(None)))}
    if hasattr(args_namespace, "func") and hasattr(args_namespace.func, "__name__"):
        args_dict["__func_name__"] = args_namespace.func.__name__
        
    req = remote_plugin_pb2.PluginInvokeRequest(name=name, args_json=json.dumps(args_dict))
    for chunk in self.remote_stub.invoke_plugin(req):
        yield chunk.text
def list_plugins(self)
Expand source code
@handle_errors
def list_plugins(self):
    return from_value(self.stub.list_plugins(Empty()).data)
class ProfileStub (channel, remote_host, node_stub=None)
Expand source code
class ProfileStub:
    def __init__(self, channel, remote_host, node_stub=None):
        self.stub = connpy_pb2_grpc.ProfileServiceStub(channel)
        self.remote_host = remote_host
        self.node_stub = node_stub

    @handle_errors
    def list_profiles(self, filter_str=None):
        req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
        return from_value(self.stub.list_profiles(req).data) or []

    @handle_errors
    def get_profile(self, name, resolve=True):
        req = connpy_pb2.ProfileRequest(name=name, resolve=resolve)
        return from_struct(self.stub.get_profile(req).data)

    @handle_errors
    def add_profile(self, name, data):
        req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
        self.stub.add_profile(req)
        if self.node_stub:
            self.node_stub._trigger_local_cache_sync()

    @handle_errors
    def resolve_node_data(self, node_data):
        req = connpy_pb2.StructRequest(data=to_struct(node_data))
        return from_struct(self.stub.resolve_node_data(req).data)

    @handle_errors
    def delete_profile(self, name):
        req = connpy_pb2.IdRequest(id=name)
        self.stub.delete_profile(req)
        if self.node_stub:
            self.node_stub._trigger_local_cache_sync()

    @handle_errors
    def update_profile(self, name, data):
        req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
        self.stub.update_profile(req)
        if self.node_stub:
            self.node_stub._trigger_local_cache_sync()

Methods

def add_profile(self, name, data)
Expand source code
@handle_errors
def add_profile(self, name, data):
    req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
    self.stub.add_profile(req)
    if self.node_stub:
        self.node_stub._trigger_local_cache_sync()
def delete_profile(self, name)
Expand source code
@handle_errors
def delete_profile(self, name):
    req = connpy_pb2.IdRequest(id=name)
    self.stub.delete_profile(req)
    if self.node_stub:
        self.node_stub._trigger_local_cache_sync()
def get_profile(self, name, resolve=True)
Expand source code
@handle_errors
def get_profile(self, name, resolve=True):
    req = connpy_pb2.ProfileRequest(name=name, resolve=resolve)
    return from_struct(self.stub.get_profile(req).data)
def list_profiles(self, filter_str=None)
Expand source code
@handle_errors
def list_profiles(self, filter_str=None):
    req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
    return from_value(self.stub.list_profiles(req).data) or []
def resolve_node_data(self, node_data)
Expand source code
@handle_errors
def resolve_node_data(self, node_data):
    req = connpy_pb2.StructRequest(data=to_struct(node_data))
    return from_struct(self.stub.resolve_node_data(req).data)
def update_profile(self, name, data)
Expand source code
@handle_errors
def update_profile(self, name, data):
    req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
    self.stub.update_profile(req)
    if self.node_stub:
        self.node_stub._trigger_local_cache_sync()
class SystemStub (channel, remote_host)
Expand source code
class SystemStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.SystemServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def start_api(self, port=None):
        self.stub.start_api(connpy_pb2.IntRequest(value=port or 8048))

    @handle_errors
    def debug_api(self, port=None):
        self.stub.debug_api(connpy_pb2.IntRequest(value=port or 8048))

    @handle_errors
    def stop_api(self):
        self.stub.stop_api(Empty())

    @handle_errors
    def restart_api(self, port=None):
        self.stub.restart_api(connpy_pb2.IntRequest(value=port or 8048))

    @handle_errors
    def get_api_status(self):
        return self.stub.get_api_status(Empty()).value

Methods

def debug_api(self, port=None)
Expand source code
@handle_errors
def debug_api(self, port=None):
    self.stub.debug_api(connpy_pb2.IntRequest(value=port or 8048))
def get_api_status(self)
Expand source code
@handle_errors
def get_api_status(self):
    return self.stub.get_api_status(Empty()).value
def restart_api(self, port=None)
Expand source code
@handle_errors
def restart_api(self, port=None):
    self.stub.restart_api(connpy_pb2.IntRequest(value=port or 8048))
def start_api(self, port=None)
Expand source code
@handle_errors
def start_api(self, port=None):
    self.stub.start_api(connpy_pb2.IntRequest(value=port or 8048))
def stop_api(self)
Expand source code
@handle_errors
def stop_api(self):
    self.stub.stop_api(Empty())