在前一篇的文章中,我们介绍了 CrewAI 的基本概念、核心组件和基础使用方法。本文将深入探讨 CrewAI 的高级特性,包括综合实战案例、最佳实践指南、性能优化技巧以及企业级部署方案,帮助您构建更加健壮和高效的多智能体协作系统

一、综合案例:图书生成智能体系统设计

让我们通过一个完整的图书生成智能体系统来展示 CrewAI 的强大能力。这个系统能够自动完成从书籍大纲设计到章节内容生成的全流程

1.1 项目架构设计

目录结构:

book_generator/
├── src/
│   ├── book_generator/
│   │   ├── crews/
│   │   │   ├── outline_crew/
│   │   │   │   ├── config/
│   │   │   │   │   ├── agents.yaml
│   │   │   │   │   └── tasks.yaml
│   │   │   │   └── __init__.py
│   │   │   └── write_chapter_crew/
│   │   │       ├── config/
│   │   │       │   ├── agents.yaml
│   │   │       │   └── tasks.yaml
│   │   │       └── __init__.py
│   │   ├── flows/
│   │   │   └── book_flow.py
│   │   ├── models/
│   │   │   └── chapter.py
│   │   └── __init__.py
│   └── main.py
├── pyproject.toml
└── README.md

1.2 智能体设计与实现

1、章节主题智能体

首先创建一个负责生成书籍大纲的智能体团队:

# outline_crew/__init__.py
from crewai import Agent, Crew, Process, Task, LLM
from crewai.project import CrewBase, agent, crew, task
from book_generator.models import Chapter

class OutlineCrew(CrewBase):
    """生成书籍大纲的智能体团队"""
    
    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"
    
    def __init__(self):
        super().__init__()
        self.llm = LLM(
            model="deepseek/deepseek-v3",
            api_key="your-api-key",
            api_base="https://api.deepseek.com/v1"
        )
    
    @agent
    def researcher_agent(self) -> Agent:
        """研究主题并收集相关信息"""
        return Agent(
            role="技术研究员",
            goal="深入研究指定技术主题,收集最新资料和发展趋势",
            backstory="你是一位资深技术研究员,擅长快速掌握新技术并分析发展趋势",
            tools=[self.search_tool],
            llm=self.llm,
            verbose=True
        )
    
    @agent
    def outline_agent(self) -> Agent:
        """基于研究结果生成书籍大纲"""
        return Agent(
            role="书籍大纲设计师",
            goal="根据研究资料设计逻辑清晰、结构完整的书籍大纲",
            backstory="你是一位经验丰富的技术书籍作者,擅长设计系统性的知识结构",
            llm=self.llm,
            verbose=True
        )
    
    @task
    def research_task(self) -> Task:
        """研究任务"""
        return Task(
            description="研究CrewAI多智能体系统的核心技术和应用场景",
            expected_output="一份详细的技术研究报告,包含核心概念、架构设计和应用案例",
            agent=self.researcher_agent()
        )
    
    @task
    def outline_task(self) -> Task:
        """生成大纲任务"""
        return Task(
            description="基于研究报告设计《CrewAI实战指南》的详细大纲",
            expected_output="包含章节结构、主要内容和技术要点的完整书籍大纲",
            agent=self.outline_agent()
        )
    
    @crew
    def crew(self) -> Crew:
        """创建并配置团队"""
        return Crew(
            agents=[self.researcher_agent(), self.outline_agent()],
            tasks=[self.research_task(), self.outline_task()],
            process=Process.sequential,
            verbose=True
        )

2. 内容创作智能体

接下来实现章节内容创作的智能体:

# write_chapter_crew/__init__.py
from crewai import Agent, Crew, Process, Task, LLM
from crewai.project import CrewBase, agent, crew, task
from book_generator.models import Chapter

