dbt 数据转换¶
为什么要学¶
dbt (data build tool) 是现代数据栈的核心组件,让数据转换变得工程化:
- SQL 即转换:用 SELECT 语句定义转换,dbt 处理 DDL/DML
- 模型化:每个 SQL 文件是一个"模型",有明确的依赖关系
- 测试内置:对数据质量进行断言测试
- 文档自动化:自动生成数据文档和 DAG 图
- 版本控制:SQL 代码在 Git 中管理
- 数据血缘:自动追踪数据从哪来、到哪去
任何做数据工程/分析工程的人都应该掌握 dbt。
核心概念¶
| 概念 | 说明 | 类比 |
|---|---|---|
| Model | 一个SQL SELECT = 一张表/视图 | 函数 |
| Source | 原始数据表声明 | 数据入口 |
| Ref | 模型间的依赖引用 | import |
| Test | 数据质量断言 | 单元测试 |
| Macro | 可复用的SQL模板(Jinja) | 函数/模板 |
| Snapshot | 慢变维度(SCD)追踪 | 历史变更记录 |
| Seed | CSV文件加载为表 | 测试数据/维度表 |
| Materialization | 物化方式(view/table/incremental) | 存储策略 |
安装配置¶
# dbt Core (开源)
pip install dbt-core dbt-postgres # PostgreSQL
pip install dbt-core dbt-bigquery # BigQuery
pip install dbt-core dbt-snowflake # Snowflake
pip install dbt-core dbt-duckdb # DuckDB(本地开发)
# 初始化项目
dbt init my_project
cd my_project
profiles.yml¶
# ~/.dbt/profiles.yml
my_project:
target: dev
outputs:
dev:
type: duckdb
path: dev.duckdb
prod:
type: postgres
host: localhost
port: 5432
user: analytics
password: "{{ env_var('DB_PASSWORD') }}"
dbname: warehouse
schema: analytics
快速上手¶
项目结构¶
my_project/
├── models/
│ ├── staging/ # 清洗层
│ │ ├── stg_orders.sql
│ │ └── _stg_models.yml
│ ├── intermediate/ # 中间层
│ │ └── int_orders_pivoted.sql
│ └── marts/ # 集市层(最终表)
│ ├── dim_customers.sql
│ └── fct_orders.sql
├── tests/
├── macros/
├── seeds/
├── dbt_project.yml
└── profiles.yml
定义模型¶
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
cleaned AS (
SELECT
id AS order_id,
user_id AS customer_id,
CAST(amount AS DECIMAL(10,2)) AS order_amount,
status,
CAST(created_at AS TIMESTAMP) AS ordered_at
FROM source
WHERE status != 'cancelled'
)
SELECT * FROM cleaned
-- models/marts/fct_orders.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
)
SELECT
o.order_id,
o.customer_id,
c.customer_name,
o.order_amount,
o.ordered_at,
DATE_TRUNC('month', o.ordered_at) AS order_month
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
定义测试和文档¶
# models/staging/_stg_models.yml
version: 2
models:
- name: stg_orders
description: "清洗后的订单数据"
columns:
- name: order_id
description: "订单唯一标识"
tests:
- unique
- not_null
- name: order_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
- name: status
tests:
- accepted_values:
values: ['pending', 'shipped', 'delivered']
运行¶
dbt run # 运行所有模型
dbt run --select staging # 只运行staging目录
dbt run --select fct_orders+ # 运行fct_orders及其下游
dbt test # 运行所有测试
dbt docs generate # 生成文档
dbt docs serve # 查看文档(含DAG图)
进阶用法¶
增量模型¶
-- models/marts/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id'
)
}}
SELECT
event_id,
user_id,
event_type,
created_at
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}
Macro(可复用SQL)¶
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
ROUND({{ column_name }}::NUMERIC / 100, 2)
{% endmacro %}
-- 使用
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM orders
Snapshot(慢变维度)¶
-- snapshots/scd_customers.sql
{% snapshot scd_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
SELECT * FROM {{ source('raw', 'customers') }}
{% endsnapshot %}
常见问题¶
Q1: dbt Core vs dbt Cloud?¶
| 方面 | dbt Core | dbt Cloud |
|---|---|---|
| 价格 | 免费开源 | 付费SaaS |
| 调度 | 需要自己搭(Airflow等) | 内置调度器 |
| IDE | VS Code/CLI | 浏览器IDE |
| 文档 | 本地生成 | 在线托管 |
| 适合 | 有DevOps能力的团队 | 快速上手的团队 |
Q2: 和 Airflow 的关系?¶
dbt 做"转换"(T in ELT),Airflow 做"编排"(何时运行dbt)。两者互补。
参考资源¶
- dbt 官方文档 - 完整文档
- dbt GitHub - 源代码
- dbt Learn - 免费课程
- dbt Community - 社区论坛