跳转至

dbt 数据转换

一句话概述:dbt(data build tool)是数据转换工具,让你用写SQL的方式构建数据仓库的转换层,并自动管理依赖、测试和文档。

核心知识点表

概念白话解释
Model一个SQL文件就是一个Model,运行后变成数据库里的表或视图
Source原始数据表的声明(告诉dbt原始数据在哪里)
Ref{{ ref('model_name') }}引用其他Model,dbt自动建立依赖
Seed把CSV文件加载到数据库的功能(适合小型参考数据)
Test数据测试,验证数据质量(不为空、唯一性等)
Snapshot快照,追踪数据随时间的变化(慢变维度)
MacroJinja宏,可复用的SQL模板
Materialization物化方式:view(视图)、table(表)、incremental(增量)
Semantic Layer语义层,用YAML定义指标,统一口径

版本信息(2026年5月)

  • dbt Core:1.11.8
  • dbt Fusion引擎:2.0+(全新Rust重写引擎,极快)
  • dbt Cloud:持续更新,支持MCP服务器

安装配置

安装dbt Core

# 创建虚拟环境
conda create -n dbt python=3.12 -y
conda activate dbt

# 根据你的数据库安装对应适配器
pip install dbt-postgres    # PostgreSQL
pip install dbt-snowflake   # Snowflake
pip install dbt-bigquery    # BigQuery
pip install dbt-duckdb      # DuckDB(学习推荐,不需要外部数据库)

# 验证安装
dbt --version  # 显示版本

# 初始化项目
dbt init my_dbt_project  # 交互式创建项目
cd my_dbt_project

配置数据库连接

# ~/.dbt/profiles.yml(数据库连接配置)
my_dbt_project:
  target: dev  # 默认使用dev环境
  outputs:
    dev:  # 开发环境
      type: duckdb  # 数据库类型(学习用DuckDB最方便)
      path: dev.duckdb  # DuckDB文件路径
    prod:  # 生产环境
      type: postgres
      host: localhost
      port: 5432
      user: analyst
      password: "{{ env_var('DBT_PASSWORD') }}"  # 从环境变量读密码
      dbname: warehouse
      schema: analytics

项目结构

my_dbt_project/
├── dbt_project.yml    # 项目配置文件
├── models/            # SQL模型(核心)
│   ├── staging/       # 基础层:清洗原始数据
│   ├── intermediate/  # 中间层:业务逻辑
│   └── marts/         # 输出层:面向分析的宽表
├── seeds/             # CSV种子数据
├── tests/             # 自定义测试
├── macros/            # 可复用SQL宏
├── snapshots/         # 快照
└── analyses/          # 分析SQL(不会被物化)

基本使用

定义Source

# models/staging/_sources.yml
version: 2
sources:
  - name: raw_data  # 源名称
    description: "原始业务数据"
    database: raw_db  # 数据库名
    schema: public  # Schema名
    tables:
      - name: users  # 原始用户表
        description: "用户注册信息"
        columns:
          - name: id
            tests:  # 数据测试
              - not_null  # id不能为空
              - unique    # id必须唯一
      - name: orders  # 原始订单表
        description: "订单信息"

编写Model

-- models/staging/stg_users.sql
-- 基础层:清洗原始用户数据

{{ config(materialized='view') }}  -- 物化为视图(每次查询实时计算)

SELECT
    id AS user_id,                          -- 重命名为统一格式
    LOWER(TRIM(email)) AS email,            -- 邮箱转小写并去空格
    first_name || ' ' || last_name AS full_name,  -- 拼接全名
    created_at,                              -- 注册时间
    CASE
        WHEN status = 'A' THEN 'active'     -- 状态码转可读文本
        WHEN status = 'I' THEN 'inactive'
        ELSE 'unknown'
    END AS user_status
FROM {{ source('raw_data', 'users') }}      -- 引用Source中定义的原始表
WHERE id IS NOT NULL                         -- 过滤掉空ID
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}

SELECT
    id AS order_id,
    user_id,
    amount,
    created_at AS order_date,
    status AS order_status
FROM {{ source('raw_data', 'orders') }}
-- models/marts/fct_user_orders.sql
-- 输出层:用户订单汇总宽表

{{ config(
    materialized='table',  -- 物化为实体表(查询更快)
    schema='analytics'     -- 放在analytics schema下
) }}

SELECT
    u.user_id,
    u.full_name,
    u.email,
    COUNT(o.order_id) AS total_orders,      -- 订单总数
    SUM(o.amount) AS total_spent,           -- 总消费金额
    MIN(o.order_date) AS first_order_date,  -- 首次下单时间
    MAX(o.order_date) AS last_order_date    -- 最近下单时间
