本文详细分析了LangChain Agent系统的架构设计,包括核心概念(Agent、Tools、AgentExecutor、Memory)、系统架构、组件实现(Agent基类、执行引擎、工具系统)、不同Agent类型(ReAct、Function Calling)、高级特性(多智能体协作、记忆系统、错误处理)及性能优化(并行执行、智能缓存)。这些模块化、可扩展的设计使LangChain Agent能够处理复杂现实任务,为构建智能应用提供强大基础。

Agent 系统概述

1. 核心概念

LangChain 的 Agent 系统基于以下核心概念:

  • Agent: 决策制定者,决定下一步采取什么行动
  • Tools: 可执行的工具集合,Agent 可以调用这些工具
  • AgentExecutor: 执行引擎,负责运行 Agent 的决策循环
  • Memory: 记忆系统,存储对话历史和中间状态

2. 系统架构图

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   User Input    │───▶│  AgentExecutor   │───▶│   Tool Results  │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                              │                          │
                              ▼                          │
                       ┌──────────────┐                  │
                       │    Agent     │                  │
                       │  (Decision   │                  │
                       │   Maker)     │                  │
                       └──────────────┘                  │
                              │                          │
                              ▼                          │
                       ┌──────────────┐                  │
                       │    Tools     │◀─────────────────┘
                       │  Collection  │
                       └──────────────┘

核心组件实现分析

1. Agent 基类设计

classBaseAgent(Runnable[AgentInput, AgentOutput]):
"""Agent 基类定义"""

    @abstractmethod
defplan(
        self,
        intermediate_steps: List[Tuple[AgentAction, str]],
        callbacks: Callbacks = None,
        **kwargs: Any
    ) -> Union[AgentAction, AgentFinish]:
"""制定下一步行动计划"""
pass

    @property
    @abstractmethod
definput_keys(self) -> List[str]:
"""输入键列表"""
pass

    @property
    @abstractmethod
defreturn_values(self) -> List[str]:
"""返回值键列表"""
pass

2. AgentExecutor 执行引擎

classAgentExecutor(Chain):
"""Agent 执行引擎 - 核心执行循环"""

def__init__(
        self,
        agent: BaseAgent,
        tools: Sequence[BaseTool],
        max_iterations: int = 15,
        max_execution_time: Optional[float] = None,
        early_stopping_method: str = "force",
        handle_parsing_errors: bool = False,
        **kwargs
    ):
        self.agent = agent
        self.tools = tools
        self.max_iterations = max_iterations
        self.max_execution_time = max_execution_time
        self.early_stopping_method = early_stopping_method
        self.handle_parsing_errors = handle_parsing_errors

def_call(self, inputs: Dict[str, Any], run_manager: CallbackManagerForChainRun = None) -> Dict[str, Any]:
"""主执行循环"""
        name_to_tool_map = {tool.name: tool for tool in self.tools}
        color_mapping = get_color_mapping([tool.name for tool in self.tools])

        intermediate_steps: List[Tuple[AgentAction, str]] = []
        iterations = 0
        time_elapsed = 0.0
        start_time = time.time()

# 主循环
while self._should_continue(iterations, time_elapsed):
# 1. Agent 决策
            next_step_output = self._take_next_step(
                name_to_tool_map,
                color_mapping,
                inputs,
                intermediate_steps,
                run_manager=run_manager,
            )

# 2. 检查是否完成
if isinstance(next_step_output, AgentFinish):
return self._return(
                    next_step_output, intermediate_steps, run_manager=run_manager
                )

# 3. 执行工具并记录结果
            intermediate_steps.extend(next_step_output)

# 4. 更新计数器
            iterations += 1
            time_elapsed = time.time() - start_time

# 达到最大迭代次数或时间限制
return self._return_stopped_response(
            self.early_stopping_method, intermediate_steps, **inputs
        )

