DeerFlow中LangGraph的的引用和核心功能实现细节

概述

DeerFlow基于LangGraph构建了一个复杂的多智能体协作系统,充分利用了LangGraph的状态管理、工作流编排、检查点机制等核心功能。本文档深入分析DeerFlow多智能体实现中LangGraph的的引用和核心功能实现细节。本项目核心是基于LangGraph构建多智能体协助系统,包括智能体的核心功能、工作流、工具体系、记忆、打断机制等协助智能体更合理工作的约束和辅助模块,引用模块包括StateGraph, MessagesState、Command、interrupt、MemorySaver、tools等。

项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/

DeerFlow中LangGraph技术点和功能分析

概述

DeerFlow基于LangGraph构建了一个复杂的多智能体协作系统,充分利用了LangGraph的状态管理、工作流编排、检查点机制等核心功能。本文档深入分析项目中LangGraph的技术应用和实现细节。

1. LangGraph核心架构引用

1.1 StateGraph状态图构建

源码文件路径: src/graph/builder.py

from langgraph.graph import StateGraph, START, END

def _build_base_graph():
    """构建基础状态图"""
    builder = StateGraph(State)
    
    # 添加节点
    builder.add_edge(START, "coordinator")
    builder.add_node("coordinator", coordinator_node)
    builder.add_node("planner", planner_node)
    builder.add_node("researcher", researcher_node)
    builder.add_node("coder", coder_node)
    builder.add_node("reporter", reporter_node)
    
    # 添加条件边
    builder.add_conditional_edges(
        "research_team",
        continue_to_running_research_team,
        ["planner", "researcher", "coder"],
    )
    
    return builder.compile()

1.2 MessagesState扩展

源码文件路径: src/graph/types.py

from langgraph.graph import MessagesState

class State(MessagesState):
    """扩展MessagesState,添加业务特定字段"""
    
    # 运行时变量
    locale: str = "en-US"
    research_topic: str = ""
    observations: list[str] = []
    resources: list[Resource] = []
    plan_iterations: int = 0
    current_plan: Plan | str = None
    final_report: str = ""
    
    # 澄清功能状态
    enable_clarification: bool = False
    clarification_rounds: int = 0
    clarification_history: list[str] = []
    is_clarification_complete: bool = False
    clarified_question: str = ""
    max_clarification_rounds: int = 3
    
    # 工作流控制
    goto: str = "planner"

2. 工作流控制机制

2.1 Command对象使用

源码文件路径: src/graph/nodes.py

LangGraph的Command对象用于精确控制工作流转换:

from langgraph.types import Command

def planner_node(state: State, config: RunnableConfig):
    # 检查计划迭代限制
    if plan_iterations >= configurable.max_plan_iterations:
        return Command(goto="reporter")
    
    # 根据计划内容决定下一步
    if curr_plan.get("has_enough_context"):
        return Command(
            update={
                "messages": [AIMessage(content=full_response, name="planner")],
                "current_plan": new_plan,
            },
            goto="reporter",
        )
    
    # 需要人工反馈
    return Command(
        update={
            "messages": [AIMessage(content=full_response, name="planner")],
            "current_plan": full_response,
        },
        goto="human_feedback",
    )

2.2 条件路由实现

源码文件路径: src/graph/builder.py

def continue_to_running_research_team(state: State):
    """智能路由决策函数"""
    current_plan = state.get("current_plan")
    
    if not current_plan or not current_plan.steps:
        return "planner"
    
    # 检查所有步骤是否完成
    if all(step.execution_res for step in current_plan.steps):
        return "planner"
    
    # 根据步骤类型路由到相应智能体
    for step in current_plan.steps:
        if not step.execution_res:
            if step.step_type == StepType.RESEARCH:
                return "researcher"
            if step.step_type == StepType.PROCESSING:
                return "coder"
    
    return "planner"

3. 中断和人机交互

3.1 interrupt机制

源码文件路径: src/graph/nodes.py

from langgraph.types import interrupt

def human_feedback_node(state):
    """人机协作节点"""
    auto_accepted_plan = state.get("auto_accepted_plan", False)
    
    if not auto_accepted_plan:
        # 中断工作流等待人工输入
        feedback = interrupt("请审核计划。")
        
        # 处理不同类型的反馈
        if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
            return Command(
                update={"messages": [HumanMessage(content=feedback, name="feedback")]},
                goto="planner",
            )
        elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
            logger.info("用户接受了计划。")

3.2 澄清功能的中断处理

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):
    """协调器节点处理澄清中断"""
    
    # 澄清模式下的中断处理
    if not response.tool_calls and response.content:
        if clarification_rounds < max_clarification_rounds:
            return Command(
                update={
                    "messages": state_messages,
                    "clarification_rounds": clarification_rounds + 1,
                    "is_clarification_complete": False,
                    "__interrupt__": [("coordinator", response.content)],
                },
                goto="__end__",
            )

4. 检查点和持久化

