跳转至

Zipkin 链路追踪

一句话概述:Zipkin 是 Twitter 开源的分布式链路追踪系统,收集微服务间请求的时序数据,帮你可视化一个请求经过了哪些服务、每步耗时多少,快速定位性能瓶颈。现代用法推荐用 OpenTelemetry SDK + Zipkin 作为后端存储的组合。

核心知识点

概念白话解释
Trace追踪 = 一个请求的完整路径(从发起到返回的全过程)
Span跨度 = 路径中某个服务处理请求的时间段
Trace ID追踪 ID = 全局唯一标识符,贯穿整个请求链
Span ID跨度 ID = 每个服务处理段的唯一标识
Parent Span父跨度 = 调用方的 Span(调用我的那个服务)
Tag标签 = Span 的元数据(如 http.method=GET)
Annotation注解 = 时间点标记(cs=客户端发送, sr=服务端收到, ss=服务端发送, cr=客户端收到)
Exporter导出器 = 把追踪数据发送到 Zipkin 的组件
OpenTelemetry追踪标准 = 厂商中立的追踪规范,取代各家私有 SDK

白话解释

假设用户访问你的网站下单,这个请求经过:

浏览器 → Nginx → API网关 → 用户服务 → 库存服务 → 数据库

每个环节都可能出问题,但日志只告诉你"出错了",不告诉你"慢在哪里"。

Zipkin 做的事:给每个请求打一个唯一"快递单号"(Trace ID),每个服务处理时都记录"我什么时候收到,什么时候处理完"(Span),最后在 Zipkin UI 里,你能看到:

整个请求耗时 500ms:
  API网关处理:20ms
  用户服务查询:30ms
  库存服务查询:100ms(← 慢在这里!)
    ├── Redis 缓存查询:5ms
    └── MySQL 查询:95ms(← 真正的瓶颈)

这样你就能精准定位慢在哪个服务、哪条 SQL 语句。

安装配置

方式一:Docker 启动 Zipkin(最快)

# 启动 Zipkin(内存存储,重启数据消失,适合开发测试)
docker run -d \
  --name zipkin \
  -p 9411:9411 \       # Zipkin UI 和 API 端口
  openzipkin/zipkin

# 验证启动
curl http://localhost:9411/health  # 应返回 {"status":"UP"}

# 访问 Zipkin Web UI
# 浏览器打开:http://localhost:9411

方式二:Docker Compose(带持久化存储)

# docker-compose.yml
version: '3.7'
services:
  zipkin:
    image: openzipkin/zipkin:latest
    container_name: zipkin
    ports:
      - "9411:9411"
    environment:
      # 使用 Elasticsearch 持久化存储(默认是内存)
      STORAGE_TYPE: elasticsearch
      ES_HOSTS: http://elasticsearch:9200
    depends_on:
      - elasticsearch

  elasticsearch:
    image: elasticsearch:8.12.0
    container_name: zipkin-elasticsearch
    environment:
      - discovery.type=single-node          # 单节点模式
      - xpack.security.enabled=false        # 关闭安全(测试用)
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"  # 内存限制
    ports:
      - "9200:9200"
docker-compose up -d               # 启动所有服务
docker-compose logs -f zipkin      # 查看 Zipkin 日志

方式三:使用 MySQL 持久化存储

# 启动带 MySQL 存储的 Zipkin
docker run -d \
  --name zipkin \
  -p 9411:9411 \
  -e STORAGE_TYPE=mysql \
  -e MYSQL_HOST=your-mysql-host \
  -e MYSQL_USER=zipkin \
  -e MYSQL_PASS=your_password \
  -e MYSQL_DB=zipkin \
  openzipkin/zipkin

# Zipkin MySQL Schema(首次使用需要建表)
# https://github.com/openzipkin/zipkin/blob/master/zipkin-storage/mysql-v1/src/main/resources/mysql.sql

Python 客户端安装

# 推荐:OpenTelemetry + Zipkin Exporter(现代方式)
pip install opentelemetry-sdk               # OpenTelemetry 核心
pip install opentelemetry-exporter-zipkin   # Zipkin 导出器

# 传统方式(不推荐新项目使用)
pip install py_zipkin                        # 旧版 Zipkin Python 库

# 可选:HTTP 框架自动追踪
pip install opentelemetry-instrumentation-flask     # Flask 自动追踪
pip install opentelemetry-instrumentation-fastapi   # FastAPI 自动追踪
pip install opentelemetry-instrumentation-requests  # requests 自动追踪
pip install opentelemetry-instrumentation-sqlalchemy  # SQLAlchemy 自动追踪

基本使用

方式一:OpenTelemetry + Zipkin(推荐,现代方式)

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.exporter.zipkin.json import ZipkinExporter
from opentelemetry.sdk.resources import Resource

# 1. 配置服务信息
resource = Resource.create({
    "service.name": "bioinfo-api",            # 服务名称(必填,显示在 Zipkin UI)
    "service.version": "1.0.0",               # 服务版本
    "deployment.environment": "production",    # 部署环境
})

