【值得收藏】LangChain Agent系统架构设计与实现全攻略:构建自主决策的AI系统
本文详细分析了LangChain Agent系统的架构设计,包括核心概念(Agent、Tools、AgentExecutor、Memory)、系统架构、组件实现(Agent基类、执行引擎、工具系统)、不同Agent类型(ReAct、Function Calling)、高级特性(多智能体协作、记忆系统、错误处理)及性能优化(并行执行、智能缓存)。这些模块化、可扩展的设计使LangChain Agen
本文详细分析了LangChain Agent系统的架构设计,包括核心概念(Agent、Tools、AgentExecutor、Memory)、系统架构、组件实现(Agent基类、执行引擎、工具系统)、不同Agent类型(ReAct、Function Calling)、高级特性(多智能体协作、记忆系统、错误处理)及性能优化(并行执行、智能缓存)。这些模块化、可扩展的设计使LangChain Agent能够处理复杂现实任务,为构建智能应用提供强大基础。
Agent 系统概述
1. 核心概念
LangChain 的 Agent 系统基于以下核心概念:
- Agent: 决策制定者,决定下一步采取什么行动
- Tools: 可执行的工具集合,Agent 可以调用这些工具
- AgentExecutor: 执行引擎,负责运行 Agent 的决策循环
- Memory: 记忆系统,存储对话历史和中间状态
2. 系统架构图
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ User Input │───▶│ AgentExecutor │───▶│ Tool Results │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
▼ │
┌──────────────┐ │
│ Agent │ │
│ (Decision │ │
│ Maker) │ │
└──────────────┘ │
│ │
▼ │
┌──────────────┐ │
│ Tools │◀─────────────────┘
│ Collection │
└──────────────┘
核心组件实现分析
1. Agent 基类设计
classBaseAgent(Runnable[AgentInput, AgentOutput]):
"""Agent 基类定义"""
@abstractmethod
defplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any
) -> Union[AgentAction, AgentFinish]:
"""制定下一步行动计划"""
pass
@property
@abstractmethod
definput_keys(self) -> List[str]:
"""输入键列表"""
pass
@property
@abstractmethod
defreturn_values(self) -> List[str]:
"""返回值键列表"""
pass
2. AgentExecutor 执行引擎
classAgentExecutor(Chain):
"""Agent 执行引擎 - 核心执行循环"""
def__init__(
self,
agent: BaseAgent,
tools: Sequence[BaseTool],
max_iterations: int = 15,
max_execution_time: Optional[float] = None,
early_stopping_method: str = "force",
handle_parsing_errors: bool = False,
**kwargs
):
self.agent = agent
self.tools = tools
self.max_iterations = max_iterations
self.max_execution_time = max_execution_time
self.early_stopping_method = early_stopping_method
self.handle_parsing_errors = handle_parsing_errors
def_call(self, inputs: Dict[str, Any], run_manager: CallbackManagerForChainRun = None) -> Dict[str, Any]:
"""主执行循环"""
name_to_tool_map = {tool.name: tool for tool in self.tools}
color_mapping = get_color_mapping([tool.name for tool in self.tools])
intermediate_steps: List[Tuple[AgentAction, str]] = []
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# 主循环
while self._should_continue(iterations, time_elapsed):
# 1. Agent 决策
next_step_output = self._take_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
# 2. 检查是否完成
if isinstance(next_step_output, AgentFinish):
return self._return(
next_step_output, intermediate_steps, run_manager=run_manager
)
# 3. 执行工具并记录结果
intermediate_steps.extend(next_step_output)
# 4. 更新计数器
iterations += 1
time_elapsed = time.time() - start_time
# 达到最大迭代次数或时间限制
return self._return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
def_take_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
"""执行下一步"""
try:
# Agent 制定计划
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager elseNone,
**inputs,
)
except OutputParserException as e:
if self.handle_parsing_errors:
# 处理解析错误
return self._handle_parsing_error(e, run_manager)
else:
raise e
# 如果是完成信号,直接返回
if isinstance(output, AgentFinish):
return output
# 执行工具
actions = [output] if isinstance(output, AgentAction) else output
result = []
for agent_action in actions:
if run_manager:
run_manager.on_agent_action(agent_action, color="green")
# 查找并执行工具
tool = name_to_tool_map[agent_action.tool]
observation = tool.run(
agent_action.tool_input,
verbose=self.verbose,
color=color_mapping[agent_action.tool],
callbacks=run_manager.get_child() if run_manager elseNone,
)
result.append((agent_action, observation))
return result
3. 工具系统设计
classBaseTool(Runnable[str, str]):
"""工具基类"""
name: str
description: str
return_direct: bool = False
@abstractmethod
def_run(
self,
query: str,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""同步执行工具"""
pass
asyncdef_arun(
self,
query: str,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""异步执行工具"""
# 默认实现:在线程池中运行同步版本
loop = asyncio.get_event_loop()
returnawait loop.run_in_executor(None, self._run, query, run_manager)
defrun(
self,
tool_input: Union[str, Dict],
verbose: Optional[bool] = None,
start_color: Optional[str] = "green",
color: Optional[str] = "blue",
callbacks: Callbacks = None,
**kwargs: Any,
) -> str:
"""运行工具的公共接口"""
# 参数验证和预处理
parsed_input = self._parse_input(tool_input)
# 创建回调管理器
callback_manager = CallbackManager.configure(
callbacks, self.callbacks, verbose, None, None, **kwargs
)
# 执行工具
try:
tool_args, tool_kwargs = self._to_args_and_kwargs(parsed_input)
observation = self._run(*tool_args, run_manager=callback_manager, **tool_kwargs)
except (Exception, KeyboardInterrupt) as e:
callback_manager.on_tool_error(e)
raise e
else:
callback_manager.on_tool_end(observation, color=color, name=self.name, **kwargs)
return observation
Agent 类型实现
1. ReAct Agent
classReActAgent(BaseAgent):
"""ReAct (Reasoning + Acting) Agent 实现"""
def__init__(
self,
llm: BaseLanguageModel,
tools: Sequence[BaseTool],
prompt: BasePromptTemplate,
output_parser: AgentOutputParser,
):
self.llm = llm
self.tools = tools
self.prompt = prompt
self.output_parser = output_parser
defplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""ReAct 推理过程"""
# 构建提示
full_inputs = self.get_full_inputs(intermediate_steps, **kwargs)
prompt_value = self.prompt.format_prompt(**full_inputs)
# LLM 推理
llm_output = self.llm.generate_prompt(
[prompt_value], callbacks=callbacks
).generations[0][0].text
# 解析输出
return self.output_parser.parse(llm_output)
defget_full_inputs(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any
) -> Dict[str, Any]:
"""构建完整输入"""
thoughts = self._construct_scratchpad(intermediate_steps)
return {
**kwargs,
"agent_scratchpad": thoughts,
"tools": self._format_tools(),
"tool_names": ", ".join([tool.name for tool in self.tools]),
}
def_construct_scratchpad(
self,
intermediate_steps: List[Tuple[AgentAction, str]]
) -> str:
"""构建思考过程"""
thoughts = ""
for action, observation in intermediate_steps:
thoughts += f"Thought: {action.log}\n"
thoughts += f"Action: {action.tool}\n"
thoughts += f"Action Input: {action.tool_input}\n"
thoughts += f"Observation: {observation}\n"
return thoughts
2. Function Calling Agent
classOpenAIFunctionsAgent(BaseAgent):
"""基于 OpenAI Function Calling 的 Agent"""
def__init__(
self,
llm: BaseLanguageModel,
tools: Sequence[BaseTool],
system_message: Optional[SystemMessage] = None,
):
self.llm = llm
self.tools = tools
self.system_message = system_message or SystemMessage(
content="You are a helpful assistant."
)
defplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""使用 Function Calling 进行决策"""
# 构建消息历史
messages = self._construct_messages(intermediate_steps, **kwargs)
# 准备函数定义
functions = [self._tool_to_function(tool) for tool in self.tools]
# 调用 LLM
response = self.llm.predict_messages(
messages,
functions=functions,
callbacks=callbacks,
)
# 解析响应
return self._parse_ai_message(response)
def_tool_to_function(self, tool: BaseTool) -> Dict[str, Any]:
"""将工具转换为函数定义"""
return {
"name": tool.name,
"description": tool.description,
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The input to the tool",
}
},
"required": ["query"],
},
}
def_parse_ai_message(self, message: AIMessage) -> Union[AgentAction, AgentFinish]:
"""解析 AI 消息"""
if message.additional_kwargs.get("function_call"):
# 有函数调用
function_call = message.additional_kwargs["function_call"]
return AgentAction(
tool=function_call["name"],
tool_input=json.loads(function_call["arguments"])["query"],
log=message.content,
)
else:
# 没有函数调用,表示完成
return AgentFinish(
return_values={"output": message.content},
log=message.content,
)
高级特性实现
1. 多智能体协作
classMultiAgentSystem:
"""多智能体协作系统"""
def__init__(self, agents: Dict[str, BaseAgent]):
self.agents = agents
self.message_queue = asyncio.Queue()
self.shared_memory = {}
asyncdefrun_collaborative_task(self, task: str) -> str:
"""运行协作任务"""
# 任务分解
subtasks = await self._decompose_task(task)
# 分配任务给不同的 Agent
tasks = []
for agent_name, subtask in subtasks.items():
agent = self.agents[agent_name]
tasks.append(self._run_agent_task(agent, subtask))
# 等待所有任务完成
results = await asyncio.gather(*tasks)
# 整合结果
returnawait self._integrate_results(results)
asyncdef_run_agent_task(self, agent: BaseAgent, task: str) -> str:
"""运行单个 Agent 任务"""
# 这里可以实现更复杂的协作逻辑
returnawait agent.ainvoke({"input": task})
2. 记忆系统
classAgentMemory:
"""Agent 记忆系统"""
def__init__(self, memory_type: str = "buffer"):
self.memory_type = memory_type
self.short_term_memory = []
self.long_term_memory = VectorStore() # 向量数据库
self.working_memory = {}
defadd_interaction(self, input_text: str, output_text: str):
"""添加交互记录"""
interaction = {
"input": input_text,
"output": output_text,
"timestamp": datetime.now(),
}
# 添加到短期记忆
self.short_term_memory.append(interaction)
# 如果短期记忆过长,转移到长期记忆
if len(self.short_term_memory) > 10:
old_interaction = self.short_term_memory.pop(0)
self._store_in_long_term_memory(old_interaction)
defretrieve_relevant_memories(self, query: str, k: int = 5) -> List[Dict]:
"""检索相关记忆"""
# 从长期记忆中检索
relevant_memories = self.long_term_memory.similarity_search(query, k=k)
# 结合短期记忆
recent_memories = self.short_term_memory[-3:] # 最近3条
return relevant_memories + recent_memories
3. 错误处理与恢复
classRobustAgentExecutor(AgentExecutor):
"""具有错误处理能力的 Agent 执行器"""
def__init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.error_recovery_strategies = {
"parsing_error": self._handle_parsing_error,
"tool_error": self._handle_tool_error,
"timeout_error": self._handle_timeout_error,
}
def_take_next_step_with_recovery(self, *args, **kwargs):
"""带错误恢复的步骤执行"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
return self._take_next_step(*args, **kwargs)
except Exception as e:
error_type = self._classify_error(e)
if error_type in self.error_recovery_strategies:
recovery_action = self.error_recovery_strategies[error_type](e)
if recovery_action == "retry":
retry_count += 1
continue
elif recovery_action == "skip":
return []
elif recovery_action == "abort":
raise e
else:
raise e
raise Exception(f"Max retries ({max_retries}) exceeded")
def_handle_parsing_error(self, error: Exception) -> str:
"""处理解析错误"""
# 可以尝试使用更宽松的解析器
return"retry"
def_handle_tool_error(self, error: Exception) -> str:
"""处理工具错误"""
# 可以尝试使用备用工具
return"skip"
def_handle_timeout_error(self, error: Exception) -> str:
"""处理超时错误"""
# 可以增加超时时间重试
return"retry"
性能优化策略
1. 并行工具执行
classParallelToolExecutor:
"""并行工具执行器"""
asyncdefexecute_tools_parallel(
self,
actions: List[AgentAction],
tools: Dict[str, BaseTool]
) -> List[Tuple[AgentAction, str]]:
"""并行执行多个工具"""
tasks = []
for action in actions:
tool = tools[action.tool]
task = self._execute_tool_async(tool, action)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果和异常
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
observation = f"Tool execution failed: {str(result)}"
else:
observation = result
final_results.append((actions[i], observation))
return final_results
asyncdef_execute_tool_async(self, tool: BaseTool, action: AgentAction) -> str:
"""异步执行单个工具"""
returnawait tool.arun(action.tool_input)
2. 智能缓存
classAgentCache:
"""Agent 执行结果缓存"""
def__init__(self, cache_size: int = 1000):
self.cache = {}
self.cache_size = cache_size
self.access_times = {}
defget_cached_result(self, state_hash: str) -> Optional[Any]:
"""获取缓存结果"""
if state_hash in self.cache:
self.access_times[state_hash] = time.time()
return self.cache[state_hash]
returnNone
defcache_result(self, state_hash: str, result: Any):
"""缓存结果"""
if len(self.cache) >= self.cache_size:
# 移除最久未访问的缓存
oldest_key = min(self.access_times.keys(),
key=lambda k: self.access_times[k])
del self.cache[oldest_key]
del self.access_times[oldest_key]
self.cache[state_hash] = result
self.access_times[state_hash] = time.time()
def_compute_state_hash(self, inputs: Dict, intermediate_steps: List) -> str:
"""计算状态哈希"""
state_str = json.dumps({
"inputs": inputs,
"steps": [(step[0].tool, step[0].tool_input, step[1])
for step in intermediate_steps]
}, sort_keys=True)
return hashlib.md5(state_str.encode()).hexdigest()
总结
LangChain Agent 系统的架构设计体现了以下关键特点:
- 模块化设计:Agent、Tools、Executor 各司其职
- 可扩展性:支持多种 Agent 类型和自定义工具
- 错误处理:完善的错误恢复机制
- 性能优化:并行执行、智能缓存等优化策略
- 协作能力:支持多智能体协作
- 记忆系统:短期和长期记忆的结合
这些设计使得 LangChain Agent 系统能够处理复杂的现实世界任务,为构建智能应用提供了强大的基础。
读者福利大放送:如果你对大模型感兴趣,想更加深入的学习大模型**,那么这份精心整理的大模型学习资料,绝对能帮你少走弯路、快速入门**
如果你是零基础小白,别担心——大模型入门真的没那么难,你完全可以学得会!
👉 不用你懂任何算法和数学知识,公式推导、复杂原理这些都不用操心;
👉 也不挑电脑配置,普通家用电脑完全能 hold 住,不用额外花钱升级设备;
👉 更不用你提前学 Python 之类的编程语言,零基础照样能上手。
你要做的特别简单:跟着我的讲解走,照着教程里的步骤一步步操作就行。
包括:大模型学习线路汇总、学习阶段,大模型实战案例,大模型学习视频,人工智能、机器学习、大模型书籍PDF。带你从零基础系统性的学好大模型!
现在这份资料免费分享给大家,有需要的小伙伴,直接VX扫描下方二维码就能领取啦😝↓↓↓
为什么要学习大模型?
数据显示,2023 年我国大模型相关人才缺口已突破百万,这一数字直接暴露了人才培养体系的严重滞后与供给不足。而随着人工智能技术的飞速迭代,产业对专业人才的需求将呈爆发式增长,据预测,到 2025 年这一缺口将急剧扩大至 400 万!!
大模型学习路线汇总
整体的学习路线分成L1到L4四个阶段,一步步带你从入门到进阶,从理论到实战,跟着学习路线一步步打卡,小白也能轻松学会!
大模型实战项目&配套源码
光学理论可不够,这套学习资料还包含了丰富的实战案例,让你在实战中检验成果巩固所学知识
大模型学习必看书籍PDF
我精选了一系列大模型技术的书籍和学习文档(电子版),它们由领域内的顶尖专家撰写,内容全面、深入、详尽,为你学习大模型提供坚实的理论基础。
大模型超全面试题汇总
在面试过程中可能遇到的问题,我都给大家汇总好了,能让你们在面试中游刃有余
这些资料真的有用吗?
这份资料由我和鲁为民博士(北京清华大学学士和美国加州理工学院博士)共同整理,现任上海殷泊信息科技CEO,其创立的MoPaaS云平台获Forrester全球’强劲表现者’认证,服务航天科工、国家电网等1000+企业,以第一作者在IEEE Transactions发表论文50+篇,获NASA JPL火星探测系统强化学习专利等35项中美专利。本套AI大模型课程由清华大学-加州理工双料博士、吴文俊人工智能奖得主鲁为民教授领衔研发。
资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的技术人员,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。
👉获取方式:
😝有需要的小伙伴,可以保存图片到VX扫描下方二维码免费领取【保证100%免费】
相信我,这套大模型系统教程将会是全网最齐全 最适合零基础的!!
更多推荐


所有评论(0)