跳转至

Apache Airflow 工作流编排

一句话概述:Apache Airflow 是用 Python 代码定义、调度和监控数据工作流的平台,是数据工程领域最主流的编排工具。

核心知识点表

概念白话解释
DAG有向无环图,就是把任务按先后顺序画出来的"流程图"
TaskDAG里的一个具体步骤,比如"下载数据""清洗数据"
Operator定义Task做什么事的模板,比如BashOperator就是跑Shell命令
Sensor一种特殊Operator,会等某个条件满足才往下走(比如等文件出现)
XCom任务之间传数据的信使,A任务的输出可以通过XCom传给B任务
Executor决定任务在哪里跑的组件(本地、Celery集群、Kubernetes等)
轻量化架构Airflow 3.x引入Task Execution Interface,简化执行架构
TaskFlow API用Python装饰器@task写任务,比传统Operator更简洁

版本信息(2026年5月)

版本线最新版本说明
3.x3.2.1当前主力版本,多团队支持、网格视图虚拟化
2.x2.11.2旧版本线,即将停止维护

安装配置

方式一:pip安装(推荐学习用)

# 创建conda虚拟环境(推荐隔离)
conda create -n airflow python=3.12 -y  # 创建Python 3.12环境
conda activate airflow  # 激活环境

# 设置Airflow的家目录(所有配置文件都放这里)
export AIRFLOW_HOME=~/airflow

# 用约束文件安装(避免依赖冲突)
pip install "apache-airflow==3.2.1" \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.1/constraints-3.12.txt"
# 约束文件锁定了所有依赖版本,保证安装不出错

方式二:Docker安装(推荐生产用)

# 拉取官方Docker镜像
docker pull apache/airflow:3.2.1  # 下载Airflow 3.2.1镜像

# 下载官方docker-compose文件
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/3.2.1/docker-compose.yaml'

# 初始化数据库
docker compose up airflow-init  # 创建数据库表和默认用户

# 启动所有服务
docker compose up -d  # -d 后台运行

方式三:uv安装(最快)

# uv是比pip快10倍的Python包管理器
pip install uv  # 先安装uv
uv pip install apache-airflow  # 用uv安装Airflow

初始化与启动

# 初始化元数据数据库(首次运行必须)
airflow db migrate  # 创建Airflow需要的数据库表

# 创建管理员账号
airflow users create \
  --username admin \
  --password admin123 \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com

# 启动三个核心组件(分三个终端窗口)
airflow scheduler  # 终端1:调度器,负责触发任务
airflow dag-processor  # 终端2:DAG处理器,解析Python文件
airflow api-server  # 终端3:API服务器+Web界面

# 访问Web界面:http://localhost:8080

基本使用

编写第一个DAG

# ~/airflow/dags/my_first_dag.py
# DAG文件必须放在 dags/ 目录下

from datetime import datetime  # 导入日期时间模块
from airflow import DAG  # 导入DAG类
from airflow.operators.bash import BashOperator  # 导入Bash操作符
from airflow.operators.python import PythonOperator  # 导入Python操作符

# 定义DAG(工作流)
with DAG(
    dag_id="my_first_pipeline",  # DAG的唯一标识
    start_date=datetime(2026, 1, 1),  # 调度开始日期
    schedule="@daily",  # 调度频率:每天一次
    catchup=False,  # 不回填历史数据
    tags=["tutorial"],  # 标签,方便在界面上筛选
) as dag:

    # 任务1:用Bash命令打印日期
    print_date = BashOperator(
        task_id="print_date",  # 任务唯一标识
        bash_command="date",  # 要执行的Shell命令
    )

    # 任务2:用Python函数处理数据
    def process_data():
        """模拟数据处理"""
        print("正在处理数据...")  # 这里写你的数据处理逻辑
        return "数据处理完成"  # 返回值会自动存入XCom

    process = PythonOperator(
        task_id="process_data",  # 任务标识
        python_callable=process_data,  # 要调用的Python函数
    )

    # 任务3:发送通知
    notify = BashOperator(
        task_id="send_notification",
        bash_command='echo "Pipeline finished at $(date)"',
    )

    # 定义任务执行顺序(先打印日期 → 处理数据 → 发通知)
    print_date >> process >> notify

TaskFlow API写法(更Pythonic)

# ~/airflow/dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task  # 导入装饰器

@dag(
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
)
def etl_pipeline():
    """ETL流水线:提取→转换→加载"""

    @task()  # 用装饰器把普通函数变成Airflow任务
    def extract():
        """提取数据"""
        data = {"users": 100, "orders": 500}  # 模拟从数据库提取
        return data  # 返回值自动通过XCom传递

    @task()
    def transform(raw_data: dict):
        """转换数据"""
        total = raw_data["users"] + raw_data["orders"]  # 计算汇总
        return {"total_records": total}

    @task()
    def load(transformed_data: dict):
        """加载数据到目标"""
        print(f"写入 {transformed_data['total_records']} 条记录")

    # 链式调用,数据自动在任务间传递
    raw = extract()  # 提取
    transformed = transform(raw)  # 转换
    load(transformed)  # 加载