class WriteChapterCrew(CrewBase):
    """撰写章节内容的智能体团队"""
    
    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"
    
    def __init__(self, chapter_topic: str):
        super().__init__()
        self.chapter_topic = chapter_topic
        self.llm = LLM(
            model="deepseek/deepseek-v3",
            api_key="your-api-key",
            api_base="https://api.deepseek.com/v1"
        )
    
    @agent
    def content_researcher(self) -> Agent:
        """研究章节相关内容"""
        return Agent(
            role="内容研究员",
            goal=f"深入研究{self.chapter_topic}相关的技术细节和最佳实践",
            backstory="你是一位专注于技术内容的研究员,擅长深入挖掘技术细节",
            tools=[self.search_tool],
            llm=self.llm,
            verbose=True
        )
    
    @agent
    def technical_writer(self) -> Agent:
        """撰写技术章节内容"""
        return Agent(
            role="技术作家",
            goal=f"撰写关于{self.chapter_topic}的详细技术章节",
            backstory="你是一位专业的技术作家,擅长将复杂技术内容转化为易懂的文章",
            llm=self.llm,
            verbose=True
        )
    
    @task
    def research_chapter_content(self) -> Task:
        """研究章节内容"""
        return Task(
            description=f"深入研究{self.chapter_topic}的技术细节、代码示例和应用场景",
            expected_output=f"关于{self.chapter_topic}的详细技术资料和代码示例",
            agent=self.content_researcher()
        )
    
    @task
    def write_chapter(self) -> Task:
        """撰写章节"""
        return Task(
            description=f"基于研究资料撰写{self.chapter_topic}章节,包含理论、代码示例和实践指导",
            expected_output=f"完整的{self.chapter_topic}技术章节,结构清晰,代码完整可运行",
            agent=self.technical_writer()
        )
    
    @crew
    def crew(self) -> Crew:
        """创建写作团队"""
        return Crew(
            agents=[self.content_researcher(), self.technical_writer()],
            tasks=[self.research_chapter_content(), self.write_chapter()],
            process=Process.sequential,
            verbose=True
        )

1.3 工作流整合

使用 Flow 将各个智能体团队整合为完整的书籍生成流程:

# flows/book_flow.py
from crewai import Flow, current_flow
from pydantic import BaseModel
from typing import List, Optional
from book_generator.crews.outline_crew import OutlineCrew
from book_generator.crews.write_chapter_crew import WriteChapterCrew

class BookState(BaseModel):
    """书籍生成流程的状态管理"""
    topic: str
    outline: Optional[str] = None
    chapters: List[str] = []
    current_chapter: int = 0

