Module connpy.tests.test_grpc_layer

Classes

class MockContext
Expand source code
class MockContext:
    def abort(self, code, details):
        raise Exception(f"gRPC Abort: {code} - {details}")

Methods

def abort(self, code, details)
Expand source code
def abort(self, code, details):
    raise Exception(f"gRPC Abort: {code} - {details}")
class TestGRPCIntegration
Expand source code
class TestGRPCIntegration:
    @pytest.fixture
    def grpc_server(self, populated_config):
        """Starts a local gRPC server for integration testing."""
        srv = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
        
        # Register services
        connpy_pb2_grpc.add_NodeServiceServicer_to_server(server.NodeServicer(populated_config), srv)
        connpy_pb2_grpc.add_ProfileServiceServicer_to_server(server.ProfileServicer(populated_config), srv)
        connpy_pb2_grpc.add_ConfigServiceServicer_to_server(server.ConfigServicer(populated_config), srv)
        connpy_pb2_grpc.add_ExecutionServiceServicer_to_server(server.ExecutionServicer(populated_config), srv)
        connpy_pb2_grpc.add_ImportExportServiceServicer_to_server(server.ImportExportServicer(populated_config), srv)
        
        port = srv.add_insecure_port('127.0.0.1:0')
        srv.start()
        yield f"127.0.0.1:{port}"
        srv.stop(0)

    @pytest.fixture
    def channel(self, grpc_server):
        with grpc.insecure_channel(grpc_server) as channel:
            yield channel

    @pytest.fixture
    def node_stub(self, channel):
        return stubs.NodeStub(channel, "localhost")

    @pytest.fixture
    def profile_stub(self, channel):
        return stubs.ProfileStub(channel, "localhost")

    @pytest.fixture
    def config_stub(self, channel):
        return stubs.ConfigStub(channel, "localhost")

    def test_list_nodes_integration(self, node_stub):
        nodes = node_stub.list_nodes()
        assert "router1" in nodes
        assert "server1@office" in nodes

    def test_get_node_details_integration(self, node_stub):
        details = node_stub.get_node_details("router1")
        assert details["host"] == "10.0.0.1"

    def test_node_not_found_integration(self, node_stub):
        with pytest.raises(ConnpyError) as exc:
            node_stub.get_node_details("non-existent")
        assert "Node 'non-existent' not found." in str(exc.value)

    def test_list_profiles_integration(self, profile_stub):
        profiles = profile_stub.list_profiles()
        assert "office-user" in profiles

    def test_get_settings_integration(self, config_stub):
        settings = config_stub.get_settings()
        assert "idletime" in settings

    def test_update_setting_integration(self, config_stub):
        config_stub.update_setting("idletime", 99)
        settings = config_stub.get_settings()
        assert settings["idletime"] == 99

    def test_add_delete_node_integration(self, node_stub):
        node_stub.add_node("integration-test-node", {"host": "9.9.9.9"})
        assert "integration-test-node" in node_stub.list_nodes()
        node_stub.delete_node("integration-test-node")
        assert "integration-test-node" not in node_stub.list_nodes()

    def test_import_yaml_integration(self, channel, node_stub):
        import yaml
        from connpy.grpc_layer import stubs
        stub = stubs.ImportExportStub(channel, "localhost")
        
        # ImportExportService expects a flat dict of nodes, not a full config structure
        inventory = {
            "imported-node": {"host": "8.8.8.8", "protocol": "ssh", "type": "connection"}
        }
        yaml_content = yaml.dump(inventory)
        
        import tempfile
        with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
            f.write(yaml_content)
            temp_path = f.name
            
        try:
            stub.import_from_file(temp_path)
            # Verify the node was imported and is visible via NodeStub
            nodes = node_stub.list_nodes()
            assert "imported-node" in nodes
        finally:
            if os.path.exists(temp_path):
                os.remove(temp_path)

Methods

def channel(self, grpc_server)
Expand source code
@pytest.fixture
def channel(self, grpc_server):
    with grpc.insecure_channel(grpc_server) as channel:
        yield channel
def config_stub(self, channel)
Expand source code
@pytest.fixture
def config_stub(self, channel):
    return stubs.ConfigStub(channel, "localhost")
def grpc_server(self, populated_config)
Expand source code
@pytest.fixture
def grpc_server(self, populated_config):
    """Starts a local gRPC server for integration testing."""
    srv = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
    
    # Register services
    connpy_pb2_grpc.add_NodeServiceServicer_to_server(server.NodeServicer(populated_config), srv)
    connpy_pb2_grpc.add_ProfileServiceServicer_to_server(server.ProfileServicer(populated_config), srv)
    connpy_pb2_grpc.add_ConfigServiceServicer_to_server(server.ConfigServicer(populated_config), srv)
    connpy_pb2_grpc.add_ExecutionServiceServicer_to_server(server.ExecutionServicer(populated_config), srv)
    connpy_pb2_grpc.add_ImportExportServiceServicer_to_server(server.ImportExportServicer(populated_config), srv)
    
    port = srv.add_insecure_port('127.0.0.1:0')
    srv.start()
    yield f"127.0.0.1:{port}"
    srv.stop(0)