etl_pipeline()  # 调用函数注册DAG

高级用法

动态任务映射(Dynamic Task Mapping)

@task()
def get_file_list():
    """获取需要处理的文件列表"""
    return ["file1.csv", "file2.csv", "file3.csv"]

@task()
def process_file(filename: str):
    """处理单个文件"""
    print(f"正在处理 {filename}")

# expand() 会为列表中的每个元素动态创建一个任务实例
# 3个文件 → 自动生成3个并行任务
process_file.expand(filename=get_file_list())

条件分支

from airflow.operators.python import BranchPythonOperator  # 导入分支操作符

def choose_branch(**context):
    """根据条件选择执行哪个分支"""
    day = context['ds']  # 获取执行日期
    if int(day[-2:]) % 2 == 0:  # 偶数日
        return 'even_day_task'  # 返回要执行的任务ID
    return 'odd_day_task'

branch = BranchPythonOperator(
    task_id='branch_check',
    python_callable=choose_branch,  # 分支判断函数
)

设置连接和变量

from airflow.models import Connection  # 导入连接模型
from airflow.models import Variable  # 导入变量模型

# 在代码中使用连接(通常在Web界面配置)
conn = Connection.get_connection_from_secrets("my_database")
# conn.host, conn.login, conn.password 等

# 使用变量存储配置
api_key = Variable.get("api_key")  # 获取变量值
config = Variable.get("config", deserialize_json=True)  # 获取JSON变量

使用Sensor等待条件

from airflow.sensors.filesystem import FileSensor  # 文件传感器

# 等待文件出现后再继续后面的任务
wait_for_file = FileSensor(
    task_id="wait_for_data",
    filepath="/data/input/daily_data.csv",  # 要等待的文件路径
    poke_interval=60,  # 每60秒检查一次
    timeout=3600,  # 最多等1小时
    mode="poke",  # poke模式:占用一个worker槽位持续检查
    # mode="reschedule"  # reschedule模式:不占槽位,到时间再检查
)

常见报错与解决

报错信息原因解决方案
DAG not foundDAG文件没放对位置或语法错误检查文件在 $AIRFLOW_HOME/dags/ 下,python your_dag.py 看有没有语法错
No module named 'xxx'依赖包没安装在Airflow环境pip install xxx 安装到Airflow所在环境
Task is in 'none' state调度器没在运行确保 airflow scheduler 在运行
Broken DAGDAG文件导入失败在Web界面看"Import Errors",修复代码错误
Permission denied文件权限问题chmod 755 dags/your_dag.py
Database is lockedSQLite并发限制生产环境改用PostgreSQL或MySQL
Executor xxx not found执行器配置错误检查 airflow.cfg 中的 executor 设置

速查表

# ===== 常用CLI命令 =====
airflow dags list              # 列出所有DAG
airflow dags trigger my_dag    # 手动触发一个DAG
airflow tasks test my_dag my_task 2026-01-01  # 测试单个任务
airflow dags pause my_dag      # 暂停DAG调度
airflow dags unpause my_dag    # 恢复DAG调度
airflow dags show my_dag       # 显示DAG结构
airflow db migrate             # 升级数据库
airflow info                   # 显示Airflow系统信息
airflow config list            # 列出所有配置

# ===== 常用调度表达式 =====
# "@once"       只运行一次
# "@hourly"     每小时
# "@daily"      每天
# "@weekly"     每周
# "@monthly"    每月
# "0 6 * * *"   每天早上6点(Cron表达式)
# "*/30 * * * *" 每30分钟

# ===== 任务依赖写法 =====
# task1 >> task2           # task1完成后执行task2
# task1 >> [task2, task3]  # task1后并行执行task2和task3
# [task1, task2] >> task3  # task1和task2都完成后执行task3

同类工具对比

特性AirflowPrefectDagster
定义方式DAG(有向无环图)Flow+Task装饰器Asset资产为中心
学习曲线中等中等
界面美观度★★★★★★★★★★★
社区规模最大(12万+GitHub Stars)中等中等
动态任务3.x支持原生支持原生支持
适合场景大型数据团队、企业级快速上手、中小团队数据质量优先
部署复杂度较高中等

面试建议:Airflow是数据工程面试高频话题。重点理解DAG概念、TaskFlow API、以及Airflow 3.x的新特性(多团队支持、取消ZooKeeper依赖等)。能手写一个简单的ETL DAG是加分项。