第6章:实战项目三 - 召唤特种小队!构建自主信息调研AI智能体(下附源码)
好了,各位未来的AI架构师们,我们这次深入源码的旅程就到此告一段落了。我们一起剖析了如何用FastAPI和WebSocket构建实时反馈的后端,用LangGraph像搭乐高一样编排出一个带智能循环的多智能体协作系统,深入了解了每个Agent的代码实现,并最终领略了专业Prompt工程的艺术。希望通过这个项目,你不仅学会了一些具体的技术,更能理解和体会到构建复杂AI应用的整体思路和架构模式。这个领域
在上篇中,我们像参观一个高度机密的“作战指挥室”一样,见证了我们AI特种小队从接受任务到交付专业报告的全过程。我们被它那套 “计划 -> 研究 -> 评审 -> 修正” 的智能循环工作流给秀了一脸,对吧?
功能展示固然炫酷,但对于我们这些“Talk is cheap, show me the code”的开发者来说,心里那股想把它扒光了看个究竟的冲动,早就按捺不住了!
所以,下篇来了!别眨眼,坐稳扶好。今天,我将亲自带你潜入这个项目的源码深处,一行行地解构这个AI小队的“基因密码”。这不只是一篇教程,更是一次深入理解现代AI应用架构的实战演练。
在本篇(下篇)中,我们将一起:
- 剖析 LangGraph 的奥秘:看看它是如何优雅地编排这个复杂的多智能体协作流程的。
- 拆解每个Agent的实现:深入分析计划员、研究员、评审员和撰写员的核心代码。
- 学习专业的Prompt工程:观摩那些能让AI表现得像个领域专家的“咒语”是如何设计的。
准备好进入硬核模式了吗?Let’s dive in!
一、项目骨架:FastAPI 与 WebSocket 实时通信
一个酷炫的AI应用,也需要一个坚实的后端框架来支撑。我们的项目选择了FastAPI,原因无他:快、现代、好用!
但最关键的是,我们要实现前端实时展示AI工作状态的功能。这就轮到WebSocket登场了。它允许服务器和客户端之间建立一个持久的双向连接,服务器可以随时主动向前端“推”送消息。
让我们直接来看核心代码 backend/main.py:
# backend/main.py (核心部分)
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
import asyncio
from backend.graph import app as research_graph_app
from backend.shared.logger import logger
app = FastAPI()
# 挂载静态文件目录,让前端可以直接访问
app.mount("/static", StaticFiles(directory="frontend"), name="static")
@app.get("/")
async def get_index():
return FileResponse(path="frontend/index.html", media_type="text/html")
@app.websocket("/ws/research")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
logger.info("WebSocket连接已建立。")
research_task = None
try:
while True:
task_description = await websocket.receive_text()
logger.info(f"收到新的研究任务: {task_description}")
async def run_research():
config = {"recursion_limit": 10}
initial_state = {"task": task_description, "plan": None, "researched_data": [], "report": None}
# astream会流式返回图每一步的输出
async for event in research_graph_app.astream(initial_state, config=config):
for node, output in event.items():
if websocket.client_state.name == 'DISCONNECTED':
logger.info("连接已断开,停止发送数据。")
return
if output is None: continue
if error := output.get('error'):
await websocket.send_json({"type": "error", "data": error})
return
await websocket.send_json({"type": "log", "node": node, "data": output})
if report := output.get('report'):
await websocket.send_json({"type": "report", "data": report})
return
if research_task and not research_task.done():
research_task.cancel()
research_task = asyncio.create_task(run_research())
await research_task
except WebSocketDisconnect:
logger.info("客户端断开连接。")
if research_task:
research_task.cancel()
except Exception as e:
logger.error(f"WebSocket处理过程中发生错误: {e}")
if research_task:
research_task.cancel()
代码解读:
- 健壮的连接管理:与旧版不同,新代码为每一个研究任务都创建了一个独立的、可取消的
asyncio.Task(research_task)。这意味着,如果一个客户端断开连接(WebSocketDisconnect),或者发送了一个新的任务,服务器可以明确地取消掉上一个正在运行的任务。这避免了“僵尸进程”的产生,极大地提高了服务器的稳定性和资源利用率。 astream流式处理:这里用的是research_graph_app.astream(),它会流式地返回图中每个节点(Node)执行完毕后的完整状态(State)。- 精细化的事件广播:在循环中,代码不再是简单地广播一个字符串,而是:
- 检查每个节点的输出(
output),如果包含error字段,就发送一个"type": "error"的JSON消息并终止执行。 - 如果包含
report字段,就发送一个"type": "report"的JSON消息,标志着流程结束。 - 对于所有正常的节点输出,则发送
"type": "log"的JSON消息,其中包含了节点名称(node)和该节点的具体输出(data)。
- 检查每个节点的输出(
- 连接状态检查:在发送任何消息之前,都会检查
websocket.client_state。如果在Graph还在后台运行时客户端就断开了,这个检查可以防止服务器向一个已经关闭的连接发送数据而引发异常。
这个经过重构的后端,不仅功能更强大,而且在工程上更加健壮和完善,为我们AI小队的稳定运行提供了坚实的保障。
二、智能体的大脑与神经网络:揭秘 LangGraph
如果说FastAPI是骨架,那LangGraph就是我们AI小队的“大脑和中枢神经系统”。它定义了每个智能体(节点)是谁,以及他们之间应该如何交流和协作(边)。
让我们再次祭出上篇的作战流程图,并对照着backend/graph.py的源码,看看它是如何从一张图纸变成可执行代码的。
作战流程图回顾:
代码实现 backend/graph.py:
# backend/graph.py (核心逻辑)
import asyncio
from langgraph.graph import StateGraph, END
from .state import AgentState
from .agents import planner, researcher, writer
from .shared.logger import logger
# --- 1. 定义图的节点 (Nodes) ---
# 研究团队节点:并行执行所有研究任务
async def research_team(state: AgentState) -> dict:
logger.info("--- [研究团队] 开始并行研究所有子任务 ---")
sub_tasks = state.get('plan')
results = await asyncio.gather(
*[researcher.research_sub_task({"task": task}, state) for task in sub_tasks]
)
return {"researched_data": [r for r in results if r is not None]}
# 反思/修正计划节点
async def revise_plan_node(state: AgentState) -> dict:
logger.info("--- [反思者] 开始修正研究计划 ---")
return await planner.plan(state)
# --- 2. 定义图的条件分支 (Conditional Edges) ---
def decide_to_continue(state: AgentState) -> str:
logger.info("--- [决策者] 开始评估研究成果 ---")
if state.get("error"): return "end"
researched_data = state.get('researched_data', [])
failed_tasks = sum(1 for r in researched_data if r.get("data", "").strip().startswith("ERROR-"))
if failed_tasks >= len(state.get('plan', [])) / 2:
logger.warning(f"--- [决策者] {failed_tasks}个任务失败,失败率过高,返回重新规划 ---")
return "replan"
else:
logger.info(f"--- [决策者] 研究成功率达标,进入报告撰写阶段 ---")
return "write_report"
# --- 3. 构建图 (Build the Graph) ---
workflow = StateGraph(AgentState)
workflow.add_node("planner", planner.plan)
workflow.add_node("researcher", research_team)
workflow.add_node("writer", writer.write_report_task)
workflow.add_node("reviser", revise_plan_node)
workflow.set_entry_point("planner")
# --- 4. 定义图的边 (Edges) ---
workflow.add_edge("planner", "researcher")
workflow.add_edge("writer", END)
workflow.add_edge("reviser", "researcher")
workflow.add_conditional_edges(
"researcher",
decide_to_continue,
{
"replan": "reviser",
"write_report": "writer",
"end": END,
},
)
app = workflow.compile()
logger.success("LangGraph应用已成功编译!")
代码与图的完美对应:
- 节点 (Nodes):
workflow.add_node("planner", planner.plan)这行代码,就是在图中创建了一个名为planner的节点,它的具体逻辑由从agents模块导入的planner.plan函数定义。新增的reviser节点也同样指向了一个具体的revise_plan_node函数。 - 边 (Edges):
workflow.add_edge("planner", "researcher")连接了计划员和研究员。workflow.add_edge("reviser", "researcher")则构建了关键的修正->研究的循环。 - 条件边 (Conditional Edges):
workflow.add_conditional_edges("researcher", ...)是LangGraph的精髓。它告诉系统:当流程走到researcher节点之后,去调用decide_to_continue这个决策函数。该函数会根据研究成果,返回一个字符串——"replan"或"write_report"。LangGraph会根据这个返回值,在提供的字典中查找到对应的目标节点(reviser或writer),并将流程导向那里。
通过这种声明式的方式,我们用非常直观的代码,就将上篇那个复杂的、带智能循环的协作流程给定义了出来。这就是LangGraph的强大之处:它让我们可以像画流程图一样去构建和编排复杂的AI Agent系统。
三、揭秘特种小队成员:Agent源码剖析
我们已经知道了AI小队是如何协作的,现在是时候看看每个成员的“个人能力”了。由于篇幅限制,我们重点剖析“计划员”和“研究员”的核心逻辑。
计划员 (Planner): 带"自我修正"能力的战略家
planner的核心职责在backend/agents/planner.py中的plan函数。它远比第一版复杂,内置了“自我修正”的重试机制。
# backend/agents/planner.py (核心部分)
async def plan(state: AgentState) -> dict:
task = state.get("task")
logger.info("--- [计划员] 开始制定研究计划 (包含预搜索) ---")
# 1. 生成预搜索查询 (带Pydantic解析和错误回退)
query_parser = PydanticOutputParser(pydantic_object=SearchQuery)
# ... (链定义) ...
try:
search_query_obj = await rate_limited_ainvoke(query_gen_chain, {"task": task})
search_query = search_query_obj.query
except Exception as e:
logger.warning(f"--- [计划员-预搜索] 错误:无法生成结构化查询...将使用原始任务。")
search_query = task
# 2. 执行预搜索 (带错误处理)
try:
search_results = await intelligent_search_tool.ainvoke(search_query)
# ... (处理不同类型的搜索结果) ...
search_context = "..."
except Exception as e:
logger.error(f"--- [计划员-预搜索] 搜索失败,将不带上下文进行规划: {e} ---")
search_context = "预搜索失败,无法获取最新背景信息。"
# 3. 制定最终计划 (带Pydantic解析和重试循环)
parser = PydanticOutputParser(pydantic_object=ResearchPlan)
# ... (链定义) ...
max_retries = 3
error_message = ""
for i in range(max_retries):
try:
plan_object = await rate_limited_ainvoke(
planning_chain,
{"task": task, "context": search_context, "error": error_message, ...}
)
logger.success(f"--- [计划员] 研究计划制定完成: {plan_object.plan} ---")
return {"plan": plan_object.plan}
except (ValidationError, OutputParserException) as e:
logger.warning(f"--- [计划员] 第 {i+1}/{max_retries} 次尝试失败,正在重试... ---")
error_message = f"你上次的输出无法被解析,错误如下:\\n```\\n{e}\\n```\\n请修正你的输出。" # 关键:将错误信息传给下一次调用
if i == max_retries - 1:
logger.error(f"--- [计划员] 错误:在 {max_retries} 次尝试后,仍然无法生成有效的计划...")
return {"error": "..."}
return {"error": "计划员在多次尝试后仍未能生成有效计划。"}
代码解读:
新版的plan函数是一个非常健壮的工程实践范例:
- 强制的结构化输出:整个流程的核心是
PydanticOutputParser。我们不再信任LLM会自觉返回格式正确的列表或字符串,而是定义了SearchQuery和ResearchPlan这两个Pydantic模型,并强制LLM的输出必须符合这两个模型的规范。这从根本上保证了数据在流程中传递的稳定性和可靠性。 - 优雅的错误处理与回退:
- 在生成搜索查询时,如果LLM返回的内容无法被
query_parser解析,代码不会崩溃,而是会warning并使用原始的task作为查询词,保证流程能继续。 - 在执行
intelligent_search_tool时,任何异常都会被捕获,并使用一段明确的错误文本作为search_context,同样是为了让流程继续。
- 在生成搜索查询时,如果LLM返回的内容无法被
- 带反馈的重试循环(自我修正): 这是最精彩的部分。在生成最终计划时,如果输出不符合
ResearchPlan模型,PydanticOutputParser会抛出一个异常。我们会捕获这个异常,将详细的错误信息(e)格式化成一段“错误反馈”(error_message),然后在下一次重试时,把这个反馈作为新的输入传给LLM!这相当于在告诉LLM:“你上次做错了,这是错误日志,请看着它修改你的行为,再试一次!” 这种机制,就是“反思-修正”循环在单个Agent内部的体现,极大地提升了Agent的自主修复能力。
研究员 (Researcher): 并行工作的分析师
researcher的核心逻辑在backend/agents/researcher.py的research_sub_task函数。它实现了一个复杂的、类似MapReduce的信息处理流程,以确保研究的深度和广度。
# backend/agents/researcher.py (核心部分)
# Map函数:总结单个文档
async def _map_document(sub_task: str, document: str) -> str:
# ... (使用Pydantic解析器,判断文档是否相关) ...
# ... 如果不相关,返回特定标记 ...
# Reduce函数:合并所有总结
async def _reduce_documents(sub_task: str, summaries: List[str], direct_answers: List[str]) -> str:
# ... (使用Pydantic解析器,将多个总结合并成最终摘要) ...
async def research_sub_task(sub_task: dict, state: AgentState) -> dict:
logger.info(f"--- [研究员] 开始研究子任务: {sub_task} ---")
task_description = sub_task.get("task", "")
try:
# 1. 为子任务生成多个搜索查询 (Pydantic确保格式)
# ...
# 2. 并行执行所有搜索
search_results_list = await asyncio.gather(...)
# 3. 智能筛选和抓取URL
# - 提取直接答案和URL列表
# - 遍历URL,逐一抓取,直到找到内容长度符合要求的文档
# - 记录已处理的URL,避免重复工作
logger.info("--- [研究员-筛选与抓取阶段] 开始为每个查询寻找最佳文档... ---")
# ... (详细的筛选和抓取逻辑) ...
# 4. MapReduce总结
if not docs_to_process:
# 如果没有抓取到文档,但有直接答案,就只用直接答案总结
# 如果什么都没有,返回ERROR-标记
final_summary = "ERROR- 未能从任何搜索结果中找到符合长度要求...的文档。"
else:
# Map: 并行总结所有抓取到的文档
map_tasks = [_map_document(task_description, doc) for doc in docs_to_process]
summaries = await asyncio.gather(*map_tasks)
valid_summaries = [s for s in summaries if "与研究子任务无关" not in s]
# Reduce: 将所有有效总结和直接答案合并成最终摘要
final_summary = await _reduce_documents(task_description, valid_summaries, direct_answers)
except Exception as e:
logger.exception(f"--- [研究员] 发生顶级错误: {e} ---")
final_summary = f"ERROR- 在研究子任务期间发生错误: {e}"
logger.success(f"--- [研究员] 子任务 '{task_description}' 研究完成。---")
return {"sub_task": task_description, "data": final_summary}
代码解读:
这个研究员Agent已经进化成一个高度复杂的“信息处理流水线”:
- MapReduce架构: 这是其核心思想。
- Map:
_map_document函数负责“映射”阶段。它接收一篇长文档,用LLM将其压缩成一段精炼的摘要,并打上“是否相关”的标签。这一步将所有非结构化的网页内容,处理成了标准化的、可聚合的信息单元。 - Reduce:
_reduce_documents函数负责“归约”阶段。它收集所有来自Map阶段的“相关”摘要,以及搜索引擎可能直接给出的答案,然后再次调用LLM,将这些碎片化的信息整合成一个逻辑连贯、内容全面的最终答案。
- Map:
- 效率与质量的平衡——智能筛选: 旧版的做法是抓取所有链接,这非常耗时且可能引入大量噪音。新版
researcher的筛选与抓取阶段则聪明得多。它会为每个查询意图(比如query1,query2)寻找一篇“最佳”的支撑文档。它会按顺序抓取该查询下的URL,一旦找到一篇内容长度在1000到25000字符之间的“合格”文档,就立即停止对这个查询意图的后续抓取。这套机制确保了用于分析的原始材料既有相关性,又有足够的信息量,同时避免了在大量无用网页上浪费时间和API调用。 - 全面的并行化:
asyncio.gather被用在了两个关键地方:一是并行发起多个搜索查询,二是并行处理多篇文档的Map过程。这使得整个研究过程的效率得到了极大的提升。 - 明确的失败信号: 当研究过程在任何一个环节出现严重问题(比如完全没找到合格的文档,或LLM最终生成摘要失败),函数会返回一个以
ERROR-为前缀的字符串。这个明确的信号,是整个系统能够实现“自我修正”闭环的关键。它使得上层的决策节点(decide_to_continue)可以清晰地、程序化地判断本次子任务的成败。
四、给AI注入灵魂:专业级Prompt工程艺术
如果说代码是AI的骨骼,LangGraph是神经系统,那Prompt就是AI的灵魂。它决定了AI的性格、能力和思考方式。
在backend/prompts/prompts.py文件中,我们为每个Agent都精心设计了Prompt。我们不仅定义了角色、任务、背景等,更重要的是,我们通过PydanticOutputParser的get_format_instructions()方法,将结构化输出的要求直接注入到了Prompt中。
我们来看看WRITER_PROMPT(报告撰写员的提示词)的节选,感受一下这种结合了Pydantic格式指令的Prompt的威力:
# backend/prompts/prompts.py (WRITER_PROMPT节选)
WRITER_PROMPT = """
# 角色
你是一位在特定领域享有盛誉的首席研究分析师...
# 任务
将一系列关于不同研究子任务的分析摘要,整合成一篇逻辑严密、结构完整、洞察深刻的最终研究报告。
# 输入数据
你将收到一份JSON格式的原始研究数据,其中包含了每个子任务的摘要。
`{{context}}`
# 输出格式指令
{format_instructions}
# 限制
- 报告必须完全基于提供的“原始研究数据”进行撰写,禁止引入任何外部信息。
- 必须严格遵守上述的Markdown格式...
# 风格/语气
权威、专业、客观、富有洞察力、具有战略高度。
"""
通过这种方式,我们实现了行为引导(角色、任务、风格)和格式控制(format_instructions)的完美结合。LLM得到的指令不再是模糊的“请返回一个列表”,而是包含了Pydantic模型所有字段、类型和描述的精确定义。这极大地提升了输出的稳定性和可靠性。
此外,在像planner这样的Agent中,我们还动态地向Prompt中加入了error字段,用于实现带反馈的重试循环。这标志着Prompt不再是静态的模板,而是能够根据程序运行的上下文动态构建的、更加智能的指令。
这就是现代AI应用开发的核心思维转变:我们不再是编写精确的if-else逻辑,而是通过设计高质量、动态化、且与代码(如Pydantic模型)紧密结合的Prompt和工作流,来引导和激发AI的“智能”,让它为我们完成复杂的、过去只有人类专家才能完成的任务。
总结
好了,各位未来的AI架构师们,我们这次深入源码的旅程就到此告一段落了。
我们一起剖析了如何用FastAPI和WebSocket构建实时反馈的后端,用LangGraph像搭乐高一样编排出一个带智能循环的多智能体协作系统,深入了解了每个Agent的代码实现,并最终领略了专业Prompt工程的艺术。
希望通过这个项目,你不仅学会了一些具体的技术,更能理解和体会到构建复杂AI应用的整体思路和架构模式。
这个领域日新月异,但核心思想——将复杂任务拆解,定义好每个智能体的能力和职责,并设计一套高效的协作流程——将会在很长一段时间内保持其价值。
本项目源码已上传到本文附件中,现在,轮到你了!把项目代码跑起来,试试给它下达一些有趣的调研任务,或者尝试去修改Prompt,赋予你的AI小队全新的性格和能力。
如果有任何问题欢迎在评论区留言,阿威会尽力为您解答!
更多推荐


所有评论(0)