Starts a local gRPC server for integration testing.

def node_stub(self, channel)
Expand source code
@pytest.fixture
def node_stub(self, channel):
    return stubs.NodeStub(channel, "localhost")
def profile_stub(self, channel)
Expand source code
@pytest.fixture
def profile_stub(self, channel):
    return stubs.ProfileStub(channel, "localhost")
def test_add_delete_node_integration(self, node_stub)
Expand source code
def test_add_delete_node_integration(self, node_stub):
    node_stub.add_node("integration-test-node", {"host": "9.9.9.9"})
    assert "integration-test-node" in node_stub.list_nodes()
    node_stub.delete_node("integration-test-node")
    assert "integration-test-node" not in node_stub.list_nodes()
def test_get_node_details_integration(self, node_stub)
Expand source code
def test_get_node_details_integration(self, node_stub):
    details = node_stub.get_node_details("router1")
    assert details["host"] == "10.0.0.1"
def test_get_settings_integration(self, config_stub)
Expand source code
def test_get_settings_integration(self, config_stub):
    settings = config_stub.get_settings()
    assert "idletime" in settings
def test_import_yaml_integration(self, channel, node_stub)
Expand source code
def test_import_yaml_integration(self, channel, node_stub):
    import yaml
    from connpy.grpc_layer import stubs
    stub = stubs.ImportExportStub(channel, "localhost")
    
    # ImportExportService expects a flat dict of nodes, not a full config structure
    inventory = {
        "imported-node": {"host": "8.8.8.8", "protocol": "ssh", "type": "connection"}
    }
    yaml_content = yaml.dump(inventory)
    
    import tempfile
    with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
        f.write(yaml_content)
        temp_path = f.name
        
    try:
        stub.import_from_file(temp_path)
        # Verify the node was imported and is visible via NodeStub
        nodes = node_stub.list_nodes()
        assert "imported-node" in nodes
    finally:
        if os.path.exists(temp_path):
            os.remove(temp_path)
def test_list_nodes_integration(self, node_stub)
Expand source code
def test_list_nodes_integration(self, node_stub):
    nodes = node_stub.list_nodes()
    assert "router1" in nodes
    assert "server1@office" in nodes
def test_list_profiles_integration(self, profile_stub)
Expand source code
def test_list_profiles_integration(self, profile_stub):
    profiles = profile_stub.list_profiles()
    assert "office-user" in profiles
def test_node_not_found_integration(self, node_stub)
Expand source code
def test_node_not_found_integration(self, node_stub):
    with pytest.raises(ConnpyError) as exc:
        node_stub.get_node_details("non-existent")
    assert "Node 'non-existent' not found." in str(exc.value)
def test_update_setting_integration(self, config_stub)
Expand source code
def test_update_setting_integration(self, config_stub):
    config_stub.update_setting("idletime", 99)
    settings = config_stub.get_settings()
    assert settings["idletime"] == 99
class TestNodeServicerNaming
Expand source code
class TestNodeServicerNaming:
    @pytest.fixture
    def servicer(self, populated_config):
        return server.NodeServicer(populated_config)

    @patch("connpy.core.node")
    def test_interact_node_uses_passed_name(self, mock_node, servicer):
        # Setup request with custom name
        params = {"name": "custom-node-name@test", "host": "1.2.3.4", "protocol": "ssh"}
        request = connpy_pb2.InteractRequest(
            id="dynamic",
            connection_params_json=json.dumps(params)
        )
        
        # Mock node to allow _connect
        mock_node_instance = MagicMock()
        mock_node_instance._connect.return_value = True
        mock_node.return_value = mock_node_instance

        # We only need the first iteration of the generator to check naming
        gen = servicer.interact_node(iter([request]), MockContext())
        next(gen) # Skip the success response

        # Verify that node() was called with the custom name
        mock_node.assert_called()
        found = False
        for call in mock_node.call_args_list:
            if call.args[0] == "custom-node-name@test":
                found = True
                break
        assert found

    @patch("connpy.core.node")
    def test_interact_node_fallback_naming(self, mock_node, servicer):
        # Setup request without custom name but with host
        params = {"host": "my-instance", "protocol": "ssm"}
        request = connpy_pb2.InteractRequest(
            id="dynamic",
            connection_params_json=json.dumps(params)
        )
        
        mock_node_instance = MagicMock()
        mock_node_instance._connect.return_value = True
        mock_node.return_value = mock_node_instance

        gen = servicer.interact_node(iter([request]), MockContext())
        next(gen)

        # Verify fallback name: dynamic-{host}@remote
        found = False
        for call in mock_node.call_args_list:
            if call.args[0] == "dynamic-my-instance@remote":
                found = True
                break
        assert found