class BookFlow(Flow):
    """书籍生成工作流"""
    
    state: BookState
    
    def __init__(self, topic: str):
        super().__init__()
        self.state = BookState(topic=topic)
    
    @start()
    def generate_outline(self):
        """生成书籍大纲"""
        print(f"开始生成《{self.state.topic}》的大纲...")
        
        # 创建大纲生成团队
        outline_crew = OutlineCrew()
        result = outline_crew.crew().kickoff()
        
        self.state.outline = result
        print("大纲生成完成!")
        
        # 提取章节列表(简化处理)
        self.state.chapters = self._extract_chapters_from_outline(result)
        
        return "outline_generated"
    
    @listener("outline_generated")
    def write_chapters(self):
        """撰写章节内容"""
        if self.state.current_chapter < len(self.state.chapters):
            current_topic = self.state.chapters[self.state.current_chapter]
            print(f"正在撰写第{self.state.current_chapter + 1}章:{current_topic}")
            
            # 创建章节写作团队
            chapter_crew = WriteChapterCrew(chapter_topic=current_topic)
            chapter_content = chapter_crew.crew().kickoff()
            
            # 保存章节内容
            self._save_chapter(current_topic, chapter_content)
            
            self.state.current_chapter += 1
            return "chapter_written"
        else:
            return "all_chapters_completed"
    
    @listener("chapter_written")
    def continue_writing(self):
        """继续撰写下一章"""
        self.write_chapters()
    
    @listener("all_chapters_completed")
    def compile_book(self):
        """整合所有章节生成完整书籍"""
        print("正在整合所有章节生成完整书籍...")
        book_content = self._compile_all_chapters()
        self._save_complete_book(book_content)
        print(f"《{self.state.topic}》生成完成!")
        return "book_completed"
    
    def _extract_chapters_from_outline(self, outline: str) -> List[str]:
        """从大纲中提取章节列表"""
        # 这里实现从大纲文本中提取章节标题的逻辑
        # 简化处理,实际项目中需要更复杂的解析
        return [
            "CrewAI基础概念",
            "智能体设计模式",
            "任务编排技巧",
            "实战项目案例",
            "性能优化策略"
        ]
    
    def _save_chapter(self, topic: str, content: str):
        """保存章节内容"""
        with open(f"chapters/{topic}.md", "w", encoding="utf-8") as f:
            f.write(content)
    
    def _compile_all_chapters(self) -> str:
        """整合所有章节"""
        book_content = f"# {self.state.topic}\n\n"
        book_content += self.state.outline + "\n\n"
        
        for chapter in self.state.chapters:
            with open(f"chapters/{chapter}.md", "r", encoding="utf-8") as f:
                book_content += f.read() + "\n\n"
        
        return book_content
    
    def _save_complete_book(self, content: str):
        """保存完整书籍"""
        with open(f"{self.state.topic}.md", "w", encoding="utf-8") as f:
            f.write(content)

# 主程序入口
def main():
    # 创建书籍生成流程
    book_flow = BookFlow(topic="CrewAI多智能体系统开发实战指南")
    
    # 启动流程
    result = book_flow.kickoff()
    print(f"书籍生成流程完成,结果:{result}")
    
    # 生成流程图
    book_flow.plot("book_generation_flow")

if __name__ == "__main__":
    main()

二、实战案例

2.1 案例一:智能客户支持系统

系统架构:

# customer_support_system.py
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import SerperDevTool, GmailTool, SlackTool

class CustomerSupportSystem:
    """智能客户支持系统"""
    
    def __init__(self):
        self.llm = LLM(
            model="gpt-4",
            api_key="your-api-key"
        )
        self.search_tool = SerperDevTool()
        self.email_tool = GmailTool()
        self.slack_tool = SlackTool()
    
    def create_support_agents(self):
        """创建支持团队的智能体"""
        
        # 初级支持代理
        tier1_agent = Agent(
            role="初级技术支持工程师",
            goal="处理常见客户问题,提供基础解决方案",
            backstory="你是一位经验丰富的初级技术支持工程师,擅长处理常见技术问题",
            tools=[self.search_tool, self.email_tool],
            llm=self.llm,
            verbose=True
        )
        
        # 高级支持代理
        tier2_agent = Agent(
            role="高级技术支持专家",
            goal="解决复杂的技术问题,提供深度技术支持",
            backstory="你是一位资深技术专家,擅长解决复杂的技术难题",
            tools=[self.search_tool, self.email_tool, self.slack_tool],
            llm=self.llm,
            verbose=True
        )
        
        # 技术文档撰写代理
        documentation_agent = Agent(
            role="技术文档工程师",
            goal="创建和更新技术文档,帮助客户自助解决问题",
            backstory="你是一位专业的技术文档工程师,擅长创建清晰易懂的技术文档",
            llm=self.llm,
            verbose=True
        )
        
        return tier1_agent, tier2_agent, documentation_agent
    
    def create_support_tasks(self, customer_query, agents):
        """创建支持任务"""
        tier1_agent, tier2_agent, documentation_agent = agents
        
        # 初步问题分析任务
        initial_analysis = Task(
            description=f"分析客户问题:{customer_query},判断问题类型和复杂度",
            expected_output="问题分析报告,包含问题分类、严重程度和初步解决方案",
            agent=tier1_agent,
            priority=1
        )
        
        # 问题解决任务(使用条件路由)
        problem_solving = Task(
            description="基于问题分析结果,提供完整的解决方案",
            expected_output="详细的解决方案,包含步骤说明和必要的技术细节",
            agent=tier2_agent,  # 可以根据问题复杂度动态分配
            priority=2
        )
        
        # 后续跟进任务
        follow_up = Task(
            description="跟进客户问题解决情况,确保客户满意度",
            expected_output="跟进报告,包含客户反馈和问题解决状态",
            agent=tier1_agent,
            priority=3
        )
        
        # 文档更新任务
        update_docs = Task(
            description="基于解决的问题,更新相关技术文档",
            expected_output="更新后的技术文档或FAQ条目",
            agent=documentation_agent,
            priority=4
        )
        
        return [initial_analysis, problem_solving, follow_up, update_docs]
    
    def run_support_system(self, customer_query):
        """运行客户支持系统"""
        # 创建智能体
        agents = self.create_support_agents()
        
        # 创建任务
        tasks = self.create_support_tasks(customer_query, agents)
        
        # 创建支持团队
        support_team = Crew(
            agents=agents,
            tasks=tasks,
            process=Process.hierarchical,  # 使用分层执行模式
            verbose=True
        )
        
        # 执行任务
        result = support_team.kickoff()
        return result

