跳转至

dbt 数据转换

为什么要学

dbt (data build tool) 是现代数据栈的核心组件,让数据转换变得工程化:

  • SQL 即转换:用 SELECT 语句定义转换,dbt 处理 DDL/DML
  • 模型化:每个 SQL 文件是一个"模型",有明确的依赖关系
  • 测试内置:对数据质量进行断言测试
  • 文档自动化:自动生成数据文档和 DAG 图
  • 版本控制:SQL 代码在 Git 中管理
  • 数据血缘:自动追踪数据从哪来、到哪去

任何做数据工程/分析工程的人都应该掌握 dbt。

核心概念

概念说明类比
Model一个SQL SELECT = 一张表/视图函数
Source原始数据表声明数据入口
Ref模型间的依赖引用import
Test数据质量断言单元测试
Macro可复用的SQL模板(Jinja)函数/模板
Snapshot慢变维度(SCD)追踪历史变更记录
SeedCSV文件加载为表测试数据/维度表
Materialization物化方式(view/table/incremental)存储策略

安装配置

# dbt Core (开源)
pip install dbt-core dbt-postgres   # PostgreSQL
pip install dbt-core dbt-bigquery   # BigQuery
pip install dbt-core dbt-snowflake  # Snowflake
pip install dbt-core dbt-duckdb     # DuckDB(本地开发)

# 初始化项目
dbt init my_project
cd my_project

profiles.yml

# ~/.dbt/profiles.yml
my_project:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
    prod:
      type: postgres
      host: localhost
      port: 5432
      user: analytics
      password: "{{ env_var('DB_PASSWORD') }}"
      dbname: warehouse
      schema: analytics

快速上手

项目结构

my_project/
├── models/
│   ├── staging/          # 清洗层
│   │   ├── stg_orders.sql
│   │   └── _stg_models.yml
│   ├── intermediate/     # 中间层
│   │   └── int_orders_pivoted.sql
│   └── marts/            # 集市层(最终表)
│       ├── dim_customers.sql
│       └── fct_orders.sql
├── tests/
├── macros/
├── seeds/
├── dbt_project.yml
└── profiles.yml

定义模型

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
),

cleaned AS (
    SELECT
        id AS order_id,
        user_id AS customer_id,
        CAST(amount AS DECIMAL(10,2)) AS order_amount,
        status,
        CAST(created_at AS TIMESTAMP) AS ordered_at
    FROM source
    WHERE status != 'cancelled'
)

SELECT * FROM cleaned
-- models/marts/fct_orders.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
)

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.order_amount,
    o.ordered_at,
    DATE_TRUNC('month', o.ordered_at) AS order_month
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id

定义测试和文档

# models/staging/_stg_models.yml
version: 2

models:
  - name: stg_orders
    description: "清洗后的订单数据"
    columns:
      - name: order_id
        description: "订单唯一标识"
        tests:
          - unique
          - not_null
      - name: order_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered']

运行

dbt run                    # 运行所有模型
dbt run --select staging   # 只运行staging目录
dbt run --select fct_orders+  # 运行fct_orders及其下游
dbt test                   # 运行所有测试
dbt docs generate          # 生成文档
dbt docs serve             # 查看文档(含DAG图)

进阶用法

增量模型

-- models/marts/fct_events.sql
{{
  config(
    materialized='incremental',
    unique_key='event_id'
  )
}}

SELECT
    event_id,
    user_id,
    event_type,
    created_at
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
  WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}

Macro(可复用SQL)

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    ROUND({{ column_name }}::NUMERIC / 100, 2)
{% endmacro %}

-- 使用
SELECT
    order_id,
    {{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM orders

Snapshot(慢变维度)

-- snapshots/scd_customers.sql
{% snapshot scd_customers %}
{{
    config(
      target_schema='snapshots',
      unique_key='customer_id',
      strategy='timestamp',
      updated_at='updated_at',
    )
}}

SELECT * FROM {{ source('raw', 'customers') }}

{% endsnapshot %}

常见问题

Q1: dbt Core vs dbt Cloud?

方面dbt Coredbt Cloud
价格免费开源付费SaaS
调度需要自己搭(Airflow等)内置调度器
IDEVS Code/CLI浏览器IDE
文档本地生成在线托管
适合有DevOps能力的团队快速上手的团队

Q2: 和 Airflow 的关系?

dbt 做"转换"(T in ELT),Airflow 做"编排"(何时运行dbt)。两者互补。

参考资源