为什么AI Agent需要MCP协议?
先说个真实场景:我想让AI Agent帮我查快递、读文件、自动发邮件,于是写了三个独立脚本分别调用快递API、文件系统和SMTP服务。每个脚本都能工作,但问题来了——每个脚本的调用方式都不一样:快递API要OAuth认证,文件系统是本地路径,邮件要SMTP连接。如果我想换AI模型或者加新工具,几乎要重写一遍。
MCP(Model Context Protocol)就是为了解决这个问题而生的。它定义了一套标准化的工具调用协议——不管你用的是Claude、GPT还是本地模型,不管工具是API、本地程序还是云服务,都用同一套接口规范。
这个协议最近在AI圈火得很快,因为它第一次让"AI Agent调用各种工具"这件事变得真正可复用、可扩展。本文从实战角度,带你从零搭一个基于MCP的工具调用系统。
MCP协议核心概念
MCP的设计哲学就三句话:工具是资源、调用是请求、结果是结构化数据。具体拆解:
- Host(主机):AI应用本身,比如你的Agent或者助手
- Client(客户端):Host里的MCP客户端,负责和Server通信
- Server(服务端):每个外部工具(API、本地服务)对应一个MCP Server
- Transport(传输层):目前主流是stdio(标准输入输出)和HTTP+SSE两种
整个通信流程是:Host发送请求 → Client路由到对应Server → Server执行工具 → 返回JSON结果 → Client解析 → Host拿到结构化数据。
实战:搭建一个本地MCP Server
场景设定
我们搭一个文件管理MCP Server,让AI Agent可以:读取指定路径文件、搜索文件内容、获取文件元信息。之所以选文件管理,是因为这个场景足够通用,代码量适中,改一改就能接其他API。
项目结构
mcp_file_server/
├── src/
│ ├── index.ts # Server入口
│ ├── tools/
│ │ ├── read_file.ts # 读取文件
│ │ ├── search_file.ts # 搜索文件
│ │ └── file_info.ts # 文件信息
│ └── types.ts # 类型定义
├── package.json
└── tsconfig.json
核心代码实现
// src/types.ts - 类型定义
export interface ToolDefinition {
name: string;
description: string;
inputSchema: {
type: "object";
properties: Record<string, any>;
required?: string[];
};
}
export interface ToolCallRequest {
name: string;
arguments: Record<string, any>;
}
export interface ToolCallResult {
content: Array<{
type: "text" | "image";
text?: string;
data?: string;
mimeType?: string;
}>;
isError?: boolean;
}
// src/tools/read_file.ts
import * as fs from "fs/promises";
import * as path from "path";
import { ToolDefinition, ToolCallResult } from "../types";
export const read_file_tool: ToolDefinition = {
name: "read_file",
description: "读取指定路径的文本文件内容,支持UTF-8编码。返回文件全文或指定行范围。",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "文件绝对路径"
},
startLine: {
type: "number",
description: "起始行号(可选,默认从第1行开始)"
},
endLine: {
type: "number",
description: "结束行号(可选,默认到文件末尾)"
}
},
required: ["path"]
}
};
export async function read_file(args: {
path: string;
startLine?: number;
endLine?: number;
}): Promise<ToolCallResult> {
try {
const content = await fs.readFile(args.path, "utf-8");
const lines = content.split("\n");
const start = (args.startLine || 1) - 1;
const end = args.endLine || lines.length;
const selected = lines.slice(start, end).join("\n");
return {
content: [{
type: "text",
text: "文件: " + args.path + "\n行数: " + lines.length + "\n---\n" + selected
}]
};
} catch (error: any) {
return {
content: [{ type: "text", text: "读取失败: " + error.message }],
isError: true
};
}
}
// src/tools/search_file.ts
import * as fs from "fs/promises";
import * as path from "path";
import { ToolDefinition, ToolCallResult } from "../types";
export const search_file_tool: ToolDefinition = {
name: "search_file",
description: "递归搜索指定目录下包含关键词的文件,返回匹配的行内容",
inputSchema: {
type: "object",
properties: {
directory: {
type: "string",
description: "搜索目录路径"
},
keyword: {
type: "string",
description: "搜索关键词"
},
extension: {
type: "string",
description: "文件扩展名过滤(如 .txt, .md)"
}
},
required: ["directory", "keyword"]
}
};
async function searchInFile(
filepath: string,
keyword: string
): Promise<string | null> {
try {
const content = await fs.readFile(filepath, "utf-8");
const lines = content.split("\n");
const matches = lines
.map((line, idx) => ({ line, num: idx + 1 }))
.filter(item => item.line.includes(keyword));
if (matches.length > 0) {
return matches
.slice(0, 10)
.map(m => " " + m.num + ": " + m.line.trim())
.join("\n");
}
return null;
} catch {
return null;
}
}
export async function search_file(args: {
directory: string;
keyword: string;
extension?: string;
}): Promise<ToolCallResult> {
const results: string[] = [];
async function walk(dir: string) {
const entries = await fs.readdir(dir, { withFileTypes: true });
for (const entry of entries) {
if (entry.name.startsWith(".")) continue;
const fullPath = path.join(dir, entry.name);
if (entry.isDirectory()) {
await walk(fullPath);
} else if (entry.isFile()) {
if (args.extension && !entry.name.endsWith(args.extension)) continue;
const match = await searchInFile(fullPath, args.keyword);
if (match) {
results.push(fullPath + ":\n" + match + "\n");
}
}
}
}
await walk(args.directory);
if (results.length === 0) {
return {
content: [{ type: "text", text: "未找到匹配结果" }],
isError: false
};
}
return {
content: [{
type: "text",
text: "找到 " + results.length + " 个匹配文件:\n---\n" + results.join("\n---\n")
}]
};
}
// src/index.ts - Server主入口
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema
} from "@modelcontextprotocol/sdk/types.js";
import { read_file_tool, read_file } from "./tools/read_file";
import { search_file_tool, search_file } from "./tools/search_file";
import { file_info_tool, file_info } from "./tools/file_info";
const server = new Server(
{ name: "file-manager-mcp", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// 注册所有工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [read_file_tool, search_file_tool, file_info_tool]
};
});
// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case "read_file":
return await read_file(args);
case "search_file":
return await search_file(args);
case "file_info":
return await file_info(args);
default:
return {
content: [{ type: "text", text: "未知工具: " + name }],
isError: true
};
}
});
// 启动stdio传输
const transport = new StdioServerTransport();
server.connect(transport);
console.error("文件管理MCP Server已启动");
让AI Agent调用MCP Server
使用Python SDK连接
# mcp_client.py
from mcp import ClientSession, StdioServerParameters
import asyncio
class MCPFileClient:
def __init__(self):
self.session = None
async def connect(self):
params = StdioServerParameters(
command="node",
args=["/path/to/mcp_file_server/dist/index.js"]
)
async with ClientSession(params) as session:
await session.initialize()
self.session = session
return session
async def read_file(self, path: str, startLine: int = None, endLine: int = None):
result = await self.session.call_tool("read_file", {
"path": path,
**( {"startLine": startLine} if startLine else {}),
**( {"endLine": endLine} if endLine else {})
})
return result.content[0].text
async def search(self, directory: str, keyword: str, extension: str = None):
result = await self.session.call_tool("search_file", {
"directory": directory,
"keyword": keyword,
**( {"extension": extension} if extension else {})
})
return result.content[0].text
# 使用示例
async def main():
client = MCPFileClient()
await client.connect()
# 让AI决定调用什么工具
ai_decision = "帮我读取 config.json 的前20行"
if "读取" in ai_decision and "config" in ai_decision:
content = await client.read_file("/project/config.json", 1, 20)
print(content)
asyncio.run(main())
集成到OpenClaw Agent
如果你用OpenClaw,可以直接在Agent配置里挂载MCP Server:
{
"mcpServers": {
"fileManager": {
"command": "node",
"args": ["/path/to/mcp_file_server/dist/index.js"],
"enabled": true
},
"webSearch": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
}
配置好后,Agent就能直接用自然语言调用这些工具,比如"帮我搜索项目里所有包含API密钥的文件"。
MCP生态现状和选型建议
目前(2026年初)MCP生态还处于快速发展期,主流工具大概分这几类:
| 类型 | 代表项目 | 成熟度 | 适用场景 |
|---|---|---|---|
| 官方SDK | @modelcontextprotocol/sdk | 生产级 | TypeScript/JavaScript开发MCP Server |
| 文件系统 | server-filesystem | 生产级 | 让AI读写本地文件 |
| 浏览器控制 | server-playwright | Beta | AI自动化操作网页 |
| GitHub集成 | server-github | 生产级 | 代码管理和PR操作 |
| 数据库 | server-postgres, server-sqlite | Beta | AI查询和分析数据 |
| Slack/Discord | server-slack, server-discord | Beta | AI操作群聊和消息 |
踩坑总结和最佳实践
我在这套方案上踩过几个坑,记下来帮你避雷:
- stdio模式慎用于长时间运行的服务:stdio模式下,如果Server卡住(比如死循环),Client会一直等待。建议用HTTP+SSE模式,支持超时控制。
- 工具返回的文本长度要控制:大文件全文返回会让AI上下文爆炸。一定要加行数限制或文件大小判断。
- 安全隔离别忘了:MCP Server以Agent权限运行,如果接文件系统的Server,不要让AI能读敏感目录(如.ssh、.npmrc)。
- 错误处理要友好:工具执行失败时,返回的isError字段和友好的错误信息比抛异常更重要。
进阶:从工具调用到工作流编排
单个工具调用是MCP的起点,真正的价值在于组合多个工具形成工作流。比如:
# 工作流示例:自动化代码审查
async function code_review_flow(repo_path: string):
# 1. 搜索所有修改的文件
changed = await search(repo_path, "TODO|FIXME|BUG")
# 2. 读取关键文件
files = parse_changed_files(changed)
reviews = [await read_file(f) for f in files]
# 3. 调用LLM分析
analysis = await llm.analyze(reviews)
# 4. 生成报告
await write_report(analysis)
这种工具编排能力才是MCP的真正魅力——它让AI Agent从"能做什么"进化到"能规划做什么"。
相关文章推荐:AI Agent定时任务自动执行实战 | OpenClaw Agent实战部署 | 大模型RAG知识库搭建教程
版权声明
本文仅代表个人观点。
本文系AI辅助作者原创,未经许可,转载请保留原文链接。

发表评论