2.2 案例二:市场分析与预测系统

# market_analysis_system.py
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import SerperDevTool, ExcelTool, PlotTool
import pandas as pd

class MarketAnalysisSystem:
    """智能市场分析与预测系统"""
    
    def __init__(self):
        self.llm = LLM(
            model="gpt-4",
            api_key="your-api-key"
        )
        self.search_tool = SerperDevTool()
        self.excel_tool = ExcelTool()
        self.plot_tool = PlotTool()
    
    def create_market_agents(self):
        """创建市场分析团队"""
        
        # 数据收集代理
        data_collector = Agent(
            role="市场数据研究员",
            goal="收集和整理相关市场数据和行业报告",
            backstory="你是一位专业的数据研究员,擅长收集和整理各类市场数据",
            tools=[self.search_tool, self.excel_tool],
            llm=self.llm,
            verbose=True
        )
        
        # 数据分析代理
        data_analyst = Agent(
            role="市场分析师",
            goal="分析市场数据,识别趋势和模式",
            backstory="你是一位资深市场分析师,擅长从数据中发现商业洞察",
            tools=[self.excel_tool, self.plot_tool],
            llm=self.llm,
            verbose=True
        )
        
        # 预测建模代理
        forecaster = Agent(
            role="预测建模专家",
            goal="基于历史数据构建预测模型",
            backstory="你是一位预测建模专家,擅长使用机器学习技术进行市场预测",
            llm=self.llm,
            verbose=True
        )
        
        # 报告撰写代理
        report_writer = Agent(
            role="商业报告撰写师",
            goal="撰写专业的市场分析报告",
            backstory="你是一位专业的商业报告撰写师,擅长将复杂分析转化为易懂的报告",
            llm=self.llm,
            verbose=True
        )
        
        return data_collector, data_analyst, forecaster, report_writer
    
    def create_analysis_tasks(self, industry, time_period, agents):
        """创建分析任务"""
        data_collector, data_analyst, forecaster, report_writer = agents
        
        # 数据收集任务
        data_collection = Task(
            description=f"收集{industry}行业过去{time_period}的市场数据,包括市场规模、增长率、主要参与者等",
            expected_output="完整的市场数据集,包含结构化的Excel文件和数据来源说明",
            agent=data_collector,
            priority=1
        )
        
        # 趋势分析任务
        trend_analysis = Task(
            description="分析收集的市场数据,识别关键趋势、机会和挑战",
            expected_output="趋势分析报告,包含数据可视化图表和关键发现",
            agent=data_analyst,
            priority=2
        )
        
        # 预测建模任务
        forecasting = Task(
            description="基于历史数据构建预测模型,预测未来市场发展趋势",
            expected_output="预测模型和未来3-5年的市场预测报告",
            agent=forecaster,
            priority=3
        )
        
        # 报告整合任务
        report_generation = Task(
            description="整合所有分析结果,撰写专业的市场分析报告",
            expected_output="完整的市场分析报告,包含执行摘要、详细分析和建议",
            agent=report_writer,
            priority=4
        )
        
        return [data_collection, trend_analysis, forecasting, report_generation]
    
    def run_analysis(self, industry, time_period="5年"):
        """运行市场分析"""
        # 创建智能体
        agents = self.create_market_agents()
        
        # 创建任务
        tasks = self.create_analysis_tasks(industry, time_period, agents)
        
        # 创建分析团队
        analysis_team = Crew(
            agents=agents,
            tasks=tasks,
            process=Process.sequential,
            verbose=True
        )
        
        # 执行分析
        result = analysis_team.kickoff()
        return result

