智能体开发中MCP的技术实现
大模型MCP学习,大模型技术认证
引言
随着人工智能技术的快速发展,智能体(Agent)系统已成为AI应用的重要形态。在智能体开发过程中,如何让模型有效地获取和利用上下文信息成为关键挑战。模型上下文协议(Model Context Protocol,MCP)作为一种新兴的技术标准,为智能体与外部资源的交互提供了强大的解决方案。
MCP概述
什么是MCP
模型上下文协议(MCP)是一种开放标准,旨在规范AI模型与外部数据源、工具和服务之间的连接方式。MCP通过标准化的接口,使得大语言模型能够安全、高效地访问各种外部资源,从而增强智能体的功能和实用性。
MCP的核心特性
- 标准化接口:提供统一的API规范,简化集成过程
- 安全性:内置权限管理和数据保护机制
- 可扩展性:支持自定义资源和工具的接入
- 实时性:支持动态数据获取和实时交互
MCP在智能体开发中的应用场景
1. 数据库连接和查询
智能体可以通过MCP连接到各种数据库系统,实现数据的查询、分析和管理。
应用示例:
- 客服智能体连接CRM系统,查询客户信息
- 分析智能体连接数据仓库,生成业务报表
- 管理智能体连接库存系统,监控商品状态
2. 文件系统访问
通过MCP,智能体可以安全地访问文件系统,读取、写入和管理各种类型的文件。
应用示例:
- 文档处理智能体读取和编辑办公文档
- 代码审查智能体访问代码仓库
- 内容管理智能体处理多媒体文件
3. 外部API集成
MCP使智能体能够轻松集成第三方API服务,扩展功能范围。
应用示例:
- 天气智能体调用气象API获取实时天气信息
- 翻译智能体集成多种翻译服务
- 支付智能体连接支付网关处理交易
4. 工具和服务调用
智能体可以通过MCP调用各种外部工具和服务,实现复杂的任务自动化。
应用示例:
- 开发智能体调用代码编译器和测试工具
- 设计智能体使用图像生成和编辑工具
- 运维智能体连接监控和部署系统
更加系统的学习MCP技术
MCP的技术架构
协议层次结构
应用层 (智能体)
↓
MCP协议层
↓
传输层 (HTTP/WebSocket)
↓
资源层 (数据库/文件/API)
核心组件
- 资源发现:自动识别和注册可用资源
- 权限管理:细粒度的访问控制机制
- 会话管理:维护智能体与资源的连接状态
- 错误处理:统一的异常处理和恢复机制
MCP消息格式
MCP协议基于JSON-RPC 2.0标准,消息格式如下:
{
"jsonrpc": "2.0",
"method": "resources/list",
"params": {
"cursor": "optional_cursor"
},
"id": 1
}
响应格式:
{
"jsonrpc": "2.0",
"result": {
"resources": [
{
"uri": "file:///path/to/file",
"name": "Example File",
"description": "A sample file resource",
"mimeType": "text/plain"
}
],
"nextCursor": "next_page_cursor"
},
"id": 1
}
MCP客户端开发
客户端架构设计
MCP客户端是智能体与MCP服务器之间的桥梁,负责:
- 建立和维护与服务器的连接
- 发送请求和处理响应
- 管理会话状态
- 处理错误和重连
Python客户端实现
import asyncio
import json
import websockets
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
@dataclass
class Resource:
uri: str
name: str
description: str
mime_type: str
class MCPClient:
def __init__(self, server_url: str):
self.server_url = server_url
self.websocket = None
self.request_id = 0
self.pending_requests: Dict[int, asyncio.Future] = {}
async def connect(self):
"""建立WebSocket连接"""
self.websocket = await websockets.connect(self.server_url)
# 启动消息处理任务
asyncio.create_task(self._handle_messages())
async def disconnect(self):
"""断开连接"""
if self.websocket:
await self.websocket.close()
async def _handle_messages(self):
"""处理接收到的消息"""
async for message in self.websocket:
try:
data = json.loads(message)
if 'id' in data and data['id'] in self.pending_requests:
future = self.pending_requests.pop(data['id'])
if 'error' in data:
future.set_exception(Exception(data['error']))
else:
future.set_result(data.get('result'))
except json.JSONDecodeError:
print(f"Invalid JSON received: {message}")
async def _send_request(self, method: str, params: Dict[str, Any] = None) -> Any:
"""发送请求并等待响应"""
self.request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
request["params"] = params
future = asyncio.Future()
self.pending_requests[self.request_id] = future
await self.websocket.send(json.dumps(request))
return await future
async def list_resources(self, cursor: Optional[str] = None) -> List[Resource]:
"""获取资源列表"""
params = {}
if cursor:
params["cursor"] = cursor
result = await self._send_request("resources/list", params)
resources = []
for resource_data in result.get("resources", []):
resources.append(Resource(
uri=resource_data["uri"],
name=resource_data["name"],
description=resource_data["description"],
mime_type=resource_data["mimeType"]
))
return resources
async def read_resource(self, uri: str) -> Dict[str, Any]:
"""读取资源内容"""
params = {"uri": uri}
return await self._send_request("resources/read", params)
async def call_tool(self, name: str, arguments: Dict[str, Any] = None) -> Any:
"""调用工具"""
params = {"name": name}
if arguments:
params["arguments"] = arguments
return await self._send_request("tools/call", params)
# 使用示例
async def main():
client = MCPClient("ws://localhost:8080")
await client.connect()
try:
# 获取资源列表
resources = await client.list_resources()
print(f"Found {len(resources)} resources")
# 读取第一个资源
if resources:
content = await client.read_resource(resources[0].uri)
print(f"Resource content: {content}")
# 调用工具
result = await client.call_tool("calculator", {"operation": "add", "a": 1, "b": 2})
print(f"Tool result: {result}")
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
JavaScript客户端实现
class MCPClient {
constructor(serverUrl) {
this.serverUrl = serverUrl;
this.websocket = null;
this.requestId = 0;
this.pendingRequests = new Map();
}
async connect() {
this.websocket = new WebSocket(this.serverUrl);
this.websocket.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.id && this.pendingRequests.has(data.id)) {
const { resolve, reject } = this.pendingRequests.get(data.id);
this.pendingRequests.delete(data.id);
if (data.error) {
reject(new Error(data.error.message));
} else {
resolve(data.result);
}
}
};
return new Promise((resolve, reject) => {
this.websocket.onopen = () => resolve();
this.websocket.onerror = (error) => reject(error);
});
}
async sendRequest(method, params = {}) {
this.requestId++;
const request = {
jsonrpc: "2.0",
method: method,
id: this.requestId,
params: params
};
return new Promise((resolve, reject) => {
this.pendingRequests.set(this.requestId, { resolve, reject });
this.websocket.send(JSON.stringify(request));
});
}
async listResources(cursor = null) {
const params = cursor ? { cursor } : {};
return await this.sendRequest("resources/list", params);
}
async readResource(uri) {
return await this.sendRequest("resources/read", { uri });
}
async callTool(name, arguments = {}) {
return await this.sendRequest("tools/call", { name, arguments });
}
}
MCP服务器开发
服务器架构设计
MCP服务器负责:
- 管理和提供资源访问
- 实现工具调用接口
- 处理客户端连接和会话
- 权限验证和安全控制
Python服务器实现
import asyncio
import json
import websockets
from typing import Dict, Any, List, Optional
from abc import ABC, abstractmethod
class Tool(ABC):
"""工具基类"""
@abstractmethod
async def call(self, arguments: Dict[str, Any]) -> Any:
pass
@property
@abstractmethod
def name(self) -> str:
pass
@property
@abstractmethod
def description(self) -> str:
pass
class CalculatorTool(Tool):
"""计算器工具示例"""
@property
def name(self) -> str:
return "calculator"
@property
def description(self) -> str:
return "Basic calculator operations"
async def call(self, arguments: Dict[str, Any]) -> Any:
operation = arguments.get("operation")
a = arguments.get("a", 0)
b = arguments.get("b", 0)
if operation == "add":
return {"result": a + b}
elif operation == "subtract":
return {"result": a - b}
elif operation == "multiply":
return {"result": a * b}
elif operation == "divide":
if b == 0:
raise ValueError("Division by zero")
return {"result": a / b}
else:
raise ValueError(f"Unknown operation: {operation}")
class FileSystemTool(Tool):
"""文件系统工具示例"""
@property
def name(self) -> str:
return "filesystem"
@property
def description(self) -> str:
return "File system operations"
async def call(self, arguments: Dict[str, Any]) -> Any:
operation = arguments.get("operation")
path = arguments.get("path", "")
if operation == "read":
try:
with open(path, 'r', encoding='utf-8') as f:
return {"content": f.read()}
except FileNotFoundError:
raise ValueError(f"File not found: {path}")
elif operation == "write":
content = arguments.get("content", "")
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
return {"success": True}
else:
raise ValueError(f"Unknown operation: {operation}")
class MCPServer:
def __init__(self, host: str = "localhost", port: int = 8080):
self.host = host
self.port = port
self.tools: Dict[str, Tool] = {}
self.resources: Dict[str, Dict[str, Any]] = {}
self.connected_clients = set()
def register_tool(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool
def register_resource(self, uri: str, name: str, description: str,
mime_type: str, content: str):
"""注册资源"""
self.resources[uri] = {
"uri": uri,
"name": name,
"description": description,
"mimeType": mime_type,
"content": content
}
async def handle_client(self, websocket, path):
"""处理客户端连接"""
self.connected_clients.add(websocket)
try:
async for message in websocket:
try:
request = json.loads(message)
response = await self.handle_request(request)
await websocket.send(json.dumps(response))
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "Parse error"},
"id": None
}
await websocket.send(json.dumps(error_response))
finally:
self.connected_clients.remove(websocket)
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理请求"""
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
try:
if method == "resources/list":
result = await self.list_resources(params)
elif method == "resources/read":
result = await self.read_resource(params)
elif method == "tools/list":
result = await self.list_tools(params)
elif method == "tools/call":
result = await self.call_tool(params)
else:
raise ValueError(f"Unknown method: {method}")
return {
"jsonrpc": "2.0",
"result": result,
"id": request_id
}
except Exception as e:
return {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": str(e)
},
"id": request_id
}
async def list_resources(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""列出资源"""
resources = []
for resource in self.resources.values():
resources.append({
"uri": resource["uri"],
"name": resource["name"],
"description": resource["description"],
"mimeType": resource["mimeType"]
})
return {"resources": resources}
async def read_resource(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""读取资源"""
uri = params.get("uri")
if uri not in self.resources:
raise ValueError(f"Resource not found: {uri}")
resource = self.resources[uri]
return {
"contents": [{
"uri": uri,
"mimeType": resource["mimeType"],
"text": resource["content"]
}]
}
async def list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""列出工具"""
tools = []
for tool in self.tools.values():
tools.append({
"name": tool.name,
"description": tool.description
})
return {"tools": tools}
async def call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""调用工具"""
name = params.get("name")
arguments = params.get("arguments", {})
if name not in self.tools:
raise ValueError(f"Tool not found: {name}")
tool = self.tools[name]
result = await tool.call(arguments)
return {
"content": [
{
"type": "text",
"text": json.dumps(result)
}
]
}
async def start(self):
"""启动服务器"""
print(f"MCP Server starting on {self.host}:{self.port}")
async with websockets.serve(self.handle_client, self.host, self.port):
await asyncio.Future() # run forever
# 使用示例
async def main():
server = MCPServer()
# 注册工具
server.register_tool(CalculatorTool())
server.register_tool(FileSystemTool())
# 注册资源
server.register_resource(
uri="file:///example.txt",
name="Example Text File",
description="A sample text file",
mime_type="text/plain",
content="Hello, MCP World!"
)
await server.start()
if __name__ == "__main__":
asyncio.run(main())
Node.js服务器实现
const WebSocket = require('ws');
class MCPServer {
constructor(port = 8080) {
this.port = port;
this.tools = new Map();
this.resources = new Map();
this.wss = null;
}
registerTool(name, description, handler) {
this.tools.set(name, { name, description, handler });
}
registerResource(uri, name, description, mimeType, content) {
this.resources.set(uri, {
uri, name, description, mimeType, content
});
}
async handleRequest(request) {
const { method, params = {}, id } = request;
try {
let result;
switch (method) {
case 'resources/list':
result = await this.listResources(params);
break;
case 'resources/read':
result = await this.readResource(params);
break;
case 'tools/list':
result = await this.listTools(params);
break;
case 'tools/call':
result = await this.callTool(params);
break;
default:
throw new Error(`Unknown method: ${method}`);
}
return {
jsonrpc: "2.0",
result,
id
};
} catch (error) {
return {
jsonrpc: "2.0",
error: {
code: -32603,
message: error.message
},
id
};
}
}
async listResources(params) {
const resources = Array.from(this.resources.values()).map(resource => ({
uri: resource.uri,
name: resource.name,
description: resource.description,
mimeType: resource.mimeType
}));
return { resources };
}
async readResource(params) {
const { uri } = params;
const resource = this.resources.get(uri);
if (!resource) {
throw new Error(`Resource not found: ${uri}`);
}
return {
contents: [{
uri: resource.uri,
mimeType: resource.mimeType,
text: resource.content
}]
};
}
async listTools(params) {
const tools = Array.from(this.tools.values()).map(tool => ({
name: tool.name,
description: tool.description
}));
return { tools };
}
async callTool(params) {
const { name, arguments: args = {} } = params;
const tool = this.tools.get(name);
if (!tool) {
throw new Error(`Tool not found: ${name}`);
}
const result = await tool.handler(args);
return {
content: [{
type: "text",
text: JSON.stringify(result)
}]
};
}
start() {
this.wss = new WebSocket.Server({ port: this.port });
this.wss.on('connection', (ws) => {
console.log('Client connected');
ws.on('message', async (message) => {
try {
const request = JSON.parse(message);
const response = await this.handleRequest(request);
ws.send(JSON.stringify(response));
} catch (error) {
const errorResponse = {
jsonrpc: "2.0",
error: {
code: -32700,
message: "Parse error"
},
id: null
};
ws.send(JSON.stringify(errorResponse));
}
});
ws.on('close', () => {
console.log('Client disconnected');
});
});
console.log(`MCP Server started on port ${this.port}`);
}
}
// 使用示例
const server = new MCPServer(8080);
// 注册计算器工具
server.registerTool('calculator', 'Basic calculator operations', async (args) => {
const { operation, a, b } = args;
switch (operation) {
case 'add':
return { result: a + b };
case 'subtract':
return { result: a - b };
case 'multiply':
return { result: a * b };
case 'divide':
if (b === 0) throw new Error('Division by zero');
return { result: a / b };
default:
throw new Error(`Unknown operation: ${operation}`);
}
});
// 注册资源
server.registerResource(
'file:///example.txt',
'Example Text File',
'A sample text file',
'text/plain',
'Hello, MCP World!'
);
server.start();
高级特性和扩展
权限和安全管理
from functools import wraps
import jwt
from typing import List
class SecurityManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.permissions = {}
def authenticate(self, token: str) -> Dict[str, Any]:
"""验证JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
def authorize(self, user_id: str, resource: str, action: str) -> bool:
"""检查用户权限"""
user_permissions = self.permissions.get(user_id, {})
resource_permissions = user_permissions.get(resource, [])
return action in resource_permissions
def require_permission(self, resource: str, action: str):
"""装饰器:检查权限"""
def decorator(func):
@wraps(func)
async def wrapper(self, params: Dict[str, Any], user_context: Dict[str, Any]):
user_id = user_context.get('user_id')
if not self.security_manager.authorize(user_id, resource, action):
raise ValueError(f"Permission denied: {action} on {resource}")
return await func(self, params, user_context)
return wrapper
return decorator
# 在服务器中使用安全管理器
class SecureMCPServer(MCPServer):
def __init__(self, host: str = "localhost", port: int = 8080, secret_key: str = "your-secret-key"):
super().__init__(host, port)
self.security_manager = SecurityManager(secret_key)
@require_permission("resources", "read")
async def read_resource(self, params: Dict[str, Any], user_context: Dict[str, Any]) -> Dict[str, Any]:
# 原有的读取资源逻辑
return await super().read_resource(params)
连接池和缓存
import asyncio
from typing import Dict, Any, Optional
import aioredis
from asyncio import Queue
class ConnectionPool:
def __init__(self, max_connections: int = 10):
self.max_connections = max_connections
self.pool = Queue(maxsize=max_connections)
self.active_connections = 0
async def get_connection(self):
"""获取连接"""
if self.active_connections < self.max_connections:
self.active_connections += 1
return await self._create_connection()
else:
return await self.pool.get()
async def release_connection(self, connection):
"""释放连接"""
await self.pool.put(connection)
async def _create_connection(self):
# 创建实际连接的逻辑
pass
class CacheManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = None
self.redis_url = redis_url
async def initialize(self):
"""初始化Redis连接"""
self.redis = await aioredis.from_url(self.redis_url)
async def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
if self.redis:
value = await self.redis.get(key)
if value:
return json.loads(value)
return None
async def set(self, key: str, value: Any, expire: int = 3600):
"""设置缓存"""
if self.redis:
await self.redis.set(key, json.dumps(value), ex=expire)
async def delete(self, key: str):
"""删除缓存"""
if self.redis:
await self.redis.delete(key)
# 在服务器中使用缓存
class CachedMCPServer(MCPServer):
def __init__(self, host: str = "localhost", port: int = 8080):
super().__init__(host, port)
self.cache_manager = CacheManager()
async def read_resource(self, params: Dict[str, Any]) -> Dict[str, Any]:
uri = params.get("uri")
cache_key = f"resource:{uri}"
# 尝试从缓存获取
cached_result = await self.cache_manager.get(cache_key)
if cached_result:
return cached_result
# 从原始数据源获取
result = await super().read_resource(params)
# 存入缓存
await self.cache_manager.set(cache_key, result)
return result
批量操作和事务
class BatchProcessor:
def __init__(self, max_batch_size: int = 100):
self.max_batch_size = max_batch_size
self.batch_queue = []
self.processing = False
async def add_operation(self, operation: Dict[str, Any]):
"""添加操作到批处理队列"""
self.batch_queue.append(operation)
if len(self.batch_queue) >= self.max_batch_size:
await self.process_batch()
async def process_batch(self):
"""处理批量操作"""
if self.processing or not self.batch_queue:
return
self.processing = True
current_batch = self.batch_queue.copy()
self.batch_queue.clear()
try:
# 并行处理批量操作
tasks = []
for operation in current_batch:
task = asyncio.create_task(self._process_single_operation(operation))
tasks.append(task)
await asyncio.gather(*tasks)
finally:
self.processing = False
async def _process_single_operation(self, operation: Dict[str, Any]):
"""处理单个操作"""
# 实现具体的操作处理逻辑
pass
# 事务管理
class TransactionManager:
def __init__(self):
self.transactions = {}
async def begin_transaction(self, transaction_id: str):
"""开始事务"""
self.transactions[transaction_id] = {
"operations": [],
"status": "active"
}
async def add_operation(self, transaction_id: str, operation: Dict[str, Any]):
"""添加操作到事务"""
if transaction_id in self.transactions:
self.transactions[transaction_id]["operations"].append(operation)
async def commit_transaction(self, transaction_id: str):
"""提交事务"""
if transaction_id not in self.transactions:
raise ValueError(f"Transaction not found: {transaction_id}")
transaction = self.transactions[transaction_id]
try:
# 执行所有操作
for operation in transaction["operations"]:
await self._execute_operation(operation)
transaction["status"] = "committed"
except Exception as e:
await self.rollback_transaction(transaction_id)
raise e
async def rollback_transaction(self, transaction_id: str):
"""回滚事务"""
if transaction_id in self.transactions:
transaction = self.transactions[transaction_id]
transaction["status"] = "rolled_back"
# 执行回滚操作
for operation in reversed(transaction["operations"]):
await self._rollback_operation(operation)
async def _execute_operation(self, operation: Dict[str, Any]):
"""执行单个操作"""
pass
async def _rollback_operation(self, operation: Dict[str, Any]):
"""回滚单个操作"""
pass
实施最佳实践
1. 安全性考虑
- 身份验证:确保只有授权的智能体可以访问资源
- 数据加密:对敏感数据进行传输和存储加密
- 访问控制:实施基于角色的权限管理
- 审计日志:记录所有资源访问活动
2. 性能优化
- 连接池管理:复用连接以减少开销
- 缓存策略:缓存频繁访问的数据
- 批量操作:减少网络请求次数
- 异步处理:提高并发处理能力
3. 错误处理
- 重试机制:自动重试失败的请求
- 降级策略:在资源不可用时提供替代方案
- 监控告警:实时监控资源状态和性能
- 故障隔离:防止单点故障影响整体系统
开发工具和框架
主流MCP实现
- 官方SDK:提供多语言支持的标准实现
- 开源框架:社区维护的增强版本
- 云服务:托管式MCP服务解决方案
- 企业版本:面向企业的定制化实现
开发工具链
- MCP测试工具:验证协议实现的正确性
- 资源管理界面:可视化管理MCP资源
- 性能监控工具:实时监控MCP性能指标
- 调试工具:辅助开发和问题排查
应用案例分析
案例1:智能客服系统
某电商公司使用MCP构建了智能客服系统,智能体可以:
- 通过MCP连接订单数据库查询订单状态
- 访问产品信息系统提供商品咨询
- 调用支付API处理退款请求
- 连接物流API追踪包裹状态
效果:
- 客服响应时间减少70%
- 问题解决率提升至95%
- 客户满意度显著提升
案例2:代码审查助手
某软件公司开发了基于MCP的代码审查智能体:
- 连接Git仓库获取代码变更
- 集成静态分析工具检查代码质量
- 调用测试框架运行自动化测试
- 生成详细的审查报告
效果:
- 代码审查效率提升3倍
- 缺陷发现率提高40%
- 开发团队生产力显著提升
未来发展趋势
1. 标准化进程
- 更多厂商支持MCP标准
- 协议规范进一步完善
- 跨平台兼容性增强
2. 功能扩展
- 支持更多类型的外部资源
- 增强实时交互能力
- 改进多模态数据处理
3. 生态建设
- 丰富的第三方连接器
- 完善的开发者工具
- 活跃的社区贡献
4. 企业级特性
- 更强的安全性和合规性
- 高可用性和可扩展性
- 企业级管理和监控功能
结论
MCP作为智能体开发的重要技术标准,为智能体与外部资源的交互提供了标准化、安全、高效的解决方案。通过合理运用MCP,开发者可以构建功能强大、实用性强的智能体系统。随着技术的不断发展和生态的逐步完善,MCP将在智能体开发中发挥越来越重要的作用,推动AI应用向更加实用和智能的方向发展。
对于开发者而言,深入理解和掌握MCP技术,将有助于构建更加优秀的智能体产品,在AI应用的竞争中占据有利地位。
更多推荐


所有评论(0)