4.1 内存检查点

源码文件路径: src/graph/builder.py

from langgraph.checkpoint.memory import MemorySaver

def build_graph_with_memory():
    """构建带内存的工作流图"""
    memory = MemorySaver()
    builder = _build_base_graph()
    return builder.compile(checkpointer=memory)

4.2 自定义检查点管理

源码文件路径: src/graph/checkpoint.py

from langgraph.store.memory import InMemoryStore

class ChatStreamManager:
    """聊天流管理器,支持MongoDB和PostgreSQL持久化"""
    
    def __init__(self, checkpoint_saver: bool = False, db_uri: str = None):
        self.store = InMemoryStore()  # 内存存储用于临时数据
        self.checkpoint_saver = checkpoint_saver
        
        if checkpoint_saver:
            if db_uri.startswith("mongodb://"):
                self._init_mongodb()
            elif db_uri.startswith("postgresql://"):
                self._init_postgresql()
    
    def process_stream_message(self, thread_id: str, message: str, finish_reason: str):
        """处理流式消息"""
        store_namespace = ("messages", thread_id)
        
        # 获取或初始化消息游标
        cursor = self.store.get(store_namespace, "cursor")
        current_index = 0 if cursor is None else cursor.value.get("index", 0) + 1
        
        # 存储消息块
        self.store.put(store_namespace, f"chunk_{current_index}", message)
        self.store.put(store_namespace, "cursor", {"index": current_index})
        
        # 对话完成时持久化
        if finish_reason in ("stop", "interrupt"):
            return self._persist_complete_conversation(thread_id, store_namespace, current_index)

4.3 数据库持久化

源码文件路径: src/graph/checkpoint.py

def _persist_to_mongodb(self, thread_id: str, messages: List[str]) -> bool:
    """持久化到MongoDB"""
    collection = self.mongo_db.chat_streams
    
    existing_document = collection.find_one({"thread_id": thread_id})
    current_timestamp = datetime.now()
    
    if existing_document:
        # 更新现有对话
        update_result = collection.update_one(
            {"thread_id": thread_id},
            {"$set": {"messages": messages, "ts": current_timestamp}},
        )
        return update_result.modified_count > 0
    else:
        # 创建新对话
        new_document = {
            "thread_id": thread_id,
            "messages": messages,
            "ts": current_timestamp,
            "id": uuid.uuid4().hex,
        }
        insert_result = collection.insert_one(new_document)
        return insert_result.inserted_id is not None

5. 流式处理

5.1 异步流式执行

源码文件路径: src/workflow.py

async def run_agent_workflow_async(user_input: str, **kwargs):
    """异步运行智能体工作流"""
    config = {
        "configurable": {
            "thread_id": "default",
            "max_plan_iterations": max_plan_iterations,
            "max_step_num": max_step_num,
        },
        "recursion_limit": get_recursion_limit(default=100),
    }
    
    final_state = None
    async for s in graph.astream(
        input=initial_state, 
        config=config, 
        stream_mode="values"
    ):
        final_state = s
        if isinstance(s, dict) and "messages" in s:
            message = s["messages"][-1]
            message.pretty_print()

5.2 流式消息处理

源码文件路径: src/graph/checkpoint.py

def chat_stream_message(thread_id: str, message: str, finish_reason: str) -> bool:
    """流式消息处理包装函数"""
    checkpoint_saver = get_bool_env("LANGGRAPH_CHECKPOINT_SAVER", False)
    if checkpoint_saver:
        return _default_manager.process_stream_message(thread_id, message, finish_reason)
    return False

6. 工具集成

6.1 LangGraph工具绑定

源码文件路径: src/graph/nodes.py

from langchain_core.tools import tool

@tool
def handoff_to_planner(
    research_topic: Annotated[str, "要移交的研究任务主题"],
    locale: Annotated[str, "用户检测到的语言区域设置"],
):
    """移交给规划器智能体进行计划制定"""
    return

# 在节点中使用工具
def coordinator_node(state: State, config: RunnableConfig):
    tools = [handoff_to_planner, handoff_after_clarification]
    response = (
        get_llm_by_type(AGENT_LLM_MAP["coordinator"])
        .bind_tools(tools)
        .invoke(messages)
    )

6.2 MCP工具动态集成

源码文件路径: src/graph/nodes.py

from langchain_mcp_adapters.client import MultiServerMCPClient

async def _setup_and_execute_agent_step(state, config, agent_type, default_tools):
    """动态设置MCP工具"""
    if mcp_servers:
        client = MultiServerMCPClient(mcp_servers)
        loaded_tools = default_tools[:]
        all_tools = await client.get_tools()
        
        for tool in all_tools:
            if tool.name in enabled_tools:
                tool.description = f"由'{enabled_tools[tool.name]}'提供支持。\n{tool.description}"
                loaded_tools.append(tool)
        
        agent = create_agent(agent_type, agent_type, loaded_tools, agent_type, pre_model_hook)