三、最佳实践指南

3.1 智能体任务优先级管理:

# 任务优先级示例
high_priority_task = Task(
    description="处理紧急客户投诉",
    expected_output="客户问题解决方案和跟进计划",
    agent=support_agent,
    priority=1  # 最高优先级
)

medium_priority_task = Task(
    description="更新产品文档",
    expected_output="更新后的产品使用手册",
    agent=documentation_agent,
    priority=2  # 中等优先级
)

low_priority_task = Task(
    description="分析季度用户反馈",
    expected_output="用户反馈分析报告",
    agent=analysis_agent,
    priority=3  # 低优先级
)

3.2 性能优化策略

1、缓存策略

# 实现智能体响应缓存
import hashlib
import json
from functools import lru_cache

class CachedAgent:
    """带缓存的智能体包装器"""
    
    def __init__(self, agent):
        self.agent = agent
        self.cache = {}
    
    def generate_response(self, task_description, max_age=3600):
        """生成响应并缓存结果"""
        # 创建缓存键
        cache_key = self._create_cache_key(task_description)
        
        # 检查缓存
        if cache_key in self.cache:
            cached_response, timestamp = self.cache[cache_key]
            if (time.time() - timestamp) < max_age:
                print("使用缓存响应")
                return cached_response
        
        # 生成新响应
        response = self.agent.generate_response(task_description)
        
        # 缓存响应
        self.cache[cache_key] = (response, time.time())
        
        return response
    
    def _create_cache_key(self, task_description):
        """创建缓存键"""
        return hashlib.md5(task_description.encode()).hexdigest()

2、速率限制和节流

# 实现API调用速率限制
import time
from collections import defaultdict

class RateLimitedAgent:
    """带速率限制的智能体"""
    
    def __init__(self, agent, requests_per_minute=60):
        self.agent = agent
        self.rate_limit = requests_per_minute
        self.request_timestamps = defaultdict(list)
    
    def generate_response(self, task_description):
        """生成响应并应用速率限制"""
        agent_id = id(self.agent)
        
        # 清理过期的请求记录
        now = time.time()
        self.request_timestamps[agent_id] = [
            ts for ts in self.request_timestamps[agent_id]
            if now - ts < 60
        ]
        
        # 检查速率限制
        if len(self.request_timestamps[agent_id]) >= self.rate_limit:
            wait_time = 60 - (now - self.request_timestamps[agent_id][0])
            print(f"速率限制达到,等待 {wait_time:.2f} 秒")
            time.sleep(wait_time)
        
        # 记录请求时间
        self.request_timestamps[agent_id].append(time.time())
        
        # 生成响应
        return self.agent.generate_response(task_description)

3、并行执行优化

# 并行任务执行示例
from crewai import Agent, Task, Crew, Process
import asyncio

