构建基于 SSE 协议通信的 MCP Server 和 Client
在之前的系列教程中,我们编写的 MCP 服务器与 MCP 客户端是通过 **stdio(Standard Input/Output,标准输入输出)**来进行交互的。客户端通过启动服务器子进程,并利用标准输入(stdin)和标准输出(stdout)建立双向通信。这种模式导致 MCP 客户端与服务器之间存在强耦合,且每个服务器进程只能与启动它的客户端通信(1:1 的关系)。
为了解耦 MCP 客户端与服务器,本文将演示如何使用 **SSE(Server-Sent Events,服务器发送事件)**协议进行通信,使服务器能够作为一个独立运行的进程,支持多个客户端的灵活连接、使用和断开。换句话说,基于 SSE 的服务器和客户端可以是完全解耦的进程,甚至运行在不同的节点上,从而提供更高的灵活性和扩展性。
本文使用到的所有代码可以在 Github 上找到:https://github.com/cr7258/hands-on-lab/tree/main/ai/claude/mcp/sse/elasticsearch-mcp-sse
MCP 系列文章:
- 一文带你入门 MCP(模型上下文协议)
- MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch
- 快速上手:实现你的第一个 MCP Client
- 构建基于 SSE 协议通信的 MCP Server 和 Client(本文)
什么是 SSE?
Server-Sent Events(SSE,服务器发送事件)是一种基于 HTTP 协议的技术,允许服务器向客户端单向、实时地推送数据。在 SSE 模式下,客户端通过创建一个 EventSource
对象与服务器建立持久连接,服务器则通过该连接持续发送数据流,而无需客户端反复发送请求。
SSE 的主要特点包括:
- 单向通信:服务器主动向客户端推送数据,客户端无法通过同一连接向服务器发送数据。
- 基于 HTTP 协议:利用现有的 HTTP 协议,无需额外的协议支持,易于实现和部署。
- 轻量级:实现简单,适用于需要实时更新的应用场景,如新闻推送、股票行情等。
- 自动重连:客户端在连接断开时会自动尝试重新连接,确保数据传输的连续性。
需要注意的是,SSE 仅支持服务器到客户端的单向通信,若应用场景需要双向实时通信,可能需要考虑使用 WebSocket 等其他技术。
环境准备
在本教程中,我们将使用 MCP Python SDK 来编写项目,使用 uv 来管理 Python 项目依赖。
uv add "mcp[cli]" elasticsearch python-dotenv uvicorn openai
实验用到的 Elasticsearch 集群可以通过 Github 中的 docker-compose.yaml
文件启动:
docker-compose up -d
浏览器输入 http://localhost:5601 访问 Kibana 界面,用户名 elastic
,密码 test123
。
在 .env
文件中设置好 Elasticsearch 的连接信息。
ELASTIC_HOST=https://localhost:9200
ELASTIC_USERNAME=elastic
ELASTIC_PASSWORD=test123
准备测试数据
在 Kibana 中打开 Management -> Dev Tools
页面, 执行以下代码创建两个索引 student
和 teacher
,分别插入几条数据:
POST /student/_doc
{
"name": "Alice",
"age": 20,
"major": "Computer Science"
}
POST /student/_doc
{
"name": "Bob",
"age": 22,
"major": "Mathematics"
}
POST /student/_doc
{
"name": "Carol",
"age": 21,
"major": "Physics"
}
POST /teacher/_doc
{
"name": "Tom",
"subject": "English",
"yearsOfExperience": 10
}
POST /teacher/_doc
{
"name": "John",
"subject": "History",
"yearsOfExperience": 7
}
POST /teacher/_doc
{
"name": "Lily",
"subject": "Mathematics",
"yearsOfExperience": 5
}
实现基于 SSE 协议的 MCP Server
MCP 服务器部分的代码与MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch一文中介绍的内容相同,因此在此不再赘述,读者可参阅该文章以获取详细信息。
这里重点说明一下如何让 MCP 服务器运行在 SSE 协议之上。在本教程中,我们使用 Starlette 框架来实现 SSE 服务器,代码主要分为以下几个部分:
SSE 传输对象的初始化
在函数开始处,通过创建SseServerTransport
对象,并指定基础路径/messages/
,用于后续管理 SSE 连接和消息传递。定义 SSE 连接处理函数
- handle_sse 函数:这是一个异步请求处理函数,当客户端请求建立 SSE 连接时会被调用。
- 连接过程:
利用sse.connect_sse
方法,传入当前请求的scope
、receive
方法和_send
方法,建立一个异步上下文管理器。
管理器返回两个数据流:read_stream
用于读取客户端发送的数据,write_stream
用于向客户端发送数据。 - 启动 MCP 服务器:在成功建立连接后,调用
mcp_server.run
方法,并传入读取、写入流以及由mcp_server.create_initialization_options()
生成的初始化参数。这一过程实现了 MCP 服务器与客户端之间的实时数据交互。
Starlette 应用及路由配置
- Starlette 实例化:函数返回一个新的 Starlette 应用实例,其调试模式根据传入的
debug
参数设置。 - 路由设置:
使用Route("/sse", endpoint=handle_sse)
定义/sse
路径,当客户端访问此路径时将触发handle_sse
函数处理 SSE 连接。
使用Mount("/messages/", app=sse.handle_post_message)
将/messages/
路径挂载到sse.handle_post_message
应用上,用于处理通过 POST 请求发送的消息,实现与 SSE 长连接的消息传递功能。
- Starlette 实例化:函数返回一个新的 Starlette 应用实例,其调试模式根据传入的
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
"""Create a Starlette application that can server the provied mcp server with SSE."""
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
接下来就是创建 MCP 服务器实例,然后通过上面定义的 create_starlette_app
方法创建 Starlette 应用,最后使用 uvicorn 启动 ASGI 服务器,实现实时的 SSE 数据传输。
if __name__ == "__main__":
mcp_server = mcp._mcp_server
parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=18080, help='Port to listen on')
args = parser.parse_args()
# Bind SSE request handling to MCP server
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)
启动 MCP 服务器
执行以下命令可以启动 MCP 服务器,默认监听在 18080 端口。
uv run server.py
# 输出
INFO: Started server process [82035]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:18080 (Press CTRL+C to quit)
实现基于 SSE 协议的 MCP Client
MCP 客户端部分的代码可以参考快速上手:实现你的第一个 MCP Client一文,这里不再重复说明。
唯一的区别就是在连接 MCP 服务器的时候通过 SSE 协议进行交互。以下是具体的代码:
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server running with SSE transport"""
# 创建 SSE 客户端连接上下文管理器
self._streams_context = sse_client(url=server_url)
# 异步初始化 SSE 连接,获取数据流对象
streams = await self._streams_context.__aenter__()
# 使用数据流创建 MCP 客户端会话上下文
self._session_context = ClientSession(*streams)
# 初始化客户端会话对象
self.session: ClientSession = await self._session_context.__aenter__()
# 执行 MCP 协议初始化握手
await self.session.initialize()
启动 MCP 客户端
执行以下命令启动 MCP 客户端并连接 MCP 服务器。
uv run client.py http://localhost:18080/sse
启动 MCP 客户端后,我们输入问题 Elasticsearch 集群中有哪些索引?
,可以看到 MCP 客户端成功调用了 MCP 服务器提供的 list_indices
从 Elasticsearch 集群中获取到了索引信息。
Initialized SSE client...
Listing tools...
Connected to server with tools: ['list_indices', 'get_index']
MCP Client Started!
Type your queries or 'quit' to exit.
Query: 集群中有哪些索引?
[Calling tool list_indices with args {}]
在集群中,有以下索引:
1. 索引名称: student
- 健康状况: green
- 状态: open
- UUID: gPUyqTHZQ12rSTEYX-Ho3w
- 主分片数: 1
- 副本分片数: 1
- 文档数量: 3
- 已删除的文档数量: 0
- 存储大小: 12kb
- 主分片的存储大小: 6kb
2. 索引名称: teacher
- 健康状况: green
- 状态: open
- UUID: ygkfkvjJSMaB946myzanNg
- 主分片数: 1
- 副本分片数: 1
- 文档数量: 3
- 已删除的文档数量: 0
- 存储大小: 22.6kb
- 主分片的存储大小: 6kb
总结
本文介绍了如何利用 Server-Sent Events(SSE)协议,实现 MCP 服务器与客户端之间的解耦通信。通过采用 SSE,服务器能够作为独立进程运行,支持多个客户端的灵活连接与断开,从而提升系统的灵活性和可扩展性。