# 2. 创建追踪提供器
tracer_provider = TracerProvider(resource=resource)

# 3. 配置 Zipkin 导出器
zipkin_exporter = ZipkinExporter(
    endpoint="http://localhost:9411/api/v2/spans",  # Zipkin API 端点
    # max_tag_value_length=256,     # 标签值最大长度
    # timeout=5,                    # 超时秒数
)

# 4. 添加批量处理器(异步发送,不影响业务性能)
tracer_provider.add_span_processor(
    BatchSpanProcessor(zipkin_exporter)       # 批量发送,不阻塞业务
    # 开发时可用 SimpleSpanProcessor 立即发送,方便调试
    # SimpleSpanProcessor(zipkin_exporter)
)

# 5. 设置为全局追踪提供器
trace.set_tracer_provider(tracer_provider)

# 6. 获取追踪器
tracer = trace.get_tracer(__name__)

手动创建 Span(追踪代码块)

from opentelemetry import trace
import time

tracer = trace.get_tracer(__name__)   # 获取追踪器(沿用全局配置)

def process_sample(sample_id: str):
    """处理生信样本(带链路追踪)"""

    # 创建根 Span(顶层追踪)
    with tracer.start_as_current_span("process_sample") as span:

        # 添加标签(key-value 元数据,可在 Zipkin UI 中搜索和过滤)
        span.set_attribute("sample.id", sample_id)        # 样本 ID
        span.set_attribute("pipeline.version", "2.0")    # 流水线版本
        span.set_attribute("http.method", "POST")         # HTTP 方法

        # 嵌套 Span:QC 质控步骤
        with tracer.start_as_current_span("quality_control") as qc_span:
            qc_span.set_attribute("tool", "fastp")       # 使用的工具
            time.sleep(0.1)                               # 模拟处理耗时
            qc_span.set_attribute("reads_passed", 95000)  # 处理结果

        # 嵌套 Span:比对步骤
        with tracer.start_as_current_span("alignment") as align_span:
            align_span.set_attribute("tool", "bwa-mem2") # 比对工具
            align_span.set_attribute("reference", "hg38") # 参考基因组
            time.sleep(0.3)                               # 比对通常最慢

        # 嵌套 Span:数据库写入
        with tracer.start_as_current_span("save_results") as db_span:
            db_span.set_attribute("db.system", "postgresql")
            db_span.set_attribute("db.operation", "INSERT")
            time.sleep(0.05)

        return {"sample_id": sample_id, "status": "completed"}


# 记录异常信息(错误会在 Zipkin UI 中标红)
def risky_operation():
    with tracer.start_as_current_span("risky_operation") as span:
        try:
            # 可能出错的操作
            result = 1 / 0
        except Exception as e:
            span.record_exception(e)                      # 记录异常堆栈
            span.set_status(trace.StatusCode.ERROR, str(e))  # 标记为错误
            raise

Flask 自动追踪(零代码追踪 HTTP 请求)

from flask import Flask
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

app = Flask(__name__)

# 自动追踪 Flask 的所有 HTTP 请求(零代码修改)
FlaskInstrumentor().instrument_app(app)

# 自动追踪所有 requests 库的 HTTP 调用
RequestsInstrumentor().instrument()

@app.route('/api/process/<sample_id>')
def process(sample_id):
    # 这个请求会自动创建 Span,无需手动添加代码
    import requests

    # 调用下游服务时,会自动注入追踪 header(traceparent)
    # 让 Zipkin 能关联上下游的 Span
    response = requests.get(f"http://upstream-service/data/{sample_id}")
    return {"status": "ok", "data": response.json()}

if __name__ == '__main__':
    app.run(debug=True)

FastAPI 自动追踪

from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

app = FastAPI()
FastAPIInstrumentor.instrument_app(app)  # 自动追踪所有路由

@app.get("/api/health")
async def health():
    return {"status": "healthy"}

SQLAlchemy 自动追踪数据库查询

from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost/mydb")

# 自动追踪所有 SQL 查询(慢查询会在 Zipkin 中显示)
SQLAlchemyInstrumentor().instrument(engine=engine)

高级用法

上下文传播:跨服务追踪

# 分布式追踪的核心:HTTP Header 传播追踪上下文
# Zipkin B3 格式 Header:X-B3-TraceId、X-B3-SpanId、X-B3-Sampled
# W3C Trace Context(推荐):traceparent、tracestate

from opentelemetry import trace, propagate
from opentelemetry.propagators.b3 import B3MultiFormat
import requests

def call_downstream_service(url: str):
    """调用下游服务,自动传播追踪上下文"""

    headers = {}

    # 将当前追踪上下文注入到 HTTP Header
    propagate.inject(headers)   # 自动添加 traceparent / X-B3-* header

    # 发送请求(下游服务会从 header 中提取上下文,创建子 Span)
    response = requests.get(url, headers=headers)
    return response.json()


