DeerFlow多智能体项目分析-LangGraph引用及核心功能实现源码分析
DeerFlow基于LangGraph构建了一个复杂的多智能体协作系统,充分利用了LangGraph的状态管理、工作流编排、检查点机制等核心功能。本文档深入分析项目中LangGraph的技术应用和实现细节。源码文件路径:"""聊天流管理器,支持MongoDB和PostgreSQL持久化"""self.store = InMemoryStore() # 内存存储用于临时数据"""处理流式消息"""#
DeerFlow中LangGraph的的引用和核心功能实现细节
概述
DeerFlow基于LangGraph构建了一个复杂的多智能体协作系统,充分利用了LangGraph的状态管理、工作流编排、检查点机制等核心功能。本文档深入分析DeerFlow多智能体实现中LangGraph的的引用和核心功能实现细节。本项目核心是基于LangGraph构建多智能体协助系统,包括智能体的核心功能、工作流、工具体系、记忆、打断机制等协助智能体更合理工作的约束和辅助模块,引用模块包括StateGraph, MessagesState、Command、interrupt、MemorySaver、tools等。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/
DeerFlow中LangGraph技术点和功能分析
概述
DeerFlow基于LangGraph构建了一个复杂的多智能体协作系统,充分利用了LangGraph的状态管理、工作流编排、检查点机制等核心功能。本文档深入分析项目中LangGraph的技术应用和实现细节。
1. LangGraph核心架构引用
1.1 StateGraph状态图构建
源码文件路径: src/graph/builder.py
from langgraph.graph import StateGraph, START, END
def _build_base_graph():
"""构建基础状态图"""
builder = StateGraph(State)
# 添加节点
builder.add_edge(START, "coordinator")
builder.add_node("coordinator", coordinator_node)
builder.add_node("planner", planner_node)
builder.add_node("researcher", researcher_node)
builder.add_node("coder", coder_node)
builder.add_node("reporter", reporter_node)
# 添加条件边
builder.add_conditional_edges(
"research_team",
continue_to_running_research_team,
["planner", "researcher", "coder"],
)
return builder.compile()
1.2 MessagesState扩展
源码文件路径: src/graph/types.py
from langgraph.graph import MessagesState
class State(MessagesState):
"""扩展MessagesState,添加业务特定字段"""
# 运行时变量
locale: str = "en-US"
research_topic: str = ""
observations: list[str] = []
resources: list[Resource] = []
plan_iterations: int = 0
current_plan: Plan | str = None
final_report: str = ""
# 澄清功能状态
enable_clarification: bool = False
clarification_rounds: int = 0
clarification_history: list[str] = []
is_clarification_complete: bool = False
clarified_question: str = ""
max_clarification_rounds: int = 3
# 工作流控制
goto: str = "planner"
2. 工作流控制机制
2.1 Command对象使用
源码文件路径: src/graph/nodes.py
LangGraph的Command对象用于精确控制工作流转换:
from langgraph.types import Command
def planner_node(state: State, config: RunnableConfig):
# 检查计划迭代限制
if plan_iterations >= configurable.max_plan_iterations:
return Command(goto="reporter")
# 根据计划内容决定下一步
if curr_plan.get("has_enough_context"):
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": new_plan,
},
goto="reporter",
)
# 需要人工反馈
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": full_response,
},
goto="human_feedback",
)
2.2 条件路由实现
源码文件路径: src/graph/builder.py
def continue_to_running_research_team(state: State):
"""智能路由决策函数"""
current_plan = state.get("current_plan")
if not current_plan or not current_plan.steps:
return "planner"
# 检查所有步骤是否完成
if all(step.execution_res for step in current_plan.steps):
return "planner"
# 根据步骤类型路由到相应智能体
for step in current_plan.steps:
if not step.execution_res:
if step.step_type == StepType.RESEARCH:
return "researcher"
if step.step_type == StepType.PROCESSING:
return "coder"
return "planner"
3. 中断和人机交互
3.1 interrupt机制
源码文件路径: src/graph/nodes.py
from langgraph.types import interrupt
def human_feedback_node(state):
"""人机协作节点"""
auto_accepted_plan = state.get("auto_accepted_plan", False)
if not auto_accepted_plan:
# 中断工作流等待人工输入
feedback = interrupt("请审核计划。")
# 处理不同类型的反馈
if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
return Command(
update={"messages": [HumanMessage(content=feedback, name="feedback")]},
goto="planner",
)
elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
logger.info("用户接受了计划。")
3.2 澄清功能的中断处理
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):
"""协调器节点处理澄清中断"""
# 澄清模式下的中断处理
if not response.tool_calls and response.content:
if clarification_rounds < max_clarification_rounds:
return Command(
update={
"messages": state_messages,
"clarification_rounds": clarification_rounds + 1,
"is_clarification_complete": False,
"__interrupt__": [("coordinator", response.content)],
},
goto="__end__",
)
4. 检查点和持久化
4.1 内存检查点
源码文件路径: src/graph/builder.py
from langgraph.checkpoint.memory import MemorySaver
def build_graph_with_memory():
"""构建带内存的工作流图"""
memory = MemorySaver()
builder = _build_base_graph()
return builder.compile(checkpointer=memory)
4.2 自定义检查点管理
源码文件路径: src/graph/checkpoint.py
from langgraph.store.memory import InMemoryStore
class ChatStreamManager:
"""聊天流管理器,支持MongoDB和PostgreSQL持久化"""
def __init__(self, checkpoint_saver: bool = False, db_uri: str = None):
self.store = InMemoryStore() # 内存存储用于临时数据
self.checkpoint_saver = checkpoint_saver
if checkpoint_saver:
if db_uri.startswith("mongodb://"):
self._init_mongodb()
elif db_uri.startswith("postgresql://"):
self._init_postgresql()
def process_stream_message(self, thread_id: str, message: str, finish_reason: str):
"""处理流式消息"""
store_namespace = ("messages", thread_id)
# 获取或初始化消息游标
cursor = self.store.get(store_namespace, "cursor")
current_index = 0 if cursor is None else cursor.value.get("index", 0) + 1
# 存储消息块
self.store.put(store_namespace, f"chunk_{current_index}", message)
self.store.put(store_namespace, "cursor", {"index": current_index})
# 对话完成时持久化
if finish_reason in ("stop", "interrupt"):
return self._persist_complete_conversation(thread_id, store_namespace, current_index)
4.3 数据库持久化
源码文件路径: src/graph/checkpoint.py
def _persist_to_mongodb(self, thread_id: str, messages: List[str]) -> bool:
"""持久化到MongoDB"""
collection = self.mongo_db.chat_streams
existing_document = collection.find_one({"thread_id": thread_id})
current_timestamp = datetime.now()
if existing_document:
# 更新现有对话
update_result = collection.update_one(
{"thread_id": thread_id},
{"$set": {"messages": messages, "ts": current_timestamp}},
)
return update_result.modified_count > 0
else:
# 创建新对话
new_document = {
"thread_id": thread_id,
"messages": messages,
"ts": current_timestamp,
"id": uuid.uuid4().hex,
}
insert_result = collection.insert_one(new_document)
return insert_result.inserted_id is not None
5. 流式处理
5.1 异步流式执行
源码文件路径: src/workflow.py
async def run_agent_workflow_async(user_input: str, **kwargs):
"""异步运行智能体工作流"""
config = {
"configurable": {
"thread_id": "default",
"max_plan_iterations": max_plan_iterations,
"max_step_num": max_step_num,
},
"recursion_limit": get_recursion_limit(default=100),
}
final_state = None
async for s in graph.astream(
input=initial_state,
config=config,
stream_mode="values"
):
final_state = s
if isinstance(s, dict) and "messages" in s:
message = s["messages"][-1]
message.pretty_print()
5.2 流式消息处理
源码文件路径: src/graph/checkpoint.py
def chat_stream_message(thread_id: str, message: str, finish_reason: str) -> bool:
"""流式消息处理包装函数"""
checkpoint_saver = get_bool_env("LANGGRAPH_CHECKPOINT_SAVER", False)
if checkpoint_saver:
return _default_manager.process_stream_message(thread_id, message, finish_reason)
return False
6. 工具集成
6.1 LangGraph工具绑定
源码文件路径: src/graph/nodes.py
from langchain_core.tools import tool
@tool
def handoff_to_planner(
research_topic: Annotated[str, "要移交的研究任务主题"],
locale: Annotated[str, "用户检测到的语言区域设置"],
):
"""移交给规划器智能体进行计划制定"""
return
# 在节点中使用工具
def coordinator_node(state: State, config: RunnableConfig):
tools = [handoff_to_planner, handoff_after_clarification]
response = (
get_llm_by_type(AGENT_LLM_MAP["coordinator"])
.bind_tools(tools)
.invoke(messages)
)
6.2 MCP工具动态集成
源码文件路径: src/graph/nodes.py
from langchain_mcp_adapters.client import MultiServerMCPClient
async def _setup_and_execute_agent_step(state, config, agent_type, default_tools):
"""动态设置MCP工具"""
if mcp_servers:
client = MultiServerMCPClient(mcp_servers)
loaded_tools = default_tools[:]
all_tools = await client.get_tools()
for tool in all_tools:
if tool.name in enabled_tools:
tool.description = f"由'{enabled_tools[tool.name]}'提供支持。\n{tool.description}"
loaded_tools.append(tool)
agent = create_agent(agent_type, agent_type, loaded_tools, agent_type, pre_model_hook)
7. 配置和调试
7.1 LangGraph Studio配置
源码文件路径: langgraph.json
{
"dockerfile_lines": [],
"graphs": {
"deep_research": "./src/workflow.py:graph",
"podcast_generation": "./src/podcast/graph/builder.py:workflow",
"ppt_generation": "./src/ppt/graph/builder.py:workflow"
},
"python_version": "3.12",
"env": "./.env",
"dependencies": ["."]
}
7.2 图可视化
源码文件路径: src/workflow.py
if __name__ == "__main__":
# 生成Mermaid图表用于可视化
print(graph.get_graph(xray=True).draw_mermaid())
7.3 递归限制配置
源码文件路径: src/config/configuration.py
def get_recursion_limit(default: int = 100) -> int:
"""获取递归限制配置"""
try:
limit = int(os.getenv("AGENT_RECURSION_LIMIT", str(default)))
return limit if limit > 0 else default
except ValueError:
return default
# 在配置中使用
config = {
"recursion_limit": get_recursion_limit(default=100),
}
8. 结构化输出
8.1 Pydantic模型定义
源码文件路径: src/prompts/planner_model.py
from pydantic import BaseModel, Field
from enum import Enum
class StepType(str, Enum):
RESEARCH = "research"
PROCESSING = "processing"
class Step(BaseModel):
need_search: bool = Field(..., description="Must be explicitly set for each step")
title: str
description: str = Field(..., description="Specify exactly what data to collect")
step_type: StepType = Field(..., description="Indicates the nature of the step")
execution_res: Optional[str] = Field(default=None, description="The Step execution result")
class Plan(BaseModel):
locale: str = Field(..., description="e.g. 'en-US' or 'zh-CN', based on the user's language")
has_enough_context: bool
thought: str = Field(default="", description="Thinking process for the plan")
title: str
steps: List[Step] = Field(default_factory=list, description="Research & Processing steps")
8.2 结构化输出集成
源码文件路径: src/graph/nodes.py
def planner_node(state: State, config: RunnableConfig):
if AGENT_LLM_MAP["planner"] == "basic" and not configurable.enable_deep_thinking:
llm = get_llm_by_type("basic").with_structured_output(
Plan,
method="json_mode",
)
response = llm.invoke(messages)
full_response = response.model_dump_json(indent=4, exclude_none=True)
9. 错误处理和容错
9.1 JSON修复机制
源码文件路径: src/utils/json_utils.py
from src.utils.json_utils import repair_json_output
def planner_node(state: State, config: RunnableConfig):
try:
curr_plan = json.loads(repair_json_output(full_response))
except json.JSONDecodeError:
logger.warning("规划器响应不是有效的JSON")
if plan_iterations > 0:
return Command(goto="reporter")
else:
return Command(goto="__end__")
9.2 异常处理
源码文件路径: src/workflow.py
async def run_agent_workflow_async(user_input: str, **kwargs):
async for s in graph.astream(input=initial_state, config=config, stream_mode="values"):
try:
final_state = s
# 处理流式输出
except Exception as e:
logger.error(f"处理流式输出时出错: {e}")
print(f"处理输出时出错: {str(e)}")
10. 性能优化
10.1 上下文压缩
源码文件路径: src/utils/context_manager.py
from src.utils.context_manager import ContextManager
# 在智能体创建时使用上下文压缩
llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP[agent_type])
pre_model_hook = partial(ContextManager(llm_token_limit, 3).compress_messages)
agent = create_agent(
agent_type, agent_type, loaded_tools, agent_type, pre_model_hook
)
10.2 递归限制控制
源码文件路径: src/graph/nodes.py
# 动态配置递归限制
recursion_limit = int(os.getenv("AGENT_RECURSION_LIMIT", "25"))
result = await agent.ainvoke(
input=agent_input,
config={"recursion_limit": recursion_limit}
)
```ilder.py:workflow",
"ppt_generation": "./src/ppt/graph/builder.py:workflow"
},
"python_version": "3.12",
"env": "./.env",
"dependencies": ["."]
}
总结
DeerFlow充分利用了LangGraph的以下核心功能:
核心技术点:
- StateGraph状态管理 - 复杂工作流编排
- MessagesState扩展 - 业务状态定义
- Command控制 - 精确的流程控制
- 条件路由 - 智能决策分支
- 中断机制 - 人机交互支持
- 检查点系统 - 状态持久化
- 流式处理 - 实时响应能力
- 工具集成 - MCP协议支持
- 结构化输出 - Pydantic模型集成
- 错误处理 - 容错和恢复机制
这种基于LangGraph的架构使得DeerFlow能够构建复杂的多智能体协作系统,同时保持良好的可维护性和扩展性。
更多推荐

所有评论(0)