def_take_next_step(
        self,
        name_to_tool_map: Dict[str, BaseTool],
        color_mapping: Dict[str, str],
        inputs: Dict[str, str],
        intermediate_steps: List[Tuple[AgentAction, str]],
        run_manager: Optional[CallbackManagerForChainRun] = None,
    ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
"""执行下一步"""
try:
# Agent 制定计划
            output = self.agent.plan(
                intermediate_steps,
                callbacks=run_manager.get_child() if run_manager elseNone,
                **inputs,
            )
except OutputParserException as e:
if self.handle_parsing_errors:
# 处理解析错误
return self._handle_parsing_error(e, run_manager)
else:
raise e

# 如果是完成信号,直接返回
if isinstance(output, AgentFinish):
return output

# 执行工具
        actions = [output] if isinstance(output, AgentAction) else output
        result = []

for agent_action in actions:
if run_manager:
                run_manager.on_agent_action(agent_action, color="green")

# 查找并执行工具
            tool = name_to_tool_map[agent_action.tool]
            observation = tool.run(
                agent_action.tool_input,
                verbose=self.verbose,
                color=color_mapping[agent_action.tool],
                callbacks=run_manager.get_child() if run_manager elseNone,
            )

            result.append((agent_action, observation))

return result

3. 工具系统设计

classBaseTool(Runnable[str, str]):
"""工具基类"""

    name: str
    description: str
    return_direct: bool = False

    @abstractmethod
def_run(
        self,
        query: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
"""同步执行工具"""
pass

asyncdef_arun(
        self,
        query: str,
        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
    ) -> str:
"""异步执行工具"""
# 默认实现:在线程池中运行同步版本
        loop = asyncio.get_event_loop()
returnawait loop.run_in_executor(None, self._run, query, run_manager)

defrun(
        self,
        tool_input: Union[str, Dict],
        verbose: Optional[bool] = None,
        start_color: Optional[str] = "green",
        color: Optional[str] = "blue",
        callbacks: Callbacks = None,
        **kwargs: Any,
    ) -> str:
"""运行工具的公共接口"""
# 参数验证和预处理
        parsed_input = self._parse_input(tool_input)

# 创建回调管理器
        callback_manager = CallbackManager.configure(
            callbacks, self.callbacks, verbose, None, None, **kwargs
        )

# 执行工具
try:
            tool_args, tool_kwargs = self._to_args_and_kwargs(parsed_input)
            observation = self._run(*tool_args, run_manager=callback_manager, **tool_kwargs)
except (Exception, KeyboardInterrupt) as e:
            callback_manager.on_tool_error(e)
raise e
else:
            callback_manager.on_tool_end(observation, color=color, name=self.name, **kwargs)
return observation

Agent 类型实现

1. ReAct Agent

classReActAgent(BaseAgent):
"""ReAct (Reasoning + Acting) Agent 实现"""

def__init__(
        self,
        llm: BaseLanguageModel,
        tools: Sequence[BaseTool],
        prompt: BasePromptTemplate,
        output_parser: AgentOutputParser,
    ):
        self.llm = llm
        self.tools = tools
        self.prompt = prompt
        self.output_parser = output_parser

defplan(
        self,
        intermediate_steps: List[Tuple[AgentAction, str]],
        callbacks: Callbacks = None,
        **kwargs: Any,
    ) -> Union[AgentAction, AgentFinish]:
"""ReAct 推理过程"""
# 构建提示
        full_inputs = self.get_full_inputs(intermediate_steps, **kwargs)
        prompt_value = self.prompt.format_prompt(**full_inputs)

# LLM 推理
        llm_output = self.llm.generate_prompt(
            [prompt_value], callbacks=callbacks
        ).generations[0][0].text

# 解析输出
return self.output_parser.parse(llm_output)

defget_full_inputs(
        self,
        intermediate_steps: List[Tuple[AgentAction, str]],
        **kwargs: Any
    ) -> Dict[str, Any]:
"""构建完整输入"""
        thoughts = self._construct_scratchpad(intermediate_steps)
return {
            **kwargs,
"agent_scratchpad": thoughts,
"tools": self._format_tools(),
"tool_names": ", ".join([tool.name for tool in self.tools]),
        }

def_construct_scratchpad(
        self,
        intermediate_steps: List[Tuple[AgentAction, str]]
    ) -> str:
"""构建思考过程"""
        thoughts = ""
for action, observation in intermediate_steps:
            thoughts += f"Thought: {action.log}\n"
            thoughts += f"Action: {action.tool}\n"
            thoughts += f"Action Input: {action.tool_input}\n"
            thoughts += f"Observation: {observation}\n"
return thoughts

2. Function Calling Agent

classOpenAIFunctionsAgent(BaseAgent):
"""基于 OpenAI Function Calling 的 Agent"""

def__init__(
        self,
        llm: BaseLanguageModel,
        tools: Sequence[BaseTool],
        system_message: Optional[SystemMessage] = None,
    ):
        self.llm = llm
        self.tools = tools
        self.system_message = system_message or SystemMessage(
            content="You are a helpful assistant."
        )

defplan(
        self,
        intermediate_steps: List[Tuple[AgentAction, str]],
        callbacks: Callbacks = None,
        **kwargs: Any,
    ) -> Union[AgentAction, AgentFinish]:
"""使用 Function Calling 进行决策"""
# 构建消息历史
        messages = self._construct_messages(intermediate_steps, **kwargs)

# 准备函数定义
        functions = [self._tool_to_function(tool) for tool in self.tools]

# 调用 LLM
        response = self.llm.predict_messages(
            messages,
            functions=functions,
            callbacks=callbacks,
        )

# 解析响应
return self._parse_ai_message(response)

def_tool_to_function(self, tool: BaseTool) -> Dict[str, Any]:
"""将工具转换为函数定义"""
return {
"name": tool.name,
"description": tool.description,
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The input to the tool",
                    }
                },
"required": ["query"],
            },
        }

