Module connpy.mcp_client
Classes
class MCPClientManager (config=None)-
Expand source code
class MCPClientManager: """Manages MCP SSE client connections for connpy.""" _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None: cls._instance = super(MCPClientManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, config=None): if self._initialized: return self.config = config self.sessions: Dict[str, Dict[str, Any]] = {} # name -> {session, stack} self.tool_cache: Dict[str, List[Dict[str, Any]]] = {} self._connecting: Dict[str, asyncio.Future] = {} self._initialized = True async def get_tools_for_llm(self, os_filter: Optional[str] = None) -> List[Dict[str, Any]]: """ Fetches tools from enabled MCP servers that match the OS filter. """ if not MCP_AVAILABLE: return [] all_llm_tools = [] try: mcp_config = self.config.config.get("ai", {}).get("mcp_servers", {}) except Exception: return [] async def _fetch(name, cfg): if not cfg.get("enabled", True): return [] # Filter by OS if specified in config (primarily used for copilot strict matching) auto_os = cfg.get("auto_load_on_os") if os_filter is not None and auto_os and os_filter.lower() != auto_os.lower(): return [] try: session = await self._ensure_connected(name, cfg) if session: if name in self.tool_cache: return self.tool_cache[name] llm_tools = await self._fetch_tools_as_openai(name, session) self.tool_cache[name] = llm_tools return llm_tools except Exception: pass return [] tasks = [ _fetch(name, cfg) for name, cfg in mcp_config.items() ] if tasks: results = await asyncio.gather(*tasks) for tools in results: all_llm_tools.extend(tools) return all_llm_tools async def _ensure_connected(self, name: str, cfg: Dict[str, Any]) -> Optional[Any]: if not MCP_AVAILABLE: return None if name in self.sessions and self.sessions[name].get("session"): return self.sessions[name]["session"] url = cfg.get("url") if not url: return None if name in self._connecting: try: return await asyncio.wait_for(asyncio.shield(self._connecting[name]), timeout=10.0) except Exception: return None loop = asyncio.get_running_loop() fut = loop.create_future() self._connecting[name] = fut try: from contextlib import AsyncExitStack stack = AsyncExitStack() async def _do_connect(): read, write = await stack.enter_async_context(sse_client(url)) session = await stack.enter_async_context(ClientSession(read, write)) await session.initialize() return session session = await asyncio.wait_for(_do_connect(), timeout=15.0) self.sessions[name] = {"session": session, "stack": stack} fut.set_result(session) return session except Exception: fut.set_result(None) return None finally: if name in self._connecting: del self._connecting[name] async def _fetch_tools_as_openai(self, server_name: str, session: Any) -> List[Dict[str, Any]]: try: result = await asyncio.wait_for(session.list_tools(), timeout=5.0) openai_tools = [] for tool in result.tools: # Use mcp_ prefix to ensure valid function name for LiteLLM/Gemini prefixed_name = f"mcp_{server_name}__{tool.name}" openai_tools.append({ "type": "function", "function": { "name": prefixed_name, "description": f"[{server_name}] {tool.description}", "parameters": tool.inputSchema } }) return openai_tools except Exception: return [] async def call_tool(self, full_tool_name: str, arguments: Dict[str, Any]) -> Any: """Calls an MCP tool and returns text result.""" if not MCP_AVAILABLE: return "Error: MCP SDK is not installed." if "__" not in full_tool_name: return f"Error: Tool {full_tool_name} is not a valid MCP tool." clean_name = full_tool_name[4:] if full_tool_name.startswith("mcp_") else full_tool_name server_name, tool_name = clean_name.split("__", 1) if server_name not in self.sessions: return f"Error: MCP server {server_name} is not connected." session = self.sessions[server_name]["session"] try: result = await asyncio.wait_for(session.call_tool(tool_name, arguments), timeout=60.0) text_outputs = [content.text for content in result.content if hasattr(content, "text")] return "\n".join(text_outputs) if text_outputs else str(result) except Exception as e: return f"Error calling tool {tool_name} on {server_name}: {str(e)}" async def shutdown(self): """Close all SSE connections.""" for name, data in self.sessions.items(): stack = data.get("stack") if stack: await stack.aclose() self.sessions = {}Manages MCP SSE client connections for connpy.
Methods
async def call_tool(self, full_tool_name: str, arguments: Dict[str, Any]) ‑> Any-
Expand source code
async def call_tool(self, full_tool_name: str, arguments: Dict[str, Any]) -> Any: """Calls an MCP tool and returns text result.""" if not MCP_AVAILABLE: return "Error: MCP SDK is not installed." if "__" not in full_tool_name: return f"Error: Tool {full_tool_name} is not a valid MCP tool." clean_name = full_tool_name[4:] if full_tool_name.startswith("mcp_") else full_tool_name server_name, tool_name = clean_name.split("__", 1) if server_name not in self.sessions: return f"Error: MCP server {server_name} is not connected." session = self.sessions[server_name]["session"] try: result = await asyncio.wait_for(session.call_tool(tool_name, arguments), timeout=60.0) text_outputs = [content.text for content in result.content if hasattr(content, "text")] return "\n".join(text_outputs) if text_outputs else str(result) except Exception as e: return f"Error calling tool {tool_name} on {server_name}: {str(e)}"Calls an MCP tool and returns text result.
async def get_tools_for_llm(self, os_filter: str | None = None) ‑> List[Dict[str, Any]]-
Expand source code
async def get_tools_for_llm(self, os_filter: Optional[str] = None) -> List[Dict[str, Any]]: """ Fetches tools from enabled MCP servers that match the OS filter. """ if not MCP_AVAILABLE: return [] all_llm_tools = [] try: mcp_config = self.config.config.get("ai", {}).get("mcp_servers", {}) except Exception: return [] async def _fetch(name, cfg): if not cfg.get("enabled", True): return [] # Filter by OS if specified in config (primarily used for copilot strict matching) auto_os = cfg.get("auto_load_on_os") if os_filter is not None and auto_os and os_filter.lower() != auto_os.lower(): return [] try: session = await self._ensure_connected(name, cfg) if session: if name in self.tool_cache: return self.tool_cache[name] llm_tools = await self._fetch_tools_as_openai(name, session) self.tool_cache[name] = llm_tools return llm_tools except Exception: pass return [] tasks = [ _fetch(name, cfg) for name, cfg in mcp_config.items() ] if tasks: results = await asyncio.gather(*tasks) for tools in results: all_llm_tools.extend(tools) return all_llm_toolsFetches tools from enabled MCP servers that match the OS filter.
async def shutdown(self)-
Expand source code
async def shutdown(self): """Close all SSE connections.""" for name, data in self.sessions.items(): stack = data.get("stack") if stack: await stack.aclose() self.sessions = {}Close all SSE connections.