open-webui/backend/open_webui/utils/mcp/client.py

117 lines
3.8 KiB
Python
Raw Normal View History

2025-09-23 14:03:26 +08:00
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.streamable_http import streamablehttp_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
async def connect(
self, url: str, headers: Optional[dict] = None, auth: Optional[any] = None
):
self._streams_context = streamablehttp_client(url, headers=headers, auth=auth)
read_stream, write_stream, _ = (
await self._streams_context.__aenter__()
) # pylint: disable=E1101
self._session_context = ClientSession(
read_stream, write_stream
) # pylint: disable=W0201
self.session: ClientSession = (
await self._session_context.__aenter__()
) # pylint: disable=C2801
await self.session.initialize()
async def list_tool_specs(self) -> Optional[dict]:
if not self.session:
raise RuntimeError("MCP client is not connected.")
result = await self.session.list_tools()
tools = result.tools
tool_specs = []
for tool in tools:
name = tool.name
description = tool.description
inputSchema = tool.inputSchema
# TODO: handle outputSchema if needed
outputSchema = getattr(tool, "outputSchema", None)
tool_specs.append(
{"name": name, "description": description, "parameters": inputSchema}
)
return tool_specs
async def call_tool(
self, function_name: str, function_args: dict
) -> Optional[dict]:
if not self.session:
raise RuntimeError("MCP client is not connected.")
result = await self.session.call_tool(function_name, function_args)
2025-09-23 14:40:59 +08:00
if not result:
raise Exception("No result returned from MCP tool call.")
result_dict = result.model_dump()
result_content = result_dict.get("content", {})
if result.isError:
raise Exception(result_content)
else:
return result_content
2025-09-23 14:03:26 +08:00
2025-09-23 15:19:36 +08:00
async def list_resources(self, cursor: Optional[str] = None) -> Optional[dict]:
if not self.session:
raise RuntimeError("MCP client is not connected.")
result = await self.session.list_resources(cursor=cursor)
if not result:
raise Exception("No result returned from MCP list_resources call.")
result_dict = result.model_dump()
resources = result_dict.get("resources", [])
return resources
async def read_resource(self, uri: str) -> Optional[dict]:
if not self.session:
raise RuntimeError("MCP client is not connected.")
result = await self.session.read_resource(uri)
if not result:
raise Exception("No result returned from MCP read_resource call.")
result_dict = result.model_dump()
return result_dict
2025-09-23 14:03:26 +08:00
async def disconnect(self):
# Clean up and close the session
if self.session:
await self._session_context.__aexit__(
None, None, None
) # pylint: disable=E1101
if self._streams_context:
await self._streams_context.__aexit__(
None, None, None
) # pylint: disable=E1101
self.session = None
async def __aenter__(self):
await self.exit_stack.__aenter__()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.exit_stack.__aexit__(exc_type, exc_value, traceback)
await self.disconnect()