Module connpy.tests.test_ai_copilot
Functions
def mock_acompletion()-
Expand source code
@pytest.fixture def mock_acompletion(): # Patch acompletion inside connpy.ai.aask_copilot with patch('litellm.acompletion') as mock: yield mock def test_aask_copilot_fallback(mock_acompletion)-
Expand source code
def test_aask_copilot_fallback(mock_acompletion): agent = ai(DummyConfig()) # Setup mock response for streaming class MockDelta: def __init__(self, content): self.content = content class MockChoice: def __init__(self, content): self.delta = MockDelta(content) class MockChunk: def __init__(self, content): self.choices = [MockChoice(content)] async def mock_ac(*args, **kwargs): return MockAsyncIterator([ MockChunk("Here is some text response instead of tool call.") ]) mock_acompletion.side_effect = mock_ac async def run_test(): return await agent.aask_copilot("Router#", "What do I do?") result = asyncio.run(run_test()) if result["error"]: print(f"ERROR OCCURRED: {result['error']}") assert result["error"] is None assert result["guide"] == "Here is some text response instead of tool call." assert result["risk_level"] == "low" def test_aask_copilot_tool_call(mock_acompletion)-
Expand source code
def test_aask_copilot_tool_call(mock_acompletion): agent = ai(DummyConfig()) # Setup mock response for streaming class MockDelta: def __init__(self, content): self.content = content class MockChoice: def __init__(self, content): self.delta = MockDelta(content) class MockChunk: def __init__(self, content): self.choices = [MockChoice(content)] # acompletion is awaited and returns an async iterator async def mock_ac(*args, **kwargs): return MockAsyncIterator([ MockChunk("<guide>Check the interfaces and running config.</guide>"), MockChunk("<commands>\nshow ip int br\nshow run\n</commands>"), MockChunk("<risk>low</risk>") ]) mock_acompletion.side_effect = mock_ac async def run_test(): return await agent.aask_copilot("Router#", "What do I do?") result = asyncio.run(run_test()) if result["error"]: print(f"ERROR OCCURRED: {result['error']}") assert result["error"] is None assert result["guide"] == "Check the interfaces and running config." assert result["risk_level"] == "low" assert result["commands"] == ["show ip int br", "show run"] def test_ingress_task_interception()-
Expand source code
def test_ingress_task_interception(): async def run_test(): c = node("test_node", "1.2.3.4") c.mylog = MagicMock() c.mylog.getvalue.return_value = b"Some session log" c.unique = "test_node" c.host = "1.2.3.4" c.tags = {"os": "cisco_ios"} class MockStream: def __init__(self): self.data = [b"a", b"b", b"\x00", b"c", b""] async def read(self): if self.data: return self.data.pop(0) return b"" def setup(self, resize_callback): pass stream = MockStream() called_copilot = False async def mock_handler(buffer, node_info, s, child_fd): nonlocal called_copilot called_copilot = True assert buffer == "Some session log" assert node_info["os"] == "cisco_ios" c.child = MagicMock() c.child.child_fd = 123 c.child.after = b"" c.child.buffer = b"" async def mock_ingress(): while True: data = await stream.read() if not data: break if mock_handler and b'\x00' in data: buffer = c.mylog.getvalue().decode() node_info = {"name": getattr(c, 'unique', 'unknown'), "host": getattr(c, 'host', 'unknown')} if isinstance(getattr(c, 'tags', None), dict): node_info["os"] = c.tags.get("os", "unknown") await mock_handler(buffer, node_info, stream, c.child.child_fd) continue await mock_ingress() assert called_copilot asyncio.run(run_test()) def test_logclean_ansi()-
Expand source code
def test_logclean_ansi(): c = node("test_node", "1.2.3.4") raw = "Router#\x1b[K\x1b[m show ip" clean = c._logclean(raw, var=True) assert "\x1b" not in clean
Classes
class DummyConfig-
Expand source code
class DummyConfig: def __init__(self): self.config = {"ai": {"engineer_api_key": "test_key", "engineer_model": "test_model"}} self.defaultdir = "/tmp" class MockAsyncIterator (items)-
Expand source code
class MockAsyncIterator: def __init__(self, items): self.items = items def __aiter__(self): return self async def __anext__(self): if not self.items: raise StopAsyncIteration return self.items.pop(0)