5分钟上手Prefect:从脚本到生产级工作流的蜕变
你还在为数据脚本缺乏监控而烦恼?还在手动执行ETL任务浪费时间?本文将带你5分钟内掌握Prefect工作流编排,从0到1构建可监控、可调度的自动化任务,让你的Python脚本秒变生产级数据管道。读完本文你将获得:- 快速安装并运行Prefect的完整步骤- 用2行代码将普通函数升级为可监控工作流- 本地启动可视化监控界面查看任务状态- 学会设置定时调度实现任务自动运行- 掌握部署工作...
5分钟上手Prefect:从脚本到生产级工作流的蜕变
你还在为数据脚本缺乏监控而烦恼?还在手动执行ETL任务浪费时间?本文将带你5分钟内掌握Prefect工作流编排,从0到1构建可监控、可调度的自动化任务,让你的Python脚本秒变生产级数据管道。
读完本文你将获得:
- 快速安装并运行Prefect的完整步骤
- 用2行代码将普通函数升级为可监控工作流
- 本地启动可视化监控界面查看任务状态
- 学会设置定时调度实现任务自动运行
- 掌握部署工作流的核心方法
什么是Prefect?
Prefect是一个Python工作流编排框架,能将普通脚本转换为具备监控、调度、重试和错误处理能力的生产级数据管道。无论是ETL处理、机器学习训练还是自动化报表生成,Prefect都能帮你轻松实现任务编排和管理。
官方文档:docs/v3/get-started/index.mdx
安装Prefect
Prefect要求Python 3.9或更高版本,使用pip或uv工具即可完成安装:
pip install -U prefect
或使用uv(更快的Python包管理器):
uv add prefect
构建第一个工作流
让我们从一个简单的"Hello World"开始,创建文件hello_world.py:
from prefect import flow, tags
@flow(log_prints=True)
def hello(name: str = "Marvin") -> None:
"""Log a friendly greeting."""
print(f"Hello, {name}!")
if __name__ == "__main__":
with tags("test"): # 标签可用于在UI中筛选流程运行
hello() # 使用默认参数运行
hello("Marvin") # 自定义参数运行
# 批量运行多个任务
crew = ["Zaphod", "Trillian", "Ford"]
for name in crew:
hello(name)
这个例子展示了Prefect的核心优势:仅需添加@flow装饰器,普通Python函数就变成了可监控、可追踪的工作流。
启动本地监控界面
运行工作流后,启动Prefect服务器查看任务状态:
prefect server start
访问 http://localhost:4200 即可打开Prefect UI,在这里你可以看到所有工作流的执行历史、日志和状态。
实现定时调度
将工作流部署为定时任务非常简单,只需修改代码中的运行部分:
if __name__ == "__main__":
hello.serve(
name="first-deployment",
cron="* * * * *", # 每分钟运行一次
parameters={"repos": ["PrefectHQ/prefect"]}
)
这段代码会将工作流部署为服务,按照cron表达式定义的 schedule 自动运行。你还可以在Prefect UI中手动触发或调整调度计划。
构建实用数据工作流
让我们创建一个更实用的例子:监控GitHub仓库星标数量的工作流github_stars.py:
from prefect import flow, task
import httpx
@task(log_prints=True)
def get_stars(repo: str):
url = f"https://api.github.com/repos/{repo}"
count = httpx.get(url).json()["stargazers_count"]
print(f"{repo} has {count} stars!")
@flow(name="GitHub Stars")
def github_stars(repos: list[str]):
for repo in repos:
get_stars(repo)
if __name__ == "__main__":
github_stars(["PrefectHQ/Prefect"])
在这个例子中,我们使用@task装饰器定义了一个任务,使用@flow定义了工作流。Prefect会自动处理任务之间的依赖关系、重试逻辑和日志记录。
示例源码:README.md
工作流监控与管理
通过Prefect UI,你可以直观地监控工作流执行情况,包括成功/失败状态、执行时间和详细日志。
主要监控功能包括:
- 实时查看任务执行状态
- 检索完整执行日志
- 设置失败告警通知
- 查看任务依赖关系图
- 导出执行报告
部署到生产环境
Prefect支持多种部署方式,包括:
- 本地进程部署
- Docker容器化部署
- Kubernetes集群部署
- 云服务集成(AWS/GCP/Azure)
部署配置文件:docker-compose.yml
总结与进阶
通过本文,你已经掌握了Prefect的基础使用:
- 安装Prefect并创建第一个工作流
- 使用
@flow和@task装饰器定义工作流组件 - 启动本地服务器和UI监控任务
- 设置定时调度实现自动运行
进阶学习资源:
现在,你已经准备好用Prefect来编排你的数据工作流了。无论是简单的脚本自动化还是复杂的ETL管道,Prefect都能帮你轻松实现生产级别的任务调度和管理。
更多推荐





所有评论(0)