5分钟上手Prefect:从脚本到生产级工作流的蜕变

【免费下载链接】prefect PrefectHQ/prefect: 是一个分布式任务调度和管理平台。适合用于自动化任务执行和 CI/CD。特点是支持多种任务执行器,可以实时监控任务状态和日志。 【免费下载链接】prefect 项目地址: https://gitcode.com/GitHub_Trending/pr/prefect

你还在为数据脚本缺乏监控而烦恼?还在手动执行ETL任务浪费时间?本文将带你5分钟内掌握Prefect工作流编排,从0到1构建可监控、可调度的自动化任务,让你的Python脚本秒变生产级数据管道。

读完本文你将获得:

  • 快速安装并运行Prefect的完整步骤
  • 用2行代码将普通函数升级为可监控工作流
  • 本地启动可视化监控界面查看任务状态
  • 学会设置定时调度实现任务自动运行
  • 掌握部署工作流的核心方法

什么是Prefect?

Prefect是一个Python工作流编排框架,能将普通脚本转换为具备监控、调度、重试和错误处理能力的生产级数据管道。无论是ETL处理、机器学习训练还是自动化报表生成,Prefect都能帮你轻松实现任务编排和管理。

官方文档:docs/v3/get-started/index.mdx

安装Prefect

Prefect要求Python 3.9或更高版本,使用pip或uv工具即可完成安装:

pip install -U prefect

或使用uv(更快的Python包管理器):

uv add prefect

构建第一个工作流

让我们从一个简单的"Hello World"开始,创建文件hello_world.py

from prefect import flow, tags

@flow(log_prints=True)
def hello(name: str = "Marvin") -> None:
    """Log a friendly greeting."""
    print(f"Hello, {name}!")

if __name__ == "__main__":
    with tags("test"):  # 标签可用于在UI中筛选流程运行
        hello()  # 使用默认参数运行
        hello("Marvin")  # 自定义参数运行
        
        # 批量运行多个任务
        crew = ["Zaphod", "Trillian", "Ford"]
        for name in crew:
            hello(name)

这个例子展示了Prefect的核心优势:仅需添加@flow装饰器,普通Python函数就变成了可监控、可追踪的工作流。

示例源码:examples/hello_world.py

启动本地监控界面

运行工作流后,启动Prefect服务器查看任务状态:

prefect server start

访问 http://localhost:4200 即可打开Prefect UI,在这里你可以看到所有工作流的执行历史、日志和状态。

Prefect UI工作池界面

实现定时调度

将工作流部署为定时任务非常简单,只需修改代码中的运行部分:

if __name__ == "__main__":
    hello.serve(
        name="first-deployment",
        cron="* * * * *",  # 每分钟运行一次
        parameters={"repos": ["PrefectHQ/prefect"]}
    )

这段代码会将工作流部署为服务,按照cron表达式定义的 schedule 自动运行。你还可以在Prefect UI中手动触发或调整调度计划。

Prefect调度界面

构建实用数据工作流

让我们创建一个更实用的例子:监控GitHub仓库星标数量的工作流github_stars.py

from prefect import flow, task
import httpx

@task(log_prints=True)
def get_stars(repo: str):
    url = f"https://api.github.com/repos/{repo}"
    count = httpx.get(url).json()["stargazers_count"]
    print(f"{repo} has {count} stars!")

@flow(name="GitHub Stars")
def github_stars(repos: list[str]):
    for repo in repos:
        get_stars(repo)

if __name__ == "__main__":
    github_stars(["PrefectHQ/Prefect"])

在这个例子中,我们使用@task装饰器定义了一个任务,使用@flow定义了工作流。Prefect会自动处理任务之间的依赖关系、重试逻辑和日志记录。

示例源码:README.md

工作流监控与管理

通过Prefect UI,你可以直观地监控工作流执行情况,包括成功/失败状态、执行时间和详细日志。

Prefect工作流执行界面

主要监控功能包括:

  • 实时查看任务执行状态
  • 检索完整执行日志
  • 设置失败告警通知
  • 查看任务依赖关系图
  • 导出执行报告

部署到生产环境

Prefect支持多种部署方式,包括:

  • 本地进程部署
  • Docker容器化部署
  • Kubernetes集群部署
  • 云服务集成(AWS/GCP/Azure)

部署配置文件:docker-compose.yml

总结与进阶

通过本文,你已经掌握了Prefect的基础使用:

  1. 安装Prefect并创建第一个工作流
  2. 使用@flow@task装饰器定义工作流组件
  3. 启动本地服务器和UI监控任务
  4. 设置定时调度实现自动运行

进阶学习资源:

现在,你已经准备好用Prefect来编排你的数据工作流了。无论是简单的脚本自动化还是复杂的ETL管道,Prefect都能帮你轻松实现生产级别的任务调度和管理。

【免费下载链接】prefect PrefectHQ/prefect: 是一个分布式任务调度和管理平台。适合用于自动化任务执行和 CI/CD。特点是支持多种任务执行器,可以实时监控任务状态和日志。 【免费下载链接】prefect 项目地址: https://gitcode.com/GitHub_Trending/pr/prefect

Logo

更多推荐