dbt 数据转换
一句话概述:dbt(data build tool)是数据转换工具,让你用写SQL的方式构建数据仓库的转换层,并自动管理依赖、测试和文档。
核心知识点表
| 概念 | 白话解释 |
|---|
| Model | 一个SQL文件就是一个Model,运行后变成数据库里的表或视图 |
| Source | 原始数据表的声明(告诉dbt原始数据在哪里) |
| Ref | 用{{ ref('model_name') }}引用其他Model,dbt自动建立依赖 |
| Seed | 把CSV文件加载到数据库的功能(适合小型参考数据) |
| Test | 数据测试,验证数据质量(不为空、唯一性等) |
| Snapshot | 快照,追踪数据随时间的变化(慢变维度) |
| Macro | Jinja宏,可复用的SQL模板 |
| Materialization | 物化方式:view(视图)、table(表)、incremental(增量) |
| Semantic Layer | 语义层,用YAML定义指标,统一口径 |
版本信息(2026年5月)
- dbt Core:1.11.8
- dbt Fusion引擎:2.0+(全新Rust重写引擎,极快)
- dbt Cloud:持续更新,支持MCP服务器
安装配置
安装dbt Core
# 创建虚拟环境
conda create -n dbt python=3.12 -y
conda activate dbt
# 根据你的数据库安装对应适配器
pip install dbt-postgres # PostgreSQL
pip install dbt-snowflake # Snowflake
pip install dbt-bigquery # BigQuery
pip install dbt-duckdb # DuckDB(学习推荐,不需要外部数据库)
# 验证安装
dbt --version # 显示版本
# 初始化项目
dbt init my_dbt_project # 交互式创建项目
cd my_dbt_project
配置数据库连接
# ~/.dbt/profiles.yml(数据库连接配置)
my_dbt_project:
target: dev # 默认使用dev环境
outputs:
dev: # 开发环境
type: duckdb # 数据库类型(学习用DuckDB最方便)
path: dev.duckdb # DuckDB文件路径
prod: # 生产环境
type: postgres
host: localhost
port: 5432
user: analyst
password: "{{ env_var('DBT_PASSWORD') }}" # 从环境变量读密码
dbname: warehouse
schema: analytics
项目结构
my_dbt_project/
├── dbt_project.yml # 项目配置文件
├── models/ # SQL模型(核心)
│ ├── staging/ # 基础层:清洗原始数据
│ ├── intermediate/ # 中间层:业务逻辑
│ └── marts/ # 输出层:面向分析的宽表
├── seeds/ # CSV种子数据
├── tests/ # 自定义测试
├── macros/ # 可复用SQL宏
├── snapshots/ # 快照
└── analyses/ # 分析SQL(不会被物化)
基本使用
定义Source
# models/staging/_sources.yml
version: 2
sources:
- name: raw_data # 源名称
description: "原始业务数据"
database: raw_db # 数据库名
schema: public # Schema名
tables:
- name: users # 原始用户表
description: "用户注册信息"
columns:
- name: id
tests: # 数据测试
- not_null # id不能为空
- unique # id必须唯一
- name: orders # 原始订单表
description: "订单信息"
编写Model
-- models/staging/stg_users.sql
-- 基础层:清洗原始用户数据
{{ config(materialized='view') }} -- 物化为视图(每次查询实时计算)
SELECT
id AS user_id, -- 重命名为统一格式
LOWER(TRIM(email)) AS email, -- 邮箱转小写并去空格
first_name || ' ' || last_name AS full_name, -- 拼接全名
created_at, -- 注册时间
CASE
WHEN status = 'A' THEN 'active' -- 状态码转可读文本
WHEN status = 'I' THEN 'inactive'
ELSE 'unknown'
END AS user_status
FROM {{ source('raw_data', 'users') }} -- 引用Source中定义的原始表
WHERE id IS NOT NULL -- 过滤掉空ID
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}
SELECT
id AS order_id,
user_id,
amount,
created_at AS order_date,
status AS order_status
FROM {{ source('raw_data', 'orders') }}
-- models/marts/fct_user_orders.sql
-- 输出层:用户订单汇总宽表
{{ config(
materialized='table', -- 物化为实体表(查询更快)
schema='analytics' -- 放在analytics schema下
) }}
SELECT
u.user_id,
u.full_name,
u.email,
COUNT(o.order_id) AS total_orders, -- 订单总数
SUM(o.amount) AS total_spent, -- 总消费金额
MIN(o.order_date) AS first_order_date, -- 首次下单时间
MAX(o.order_date) AS last_order_date -- 最近下单时间
FROM {{ ref('stg_users') }} u -- 引用staging层的用户模型
LEFT JOIN {{ ref('stg_orders') }} o -- 引用staging层的订单模型
ON u.user_id = o.user_id
GROUP BY u.user_id, u.full_name, u.email
运行dbt
# 核心命令
dbt run # 运行所有模型(创建表/视图)
dbt run --select stg_users # 只运行指定模型
dbt run --select marts.* # 运行marts目录下所有模型
dbt test # 运行所有数据测试
dbt build # run + test + snapshot 一步到位
dbt seed # 加载CSV种子数据
dbt docs generate # 生成文档
dbt docs serve # 启动文档网站(含数据血缘图)
高级用法
增量模型
-- models/marts/fct_daily_events.sql
-- 增量模型:只处理新数据,不全量重建
{{ config(
materialized='incremental', -- 增量物化
unique_key='event_id', -- 去重键
incremental_strategy='merge', -- 合并策略
) }}
SELECT
event_id,
user_id,
event_type,
event_timestamp,
properties
FROM {{ source('raw_data', 'events') }}
{% if is_incremental() %}
-- 增量运行时,只处理上次运行之后的新数据
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
Jinja宏
-- macros/cents_to_dollars.sql
-- 自定义宏:分转元
{% macro cents_to_dollars(column_name) %}
ROUND({{ column_name }} / 100.0, 2) -- 分除以100得到元,保留2位小数
{% endmacro %}
-- 使用方式(在任何Model中):
-- SELECT {{ cents_to_dollars('amount_cents') }} AS amount_dollars
数据测试
# models/marts/_schema.yml
version: 2
models:
- name: fct_user_orders
description: "用户订单汇总表"
columns:
- name: user_id
tests:
- not_null # 不能为空
- unique # 必须唯一
- name: total_orders
tests:
- not_null
- dbt_utils.accepted_range: # 值在合理范围内
min_value: 0
inclusive: true
- name: total_spent
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
快照(慢变维度)
-- snapshots/snap_users.sql
-- 追踪用户状态随时间的变化
{% snapshot snap_users %}
{{ config(
target_schema='snapshots',
unique_key='user_id',
strategy='timestamp', -- 基于时间戳检测变化
updated_at='updated_at', -- 用这个字段判断是否有更新
) }}
SELECT * FROM {{ source('raw_data', 'users') }}
{% endsnapshot %}
常见报错与解决
| 报错信息 | 原因 | 解决方案 |
|---|
Compilation Error: source not found | Source没定义或名字写错 | 检查_sources.yml文件 |
Database Error: relation does not exist | 依赖的表还没创建 | 先运行上游模型 dbt run --select +model_name |
dbt_project.yml not found | 没在项目根目录运行 | cd到包含dbt_project.yml的目录 |
Profile not found | profiles.yml配置缺失 | 检查~/.dbt/profiles.yml存在且格式正确 |
Incremental model error | 增量模型首次运行报错 | 加--full-refresh全量运行一次 |
Test failed | 数据质量问题 | 检查数据,修复源数据或调整测试规则 |
速查表
# ===== 核心命令 =====
dbt run # 运行所有模型
dbt test # 运行所有测试
dbt build # run + test + snapshot
dbt seed # 加载CSV数据
dbt docs generate && dbt docs serve # 生成并查看文档
# ===== 模型选择语法 =====
dbt run --select model_name # 运行单个模型
dbt run --select +model_name # 运行模型及其所有上游依赖
dbt run --select model_name+ # 运行模型及其所有下游依赖
dbt run --select +model_name+ # 运行模型及所有上下游
dbt run --select tag:daily # 运行带daily标签的模型
dbt run --select path/to/models # 运行指定目录
# ===== 物化方式 =====
# view 视图,不存数据(默认)
# table 物理表,全量刷新
# incremental 增量表,只处理新数据
# ephemeral 临时CTE,不创建实体
# ===== Jinja常用语法 =====
# {{ ref('model_name') }} 引用模型
# {{ source('source', 'table') }} 引用源表
# {{ config(...) }} 模型配置
# {% if is_incremental() %} 增量判断
# {{ this }} 当前模型的表名
同类工具对比
| 特性 | dbt | SQLMesh | Dataform |
|---|
| 语言 | SQL + Jinja | SQL + Python | SQL + JS |
| 增量模型 | 支持 | 更强大 | 支持 |
| 数据血缘 | 自动生成 | 自动生成 | 自动生成 |
| 版本管理 | Git | Git | Git |
| 云服务 | dbt Cloud | Tobiko Cloud | Google BigQuery |
| 社区 | 最大(50k+ Stars) | 快速增长 | Google生态 |
| 适合场景 | 通用数据转换 | 需要更强增量能力 | GCP用户 |
面试建议:dbt是现代数据栈的核心组件。重点理解"ELT vs ETL"的区别——dbt只做T(Transform),E和L由其他工具完成。会写staging/marts分层模型、知道ref()和source()的用法、理解增量模型原理是加分项。