7. 配置和调试

7.1 LangGraph Studio配置

源码文件路径: langgraph.json

{
  "dockerfile_lines": [],
  "graphs": {
    "deep_research": "./src/workflow.py:graph",
    "podcast_generation": "./src/podcast/graph/builder.py:workflow",
    "ppt_generation": "./src/ppt/graph/builder.py:workflow"
  },
  "python_version": "3.12",
  "env": "./.env",
  "dependencies": ["."]
}

7.2 图可视化

源码文件路径: src/workflow.py

if __name__ == "__main__":
    # 生成Mermaid图表用于可视化
    print(graph.get_graph(xray=True).draw_mermaid())

7.3 递归限制配置

源码文件路径: src/config/configuration.py

def get_recursion_limit(default: int = 100) -> int:
    """获取递归限制配置"""
    try:
        limit = int(os.getenv("AGENT_RECURSION_LIMIT", str(default)))
        return limit if limit > 0 else default
    except ValueError:
        return default

# 在配置中使用
config = {
    "recursion_limit": get_recursion_limit(default=100),
}

8. 结构化输出

8.1 Pydantic模型定义

源码文件路径: src/prompts/planner_model.py

from pydantic import BaseModel, Field
from enum import Enum

class StepType(str, Enum):
    RESEARCH = "research"
    PROCESSING = "processing"

class Step(BaseModel):
    need_search: bool = Field(..., description="Must be explicitly set for each step")
    title: str
    description: str = Field(..., description="Specify exactly what data to collect")
    step_type: StepType = Field(..., description="Indicates the nature of the step")
    execution_res: Optional[str] = Field(default=None, description="The Step execution result")

class Plan(BaseModel):
    locale: str = Field(..., description="e.g. 'en-US' or 'zh-CN', based on the user's language")
    has_enough_context: bool
    thought: str = Field(default="", description="Thinking process for the plan")
    title: str
    steps: List[Step] = Field(default_factory=list, description="Research & Processing steps")

8.2 结构化输出集成

源码文件路径: src/graph/nodes.py

def planner_node(state: State, config: RunnableConfig):
    if AGENT_LLM_MAP["planner"] == "basic" and not configurable.enable_deep_thinking:
        llm = get_llm_by_type("basic").with_structured_output(
            Plan,
            method="json_mode",
        )
        response = llm.invoke(messages)
        full_response = response.model_dump_json(indent=4, exclude_none=True)

9. 错误处理和容错

9.1 JSON修复机制

源码文件路径: src/utils/json_utils.py

from src.utils.json_utils import repair_json_output

def planner_node(state: State, config: RunnableConfig):
    try:
        curr_plan = json.loads(repair_json_output(full_response))
    except json.JSONDecodeError:
        logger.warning("规划器响应不是有效的JSON")
        if plan_iterations > 0:
            return Command(goto="reporter")
        else:
            return Command(goto="__end__")

9.2 异常处理

源码文件路径: src/workflow.py

async def run_agent_workflow_async(user_input: str, **kwargs):
    async for s in graph.astream(input=initial_state, config=config, stream_mode="values"):
        try:
            final_state = s
            # 处理流式输出
        except Exception as e:
            logger.error(f"处理流式输出时出错: {e}")
            print(f"处理输出时出错: {str(e)}")

10. 性能优化

10.1 上下文压缩

源码文件路径: src/utils/context_manager.py

from src.utils.context_manager import ContextManager

# 在智能体创建时使用上下文压缩
llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP[agent_type])
pre_model_hook = partial(ContextManager(llm_token_limit, 3).compress_messages)

agent = create_agent(
    agent_type, agent_type, loaded_tools, agent_type, pre_model_hook
)

10.2 递归限制控制

源码文件路径: src/graph/nodes.py

# 动态配置递归限制
recursion_limit = int(os.getenv("AGENT_RECURSION_LIMIT", "25"))

result = await agent.ainvoke(
    input=agent_input, 
    config={"recursion_limit": recursion_limit}
)
```ilder.py:workflow",
    "ppt_generation": "./src/ppt/graph/builder.py:workflow"
  },
  "python_version": "3.12",
  "env": "./.env",
  "dependencies": ["."]
}

总结

DeerFlow充分利用了LangGraph的以下核心功能:

核心技术点:

  1. StateGraph状态管理 - 复杂工作流编排
  2. MessagesState扩展 - 业务状态定义
  3. Command控制 - 精确的流程控制
  4. 条件路由 - 智能决策分支
  5. 中断机制 - 人机交互支持
  6. 检查点系统 - 状态持久化
  7. 流式处理 - 实时响应能力
  8. 工具集成 - MCP协议支持
  9. 结构化输出 - Pydantic模型集成
  10. 错误处理 - 容错和恢复机制

这种基于LangGraph的架构使得DeerFlow能够构建复杂的多智能体协作系统,同时保持良好的可维护性和扩展性。

Logo

更多推荐