基于LangGraph的油田生产分析智能体示例
LangGraph是一个基于有向循环图的AI智能体开发框架,适用于复杂协作流程如油田生产分析。该框架支持多智能体分工协作,通过动态任务分解和专业化分工提升任务性能。示例中设计了指挥官、数据提取、指标分析和质控四个智能体,采用层级指挥模式结合反思迭代机制确保分析准确性。LangGraph允许高度定制化逻辑和嵌入专业规则,具备自我纠错能力,通过循环流程自动发现并修正错误,提高报告可靠性。实现代码展示了
LangGraph衍生自LangChain,是一个流行的 AI Agent 开发框架和编排框架,用于驱动LLMs的AI应用构建,具有灵活的编排能力、逻辑可定制、开源兼容性强等特点,LangGraph 框架因其基于有向循环图 (Directed Cyclic Graph, DCG) 的架构和高度的灵活性,是设计需要自我修正和复杂协作流程(例如油田生产分析)的理想选择。多智能体协作能够通过动态任务分解和专业化分工来提高复杂任务的性能,并可能产生智能涌现。不过,LangGraph的学习难度有些大,其内部高度抽象的设计,上手成本较高,要求使用者懂得软件开发逻辑,并深入理解其原理才能灵活使用。以下是基于 LangGraph 框架设计的一个油田生产分析多智能体协作示例,采用层级指挥模式结合反思迭代机制来确保分析的准度和深度。
一、任务目标与输入
目标: 根据用户输入的特定井号和时间范围,自主完成生产数据采集、异常检测、性能评估,并输出一份包含优化建议的工程分析报告。
用户输入: 请分析xxx井过去六个月的产油量、含水率(Water Cut)变化趋势,并给出潜在的生产瓶颈和增产建议。
二、智能体分工与主要职责
在这个协作系统中,我们设置了四个具有专长(通过工具调用实现)的智能体,共同完成生产分析任务:
| 角色 | 主要职责 | Agentic 模式 |
|---|---|---|
| 指挥官智能体 (Planner) | 接收用户请求,进行任务分解,分配子任务,并最终整合所有专家的分析结果,撰写最终报告。 | 规划 (Planning) |
| 数据提取智能体 (Data Retriever) | 负责访问内部数据源(如生产数据库或历史传感器数据),执行复杂查询或进行RAG(检索增强生成),提取原始生产数据。 | 工具使用 (Tool Use) |
| 指标分析智能体 (Analyst) | 对原始数据进行清洗、计算关键指标(如产量、注采比、效率),执行异常检测算法,并用代码解释器运行模拟或生成图表。 | 工具使用 & 规划 |
| 质控智能体 (QC) | 扮演反思者的角色,审查分析智能体的输出,评估异常检测是否可靠、建议是否合理,并提供反馈以驱动迭代优化。 | 反思与迭代 (Reflection) |
三、基于 LangGraph 工作流设计(DCG 结构)
LangGraph 通过定义节点 (Nodes) 和边 (Edges) 来构建一个动态且可循环的工作流程。流程中的动态路由实现了关键的自我纠错功能,这是 Agentic 工作流的核心优势。
| 步骤/节点 (Node) | 智能体 / 动作 (Action) | 关键逻辑与输出 |
|---|---|---|
| 1. 开始 (Start) | 用户输入请求。 | 任务进入流程。 |
| 2. 任务规划 (Planning) | 指挥智能体 | 将请求分解为数据收集和分析的步骤,并生成结构化的数据提取任务清单。 |
| 3. 数据采集 (Data Retrieval) | 数据提取智能体 + 工具调用 | 调用内部 API 或 SQL 接口,获取井 XXXXX 的历史产量、含水率、压力等数据。输出原始生产数据集。 |
| 4. 数据与异常分析 (Analysis) | 指标分析智能体 + 代码解释器 | 执行数据处理脚本,计算趋势,标记异常时间点(如产量骤降或含水率突变),生成初步分析结果和图表。 |
| 5. 质量评估与反思 (qc) | 质控智能体 | 反思模式: 评估 Node 4 的分析结果。如果发现异常标记没有充分解释,或图表不清晰,则输出 Needs_Refinement,附带具体的优化建议。否则输出 QC_Pass。 |
| 6. 动态路由 (Router) | LangGraph 决策逻辑 | 根据 Node 5 的评估结果进行路由: 👉 如果结果是 Needs_Refinement: 流程回到 Node 4 (数据与异常分析),形成迭代循环,要求工程分析智能体根据批评意见修正分析或重新运行代码。 👉 如果结果是 QC_Pass: 流程进入 Node 7。 |
| 7. 报告与建议生成 (Finalize) | 指挥智能体 | 整合所有经过质量验证的分析结果和图表,撰写包含瓶颈诊断、增产措施(如酸化、压裂等)和效益预测的最终报告。 |
| 8. 结束 (End) | 交付最终报告给用户。 | 任务完成。 |
四、LangGraph的价值
- 高度定制化逻辑: 油田生产分析往往涉及高度专业化的知识和独特的业务规则。LangGraph 允许开发者在 Node 5 (质量评估) 和 Node 6 (路由) 中嵌入特定的工程逻辑和判断标准,这使其比预设死的框架更具适应性。
- 自我纠错能力: 通过 Node 5 和 Node 6 构成的循环,系统可以自动发现分析中的逻辑错误或数据缺失(例如,发现产量下降但未分析到泵故障的关联数据),并触发指标分析智能体进行第二次甚至多次迭代分析,直到质控智能体满意为止,极大地提高了最终报告的可靠性和深度。
- 专业能力集成: 流程清晰地定义了每个智能体使用哪些工具(数据库、代码解释器)来执行其专业任务,实现了专业分工,使得整体系统性能超越单一 Agent 的能力。
五、实现示例代码
'''
利用LangGraph框架实现多个智能体,包括指挥智能体、数据提取智能体、指标分析智能体和质控智能体等。
用户的初始请求进入指挥智能体,经抽取数据智能体获取数据后,再进行质量检测判断相关指标是否存在,
再进入指标分析智能体,最后通过最终解答智能体形成分析报告。
运行环境:agent
'''
import os, json, random, pandas as pd
from typing import Literal, TypedDict, List
from langchain_ollama import ChatOllama
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from io import StringIO
# ---------------- 1. 状态 ----------------
class State(TypedDict):
request: str # 指挥官收到的自然语言需求
sql: str # 数据提取智能体生成的 SQL
raw_csv: str # 提取结果(csv 文本)
qc_report: str # 质控结论
metrics: dict # 指标分析结果
final_report: str # 最终解读报告
messages: List[str] # 运行日志
# ---------------- 2. 本地模型 ----------------
llm = ChatOllama(
model="qwen2.5:14b",
# base_url=os.getenv("OLLAMA_BASE_URL","http://localhost:11434"),
base_url='http://localhost:11434',
temperature=0.4
)
# ---------------- 3. 各 Agent ----------------
# 3.1 指挥官 Agent
def commander(state: State) -> Command[Literal["extractor"]]:
"""解析需求,生成结构化提取任务"""
prompt = (f"请把下列油田生产分析需求转化为一条简洁的英文 SQL 查询语句,"
f"表名 production,字段:well_id, date, oil_rate(bbl/d), water_rate(bbl/d), pump_efficiency(%)"
f"需求:{state['request']}\n只返回 SQL,不要解释。")
sql = llm.invoke(prompt).content.strip()
return Command(goto="extractor",
update={"sql": sql, "messages": [f"[Commander] 需求解析完成,SQL:{sql}"]})
# 3.2 数据提取 Agent
def extractor(state: State) -> Command[Literal["qc"]]:
"""模拟从数据库提取数据,返回 CSV 文本"""
# 连接实际数据后,可替换为数据库或REST接口
# 这里随机生成 30 条记录做演示
dates = pd.date_range("2024-01-01", periods=30, freq='D')
df = pd.DataFrame({
"well_id": ["A-001"]*30,
"date": dates,
"oil_rate": [random.uniform(150, 250) for _ in range(30)],
"water_rate": [random.uniform(800, 1200) for _ in range(30)],
"pump_efficiency": [random.uniform(55, 70) for _ in range(30)]
})
csv = df.to_csv(index=False)
return Command(goto="qc",
update={"raw_csv": csv, "messages": [f"[Extractor] 数据提取完成,{len(df)} 条记录"]})
# 3.3 质控 Agent
def qc(state: State) -> Command[Literal["extractor", "analyzer"]]:
"""质控:空值、负值、单位异常"""
df = pd.read_csv(StringIO(state["raw_csv"]))
issues = []
if df.isnull().any().any():
issues.append("存在空值")
if (df["oil_rate"] < 0).any() or (df["water_rate"] < 0).any():
issues.append("产量为负")
if df["pump_efficiency"].max() > 100:
issues.append("泵效超 100%")
if issues:
return Command(goto="extractor",
update={"qc_report": ";".join(issues),
"messages": [f"[QC] ❌ 质控未通过:{issues},已退回提取"]})
else:
return Command(goto="analyzer",
update={"qc_report": "数据质量合格",
"messages": [f"[QC] ✅ 质控通过"]})
# 3.4 指标分析 Agent
def analyzer(state: State) -> Command[Literal["commander"]]:
"""计算关键指标"""
df = pd.read_csv(StringIO(state["raw_csv"]))
# 简单示例指标
avg_oil = df["oil_rate"].mean()
avg_water = df["water_rate"].mean()
water_cut = avg_water / (avg_oil + avg_water) * 100
avg_pump = df["pump_efficiency"].mean()
metrics = {
"平均日产油(bbl/d)": round(avg_oil, 2),
"平均日产水(bbl/d)": round(avg_water, 2),
"综合含水率(%)": round(water_cut, 2),
"平均泵效(%)": round(avg_pump, 2)
}
return Command(goto="commander",
update={"metrics": metrics,
"messages": [f"[Analyzer] 指标计算完成:{metrics}"]})
# 3.5 最终报告 Agent
def commander_finalize(state: State) -> Command[Literal[END]]:
"""生成最终解读报告"""
prompt = (f"请用中文写一段 200 字左右的油田生产分析结论,要求通俗易懂,并给出下一步措施建议。"
f"指标如下:{json.dumps(state['metrics'], ensure_ascii=False)}")
report = llm.invoke(prompt).content.strip()
# 保存报告
with open("oilfield_report.md", "w", encoding="utf-8") as f:
f.write("# 油田生产分析报告\n\n" + report)
return Command(goto=END,
update={"final_report": report,
"messages": [f"[Commander] 报告已生成并保存为 oilfield_report.md"]})
# ---------------- 4. 图构建 ----------------
workflow = StateGraph(State)
nodes = {
"commander": commander,
"extractor": extractor,
"qc": qc,
"analyzer": analyzer,
"finalize": commander_finalize
}
for name, func in nodes.items():
workflow.add_node(name, func)
# 边
workflow.add_edge(START, "commander")
workflow.add_conditional_edges("commander",
lambda s: "extractor") # 第一次必然提取
workflow.add_conditional_edges("extractor",
lambda s: "qc")
workflow.add_conditional_edges("qc",
lambda s: "analyzer" if s["qc_report"] == "数据质量合格" else "extractor")
workflow.add_conditional_edges("analyzer",
lambda s: "finalize")
workflow.add_edge("finalize", END)
app = workflow.compile()
# ---------------- 5. 运行示例 ----------------
if __name__ == "__main__":
req = "请分析 2024 年 1 月 A-001 井的生产情况,重点关注含水与泵效。"
st = app.invoke({"request": req, "messages": []})
for m in st["messages"]:
print(m)
更多推荐


所有评论(0)