MCP开发-04.重服务与长任务开发

本系列:00 什么是 MCP · 01 协议与心智模型 · 02 Python 框架选型 · 03 部署与调用 · 04 重任务与优化(本文) · 05 从 0 开发 · 06 进阶示例 · 07 阿里云案例


1. 问题从哪来

MCP Tool 若直接绑定 大计算(训练、批处理 ETL、大文件转换、外部 CI 流水线),同步 tools/call 会带来:

现象 原因
Host / 网关 超时 HTTP 或 LLM 侧等待上限(常见 30s~几分钟)
阻塞对话 模型与 UI 卡在单次 Tool 调用
stdio 资源争抢 单进程串行,Host 与子进程生命周期绑定
Context 膨胀 巨型 JSON 塞进 Tool 返回,挤占模型上下文

重服务不是「把现有脚本 @mcp.tool() 包一层」就够,需要在 协议层架构层 同时设计。

段末注释ETL(Extract, Transform, Load)指抽取、转换、加载数据的批处理流程。


2. 规范层解法(2025-11-25)

2.1 Tasks:call-now, fetch-later

Tasks(SEP-1686)让 Server 立即返回 taskIdworking 状态,Client 异步 tasks/get / tasks/result,必要时 tasks/cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sequenceDiagram
participant C as Client
participant S as MCP_Server
participant W as Worker

C->>S: tools/call(带 task 能力)
S->>W: 提交后台作业
S-->>C: Task taskId status=working
loop 轮询
C->>S: tasks/get
S-->>C: working + progress
end
W-->>S: 完成
S-->>C: tasks/get status=completed + result

任务状态机(终态不可回退):

状态 含义
working 执行中
input_required 需 Client / 用户补参(可与 elicitation 配合)
completed 成功
failed 失败
cancelled 已请求取消

2.2 Progress

长任务应配合 Context.report_progress(或等价 API)与 Client 提供的 progressToken,复用 MCP 既有 progress 通知,而非自建轮询字段。

2.3 Cancellation

取消是 协作式的:Server 收到 tasks/cancelnotifications/cancelled 后应尽力停止(杀子进程、撤销队列 job),但不保证瞬时中断。

2.4 Structured output 与 Resource 卸载

大结果不应全文塞进 Tool 文本返回:

  • 返回 摘要 + 句柄(job ID、S3 URI、Resource URI)。
  • 完整结果挂 Resource@mcp.resource)或对象存储,Client / 模型按需 resources/read

3. 实现层模式

模式 做法 何时用
Task 装饰器 @mcp.tool(task=True)(FastMCP ≥2.14 或官方 SDK 跟进版) Host 支持 Tasks
外部队列 Celery / RQ / Redis Queue;Tool 只 submitpoll 小时级、需持久化、多 Worker
拆分 Tool submit_job + get_job_status + fetch_result Host 尚未支持 Tasks 的兼容方案
Resource 卸载 大 payload 写文件 / 存储,Tool 返回 URI 减少 token、支持重复读取
采样分离 重推理放专用 GPU 服务,MCP 只做调度 模型推理与 Agent 解耦

3.1 FastMCP Tasks 示例(需版本与 Host 支持)

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
from mcp.server.fastmcp import Context, FastMCP

mcp = FastMCP("heavy_demo")

@mcp.tool(task=True)
async def process_dataset(path: str, ctx: Context) -> str:
"""批处理数据集(后台 Task)。"""
steps = 10
for i in range(steps):
await asyncio.sleep(0.5) # 模拟耗时
await ctx.report_progress(i + 1, steps, f"processing chunk {i + 1}/{steps}")
return f"done: {path}"

若运行环境为独立 fastmcp 包,import 可能为 from fastmcp import FastMCP;以 02 框架选型 为准。

3.2 兼容模式:三 Tool 拆分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
_JOBS: dict[str, dict] = {}

@mcp.tool()
def submit_job(input_uri: str) -> str:
"""提交批处理,返回 job_id。"""
job_id = "job-" + str(len(_JOBS) + 1)
_JOBS[job_id] = {"status": "working", "input": input_uri}
# 生产:此处 push 到 Celery 等
return job_id

@mcp.tool()
def get_job_status(job_id: str) -> str:
"""查询 job 状态。"""
return _JOBS.get(job_id, {}).get("status", "unknown")

@mcp.tool()
def fetch_job_result(job_id: str) -> str:
"""获取完成结果摘要。"""
job = _JOBS.get(job_id, {})
if job.get("status") != "completed":
return "not ready"
return job.get("result", "")

Tool 描述中应写明:「先 submit,再轮询 status,最后 fetch」。


4. 架构建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
flowchart LR
Host["MCP Host"]
MCP["MCP Server\n轻量 调度"]
Q["任务队列\nRedis / SQS"]
W1["Worker 1"]
W2["Worker 2"]
Store["对象存储 / DB"]

Host --> MCP
MCP --> Q
Q --> W1
Q --> W2
W1 --> Store
W2 --> Store
MCP --> Store
原则 说明
MCP 薄、Worker 厚 MCP 进程只做鉴权、参数校验、入队、查状态
HTTP 无状态入口 便于多副本;Session 状态放 Redis(2026 后趋向显式 handle)
避免 stdio 跑重任务 开发可 stdio,生产重任务走 HTTP + 队列
幂等 Task taskId / job_id 支持安全重试
TTL 任务结果保留时长(keepAlive / 业务策略),过期后仅留 metadata

部署选型见 03 部署方式与调用配置


5. 开发阶段优化清单

5.1 工具元数据

  • description 标明预期耗时量级(秒 / 分钟 / 小时)。
  • 使用 MCP Annotations(如 readOnlyHintdestructiveHint)帮助 Host 与用户确认风险。
  • 参数 schema 限制输入规模(如最大行数、文件大小)。

5.2 超时与取消

  • 监听 cancellation notification;子进程用进程组以便 kill
  • 外部队列配置 visibility timeoutdead letter

5.3 可观测性

  • 结构化日志:task_id / job_id 贯穿 trace。
  • 指标:队列深度、P99 任务时长、失败率。

5.4 返回设计

  • 默认返回 人类可读摘要 + 机器可读 JSON(structured content)。
  • 大于阈值的结果 只返回 Resource URI 或 signed URL。

5.5 Host 能力探测

Tasks 为较新能力,并非所有 Host 已实现。Server 可在 initialize capabilities 中声明 tasks,Client 不支持时降级为三 Tool 模式或同步短超时版本(并在描述中标注限制)。


6. 反模式(避免)

反模式 后果
同步 Tool 内 sleep(3600) 超时、无法取消、阻塞 Host
把 10MB JSON 塞进 Tool 返回 Context 爆炸、费用上升
stdio 单进程跑 GPU 训练 无法扩展、Host 重启即丢任务
无 job_id 的「.fire-and-forget」 Client 无法查询、难以排错

7. 小结

层级 手段
协议 Tasks、Progress、Cancellation
实现 task=True / 队列 / 三 Tool 兼容
数据 Resource 卸载、structured output
部署 HTTP 无状态 + Worker 池

完整代码 walkthrough:06 进阶示例-Resources与Tasks · 生产案例(阿里云 Argo + OSS/NAS):07 阿里云案例 · 协议细节:01 协议与心智模型

-------------本文结束感谢您的阅读-------------