def_parse_ai_message(self, message: AIMessage) -> Union[AgentAction, AgentFinish]:
"""解析 AI 消息"""
if message.additional_kwargs.get("function_call"):
# 有函数调用
            function_call = message.additional_kwargs["function_call"]
return AgentAction(
                tool=function_call["name"],
                tool_input=json.loads(function_call["arguments"])["query"],
                log=message.content,
            )
else:
# 没有函数调用,表示完成
return AgentFinish(
                return_values={"output": message.content},
                log=message.content,
            )

高级特性实现

1. 多智能体协作

classMultiAgentSystem:
"""多智能体协作系统"""

def__init__(self, agents: Dict[str, BaseAgent]):
        self.agents = agents
        self.message_queue = asyncio.Queue()
        self.shared_memory = {}

asyncdefrun_collaborative_task(self, task: str) -> str:
"""运行协作任务"""
# 任务分解
        subtasks = await self._decompose_task(task)

# 分配任务给不同的 Agent
        tasks = []
for agent_name, subtask in subtasks.items():
            agent = self.agents[agent_name]
            tasks.append(self._run_agent_task(agent, subtask))

# 等待所有任务完成
        results = await asyncio.gather(*tasks)

# 整合结果
returnawait self._integrate_results(results)

asyncdef_run_agent_task(self, agent: BaseAgent, task: str) -> str:
"""运行单个 Agent 任务"""
# 这里可以实现更复杂的协作逻辑
returnawait agent.ainvoke({"input": task})

2. 记忆系统

classAgentMemory:
"""Agent 记忆系统"""

def__init__(self, memory_type: str = "buffer"):
        self.memory_type = memory_type
        self.short_term_memory = []
        self.long_term_memory = VectorStore()  # 向量数据库
        self.working_memory = {}

defadd_interaction(self, input_text: str, output_text: str):
"""添加交互记录"""
        interaction = {
"input": input_text,
"output": output_text,
"timestamp": datetime.now(),
        }

# 添加到短期记忆
        self.short_term_memory.append(interaction)

# 如果短期记忆过长,转移到长期记忆
if len(self.short_term_memory) > 10:
            old_interaction = self.short_term_memory.pop(0)
            self._store_in_long_term_memory(old_interaction)

defretrieve_relevant_memories(self, query: str, k: int = 5) -> List[Dict]:
"""检索相关记忆"""
# 从长期记忆中检索
        relevant_memories = self.long_term_memory.similarity_search(query, k=k)

# 结合短期记忆
        recent_memories = self.short_term_memory[-3:]  # 最近3条

return relevant_memories + recent_memories

3. 错误处理与恢复

classRobustAgentExecutor(AgentExecutor):
"""具有错误处理能力的 Agent 执行器"""

def__init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.error_recovery_strategies = {
"parsing_error": self._handle_parsing_error,
"tool_error": self._handle_tool_error,
"timeout_error": self._handle_timeout_error,
        }

def_take_next_step_with_recovery(self, *args, **kwargs):
"""带错误恢复的步骤执行"""
        max_retries = 3
        retry_count = 0

while retry_count < max_retries:
try:
return self._take_next_step(*args, **kwargs)
except Exception as e:
                error_type = self._classify_error(e)

if error_type in self.error_recovery_strategies:
                    recovery_action = self.error_recovery_strategies[error_type](e)
if recovery_action == "retry":
                        retry_count += 1
continue
elif recovery_action == "skip":
return []
elif recovery_action == "abort":
raise e
else:
raise e

raise Exception(f"Max retries ({max_retries}) exceeded")

def_handle_parsing_error(self, error: Exception) -> str:
"""处理解析错误"""
# 可以尝试使用更宽松的解析器
return"retry"

def_handle_tool_error(self, error: Exception) -> str:
"""处理工具错误"""
# 可以尝试使用备用工具
return"skip"

def_handle_timeout_error(self, error: Exception) -> str:
"""处理超时错误"""
# 可以增加超时时间重试
return"retry"

性能优化策略

1. 并行工具执行

classParallelToolExecutor:
"""并行工具执行器"""

asyncdefexecute_tools_parallel(
        self,
        actions: List[AgentAction],
        tools: Dict[str, BaseTool]
    ) -> List[Tuple[AgentAction, str]]:
"""并行执行多个工具"""
        tasks = []
for action in actions:
            tool = tools[action.tool]
            task = self._execute_tool_async(tool, action)
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

# 处理结果和异常
        final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
                observation = f"Tool execution failed: {str(result)}"
else:
                observation = result

            final_results.append((actions[i], observation))

return final_results