FROM {{ ref('stg_users') }} u               -- 引用staging层的用户模型
LEFT JOIN {{ ref('stg_orders') }} o         -- 引用staging层的订单模型
    ON u.user_id = o.user_id
GROUP BY u.user_id, u.full_name, u.email

运行dbt

# 核心命令
dbt run              # 运行所有模型(创建表/视图)
dbt run --select stg_users  # 只运行指定模型
dbt run --select marts.*    # 运行marts目录下所有模型
dbt test             # 运行所有数据测试
dbt build            # run + test + snapshot 一步到位
dbt seed             # 加载CSV种子数据
dbt docs generate    # 生成文档
dbt docs serve       # 启动文档网站(含数据血缘图)

高级用法

增量模型

-- models/marts/fct_daily_events.sql
-- 增量模型:只处理新数据,不全量重建

{{ config(
    materialized='incremental',          -- 增量物化
    unique_key='event_id',               -- 去重键
    incremental_strategy='merge',        -- 合并策略
) }}

SELECT
    event_id,
    user_id,
    event_type,
    event_timestamp,
    properties
FROM {{ source('raw_data', 'events') }}

{% if is_incremental() %}
    -- 增量运行时,只处理上次运行之后的新数据
    WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}

Jinja宏

-- macros/cents_to_dollars.sql
-- 自定义宏:分转元

{% macro cents_to_dollars(column_name) %}
    ROUND({{ column_name }} / 100.0, 2)  -- 分除以100得到元,保留2位小数
{% endmacro %}

-- 使用方式(在任何Model中):
-- SELECT {{ cents_to_dollars('amount_cents') }} AS amount_dollars

数据测试

# models/marts/_schema.yml
version: 2
models:
  - name: fct_user_orders
    description: "用户订单汇总表"
    columns:
      - name: user_id
        tests:
          - not_null  # 不能为空
          - unique    # 必须唯一
      - name: total_orders
        tests:
          - not_null
          - dbt_utils.accepted_range:  # 值在合理范围内
              min_value: 0
              inclusive: true
      - name: total_spent
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

快照(慢变维度)

-- snapshots/snap_users.sql
-- 追踪用户状态随时间的变化

{% snapshot snap_users %}

{{ config(
    target_schema='snapshots',
    unique_key='user_id',
    strategy='timestamp',           -- 基于时间戳检测变化
    updated_at='updated_at',        -- 用这个字段判断是否有更新
) }}

SELECT * FROM {{ source('raw_data', 'users') }}

{% endsnapshot %}

常见报错与解决

报错信息原因解决方案
Compilation Error: source not foundSource没定义或名字写错检查_sources.yml文件
Database Error: relation does not exist依赖的表还没创建先运行上游模型 dbt run --select +model_name
dbt_project.yml not found没在项目根目录运行cd到包含dbt_project.yml的目录
Profile not foundprofiles.yml配置缺失检查~/.dbt/profiles.yml存在且格式正确
Incremental model error增量模型首次运行报错--full-refresh全量运行一次
Test failed数据质量问题检查数据,修复源数据或调整测试规则

速查表

# ===== 核心命令 =====
dbt run                    # 运行所有模型
dbt test                   # 运行所有测试
dbt build                  # run + test + snapshot
dbt seed                   # 加载CSV数据
dbt docs generate && dbt docs serve  # 生成并查看文档

# ===== 模型选择语法 =====
dbt run --select model_name       # 运行单个模型
dbt run --select +model_name      # 运行模型及其所有上游依赖
dbt run --select model_name+      # 运行模型及其所有下游依赖
dbt run --select +model_name+     # 运行模型及所有上下游
dbt run --select tag:daily        # 运行带daily标签的模型
dbt run --select path/to/models   # 运行指定目录

# ===== 物化方式 =====
# view         视图,不存数据(默认)
# table        物理表,全量刷新
# incremental  增量表,只处理新数据
# ephemeral    临时CTE,不创建实体

# ===== Jinja常用语法 =====
# {{ ref('model_name') }}          引用模型
# {{ source('source', 'table') }}  引用源表
# {{ config(...) }}                模型配置
# {% if is_incremental() %}        增量判断
# {{ this }}                       当前模型的表名

同类工具对比

特性dbtSQLMeshDataform
语言SQL + JinjaSQL + PythonSQL + JS
增量模型支持更强大支持
数据血缘自动生成自动生成自动生成
版本管理GitGitGit
云服务dbt CloudTobiko CloudGoogle BigQuery
社区最大(50k+ Stars)快速增长Google生态
适合场景通用数据转换需要更强增量能力GCP用户

面试建议:dbt是现代数据栈的核心组件。重点理解"ELT vs ETL"的区别——dbt只做T(Transform),E和L由其他工具完成。会写staging/marts分层模型、知道ref()和source()的用法、理解增量模型原理是加分项。