def receive_and_continue(request_headers: dict):
    """从 HTTP Header 中提取追踪上下文,继续追踪链路"""

    # 从入站请求的 Header 中提取上下文
    context = propagate.extract(request_headers)

    # 在已有上下文中创建新 Span(作为上游的子 Span)
    with tracer.start_as_current_span(
        "process-request",
        context=context      # 继承上游的追踪上下文
    ) as span:
        span.set_attribute("service", "downstream")
        # 处理请求...

采样策略

from opentelemetry.sdk.trace.sampling import (
    TraceIdRatioBased,    # 按比例采样
    AlwaysOn,             # 全量采样(100%)
    AlwaysOff,            # 不采样(0%)
    ParentBased,          # 根据父 Span 决定
)

# 采样 10% 的请求(高流量生产环境常用)
sampler = TraceIdRatioBased(0.1)

# 采样 100%(开发测试用)
sampler = AlwaysOn()

# 生产推荐:根据父 Span 决定(父 Span 采样则子 Span 也采样)
sampler = ParentBased(root=TraceIdRatioBased(0.1))

tracer_provider = TracerProvider(
    resource=resource,
    sampler=sampler         # 传入采样器
)

Zipkin UI 使用指南

1. 打开 http://localhost:9411

2. 搜索追踪记录:
   - Service Name:选择你的服务名称
   - Span Name:选择特定操作(如 process_sample)
   - Start Time:设置时间范围
   - Duration:筛选耗时超过某值的请求(查慢请求用这个)
   - Tags:按标签筛选(如 sample.id=12345)

3. 点击某个 Trace:
   - 看到瀑布图(Gantt 图),直观显示每个 Span 的时序
   - 点击某个 Span 查看详细信息(Tags、Log、Annotations)
   - 红色 Span = 有错误,重点关注

4. 常用技巧:
   - 找最慢请求:设置 Min Duration 过滤
   - 找错误请求:用 error=true 标签过滤
   - 跨服务定位:看同一 Trace 中哪个服务的 Span 最长

常见报错

报错信息原因解决方案
Connection refused 9411Zipkin 未启动docker start zipkin 或检查端口
No traces found in UI追踪数据未发送成功检查 endpoint URL,查看应用日志
Failed to export spansZipkin 端点不可达确认 Zipkin 地址配置正确
Span context not propagated未注入追踪 Header确保使用 propagate.inject(headers)
Data not persisted after restart使用默认内存存储配置 MySQL/Elasticsearch 持久化
Too many spans采样率过高,数据量太大降低采样率(如改为 10%)
opentelemetry.exporter.zipkin deprecatedZipkin 导出器即将废弃仅影响 OTel Spec,目前仍可使用

速查表

# ===== Zipkin 服务管理 =====
docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin   # 启动
docker stop zipkin                                             # 停止
docker start zipkin                                            # 重启
curl http://localhost:9411/health                             # 健康检查
curl http://localhost:9411/api/v2/services                   # 列出服务名

# ===== Zipkin API =====
# GET  /api/v2/services              列出所有服务名
# GET  /api/v2/spans?serviceName=xxx 列出服务的 span 名
# GET  /api/v2/traces                查询 trace 列表
# POST /api/v2/spans                 上报 span 数据(JSON 格式)
# GET  /api/v2/trace/{traceId}       获取特定 trace

# ===== Python 快速参考 =====
# pip install opentelemetry-sdk opentelemetry-exporter-zipkin
# from opentelemetry.exporter.zipkin.json import ZipkinExporter
# exporter = ZipkinExporter(endpoint="http://localhost:9411/api/v2/spans")

# ===== 常用标签(Tag)规范 =====
# http.method       HTTP 方法(GET/POST/PUT)
# http.url          请求 URL
# http.status_code  响应状态码
# db.system         数据库类型(postgresql/mysql)
# db.statement      SQL 语句
# error             是否错误(true/false)

# ===== 存储后端对比 =====
# 内存(默认):重启丢失,适合开发
# MySQL:适合中小规模(< 100 万 traces/天)
# Cassandra:适合大规模分布式
# Elasticsearch:适合大规模 + 全文搜索,最推荐

同类工具对比

特性ZipkinJaegerTempo (Grafana)OpenTelemetry
定位追踪后端追踪后端追踪后端追踪标准+SDK
开源
UI简洁完整需 Grafana
存储内存/MySQL/ES内存/Cassandra/ES对象存储-
协议B3/W3CJaeger/W3COTLPOTLP
社区成熟活跃快速增长业界标准
易用性简单中等需配套 Grafana需配后端
适合简单场景中大型系统Grafana 用户标准化追踪

推荐组合方案: - 快速上手:Zipkin(Docker 一键启动) - 生产推荐:OpenTelemetry SDK + Jaeger(功能更全,UI 更好) - 已用 Grafana:OpenTelemetry SDK + Grafana Tempo(无缝集成) - 2026 年最佳实践:OpenTelemetry Collector → Jaeger/Tempo(灵活切换后端)


参考:OpenZipkin 官网 | OpenTelemetry Python Zipkin 导出器 | OpenTelemetry 规范 | 更新于 2026 年