Apache Airflow(简称 Airflow)是一个平台,用于以编程方式编写调度监控工作流。

当将工作流定义为代码时,它们变得更易于维护版本控制测试协作

使用 Airflow 将工作流编写为有向无环图(DAGs)的任务集合。Airflow 调度器会根据指定的依赖关系,在多个工作节点上执行任务。强大的命令行工具使得对 DAG 进行复杂操作变得简单方便。功能丰富的用户界面使您能够轻松地可视化正在运行的生产环境中的流水线,监控进度,并在需要时进行故障排查

1. 基本概念

在 Airflow 中,数据管道被定义为 DAG(有向无环图),由以下主要组件构成:

DAG(Directed Acyclic Graph): 描述任务及其执行顺序。

Operator: 定义单个任务,比如运行 Python 函数、执行 Bash 命令、触发外部工具等。

Task: DAG 中的一个任务实例,由 Operator 定义。

Task Dependencies: 任务之间的依赖关系。

2. 安装和配置 Airflow

安装 Airflow

Airflow 使用 Python 环境,可以通过 pip 安装:

pip install apache-airflow

初始化数据库

初始化元数据库(Airflow 默认使用 SQLite,可以配置为其他数据库):

airflow db init

创建用户

创建一个管理员账户用于访问 Airflow Web UI:

airflow users create \
    --username admin \
    --firstname FirstName \
    --lastname LastName \
    --role Admin \
    --email admin@example.com

启动 Web Server 和 Scheduler

启动 Web 服务器:

airflow webserver

启动调度器:

airflow scheduler

默认情况下,Airflow Web UI 在 http://localhost:8080。

3. 创建一个简单的 DAG

DAG 文件结构

Airflow DAG 文件是一个 Python 脚本,通常存储在 Airflow 的 dags 文件夹中(默认路径:~/airflow/dags)。

示例 DAG

以下是一个简单的 DAG 示例,包含两个任务:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# 定义 DAG
default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    default_args=default_args,
    start_date=datetime(2023, 12, 1),
    schedule_interval='@daily',  # 每天执行一次
    catchup=False,  # 是否对历史日期补跑
) as dag:

    # 定义任务
    task_1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    task_2 = BashOperator(
        task_id='echo_message',
        bash_command='echo "Hello, Airflow!"',
    )

    # 定义任务依赖
    task_1 >> task_2

4. 运行管道

1. 将 DAG 文件保存到 Airflow 的 dags 目录(例如 ~/airflow/dags/example_dag.py)。

2. 启动 Airflow 调度器和 Web 服务器:

airflow scheduler
airflow webserver

3. 打开 Airflow Web UI,激活 DAG 并手动触发运行。

5. 使用 Operators 构建任务

Airflow 提供了多种内置的 Operators,用于处理不同的任务类型:

常用 Operators

BashOperator: 运行 Bash 命令。

PythonOperator: 调用 Python 函数。

PostgresOperator: 执行 PostgreSQL 查询。

MySqlOperator: 执行 MySQL 查询。

S3Operator: 与 AWS S3 交互。

EmailOperator: 发送电子邮件。

6. 设置任务依赖

Airflow 允许通过运算符设置任务依赖:

• task_1 >> task_2:task_1 必须先完成。

• task_1 << task_2:task_2 必须先完成。

• [task_1, task_2] >> task_3:task_1 和 task_2 都必须先完成。

7. 监控和调试管道

Airflow Web UI:查看任务的状态、日志和执行历史。

任务重试:通过设置 retries 和 retry_delay 自动重试任务。

任务日志:每个任务的日志可在 Web UI 中查看。

Logo

更多推荐