CrewAI 进阶实战:从项目实战到企业级部署的完整指南
·
在前一篇的文章中,我们介绍了 CrewAI 的基本概念、核心组件和基础使用方法。本文将深入探讨 CrewAI 的高级特性,包括综合实战案例、最佳实践指南、性能优化技巧以及企业级部署方案,帮助您构建更加健壮和高效的多智能体协作系统
一、综合案例:图书生成智能体系统设计
让我们通过一个完整的图书生成智能体系统来展示 CrewAI 的强大能力。这个系统能够自动完成从书籍大纲设计到章节内容生成的全流程
1.1 项目架构设计
目录结构:
book_generator/
├── src/
│ ├── book_generator/
│ │ ├── crews/
│ │ │ ├── outline_crew/
│ │ │ │ ├── config/
│ │ │ │ │ ├── agents.yaml
│ │ │ │ │ └── tasks.yaml
│ │ │ │ └── __init__.py
│ │ │ └── write_chapter_crew/
│ │ │ ├── config/
│ │ │ │ ├── agents.yaml
│ │ │ │ └── tasks.yaml
│ │ │ └── __init__.py
│ │ ├── flows/
│ │ │ └── book_flow.py
│ │ ├── models/
│ │ │ └── chapter.py
│ │ └── __init__.py
│ └── main.py
├── pyproject.toml
└── README.md
1.2 智能体设计与实现
1、章节主题智能体
首先创建一个负责生成书籍大纲的智能体团队:
# outline_crew/__init__.py
from crewai import Agent, Crew, Process, Task, LLM
from crewai.project import CrewBase, agent, crew, task
from book_generator.models import Chapter
class OutlineCrew(CrewBase):
"""生成书籍大纲的智能体团队"""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
def __init__(self):
super().__init__()
self.llm = LLM(
model="deepseek/deepseek-v3",
api_key="your-api-key",
api_base="https://api.deepseek.com/v1"
)
@agent
def researcher_agent(self) -> Agent:
"""研究主题并收集相关信息"""
return Agent(
role="技术研究员",
goal="深入研究指定技术主题,收集最新资料和发展趋势",
backstory="你是一位资深技术研究员,擅长快速掌握新技术并分析发展趋势",
tools=[self.search_tool],
llm=self.llm,
verbose=True
)
@agent
def outline_agent(self) -> Agent:
"""基于研究结果生成书籍大纲"""
return Agent(
role="书籍大纲设计师",
goal="根据研究资料设计逻辑清晰、结构完整的书籍大纲",
backstory="你是一位经验丰富的技术书籍作者,擅长设计系统性的知识结构",
llm=self.llm,
verbose=True
)
@task
def research_task(self) -> Task:
"""研究任务"""
return Task(
description="研究CrewAI多智能体系统的核心技术和应用场景",
expected_output="一份详细的技术研究报告,包含核心概念、架构设计和应用案例",
agent=self.researcher_agent()
)
@task
def outline_task(self) -> Task:
"""生成大纲任务"""
return Task(
description="基于研究报告设计《CrewAI实战指南》的详细大纲",
expected_output="包含章节结构、主要内容和技术要点的完整书籍大纲",
agent=self.outline_agent()
)
@crew
def crew(self) -> Crew:
"""创建并配置团队"""
return Crew(
agents=[self.researcher_agent(), self.outline_agent()],
tasks=[self.research_task(), self.outline_task()],
process=Process.sequential,
verbose=True
)
2. 内容创作智能体
接下来实现章节内容创作的智能体:
# write_chapter_crew/__init__.py
from crewai import Agent, Crew, Process, Task, LLM
from crewai.project import CrewBase, agent, crew, task
from book_generator.models import Chapter
class WriteChapterCrew(CrewBase):
"""撰写章节内容的智能体团队"""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
def __init__(self, chapter_topic: str):
super().__init__()
self.chapter_topic = chapter_topic
self.llm = LLM(
model="deepseek/deepseek-v3",
api_key="your-api-key",
api_base="https://api.deepseek.com/v1"
)
@agent
def content_researcher(self) -> Agent:
"""研究章节相关内容"""
return Agent(
role="内容研究员",
goal=f"深入研究{self.chapter_topic}相关的技术细节和最佳实践",
backstory="你是一位专注于技术内容的研究员,擅长深入挖掘技术细节",
tools=[self.search_tool],
llm=self.llm,
verbose=True
)
@agent
def technical_writer(self) -> Agent:
"""撰写技术章节内容"""
return Agent(
role="技术作家",
goal=f"撰写关于{self.chapter_topic}的详细技术章节",
backstory="你是一位专业的技术作家,擅长将复杂技术内容转化为易懂的文章",
llm=self.llm,
verbose=True
)
@task
def research_chapter_content(self) -> Task:
"""研究章节内容"""
return Task(
description=f"深入研究{self.chapter_topic}的技术细节、代码示例和应用场景",
expected_output=f"关于{self.chapter_topic}的详细技术资料和代码示例",
agent=self.content_researcher()
)
@task
def write_chapter(self) -> Task:
"""撰写章节"""
return Task(
description=f"基于研究资料撰写{self.chapter_topic}章节,包含理论、代码示例和实践指导",
expected_output=f"完整的{self.chapter_topic}技术章节,结构清晰,代码完整可运行",
agent=self.technical_writer()
)
@crew
def crew(self) -> Crew:
"""创建写作团队"""
return Crew(
agents=[self.content_researcher(), self.technical_writer()],
tasks=[self.research_chapter_content(), self.write_chapter()],
process=Process.sequential,
verbose=True
)
1.3 工作流整合
使用 Flow 将各个智能体团队整合为完整的书籍生成流程:
# flows/book_flow.py
from crewai import Flow, current_flow
from pydantic import BaseModel
from typing import List, Optional
from book_generator.crews.outline_crew import OutlineCrew
from book_generator.crews.write_chapter_crew import WriteChapterCrew
class BookState(BaseModel):
"""书籍生成流程的状态管理"""
topic: str
outline: Optional[str] = None
chapters: List[str] = []
current_chapter: int = 0
class BookFlow(Flow):
"""书籍生成工作流"""
state: BookState
def __init__(self, topic: str):
super().__init__()
self.state = BookState(topic=topic)
@start()
def generate_outline(self):
"""生成书籍大纲"""
print(f"开始生成《{self.state.topic}》的大纲...")
# 创建大纲生成团队
outline_crew = OutlineCrew()
result = outline_crew.crew().kickoff()
self.state.outline = result
print("大纲生成完成!")
# 提取章节列表(简化处理)
self.state.chapters = self._extract_chapters_from_outline(result)
return "outline_generated"
@listener("outline_generated")
def write_chapters(self):
"""撰写章节内容"""
if self.state.current_chapter < len(self.state.chapters):
current_topic = self.state.chapters[self.state.current_chapter]
print(f"正在撰写第{self.state.current_chapter + 1}章:{current_topic}")
# 创建章节写作团队
chapter_crew = WriteChapterCrew(chapter_topic=current_topic)
chapter_content = chapter_crew.crew().kickoff()
# 保存章节内容
self._save_chapter(current_topic, chapter_content)
self.state.current_chapter += 1
return "chapter_written"
else:
return "all_chapters_completed"
@listener("chapter_written")
def continue_writing(self):
"""继续撰写下一章"""
self.write_chapters()
@listener("all_chapters_completed")
def compile_book(self):
"""整合所有章节生成完整书籍"""
print("正在整合所有章节生成完整书籍...")
book_content = self._compile_all_chapters()
self._save_complete_book(book_content)
print(f"《{self.state.topic}》生成完成!")
return "book_completed"
def _extract_chapters_from_outline(self, outline: str) -> List[str]:
"""从大纲中提取章节列表"""
# 这里实现从大纲文本中提取章节标题的逻辑
# 简化处理,实际项目中需要更复杂的解析
return [
"CrewAI基础概念",
"智能体设计模式",
"任务编排技巧",
"实战项目案例",
"性能优化策略"
]
def _save_chapter(self, topic: str, content: str):
"""保存章节内容"""
with open(f"chapters/{topic}.md", "w", encoding="utf-8") as f:
f.write(content)
def _compile_all_chapters(self) -> str:
"""整合所有章节"""
book_content = f"# {self.state.topic}\n\n"
book_content += self.state.outline + "\n\n"
for chapter in self.state.chapters:
with open(f"chapters/{chapter}.md", "r", encoding="utf-8") as f:
book_content += f.read() + "\n\n"
return book_content
def _save_complete_book(self, content: str):
"""保存完整书籍"""
with open(f"{self.state.topic}.md", "w", encoding="utf-8") as f:
f.write(content)
# 主程序入口
def main():
# 创建书籍生成流程
book_flow = BookFlow(topic="CrewAI多智能体系统开发实战指南")
# 启动流程
result = book_flow.kickoff()
print(f"书籍生成流程完成,结果:{result}")
# 生成流程图
book_flow.plot("book_generation_flow")
if __name__ == "__main__":
main()
二、实战案例
2.1 案例一:智能客户支持系统
系统架构:
# customer_support_system.py
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import SerperDevTool, GmailTool, SlackTool
class CustomerSupportSystem:
"""智能客户支持系统"""
def __init__(self):
self.llm = LLM(
model="gpt-4",
api_key="your-api-key"
)
self.search_tool = SerperDevTool()
self.email_tool = GmailTool()
self.slack_tool = SlackTool()
def create_support_agents(self):
"""创建支持团队的智能体"""
# 初级支持代理
tier1_agent = Agent(
role="初级技术支持工程师",
goal="处理常见客户问题,提供基础解决方案",
backstory="你是一位经验丰富的初级技术支持工程师,擅长处理常见技术问题",
tools=[self.search_tool, self.email_tool],
llm=self.llm,
verbose=True
)
# 高级支持代理
tier2_agent = Agent(
role="高级技术支持专家",
goal="解决复杂的技术问题,提供深度技术支持",
backstory="你是一位资深技术专家,擅长解决复杂的技术难题",
tools=[self.search_tool, self.email_tool, self.slack_tool],
llm=self.llm,
verbose=True
)
# 技术文档撰写代理
documentation_agent = Agent(
role="技术文档工程师",
goal="创建和更新技术文档,帮助客户自助解决问题",
backstory="你是一位专业的技术文档工程师,擅长创建清晰易懂的技术文档",
llm=self.llm,
verbose=True
)
return tier1_agent, tier2_agent, documentation_agent
def create_support_tasks(self, customer_query, agents):
"""创建支持任务"""
tier1_agent, tier2_agent, documentation_agent = agents
# 初步问题分析任务
initial_analysis = Task(
description=f"分析客户问题:{customer_query},判断问题类型和复杂度",
expected_output="问题分析报告,包含问题分类、严重程度和初步解决方案",
agent=tier1_agent,
priority=1
)
# 问题解决任务(使用条件路由)
problem_solving = Task(
description="基于问题分析结果,提供完整的解决方案",
expected_output="详细的解决方案,包含步骤说明和必要的技术细节",
agent=tier2_agent, # 可以根据问题复杂度动态分配
priority=2
)
# 后续跟进任务
follow_up = Task(
description="跟进客户问题解决情况,确保客户满意度",
expected_output="跟进报告,包含客户反馈和问题解决状态",
agent=tier1_agent,
priority=3
)
# 文档更新任务
update_docs = Task(
description="基于解决的问题,更新相关技术文档",
expected_output="更新后的技术文档或FAQ条目",
agent=documentation_agent,
priority=4
)
return [initial_analysis, problem_solving, follow_up, update_docs]
def run_support_system(self, customer_query):
"""运行客户支持系统"""
# 创建智能体
agents = self.create_support_agents()
# 创建任务
tasks = self.create_support_tasks(customer_query, agents)
# 创建支持团队
support_team = Crew(
agents=agents,
tasks=tasks,
process=Process.hierarchical, # 使用分层执行模式
verbose=True
)
# 执行任务
result = support_team.kickoff()
return result
2.2 案例二:市场分析与预测系统
# market_analysis_system.py
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import SerperDevTool, ExcelTool, PlotTool
import pandas as pd
class MarketAnalysisSystem:
"""智能市场分析与预测系统"""
def __init__(self):
self.llm = LLM(
model="gpt-4",
api_key="your-api-key"
)
self.search_tool = SerperDevTool()
self.excel_tool = ExcelTool()
self.plot_tool = PlotTool()
def create_market_agents(self):
"""创建市场分析团队"""
# 数据收集代理
data_collector = Agent(
role="市场数据研究员",
goal="收集和整理相关市场数据和行业报告",
backstory="你是一位专业的数据研究员,擅长收集和整理各类市场数据",
tools=[self.search_tool, self.excel_tool],
llm=self.llm,
verbose=True
)
# 数据分析代理
data_analyst = Agent(
role="市场分析师",
goal="分析市场数据,识别趋势和模式",
backstory="你是一位资深市场分析师,擅长从数据中发现商业洞察",
tools=[self.excel_tool, self.plot_tool],
llm=self.llm,
verbose=True
)
# 预测建模代理
forecaster = Agent(
role="预测建模专家",
goal="基于历史数据构建预测模型",
backstory="你是一位预测建模专家,擅长使用机器学习技术进行市场预测",
llm=self.llm,
verbose=True
)
# 报告撰写代理
report_writer = Agent(
role="商业报告撰写师",
goal="撰写专业的市场分析报告",
backstory="你是一位专业的商业报告撰写师,擅长将复杂分析转化为易懂的报告",
llm=self.llm,
verbose=True
)
return data_collector, data_analyst, forecaster, report_writer
def create_analysis_tasks(self, industry, time_period, agents):
"""创建分析任务"""
data_collector, data_analyst, forecaster, report_writer = agents
# 数据收集任务
data_collection = Task(
description=f"收集{industry}行业过去{time_period}的市场数据,包括市场规模、增长率、主要参与者等",
expected_output="完整的市场数据集,包含结构化的Excel文件和数据来源说明",
agent=data_collector,
priority=1
)
# 趋势分析任务
trend_analysis = Task(
description="分析收集的市场数据,识别关键趋势、机会和挑战",
expected_output="趋势分析报告,包含数据可视化图表和关键发现",
agent=data_analyst,
priority=2
)
# 预测建模任务
forecasting = Task(
description="基于历史数据构建预测模型,预测未来市场发展趋势",
expected_output="预测模型和未来3-5年的市场预测报告",
agent=forecaster,
priority=3
)
# 报告整合任务
report_generation = Task(
description="整合所有分析结果,撰写专业的市场分析报告",
expected_output="完整的市场分析报告,包含执行摘要、详细分析和建议",
agent=report_writer,
priority=4
)
return [data_collection, trend_analysis, forecasting, report_generation]
def run_analysis(self, industry, time_period="5年"):
"""运行市场分析"""
# 创建智能体
agents = self.create_market_agents()
# 创建任务
tasks = self.create_analysis_tasks(industry, time_period, agents)
# 创建分析团队
analysis_team = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
verbose=True
)
# 执行分析
result = analysis_team.kickoff()
return result
三、最佳实践指南
3.1 智能体任务优先级管理:
# 任务优先级示例
high_priority_task = Task(
description="处理紧急客户投诉",
expected_output="客户问题解决方案和跟进计划",
agent=support_agent,
priority=1 # 最高优先级
)
medium_priority_task = Task(
description="更新产品文档",
expected_output="更新后的产品使用手册",
agent=documentation_agent,
priority=2 # 中等优先级
)
low_priority_task = Task(
description="分析季度用户反馈",
expected_output="用户反馈分析报告",
agent=analysis_agent,
priority=3 # 低优先级
)
3.2 性能优化策略
1、缓存策略
# 实现智能体响应缓存
import hashlib
import json
from functools import lru_cache
class CachedAgent:
"""带缓存的智能体包装器"""
def __init__(self, agent):
self.agent = agent
self.cache = {}
def generate_response(self, task_description, max_age=3600):
"""生成响应并缓存结果"""
# 创建缓存键
cache_key = self._create_cache_key(task_description)
# 检查缓存
if cache_key in self.cache:
cached_response, timestamp = self.cache[cache_key]
if (time.time() - timestamp) < max_age:
print("使用缓存响应")
return cached_response
# 生成新响应
response = self.agent.generate_response(task_description)
# 缓存响应
self.cache[cache_key] = (response, time.time())
return response
def _create_cache_key(self, task_description):
"""创建缓存键"""
return hashlib.md5(task_description.encode()).hexdigest()
2、速率限制和节流
# 实现API调用速率限制
import time
from collections import defaultdict
class RateLimitedAgent:
"""带速率限制的智能体"""
def __init__(self, agent, requests_per_minute=60):
self.agent = agent
self.rate_limit = requests_per_minute
self.request_timestamps = defaultdict(list)
def generate_response(self, task_description):
"""生成响应并应用速率限制"""
agent_id = id(self.agent)
# 清理过期的请求记录
now = time.time()
self.request_timestamps[agent_id] = [
ts for ts in self.request_timestamps[agent_id]
if now - ts < 60
]
# 检查速率限制
if len(self.request_timestamps[agent_id]) >= self.rate_limit:
wait_time = 60 - (now - self.request_timestamps[agent_id][0])
print(f"速率限制达到,等待 {wait_time:.2f} 秒")
time.sleep(wait_time)
# 记录请求时间
self.request_timestamps[agent_id].append(time.time())
# 生成响应
return self.agent.generate_response(task_description)
3、并行执行优化
# 并行任务执行示例
from crewai import Agent, Task, Crew, Process
import asyncio
class ParallelProcessingSystem:
"""并行处理系统"""
def create_parallel_agents(self):
"""创建并行工作的智能体"""
agents = []
for i in range(3):
agent = Agent(
role=f"数据处理专家_{i+1}",
goal="并行处理数据块,提高处理效率",
backstory="你是一位数据处理专家,擅长高效处理大规模数据集",
llm=self.llm,
verbose=True
)
agents.append(agent)
return agents
def create_parallel_tasks(self, data_chunks, agents):
"""创建并行任务"""
tasks = []
for i, (chunk, agent) in enumerate(zip(data_chunks, agents)):
task = Task(
description=f"处理数据块 {i+1}:{chunk[:100]}...",
expected_output=f"数据块 {i+1} 的处理结果",
agent=agent,
priority=1
)
tasks.append(task)
return tasks
def run_parallel_processing(self, large_dataset):
"""运行并行处理"""
# 数据分片
chunk_size = len(large_dataset) // 3
data_chunks = [
large_dataset[i*chunk_size : (i+1)*chunk_size]
for i in range(3)
]
# 创建智能体和任务
agents = self.create_parallel_agents()
tasks = self.create_parallel_tasks(data_chunks, agents)
# 使用并行执行模式
processing_team = Crew(
agents=agents,
tasks=tasks,
process=Process.parallel, # 并行执行
verbose=True
)
# 执行处理
results = processing_team.kickoff()
# 合并结果
combined_result = self._merge_results(results)
return combined_result
3.3 错误处理和容错机制
# 实现健壮的错误处理机制
import logging
from crewai import Agent, Task, Crew, Process
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class FaultTolerantSystem:
"""容错系统设计"""
def create_fault_tolerant_agent(self):
"""创建容错智能体"""
return Agent(
role="容错数据处理器",
goal="处理数据并自动恢复错误",
backstory="你是一位容错系统专家,擅长处理和恢复各种错误情况",
llm=self.llm,
verbose=True
)
def execute_with_retry(self, task, max_retries=3):
"""带重试机制的任务执行"""
for attempt in range(max_retries):
try:
result = task.execute()
logger.info(f"任务执行成功 (尝试 {attempt+1}/{max_retries})")
return result
except Exception as e:
logger.error(f"任务执行失败 (尝试 {attempt+1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
logger.info("等待后重试...")
time.sleep(2 ** attempt) # 指数退避
else:
logger.error("达到最大重试次数,任务执行失败")
raise
def create_fallback_task(self, original_task):
"""创建降级任务"""
return Task(
description=f"降级处理:{original_task.description}",
expected_output=f"简化版 {original_task.expected_output}",
agent=self.create_fault_tolerant_agent(),
priority=original_task.priority
)
def run_fault_tolerant_workflow(self, main_tasks):
"""运行容错工作流"""
results = []
for task in main_tasks:
try:
# 尝试执行主任务
result = self.execute_with_retry(task)
results.append(result)
except Exception as e:
logger.error(f"主任务失败,使用降级方案: {str(e)}")
# 执行降级任务
fallback_task = self.create_fallback_task(task)
fallback_result = self.execute_with_retry(fallback_task)
results.append(fallback_result)
return results
四、常见问题与解决方案
4.1 性能问题
问题:智能体响应时间过长
解决方案:
# performance_optimization.py
class PerformanceOptimizer:
"""性能优化器"""
def optimize_agent_response(self, agent, task, max_tokens=1000):
"""优化智能体响应时间"""
# 1. 简化提示词
simplified_prompt = self._simplify_prompt(task.description)
# 2. 限制响应长度
optimized_task = Task(
description=simplified_prompt,
expected_output=f"{task.expected_output} (限制在{max_tokens}个tokens内)",
agent=agent,
priority=task.priority
)
# 3. 使用流式响应
response = agent.generate_response_stream(optimized_task)
return response
def _simplify_prompt(self, prompt):
"""简化提示词"""
# 移除冗余信息
# 使用更简洁的表达方式
# 保留关键信息
# 示例简化逻辑
prompt = prompt.replace("请你帮我", "请")
prompt = prompt.replace("非常详细的", "详细的")
return prompt.strip()
def enable_caching(self, agent):
"""启用缓存"""
agent.enable_cache(ttl=3600) # 缓存1小时
def use_lightweight_model(self, agent):
"""切换到轻量级模型"""
original_model = agent.llm.model
agent.llm.model = "gpt-3.5-turbo" # 使用更快速的模型
return original_model # 返回原始模型以便恢复
4.2 成本控制问题
问题:API 调用成本过高
解决方案:
# cost_control.py
class CostOptimizer:
"""成本优化器"""
def __init__(self, max_daily_cost=10):
self.max_daily_cost = max_daily_cost
self.daily_spend = 0
self.model_costs = {
"gpt-4": 0.06,
"gpt-3.5-turbo": 0.002,
"deepseek-v3": 0.004
}
def select_economical_model(self, task_complexity):
"""根据任务复杂度选择经济模型"""
if task_complexity == "low" and self.daily_spend > self.max_daily_cost * 0.8:
return "gpt-3.5-turbo"
elif task_complexity == "medium":
return "deepseek-v3"
elif task_complexity == "high" and self.daily_spend < self.max_daily_cost * 0.5:
return "gpt-4"
else:
return "deepseek-v3"
def optimize_token_usage(self, prompt):
"""优化token使用"""
# 移除不必要的空格和换行
optimized_prompt = " ".join(prompt.split())
# 使用更简洁的表达方式
optimized_prompt = optimized_prompt.replace("for the purpose of", "for")
optimized_prompt = optimized_prompt.replace("in order to", "to")
return optimized_prompt
def batch_requests(self, tasks, batch_size=5):
"""批量处理请求"""
batches = [tasks[i:i+batch_size] for i in range(0, len(tasks), batch_size)]
return batches
4.3 可靠性问题
问题:智能体执行失败率高
解决方案:
# reliability_improvement.py
import time
import random
class ReliabilityEnhancer:
"""可靠性增强器"""
def execute_with_retry(self, func, max_retries=3, backoff_factor=2):
"""带重试机制的执行"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt + random.uniform(0, 1)
print(f"重试 {attempt+1}/{max_retries},等待 {wait_time:.2f} 秒")
time.sleep(wait_time)
def add_fallback_agent(self, primary_agent, fallback_agent):
"""添加降级智能体"""
class FallbackAgent:
def __init__(self, primary, fallback):
self.primary = primary
self.fallback = fallback
def execute_task(self, task):
try:
return self.primary.execute_task(task)
except Exception as e:
print(f"主智能体失败,使用降级智能体: {e}")
return self.fallback.execute_task(task)
return FallbackAgent(primary_agent, fallback_agent)
def validate_response(self, response, validation_rules):
"""验证响应质量"""
for rule in validation_rules:
if not rule(response):
return False, f"违反规则: {rule.__name__}"
return True, "验证通过"
CrewAI 正在快速发展,随着社区的不断壮大和技术的持续进步,相信它将成为构建下一代 AI 应用的重要基础设施。现在正是开始探索和应用 CrewAI 的最佳时机!
更多推荐


所有评论(0)