Apache Airflow 工作流编排
一句话概述:Apache Airflow 是用 Python 代码定义、调度和监控数据工作流的平台,是数据工程领域最主流的编排工具。
核心知识点表
| 概念 | 白话解释 |
|---|
| DAG | 有向无环图,就是把任务按先后顺序画出来的"流程图" |
| Task | DAG里的一个具体步骤,比如"下载数据""清洗数据" |
| Operator | 定义Task做什么事的模板,比如BashOperator就是跑Shell命令 |
| Sensor | 一种特殊Operator,会等某个条件满足才往下走(比如等文件出现) |
| XCom | 任务之间传数据的信使,A任务的输出可以通过XCom传给B任务 |
| Executor | 决定任务在哪里跑的组件(本地、Celery集群、Kubernetes等) |
| 轻量化架构 | Airflow 3.x引入Task Execution Interface,简化执行架构 |
| TaskFlow API | 用Python装饰器@task写任务,比传统Operator更简洁 |
版本信息(2026年5月)
| 版本线 | 最新版本 | 说明 |
|---|
| 3.x | 3.2.1 | 当前主力版本,多团队支持、网格视图虚拟化 |
| 2.x | 2.11.2 | 旧版本线,即将停止维护 |
安装配置
方式一:pip安装(推荐学习用)
# 创建conda虚拟环境(推荐隔离)
conda create -n airflow python=3.12 -y # 创建Python 3.12环境
conda activate airflow # 激活环境
# 设置Airflow的家目录(所有配置文件都放这里)
export AIRFLOW_HOME=~/airflow
# 用约束文件安装(避免依赖冲突)
pip install "apache-airflow==3.2.1" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.1/constraints-3.12.txt"
# 约束文件锁定了所有依赖版本,保证安装不出错
方式二:Docker安装(推荐生产用)
# 拉取官方Docker镜像
docker pull apache/airflow:3.2.1 # 下载Airflow 3.2.1镜像
# 下载官方docker-compose文件
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/3.2.1/docker-compose.yaml'
# 初始化数据库
docker compose up airflow-init # 创建数据库表和默认用户
# 启动所有服务
docker compose up -d # -d 后台运行
方式三:uv安装(最快)
# uv是比pip快10倍的Python包管理器
pip install uv # 先安装uv
uv pip install apache-airflow # 用uv安装Airflow
初始化与启动
# 初始化元数据数据库(首次运行必须)
airflow db migrate # 创建Airflow需要的数据库表
# 创建管理员账号
airflow users create \
--username admin \
--password admin123 \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# 启动三个核心组件(分三个终端窗口)
airflow scheduler # 终端1:调度器,负责触发任务
airflow dag-processor # 终端2:DAG处理器,解析Python文件
airflow api-server # 终端3:API服务器+Web界面
# 访问Web界面:http://localhost:8080
基本使用
编写第一个DAG
# ~/airflow/dags/my_first_dag.py
# DAG文件必须放在 dags/ 目录下
from datetime import datetime # 导入日期时间模块
from airflow import DAG # 导入DAG类
from airflow.operators.bash import BashOperator # 导入Bash操作符
from airflow.operators.python import PythonOperator # 导入Python操作符
# 定义DAG(工作流)
with DAG(
dag_id="my_first_pipeline", # DAG的唯一标识
start_date=datetime(2026, 1, 1), # 调度开始日期
schedule="@daily", # 调度频率:每天一次
catchup=False, # 不回填历史数据
tags=["tutorial"], # 标签,方便在界面上筛选
) as dag:
# 任务1:用Bash命令打印日期
print_date = BashOperator(
task_id="print_date", # 任务唯一标识
bash_command="date", # 要执行的Shell命令
)
# 任务2:用Python函数处理数据
def process_data():
"""模拟数据处理"""
print("正在处理数据...") # 这里写你的数据处理逻辑
return "数据处理完成" # 返回值会自动存入XCom
process = PythonOperator(
task_id="process_data", # 任务标识
python_callable=process_data, # 要调用的Python函数
)
# 任务3:发送通知
notify = BashOperator(
task_id="send_notification",
bash_command='echo "Pipeline finished at $(date)"',
)
# 定义任务执行顺序(先打印日期 → 处理数据 → 发通知)
print_date >> process >> notify
TaskFlow API写法(更Pythonic)
# ~/airflow/dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task # 导入装饰器
@dag(
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
)
def etl_pipeline():
"""ETL流水线:提取→转换→加载"""
@task() # 用装饰器把普通函数变成Airflow任务
def extract():
"""提取数据"""
data = {"users": 100, "orders": 500} # 模拟从数据库提取
return data # 返回值自动通过XCom传递
@task()
def transform(raw_data: dict):
"""转换数据"""
total = raw_data["users"] + raw_data["orders"] # 计算汇总
return {"total_records": total}
@task()
def load(transformed_data: dict):
"""加载数据到目标"""
print(f"写入 {transformed_data['total_records']} 条记录")
# 链式调用,数据自动在任务间传递
raw = extract() # 提取
transformed = transform(raw) # 转换
load(transformed) # 加载
etl_pipeline() # 调用函数注册DAG
高级用法
动态任务映射(Dynamic Task Mapping)
@task()
def get_file_list():
"""获取需要处理的文件列表"""
return ["file1.csv", "file2.csv", "file3.csv"]
@task()
def process_file(filename: str):
"""处理单个文件"""
print(f"正在处理 {filename}")
# expand() 会为列表中的每个元素动态创建一个任务实例
# 3个文件 → 自动生成3个并行任务
process_file.expand(filename=get_file_list())
条件分支
from airflow.operators.python import BranchPythonOperator # 导入分支操作符
def choose_branch(**context):
"""根据条件选择执行哪个分支"""
day = context['ds'] # 获取执行日期
if int(day[-2:]) % 2 == 0: # 偶数日
return 'even_day_task' # 返回要执行的任务ID
return 'odd_day_task'
branch = BranchPythonOperator(
task_id='branch_check',
python_callable=choose_branch, # 分支判断函数
)
设置连接和变量
from airflow.models import Connection # 导入连接模型
from airflow.models import Variable # 导入变量模型
# 在代码中使用连接(通常在Web界面配置)
conn = Connection.get_connection_from_secrets("my_database")
# conn.host, conn.login, conn.password 等
# 使用变量存储配置
api_key = Variable.get("api_key") # 获取变量值
config = Variable.get("config", deserialize_json=True) # 获取JSON变量
使用Sensor等待条件
from airflow.sensors.filesystem import FileSensor # 文件传感器
# 等待文件出现后再继续后面的任务
wait_for_file = FileSensor(
task_id="wait_for_data",
filepath="/data/input/daily_data.csv", # 要等待的文件路径
poke_interval=60, # 每60秒检查一次
timeout=3600, # 最多等1小时
mode="poke", # poke模式:占用一个worker槽位持续检查
# mode="reschedule" # reschedule模式:不占槽位,到时间再检查
)
常见报错与解决
| 报错信息 | 原因 | 解决方案 |
|---|
DAG not found | DAG文件没放对位置或语法错误 | 检查文件在 $AIRFLOW_HOME/dags/ 下,python your_dag.py 看有没有语法错 |
No module named 'xxx' | 依赖包没安装在Airflow环境 | pip install xxx 安装到Airflow所在环境 |
Task is in 'none' state | 调度器没在运行 | 确保 airflow scheduler 在运行 |
Broken DAG | DAG文件导入失败 | 在Web界面看"Import Errors",修复代码错误 |
Permission denied | 文件权限问题 | chmod 755 dags/your_dag.py |
Database is locked | SQLite并发限制 | 生产环境改用PostgreSQL或MySQL |
Executor xxx not found | 执行器配置错误 | 检查 airflow.cfg 中的 executor 设置 |
速查表
# ===== 常用CLI命令 =====
airflow dags list # 列出所有DAG
airflow dags trigger my_dag # 手动触发一个DAG
airflow tasks test my_dag my_task 2026-01-01 # 测试单个任务
airflow dags pause my_dag # 暂停DAG调度
airflow dags unpause my_dag # 恢复DAG调度
airflow dags show my_dag # 显示DAG结构
airflow db migrate # 升级数据库
airflow info # 显示Airflow系统信息
airflow config list # 列出所有配置
# ===== 常用调度表达式 =====
# "@once" 只运行一次
# "@hourly" 每小时
# "@daily" 每天
# "@weekly" 每周
# "@monthly" 每月
# "0 6 * * *" 每天早上6点(Cron表达式)
# "*/30 * * * *" 每30分钟
# ===== 任务依赖写法 =====
# task1 >> task2 # task1完成后执行task2
# task1 >> [task2, task3] # task1后并行执行task2和task3
# [task1, task2] >> task3 # task1和task2都完成后执行task3
同类工具对比
| 特性 | Airflow | Prefect | Dagster |
|---|
| 定义方式 | DAG(有向无环图) | Flow+Task装饰器 | Asset资产为中心 |
| 学习曲线 | 中等 | 低 | 中等 |
| 界面美观度 | ★★★ | ★★★★ | ★★★★ |
| 社区规模 | 最大(12万+GitHub Stars) | 中等 | 中等 |
| 动态任务 | 3.x支持 | 原生支持 | 原生支持 |
| 适合场景 | 大型数据团队、企业级 | 快速上手、中小团队 | 数据质量优先 |
| 部署复杂度 | 较高 | 低 | 中等 |
面试建议:Airflow是数据工程面试高频话题。重点理解DAG概念、TaskFlow API、以及Airflow 3.x的新特性(多团队支持、取消ZooKeeper依赖等)。能手写一个简单的ETL DAG是加分项。