class ParallelProcessingSystem:
    """并行处理系统"""
    
    def create_parallel_agents(self):
        """创建并行工作的智能体"""
        agents = []
        for i in range(3):
            agent = Agent(
                role=f"数据处理专家_{i+1}",
                goal="并行处理数据块,提高处理效率",
                backstory="你是一位数据处理专家,擅长高效处理大规模数据集",
                llm=self.llm,
                verbose=True
            )
            agents.append(agent)
        return agents
    
    def create_parallel_tasks(self, data_chunks, agents):
        """创建并行任务"""
        tasks = []
        for i, (chunk, agent) in enumerate(zip(data_chunks, agents)):
            task = Task(
                description=f"处理数据块 {i+1}:{chunk[:100]}...",
                expected_output=f"数据块 {i+1} 的处理结果",
                agent=agent,
                priority=1
            )
            tasks.append(task)
        return tasks
    
    def run_parallel_processing(self, large_dataset):
        """运行并行处理"""
        # 数据分片
        chunk_size = len(large_dataset) // 3
        data_chunks = [
            large_dataset[i*chunk_size : (i+1)*chunk_size]
            for i in range(3)
        ]
        
        # 创建智能体和任务
        agents = self.create_parallel_agents()
        tasks = self.create_parallel_tasks(data_chunks, agents)
        
        # 使用并行执行模式
        processing_team = Crew(
            agents=agents,
            tasks=tasks,
            process=Process.parallel,  # 并行执行
            verbose=True
        )
        
        # 执行处理
        results = processing_team.kickoff()
        
        # 合并结果
        combined_result = self._merge_results(results)
        return combined_result

3.3 错误处理和容错机制

# 实现健壮的错误处理机制
import logging
from crewai import Agent, Task, Crew, Process

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

class FaultTolerantSystem:
    """容错系统设计"""
    
    def create_fault_tolerant_agent(self):
        """创建容错智能体"""
        return Agent(
            role="容错数据处理器",
            goal="处理数据并自动恢复错误",
            backstory="你是一位容错系统专家,擅长处理和恢复各种错误情况",
            llm=self.llm,
            verbose=True
        )
    
    def execute_with_retry(self, task, max_retries=3):
        """带重试机制的任务执行"""
        for attempt in range(max_retries):
            try:
                result = task.execute()
                logger.info(f"任务执行成功 (尝试 {attempt+1}/{max_retries})")
                return result
            except Exception as e:
                logger.error(f"任务执行失败 (尝试 {attempt+1}/{max_retries}): {str(e)}")
                if attempt < max_retries - 1:
                    logger.info("等待后重试...")
                    time.sleep(2 ** attempt)  # 指数退避
                else:
                    logger.error("达到最大重试次数,任务执行失败")
                    raise
    
    def create_fallback_task(self, original_task):
        """创建降级任务"""
        return Task(
            description=f"降级处理:{original_task.description}",
            expected_output=f"简化版 {original_task.expected_output}",
            agent=self.create_fault_tolerant_agent(),
            priority=original_task.priority
        )
    
    def run_fault_tolerant_workflow(self, main_tasks):
        """运行容错工作流"""
        results = []
        
        for task in main_tasks:
            try:
                # 尝试执行主任务
                result = self.execute_with_retry(task)
                results.append(result)
            except Exception as e:
                logger.error(f"主任务失败,使用降级方案: {str(e)}")
                # 执行降级任务
                fallback_task = self.create_fallback_task(task)
                fallback_result = self.execute_with_retry(fallback_task)
                results.append(fallback_result)
        
        return results

四、常见问题与解决方案

4.1 性能问题

问题:智能体响应时间过长

解决方案:

# performance_optimization.py
class PerformanceOptimizer:
    """性能优化器"""
    
    def optimize_agent_response(self, agent, task, max_tokens=1000):
        """优化智能体响应时间"""
        
        # 1. 简化提示词
        simplified_prompt = self._simplify_prompt(task.description)
        
        # 2. 限制响应长度
        optimized_task = Task(
            description=simplified_prompt,
            expected_output=f"{task.expected_output} (限制在{max_tokens}个tokens内)",
            agent=agent,
            priority=task.priority
        )
        
        # 3. 使用流式响应
        response = agent.generate_response_stream(optimized_task)
        
        return response
    
    def _simplify_prompt(self, prompt):
        """简化提示词"""
        # 移除冗余信息
        # 使用更简洁的表达方式
        # 保留关键信息
        
        # 示例简化逻辑
        prompt = prompt.replace("请你帮我", "请")
        prompt = prompt.replace("非常详细的", "详细的")
        return prompt.strip()
    
    def enable_caching(self, agent):
        """启用缓存"""
        agent.enable_cache(ttl=3600)  # 缓存1小时
    
    def use_lightweight_model(self, agent):
        """切换到轻量级模型"""
        original_model = agent.llm.model
        agent.llm.model = "gpt-3.5-turbo"  # 使用更快速的模型
        
        return original_model  # 返回原始模型以便恢复

4.2 成本控制问题

问题:API 调用成本过高

解决方案:

# cost_control.py
class CostOptimizer:
    """成本优化器"""
    
    def __init__(self, max_daily_cost=10):
        self.max_daily_cost = max_daily_cost
        self.daily_spend = 0
        self.model_costs = {
            "gpt-4": 0.06,
            "gpt-3.5-turbo": 0.002,
            "deepseek-v3": 0.004
        }
    
    def select_economical_model(self, task_complexity):
        """根据任务复杂度选择经济模型"""
        if task_complexity == "low" and self.daily_spend > self.max_daily_cost * 0.8:
            return "gpt-3.5-turbo"
        elif task_complexity == "medium":
            return "deepseek-v3"
        elif task_complexity == "high" and self.daily_spend < self.max_daily_cost * 0.5:
            return "gpt-4"
        else:
            return "deepseek-v3"
    
    def optimize_token_usage(self, prompt):
        """优化token使用"""
        # 移除不必要的空格和换行
        optimized_prompt = " ".join(prompt.split())
        
        # 使用更简洁的表达方式
        optimized_prompt = optimized_prompt.replace("for the purpose of", "for")
        optimized_prompt = optimized_prompt.replace("in order to", "to")
        
        return optimized_prompt
    
    def batch_requests(self, tasks, batch_size=5):
        """批量处理请求"""
        batches = [tasks[i:i+batch_size] for i in range(0, len(tasks), batch_size)]
        return batches

4.3 可靠性问题

问题:智能体执行失败率高

解决方案:

# reliability_improvement.py
import time
import random

class ReliabilityEnhancer:
    """可靠性增强器"""
    
    def execute_with_retry(self, func, max_retries=3, backoff_factor=2):
        """带重试机制的执行"""
        for attempt in range(max_retries):
            try:
                return func()
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                wait_time = backoff_factor ** attempt + random.uniform(0, 1)
                print(f"重试 {attempt+1}/{max_retries},等待 {wait_time:.2f} 秒")
                time.sleep(wait_time)
    
    def add_fallback_agent(self, primary_agent, fallback_agent):
        """添加降级智能体"""
        class FallbackAgent:
            def __init__(self, primary, fallback):
                self.primary = primary
                self.fallback = fallback
            
            def execute_task(self, task):
                try:
                    return self.primary.execute_task(task)
                except Exception as e:
                    print(f"主智能体失败,使用降级智能体: {e}")
                    return self.fallback.execute_task(task)
        
        return FallbackAgent(primary_agent, fallback_agent)
    
    def validate_response(self, response, validation_rules):
        """验证响应质量"""
        for rule in validation_rules:
            if not rule(response):
                return False, f"违反规则: {rule.__name__}"
        return True, "验证通过"

CrewAI 正在快速发展,随着社区的不断壮大和技术的持续进步,相信它将成为构建下一代 AI 应用的重要基础设施。现在正是开始探索和应用 CrewAI 的最佳时机!

Logo

更多推荐