Methods

def servicer(self, populated_config)
Expand source code
@pytest.fixture
def servicer(self, populated_config):
    return server.NodeServicer(populated_config)
def test_interact_node_fallback_naming(self, mock_node, servicer)
Expand source code
@patch("connpy.core.node")
def test_interact_node_fallback_naming(self, mock_node, servicer):
    # Setup request without custom name but with host
    params = {"host": "my-instance", "protocol": "ssm"}
    request = connpy_pb2.InteractRequest(
        id="dynamic",
        connection_params_json=json.dumps(params)
    )
    
    mock_node_instance = MagicMock()
    mock_node_instance._connect.return_value = True
    mock_node.return_value = mock_node_instance

    gen = servicer.interact_node(iter([request]), MockContext())
    next(gen)

    # Verify fallback name: dynamic-{host}@remote
    found = False
    for call in mock_node.call_args_list:
        if call.args[0] == "dynamic-my-instance@remote":
            found = True
            break
    assert found
def test_interact_node_uses_passed_name(self, mock_node, servicer)
Expand source code
@patch("connpy.core.node")
def test_interact_node_uses_passed_name(self, mock_node, servicer):
    # Setup request with custom name
    params = {"name": "custom-node-name@test", "host": "1.2.3.4", "protocol": "ssh"}
    request = connpy_pb2.InteractRequest(
        id="dynamic",
        connection_params_json=json.dumps(params)
    )
    
    # Mock node to allow _connect
    mock_node_instance = MagicMock()
    mock_node_instance._connect.return_value = True
    mock_node.return_value = mock_node_instance

    # We only need the first iteration of the generator to check naming
    gen = servicer.interact_node(iter([request]), MockContext())
    next(gen) # Skip the success response

    # Verify that node() was called with the custom name
    mock_node.assert_called()
    found = False
    for call in mock_node.call_args_list:
        if call.args[0] == "custom-node-name@test":
            found = True
            break
    assert found
class TestStubsMessageFormatting
Expand source code
class TestStubsMessageFormatting:
    @patch("termios.tcsetattr")
    @patch("termios.tcgetattr")
    @patch("tty.setraw")
    @patch("os.read")
    @patch("select.select")
    def test_connect_dynamic_msg_formatting_ssm(self, mock_select, mock_read, mock_setraw, mock_getattr, mock_setattr):
        from connpy.grpc_layer.stubs import NodeStub
        
        mock_getattr.return_value = [0, 0, 0, 0, 0, 0, [0] * 32]
        mock_channel = MagicMock()
        stub = NodeStub(mock_channel, "localhost:8048")
        
        mock_resp = MagicMock()
        mock_resp.success = True
        stub.stub.interact_node.return_value = iter([mock_resp])
        
        with patch("connpy.printer.success") as mock_success:
            with patch("sys.stdin.fileno", return_value=0):
                mock_select.return_value = ([], [], [])
                params = {"protocol": "ssm", "host": "i-12345", "name": "my-ssm-node@aws"}
                
                with patch("select.select", side_effect=KeyboardInterrupt):
                    try:
                        stub.connect_dynamic(params)
                    except KeyboardInterrupt:
                        pass
                
                mock_success.assert_called()
                msg = mock_success.call_args[0][0]
                assert "Connected to my-ssm-node@aws" in msg
                assert "at i-12345" in msg
                assert ":22" not in msg
                assert "via: ssm" in msg

Methods

def test_connect_dynamic_msg_formatting_ssm(self, mock_select, mock_read, mock_setraw, mock_getattr, mock_setattr)
Expand source code
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("tty.setraw")
@patch("os.read")
@patch("select.select")
def test_connect_dynamic_msg_formatting_ssm(self, mock_select, mock_read, mock_setraw, mock_getattr, mock_setattr):
    from connpy.grpc_layer.stubs import NodeStub
    
    mock_getattr.return_value = [0, 0, 0, 0, 0, 0, [0] * 32]
    mock_channel = MagicMock()
    stub = NodeStub(mock_channel, "localhost:8048")
    
    mock_resp = MagicMock()
    mock_resp.success = True
    stub.stub.interact_node.return_value = iter([mock_resp])
    
    with patch("connpy.printer.success") as mock_success:
        with patch("sys.stdin.fileno", return_value=0):
            mock_select.return_value = ([], [], [])
            params = {"protocol": "ssm", "host": "i-12345", "name": "my-ssm-node@aws"}
            
            with patch("select.select", side_effect=KeyboardInterrupt):
                try:
                    stub.connect_dynamic(params)
                except KeyboardInterrupt:
                    pass
            
            mock_success.assert_called()
            msg = mock_success.call_args[0][0]
            assert "Connected to my-ssm-node@aws" in msg
            assert "at i-12345" in msg
            assert ":22" not in msg
            assert "via: ssm" in msg