asyncdef_execute_tool_async(self, tool: BaseTool, action: AgentAction) -> str:
"""异步执行单个工具"""
returnawait tool.arun(action.tool_input)

2. 智能缓存

classAgentCache:
"""Agent 执行结果缓存"""

def__init__(self, cache_size: int = 1000):
        self.cache = {}
        self.cache_size = cache_size
        self.access_times = {}

defget_cached_result(self, state_hash: str) -> Optional[Any]:
"""获取缓存结果"""
if state_hash in self.cache:
            self.access_times[state_hash] = time.time()
return self.cache[state_hash]
returnNone

defcache_result(self, state_hash: str, result: Any):
"""缓存结果"""
if len(self.cache) >= self.cache_size:
# 移除最久未访问的缓存
            oldest_key = min(self.access_times.keys(),
                           key=lambda k: self.access_times[k])
del self.cache[oldest_key]
del self.access_times[oldest_key]

        self.cache[state_hash] = result
        self.access_times[state_hash] = time.time()

def_compute_state_hash(self, inputs: Dict, intermediate_steps: List) -> str:
"""计算状态哈希"""
        state_str = json.dumps({
"inputs": inputs,
"steps": [(step[0].tool, step[0].tool_input, step[1])
for step in intermediate_steps]
        }, sort_keys=True)
return hashlib.md5(state_str.encode()).hexdigest()

总结

LangChain Agent 系统的架构设计体现了以下关键特点:

  1. 模块化设计:Agent、Tools、Executor 各司其职
  2. 可扩展性:支持多种 Agent 类型和自定义工具
  3. 错误处理:完善的错误恢复机制
  4. 性能优化:并行执行、智能缓存等优化策略
  5. 协作能力:支持多智能体协作
  6. 记忆系统:短期和长期记忆的结合

这些设计使得 LangChain Agent 系统能够处理复杂的现实世界任务,为构建智能应用提供了强大的基础。

读者福利大放送:如果你对大模型感兴趣,想更加深入的学习大模型**,那么这份精心整理的大模型学习资料,绝对能帮你少走弯路、快速入门**

如果你是零基础小白,别担心——大模型入门真的没那么难,你完全可以学得会

👉 不用你懂任何算法和数学知识,公式推导、复杂原理这些都不用操心;
👉 也不挑电脑配置,普通家用电脑完全能 hold 住,不用额外花钱升级设备;
👉 更不用你提前学 Python 之类的编程语言,零基础照样能上手。

你要做的特别简单:跟着我的讲解走,照着教程里的步骤一步步操作就行。

包括:大模型学习线路汇总、学习阶段,大模型实战案例,大模型学习视频,人工智能、机器学习、大模型书籍PDF。带你从零基础系统性的学好大模型!

现在这份资料免费分享给大家,有需要的小伙伴,直接VX扫描下方二维码就能领取啦😝↓↓↓
在这里插入图片描述

为什么要学习大模型?

数据显示,2023 年我国大模型相关人才缺口已突破百万,这一数字直接暴露了人才培养体系的严重滞后与供给不足。而随着人工智能技术的飞速迭代,产业对专业人才的需求将呈爆发式增长,据预测,到 2025 年这一缺口将急剧扩大至 400 万!!
在这里插入图片描述

大模型学习路线汇总

整体的学习路线分成L1到L4四个阶段,一步步带你从入门到进阶,从理论到实战,跟着学习路线一步步打卡,小白也能轻松学会!
在这里插入图片描述

大模型实战项目&配套源码

光学理论可不够,这套学习资料还包含了丰富的实战案例,让你在实战中检验成果巩固所学知识
在这里插入图片描述

大模型学习必看书籍PDF

我精选了一系列大模型技术的书籍和学习文档(电子版),它们由领域内的顶尖专家撰写,内容全面、深入、详尽,为你学习大模型提供坚实的理论基础。
在这里插入图片描述

大模型超全面试题汇总

在面试过程中可能遇到的问题,我都给大家汇总好了,能让你们在面试中游刃有余
在这里插入图片描述

这些资料真的有用吗?

这份资料由我和鲁为民博士(北京清华大学学士和美国加州理工学院博士)共同整理,现任上海殷泊信息科技CEO,其创立的MoPaaS云平台获Forrester全球’强劲表现者’认证,服务航天科工、国家电网等1000+企业,以第一作者在IEEE Transactions发表论文50+篇,获NASA JPL火星探测系统强化学习专利等35项中美专利。本套AI大模型课程由清华大学-加州理工双料博士、吴文俊人工智能奖得主鲁为民教授领衔研发。

资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的技术人员,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。
在这里插入图片描述
👉获取方式

😝有需要的小伙伴,可以保存图片到VX扫描下方二维码免费领取【保证100%免费】
在这里插入图片描述
相信我,这套大模型系统教程将会是全网最齐全 最适合零基础的!!

Logo

更多推荐