跳转至

Snakemake 流程管理实战

彭文强 | 2026届 | 生信分析工程师求职

1. 一句话说明

Snakemake 是基于 Python 的生信流程管理工具,用"规则(Rule)"定义每一步分析,自动推导执行顺序、断点续跑、并行调度,把你的手写 shell pipeline 变成工程级可复现流程。


2. 为什么要学 Snakemake

手写 Shell 脚本 vs 流程管理工具

你之前的 T2D 项目用的是编号 shell 脚本(01_qc.sh → 02_trim.sh → ...),这种方式能跑通,但有几个痛点:

痛点 手写 Shell Snakemake
某步失败了 需要手动找到断点,注释掉前面已完成的步骤 自动识别哪步没完成,--rerun-incomplete 断点续跑
加了新样本 需要改脚本里的样本列表,或者全部重跑 只跑新增样本,已完成的自动跳过
并行执行 需要手写 &wait,容易写错 -j 8 自动并行,DAG 调度不会冲突
集群投递 每步都要写 sbatch 脚本 加一个 --cluster 参数就搞定
别人复现 需要读你所有脚本理解顺序 看 Snakefile 一目了然,snakemake --dag 生成流程图

Snakemake 的核心优势

  1. Python 语法:你学过 Python 就能上手,不像 Nextflow 要学 Groovy
  2. 增量计算:通过文件时间戳判断,只重跑"过期"的步骤
  3. DAG 调度:自动分析依赖关系,能并行的绝不串行
  4. 生态完善:conda 集成、容器支持、集群投递、云计算全覆盖
  5. 学术主流:发表论文时附 Snakefile 是加分项,可复现性有保障

白话总结:手写 shell 像你自己炒菜——得记住顺序、火候、原料;Snakemake 像写了一份详细菜谱交给机器人厨师——它知道先切菜再炒菜,中途停电了恢复后自动从断的地方继续。


3. 核心概念白话版

3.1 Rule(规则)—— 菜谱里的一道菜

# 一个 rule 就是流程中的"一步操作"
# 白话:菜谱里的一道菜,告诉你需要什么原料(input)、做出什么成品(output)、怎么做(shell)

rule fastp_trim:          # rule 名字,相当于菜名"番茄炒蛋"
    input:                # 原料:需要什么文件才能开始做
        "data/{sample}_R1.fastq.gz"
    output:               # 成品:做完会产出什么文件
        "results/{sample}_clean_R1.fastq.gz"
    shell:                # 做法:具体执行什么命令
        "fastp -i {input} -o {output}"

3.2 Input / Output / Shell —— 原料、成品、做法

  • input:这步需要的输入文件(如果文件不存在,Snakemake 会去找哪个 rule 能产出它)
  • output:这步会产出的文件(Snakemake 靠这个建立 rule 之间的依赖)
  • shell:要执行的 shell 命令(也可以用 run: 写 Python 代码)

关键理解:Snakemake 不是按你写 rule 的顺序执行的!它是看你"最终想要什么文件"(rule all),然后反向推导需要哪些 rule。

3.3 Wildcard(通配符)—— 批量处理的魔法

# {sample} 就是通配符,代表"任意样本名"
# 白话:就像你写了一个模板,{sample} 是空格等着被填入具体名字

rule fastp_trim:
    input:
        "data/{sample}_R1.fastq.gz"      # {sample} 可以是 SRR001, SRR002, ...
    output:
        "results/{sample}_clean_R1.fastq.gz"  # 同一个 {sample} 会被替换成同样的值
    shell:
        "fastp -i {input} -o {output}"

# 当 Snakemake 需要 results/SRR001_clean_R1.fastq.gz 时
# 它自动把 {sample} 替换成 SRR001,去找 data/SRR001_R1.fastq.gz

白话:通配符就像填空题里的下划线 ___,Snakemake 会自动帮你把所有样本名一个一个填进去。

3.4 DAG(有向无环图)—— 流程的全景图

DAG = Directed Acyclic Graph(有向无环图)

白话:就是一张"先后顺序图"
- 有向:有箭头,表示"A 要在 B 之前做"
- 无环:不能绕圈(不能 A 依赖 B,B 又依赖 A)

例如你的宏基因组流程 DAG:

    fastqc ──┐
             ├──→ multiqc
    fastp ───┘
      │
      ▼
    bowtie2(去宿主)
      │
      ▼
    kraken2(物种注释)
      │
      ▼
    bracken(丰度校正)

Snakemake 看这张图就知道:fastp 和 fastqc 互不依赖可以并行,但 bowtie2 必须等 fastp 完成。

3.5 Config 配置文件 —— 参数和路径的集中管理

# config.yaml —— 把经常变的参数集中到一个文件里
# 好处:换数据库路径、换样本列表,只改这一个文件,不用动 Snakefile

samples:                           # 样本列表
  - T2D_001
  - T2D_002
  - Control_001

host_db: "/data/db/bowtie2/hg38"  # 宿主基因组索引路径
kraken2_db: "/data/db/kraken2/k2_standard"  # Kraken2 数据库路径
threads: 8                         # 默认线程数
min_quality: 20                    # fastp 最低质量阈值

在 Snakefile 中这样用:

configfile: "config.yaml"          # 加载配置文件
SAMPLES = config["samples"]        # 读取样本列表

4. 环境安装

方法一:conda 安装(推荐)

# 用 mamba 安装(比 conda 快 10-50 倍)
mamba create -n snakemake_env -c conda-forge -c bioconda snakemake -y
# -n snakemake_env: 创建一个专门的环境
# -c conda-forge -c bioconda: 指定频道
# snakemake: 安装完整版(包含所有功能)

# 激活环境
conda activate snakemake_env

# 验证安装
snakemake --version
# 输出类似: 8.x.x 或 9.x.x

如果只需要核心功能(不需要云和集群插件):

# 安装精简版
mamba install -c conda-forge -c bioconda snakemake-minimal -y

方法二:pip 安装

# 先确保 Python >= 3.11(Snakemake 8+ 要求)
pip install snakemake

# 验证
snakemake --version

可视化依赖(可选,画 DAG 图需要)

# 安装 graphviz(画流程图需要)
mamba install -c conda-forge graphviz -y

# 验证
dot -V
# 输出: dot - graphviz version x.x.x

5. 实操教程:从零搭建生信 Pipeline

5.1 第一个 Snakemake 规则

先创建项目目录结构:

mkdir -p snakemake_demo/data snakemake_demo/results snakemake_demo/logs
cd snakemake_demo

# 创建一个假数据用于演示(实际项目用真实 fastq)
echo "@read1\nACGTACGT\n+\nIIIIIIII" | gzip > data/sample1_R1.fastq.gz
echo "@read1\nACGTACGT\n+\nIIIIIIII" | gzip > data/sample1_R2.fastq.gz

创建最简单的 Snakefile:

# Snakefile —— 第一个规则:用 fastp 做质控
# 文件名必须叫 Snakefile(首字母大写),放在项目根目录

# rule all: 告诉 Snakemake "我最终要什么文件"
rule all:
    input:
        "results/sample1_clean_R1.fastq.gz"  # 我要这个文件,你帮我推导怎么得到它

# rule fastp_trim: 定义 fastp 质控这一步
rule fastp_trim:
    input:
        r1 = "data/{sample}_R1.fastq.gz",    # 输入:原始正向 reads
        r2 = "data/{sample}_R2.fastq.gz"     # 输入:原始反向 reads
    output:
        r1 = "results/{sample}_clean_R1.fastq.gz",  # 输出:质控后正向
        r2 = "results/{sample}_clean_R2.fastq.gz"   # 输出:质控后反向
    shell:
        """
        fastp -i {input.r1} -I {input.r2} \
              -o {output.r1} -O {output.r2} \
              --qualified_quality_phred 20 \
              --length_required 50
        """
        # -i/-I: 输入的正向/反向 reads
        # -o/-O: 输出的正向/反向 reads
        # --qualified_quality_phred 20: 质量低于 20 的碱基标记为不合格
        # --length_required 50: 过滤掉长度低于 50bp 的 reads

运行:

# 预演模式(不实际运行,只看会执行什么)
snakemake -n
# 输出会告诉你:要执行 fastp_trim 规则

# 正式运行(-j 1 表示用 1 个核心)
snakemake -j 1
# 如果 fastp 安装好了,就会执行质控

# 再运行一次试试
snakemake -j 1
# 输出: Nothing to be done. (因为输出文件已存在且比输入新)

5.2 多规则串联(fastp → bowtie2 → samtools)

# Snakefile —— 三步串联流程
# 流程: fastp质控 → bowtie2比对 → samtools排序

SAMPLES = ["sample1"]  # 样本列表(后面会改成从 config 读取)

# === 最终目标 ===
rule all:
    input:
        expand("results/sorted/{sample}.sorted.bam", sample=SAMPLES)
        # expand() 把 {sample} 替换成列表里的每个值
        # 相当于: ["results/sorted/sample1.sorted.bam"]

# === 第1步: fastp 质控 ===
rule fastp_trim:
    input:
        r1 = "data/{sample}_R1.fastq.gz",
        r2 = "data/{sample}_R2.fastq.gz"
    output:
        r1 = "results/clean/{sample}_R1.fastq.gz",
        r2 = "results/clean/{sample}_R2.fastq.gz"
    threads: 4                            # 这步用 4 个线程
    shell:
        """
        fastp -i {input.r1} -I {input.r2} \
              -o {output.r1} -O {output.r2} \
              --thread {threads}
        """

# === 第2步: bowtie2 比对 ===
rule bowtie2_align:
    input:
        r1 = "results/clean/{sample}_R1.fastq.gz",  # 注意!这里的输入 = 上一步的输出
        r2 = "results/clean/{sample}_R2.fastq.gz"   # Snakemake 自动建立依赖
    output:
        bam = "results/aligned/{sample}.bam"
    params:
        index = "/data/db/bowtie2/hg38"   # bowtie2 索引路径
    threads: 8
    shell:
        """
        bowtie2 -x {params.index} \
                -1 {input.r1} -2 {input.r2} \
                --threads {threads} \
                | samtools view -bS - > {output.bam}
        """
        # bowtie2 输出 SAM 格式,通过管道传给 samtools 转成 BAM

# === 第3步: samtools 排序 ===
rule samtools_sort:
    input:
        "results/aligned/{sample}.bam"    # 输入 = 上一步的输出
    output:
        "results/sorted/{sample}.sorted.bam"
    threads: 4
    shell:
        """
        samtools sort -@ {threads} -o {output} {input}
        """
        # -@ {threads}: 使用多线程排序
        # -o {output}: 指定输出文件

串联逻辑:Snakemake 看到 rule all 要 sorted/sample1.sorted.bam,就去找谁的 output 匹配 → 找到 samtools_sort → 它的 input 需要 aligned/sample1.bam → 找到 bowtie2_align → 它的 input 需要 clean/sample1_R1.fastq.gz → 找到 fastp_trim。整条链自动串起来。

5.3 通配符处理多样本

# 关键变化:SAMPLES 列表里加多个样本,其他代码一字不改!

SAMPLES = ["T2D_001", "T2D_002", "T2D_003", "Control_001", "Control_002"]

rule all:
    input:
        expand("results/sorted/{sample}.sorted.bam", sample=SAMPLES)
        # expand() 自动展开成 5 个文件:
        # results/sorted/T2D_001.sorted.bam
        # results/sorted/T2D_002.sorted.bam
        # results/sorted/T2D_003.sorted.bam
        # results/sorted/Control_001.sorted.bam
        # results/sorted/Control_002.sorted.bam

# 下面的 rule 一字不改!{sample} 通配符自动匹配每个样本

白话:通配符 {sample} 就像一个万能插座,你只要告诉 Snakemake "我要这5个样本的结果",它自动对每个样本执行一遍完整流程。而且 5 个样本互相独立,可以并行 -j 5 同时跑。

5.4 使用 config.yaml 配置参数

创建 config.yaml

# config.yaml —— 所有可变参数集中在这里
# 换项目只需要改这个文件,不用动 Snakefile

# 样本列表
samples:
  - T2D_001
  - T2D_002
  - T2D_003
  - Control_001
  - Control_002

# 数据库路径
host_db: "/data/db/bowtie2/hg38"           # 宿主基因组 bowtie2 索引
kraken2_db: "/data/db/kraken2/k2_standard" # Kraken2 数据库

# 分析参数
fastp:
  qualified_quality_phred: 20   # 碱基质量阈值
  length_required: 50           # 最短 reads 长度
  threads: 4                    # fastp 线程数

bowtie2:
  threads: 8                    # bowtie2 线程数
  sensitivity: "--very-sensitive"  # 比对灵敏度

kraken2:
  confidence: 0.2               # 分类置信度阈值
  threads: 8                    # kraken2 线程数

在 Snakefile 中引用:

# Snakefile —— 从 config.yaml 读取所有参数

configfile: "config.yaml"                    # 加载配置文件

SAMPLES = config["samples"]                  # 读取样本列表
HOSTDB = config["host_db"]                   # 宿主数据库路径
KRAKENDB = config["kraken2_db"]              # Kraken2 数据库路径

rule all:
    input:
        expand("results/kraken2/{sample}_report.txt", sample=SAMPLES)

rule fastp_trim:
    input:
        r1 = "data/{sample}_R1.fastq.gz",
        r2 = "data/{sample}_R2.fastq.gz"
    output:
        r1 = "results/clean/{sample}_R1.fastq.gz",
        r2 = "results/clean/{sample}_R2.fastq.gz"
    params:
        quality = config["fastp"]["qualified_quality_phred"],  # 从 config 读参数
        min_len = config["fastp"]["length_required"]
    threads: config["fastp"]["threads"]                        # 从 config 读线程数
    shell:
        """
        fastp -i {input.r1} -I {input.r2} \
              -o {output.r1} -O {output.r2} \
              --qualified_quality_phred {params.quality} \
              --length_required {params.min_len} \
              --thread {threads}
        """

运行时也可以覆盖 config 参数:

# 用默认 config.yaml
snakemake -j 8

# 临时覆盖某个参数(不修改文件)
snakemake -j 8 --config samples='["T2D_001"]'

5.5 使用 conda 环境管理依赖

Snakemake 可以为每个 rule 自动创建独立 conda 环境,解决工具版本冲突:

创建 envs/fastp.yaml

# envs/fastp.yaml —— fastp 质控步骤的独立环境
# 注意:Snakemake 管理的 conda 环境 YAML 不要写 name 字段,Snakemake 会自动命名
channels:
  - conda-forge
  - bioconda
dependencies:
  - fastp=0.23.4

创建 envs/bowtie2.yaml

# envs/bowtie2.yaml —— 比对步骤的独立环境
channels:
  - conda-forge
  - bioconda
dependencies:
  - bowtie2=2.5.1
  - samtools=1.17

在 Snakefile 中指定:

rule fastp_trim:
    input:
        r1 = "data/{sample}_R1.fastq.gz",
        r2 = "data/{sample}_R2.fastq.gz"
    output:
        r1 = "results/clean/{sample}_R1.fastq.gz",
        r2 = "results/clean/{sample}_R2.fastq.gz"
    conda:
        "envs/fastp.yaml"       # 指定这步用哪个 conda 环境
    shell:
        "fastp -i {input.r1} -I {input.r2} -o {output.r1} -O {output.r2}"

rule bowtie2_align:
    input:
        r1 = "results/clean/{sample}_R1.fastq.gz",  # 注意!这里的输入 = 上一步的输出
        r2 = "results/clean/{sample}_R2.fastq.gz"   # Snakemake 自动建立依赖
    output:
        bam = "results/aligned/{sample}.bam"
    params:
        index = "/data/db/bowtie2/hg38"   # bowtie2 索引路径
    conda:
        "envs/bowtie2.yaml"     # 这步用另一个环境
    shell:
        "bowtie2 -x {params.index} -1 {input.r1} -2 {input.r2} | samtools view -bS - > {output.bam}"

运行时加 --use-conda 参数:

# Snakemake 会自动创建和激活每个 rule 指定的 conda 环境
snakemake -j 8 --use-conda

# 第一次运行会下载并创建环境(比较慢),之后会复用缓存

好处:每步分析的工具版本完全隔离,不怕冲突,而且别人拿到你的 Snakefile + envs/ 目录就能完美复现。

5.6 生成 DAG 可视化图

# 生成 DAG 图(需要先安装 graphviz)
snakemake --dag | dot -Tpng > dag.png
# --dag: 输出 DOT 格式的 DAG 描述
# dot -Tpng: 用 graphviz 转成 PNG 图片

# 也可以生成 PDF(适合论文插图)
snakemake --dag | dot -Tpdf > dag.pdf

# 生成 SVG(适合网页展示)
snakemake --dag | dot -Tsvg > dag.svg

# 只看 rule 之间的关系(不展开每个样本)
snakemake --rulegraph | dot -Tpng > rulegraph.png
# --rulegraph: 只显示 rule 级别的依赖,不展开通配符

实际效果

                 ┌─────────────┐
                 │   rule all  │
                 └──────┬──────┘
                        │
          ┌─────────────┼─────────────┐
          ▼             ▼             ▼
   ┌────────────┐┌────────────┐┌────────────┐
   │kraken2     ││kraken2     ││kraken2     │
   │(T2D_001)  ││(T2D_002)  ││(Control_001)│
   └─────┬──────┘└─────┬──────┘└─────┬──────┘
         ▼             ▼             ▼
   ┌────────────┐┌────────────┐┌────────────┐
   │bowtie2     ││bowtie2     ││bowtie2     │
   │(T2D_001)  ││(T2D_002)  ││(Control_001)│
   └─────┬──────┘└─────┬──────┘└─────┬──────┘
         ▼             ▼             ▼
   ┌────────────┐┌────────────┐┌────────────┐
   │fastp       ││fastp       ││fastp       │
   │(T2D_001)  ││(T2D_002)  ││(Control_001)│
   └────────────┘└────────────┘└────────────┘

5.7 日志管理

# 每个 rule 加 log 字段,把运行日志写入指定文件

rule fastp_trim:
    input:
        r1 = "data/{sample}_R1.fastq.gz",
        r2 = "data/{sample}_R2.fastq.gz"
    output:
        r1 = "results/clean/{sample}_R1.fastq.gz",
        r2 = "results/clean/{sample}_R2.fastq.gz"
    log:
        "logs/fastp/{sample}.log"          # 日志按样本分开存放
    benchmark:
        "benchmarks/fastp/{sample}.txt"    # 记录运行时间和内存消耗
    shell:
        """
        fastp -i {input.r1} -I {input.r2} \
              -o {output.r1} -O {output.r2} \
              2> {log}
        """
        # 2> {log}: 把标准错误(进度信息)重定向到日志文件
        # 也可以用 &> {log} 同时捕获标准输出和标准错误

日志目录结构

logs/
├── fastp/
│   ├── T2D_001.log          # 每个样本独立日志
│   ├── T2D_002.log
│   └── Control_001.log
├── bowtie2/
│   ├── T2D_001.log
│   └── ...
└── kraken2/
    └── ...

benchmarks/                   # benchmark 记录
├── fastp/
│   ├── T2D_001.txt          # 内容:运行秒数、最大内存等
│   └── ...

benchmark 文件内容示例

s         h:m:s    max_rss   max_vms   max_uss   max_pss   io_in   io_out   mean_load
45.3241   0:00:45  1024.32   2048.64   890.12    920.45    1024    512      75.2
# s: 运行秒数
# max_rss: 最大内存占用(MB)
# mean_load: 平均 CPU 负载(%)

5.8 集群提交(--cluster / --slurm)

通用集群提交(PBS/SLURM/SGE)

# SLURM 集群投递
snakemake -j 100 \
    --cluster "sbatch -p normal -n {threads} -t 4:00:00 -o logs/slurm/{rule}_{wildcards}.out -e logs/slurm/{rule}_{wildcards}.err" \
    --latency-wait 60
# -j 100: 最多同时投递 100 个任务
# --cluster: 投递命令模板
#   -p normal: 投递到 normal 队列
#   -n {threads}: CPU 数量和 rule 里定义的 threads 一致
#   -t 4:00:00: 最长运行 4 小时
# --latency-wait 60: 等待 60 秒让文件系统同步(集群 NFS 有延迟)

# PBS 集群投递
snakemake -j 100 \
    --cluster "qsub -l nodes=1:ppn={threads} -l walltime=4:00:00 -o {log}" \
    --latency-wait 60

Snakemake 8+ 的 SLURM 插件(更推荐)

# 安装 SLURM 执行器插件(Snakemake 8+)
pip install snakemake-executor-plugin-slurm

# 使用插件投递
snakemake -j 100 --executor slurm \
    --default-resources slurm_partition=normal runtime=240 mem_mb=8000
# --executor slurm: 使用 SLURM 执行器
# --default-resources: 默认资源配置

在 Snakefile 中为每步指定资源

rule bowtie2_align:
    input: ...
    output: ...
    threads: 8
    resources:
        mem_mb = 16000,         # 这步需要 16GB 内存
        runtime = 120,          # 最长运行 120 分钟
        slurm_partition = "high_mem"  # 投递到大内存队列
    shell: ...

6. 完整宏基因组分析 Snakefile 模板

以下是与你 T2D 肠道菌群项目对应的完整流程:

# ============================================================
# Snakefile —— 2型糖尿病肠道宏基因组分析流程
# 对应项目: T2D 肠道菌群宏基因组分析
# 流程: FastQC → fastp质控 → Bowtie2去宿主 → Kraken2物种注释 → Bracken丰度校正
# 作者: 彭文强
# ============================================================

# === 加载配置 ===
configfile: "config.yaml"

SAMPLES = config["samples"]                  # 样本列表
HOSTDB = config["host_db"]                   # 人类基因组 bowtie2 索引
KRAKENDB = config["kraken2_db"]              # Kraken2 标准数据库
READ_LEN = config.get("read_length", 150)    # reads 长度,默认 150bp

# === 最终目标 ===
rule all:
    input:
        # FastQC 原始数据质量报告
        expand("results/fastqc/{sample}_R1_fastqc.html", sample=SAMPLES),
        # MultiQC 汇总报告
        "results/multiqc/multiqc_report.html",
        # Bracken 丰度表(最终产出)
        expand("results/bracken/{sample}_bracken_species.txt", sample=SAMPLES),
        # 汇总丰度矩阵
        "results/abundance_matrix/species_abundance.tsv"

# === Rule 1: FastQC 原始数据质控 ===
rule fastqc_raw:
    input:
        r1 = "data/raw/{sample}_R1.fastq.gz",
        r2 = "data/raw/{sample}_R2.fastq.gz"
    output:
        html1 = "results/fastqc/{sample}_R1_fastqc.html",
        html2 = "results/fastqc/{sample}_R2_fastqc.html",
        zip1 = "results/fastqc/{sample}_R1_fastqc.zip",
        zip2 = "results/fastqc/{sample}_R2_fastqc.zip"
    params:
        outdir = "results/fastqc"            # FastQC 输出目录
    threads: 2
    log:
        "logs/fastqc/{sample}.log"
    conda:
        "envs/qc.yaml"
    shell:
        """
        fastqc -t {threads} -o {params.outdir} {input.r1} {input.r2} 2> {log}
        """

# === Rule 2: fastp 质控和过滤 ===
rule fastp_trim:
    input:
        r1 = "data/raw/{sample}_R1.fastq.gz",
        r2 = "data/raw/{sample}_R2.fastq.gz"
    output:
        r1 = "results/clean/{sample}_R1.fastq.gz",
        r2 = "results/clean/{sample}_R2.fastq.gz",
        html = "results/fastp_reports/{sample}_fastp.html",
        json = "results/fastp_reports/{sample}_fastp.json"
    params:
        quality = config["fastp"]["qualified_quality_phred"],
        min_len = config["fastp"]["length_required"]
    threads: config["fastp"]["threads"]
    log:
        "logs/fastp/{sample}.log"
    benchmark:
        "benchmarks/fastp/{sample}.txt"
    conda:
        "envs/qc.yaml"
    shell:
        """
        fastp \
            -i {input.r1} -I {input.r2} \
            -o {output.r1} -O {output.r2} \
            --html {output.html} --json {output.json} \
            --qualified_quality_phred {params.quality} \
            --length_required {params.min_len} \
            --detect_adapter_for_pe \
            --thread {threads} \
            2> {log}
        """
        # --detect_adapter_for_pe: 自动检测双端接头序列

# === Rule 3: MultiQC 汇总质控报告 ===
rule multiqc:
    input:
        expand("results/fastqc/{sample}_R1_fastqc.zip", sample=SAMPLES),
        expand("results/fastp_reports/{sample}_fastp.json", sample=SAMPLES)
    output:
        "results/multiqc/multiqc_report.html"
    params:
        input_dirs = "results/fastqc results/fastp_reports",
        outdir = "results/multiqc"
    log:
        "logs/multiqc/multiqc.log"
    conda:
        "envs/qc.yaml"
    shell:
        """
        multiqc {params.input_dirs} -o {params.outdir} --force 2> {log}
        """
        # --force: 如果输出目录已存在就覆盖

# === Rule 4: Bowtie2 去宿主(去除人类 reads) ===
rule bowtie2_remove_host:
    input:
        r1 = "results/clean/{sample}_R1.fastq.gz",
        r2 = "results/clean/{sample}_R2.fastq.gz"
    output:
        r1 = "results/host_removed/{sample}_R1.fastq.gz",
        r2 = "results/host_removed/{sample}_R2.fastq.gz"
    params:
        host_db = HOSTDB,
        # 临时 SAM 文件(不保留)
        tmp_prefix = "results/host_removed/{sample}"
    threads: config["bowtie2"]["threads"]
    log:
        "logs/bowtie2/{sample}.log"
    benchmark:
        "benchmarks/bowtie2/{sample}.txt"
    conda:
        "envs/align.yaml"
    shell:
        """
        bowtie2 -x {params.host_db} \
            -1 {input.r1} -2 {input.r2} \
            {config[bowtie2][sensitivity]} \
            --threads {threads} \
            --un-conc-gz {params.tmp_prefix}_R%.fastq.gz \
            -S /dev/null \
            2> {log}
        """
        # --un-conc-gz: 输出未比对到宿主的 reads(即微生物 reads)
        # -S /dev/null: 不保存 SAM 文件(我们只要未比对上的)
        # R%: bowtie2 会自动替换成 R1 和 R2

# === Rule 5: Kraken2 物种分类注释 ===
rule kraken2_classify:
    input:
        r1 = "results/host_removed/{sample}_R1.fastq.gz",
        r2 = "results/host_removed/{sample}_R2.fastq.gz"
    output:
        report = "results/kraken2/{sample}_report.txt",
        output = "results/kraken2/{sample}_output.txt"
    params:
        db = KRAKENDB,
        confidence = config["kraken2"]["confidence"]
    threads: config["kraken2"]["threads"]
    log:
        "logs/kraken2/{sample}.log"
    benchmark:
        "benchmarks/kraken2/{sample}.txt"
    conda:
        "envs/taxonomy.yaml"
    shell:
        """
        kraken2 --db {params.db} \
            --paired {input.r1} {input.r2} \
            --threads {threads} \
            --report {output.report} \
            --output {output.output} \
            --confidence {params.confidence} \
            2> {log}
        """
        # --confidence 0.2: 分类置信度阈值,提高准确性

# === Rule 6: Bracken 丰度重估计 ===
rule bracken_abundance:
    input:
        report = "results/kraken2/{sample}_report.txt"
    output:
        bracken = "results/bracken/{sample}_bracken_species.txt",
        report = "results/bracken/{sample}_bracken_report.txt"
    params:
        db = KRAKENDB,
        read_len = READ_LEN,
        level = "S",                         # S = Species 种级别
        threshold = 10                       # 最少 10 条 reads 才算
    log:
        "logs/bracken/{sample}.log"
    conda:
        "envs/taxonomy.yaml"
    shell:
        """
        bracken -d {params.db} \
            -i {input.report} \
            -o {output.bracken} \
            -w {output.report} \
            -r {params.read_len} \
            -l {params.level} \
            -t {params.threshold} \
            2> {log}
        """
        # -r 150: reads 长度(和测序策略对应)
        # -l S: 在种(Species)级别估计丰度
        # -t 10: 少于 10 条 reads 的物种不报告

# === Rule 7: 合并所有样本的丰度矩阵 ===
rule merge_abundance:
    input:
        expand("results/bracken/{sample}_bracken_species.txt", sample=SAMPLES)
    output:
        "results/abundance_matrix/species_abundance.tsv"
    log:
        "logs/merge_abundance.log"
    run:
        # 这里用 Python 代码代替 shell 命令
        import pandas as pd

        dfs = []
        for f in input:
            sample_name = f.split("/")[-1].replace("_bracken_species.txt", "")
            df = pd.read_csv(f, sep="\t")
            df = df[["name", "new_est_reads"]].rename(
                columns={"new_est_reads": sample_name}
            )
            dfs.append(df)

        # 合并所有样本
        merged = dfs[0]
        for df in dfs[1:]:
            merged = merged.merge(df, on="name", how="outer")
        merged = merged.fillna(0)

        # 保存
        merged.to_csv(output[0], sep="\t", index=False)
        print(f"合并完成: {len(merged)} 个物种, {len(SAMPLES)} 个样本")

配套的 config.yaml

# config.yaml —— T2D 肠道宏基因组分析配置

samples:
  - T2D_001
  - T2D_002
  - T2D_003
  - T2D_004
  - T2D_005
  - Control_001
  - Control_002
  - Control_003
  - Control_004
  - Control_005

# 数据库路径
host_db: "/data/db/bowtie2/GRCh38"
kraken2_db: "/data/db/kraken2/k2_standard_20230605"
read_length: 150

# fastp 参数
fastp:
  qualified_quality_phred: 20
  length_required: 50
  threads: 4

# bowtie2 参数
bowtie2:
  threads: 8
  sensitivity: "--very-sensitive"

# kraken2 参数
kraken2:
  confidence: 0.2
  threads: 8

7. Snakemake vs Nextflow 对比

对比维度 Snakemake Nextflow
底层语言 Python Groovy(JVM 语言)
学习曲线 低(Python 基础即可) 较高(需学 Groovy + Channel 概念)
定义方式 基于文件的 Rule(声明式) 基于数据流的 Process + Channel
执行逻辑 从目标文件反向推导 DAG 从输入 Channel 正向推送数据
增量计算 天然支持(文件时间戳) 需要 -resume 参数 + 工作目录缓存
社区流程 Snakemake Workflow Catalog nf-core(100+ 预构建流程,更丰富)
集群支持 --cluster / 插件 原生 executor(更优雅)
云计算 基础支持 原生支持 AWS/GCP/Azure(强项)
容器集成 conda / singularity / docker docker / singularity / conda
调试 容易(Python 报错,--debug 较难(Groovy 堆栈、Channel 死锁)
适用场景 学术研究、个人项目、HPC 生产环境、企业级、大规模数据
版本 v9+(插件化架构) DSL2(模块化)

选择建议

  • 选 Snakemake:你 Python 基础好、主要在 HPC 跑、做学术项目、想快速上手
  • 选 Nextflow:公司已有 nf-core 流程、需要云上运行、面试的公司用 Nextflow
  • 面试答法:两个都了解,Snakemake 动手写过完整流程,Nextflow 知道 nf-core 生态,按团队技术栈选择

8. 常见报错与解决

报错 1: MissingInputException

MissingInputException in rule fastp_trim:
Missing input files for rule fastp_trim:
    data/raw/T2D_001_R1.fastq.gz

原因:Snakemake 找不到输入文件 解决

# 1. 检查文件是否存在
ls data/raw/T2D_001_R1.fastq.gz

# 2. 检查文件名是否和 Snakefile 里定义的模式匹配
# 常见问题:文件名是 T2D-001 但通配符期望 T2D_001

# 3. 检查路径是否正确(相对路径 vs 绝对路径)

报错 2: AmbiguousRuleException

AmbiguousRuleException:
Rules fastp_trim and fastp_trim_v2 are ambiguous for the file results/clean/T2D_001_R1.fastq.gz

原因:两个 rule 都能产出同一个文件,Snakemake 不知道用哪个 解决

# 方案1: 给 rule 加优先级
ruleorder: fastp_trim > fastp_trim_v2

# 方案2: 删除重复的 rule
# 方案3: 让两个 rule 的 output 路径不同

报错 3: IncompleteFilesException

IncompleteFilesException:
The files below seem to be incomplete. If you are sure that certain files are not incomplete, mark them as complete with
    snakemake --cleanup-metadata <filenames>

原因:上次运行中断,输出文件不完整 解决

# 方案1: 重跑未完成的任务
snakemake --rerun-incomplete -j 8

# 方案2: 手动删除不完整的文件后重跑
rm results/clean/T2D_001_R1.fastq.gz
snakemake -j 8

# 方案3: 标记文件为已完成(确认文件没问题时)
snakemake --cleanup-metadata results/clean/T2D_001_R1.fastq.gz

报错 4: WorkflowError: No rule to produce target

WorkflowError:
Target rules may not contain wildcards. (rules all)

原因:rule all 里直接使用了通配符 {sample} 而不是 expand() 解决

# 错误写法
rule all:
    input:
        "results/{sample}_report.txt"       # 错!rule all 不能有通配符

# 正确写法
rule all:
    input:
        expand("results/{sample}_report.txt", sample=SAMPLES)  # 用 expand 展开

报错 5: ChildIOException / 磁盘空间不足

ChildIOException:
Error in rule bowtie2_align:
    OSError: [Errno 28] No space left on device

原因:磁盘空间满了(比对会生成大量中间文件) 解决

# 1. 使用 temp() 标记中间文件,Snakemake 自动删除
rule bowtie2_align:
    output:
        bam = temp("results/aligned/{sample}.bam")  # temp() 标记为临时文件
        # 当下游 rule 用完这个文件后,Snakemake 自动删除它

# 2. 检查磁盘空间
# df -h /data

# 3. 手动清理
# snakemake --delete-temp-output

报错 6: CreateCondaEnvironmentException

CreateCondaEnvironmentException:
Could not create conda environment from envs/fastp.yaml

原因:conda 环境创建失败(通常是依赖冲突或网络问题) 解决

# 1. 检查 yaml 文件语法是否正确
cat envs/fastp.yaml

# 2. 手动测试能否创建环境
conda env create -f envs/fastp.yaml

# 3. 如果是网络问题,配置镜像源
# 编辑 ~/.condarc 添加清华镜像

# 4. 如果是版本冲突,放宽版本约束
# fastp=0.23.4 → fastp>=0.23  (只约束主版本)

报错 7: 集群任务超时 / 内存不足

slurmstepd: error: Job 12345 exceeded memory limit (16384 > 8192)
CANCELLED by slurm_script

原因:rule 实际使用的内存超过了请求的资源 解决

# 在 rule 里增加资源声明
rule kraken2_classify:
    resources:
        mem_mb = 32000,      # 请求 32GB 内存(Kraken2 数据库很大)
        runtime = 240        # 请求 4 小时
    threads: 8
    ...


9. 速查表

常用命令

命令 作用
snakemake -n 预演模式(不执行,只看会做什么)
snakemake -j 8 正式运行,最多 8 个核心并行
snakemake -j 8 -k 某步失败后继续跑其他不依赖它的步骤
snakemake --rerun-incomplete -j 8 断点续跑(重跑未完成的)
snakemake --forceall -j 8 强制全部重跑
snakemake --forcerun rule_name -j 8 强制重跑指定 rule
snakemake --until rule_name -j 8 只跑到指定 rule
snakemake --dag \| dot -Tpng > dag.png 生成完整 DAG 图
snakemake --rulegraph \| dot -Tpng > rg.png 生成 rule 关系图
snakemake --summary 查看所有输出文件状态
snakemake --list 列出所有 rule
snakemake --lint 检查 Snakefile 语法和最佳实践
snakemake --delete-all-output 删除所有输出文件
snakemake --use-conda -j 8 自动创建并使用 conda 环境
snakemake --configfile alt.yaml -j 8 指定配置文件
snakemake --report report.html 生成运行报告

语法速查

# === 1. Rule 基本结构 ===
rule rule_name:
    input:   "输入文件"
    output:  "输出文件"
    params:  extra = "额外参数"
    threads: 4
    log:     "日志文件"
    benchmark: "性能记录文件"
    conda:   "envs/env.yaml"
    resources: mem_mb = 8000
    shell:   "命令 {input} {output} {params.extra} {threads}"

# === 2. 通配符 ===
"{sample}"                    # 在 input/output 中直接用
"{wildcards.sample}"          # 在 shell 中引用通配符的值

# === 3. expand() 展开 ===
expand("{sample}_{read}.fq", sample=["A","B"], read=["R1","R2"])
# 结果: ["A_R1.fq", "A_R2.fq", "B_R1.fq", "B_R2.fq"]

# === 4. temp() 临时文件 ===
output: temp("intermediate.bam")   # 下游用完后自动删除

# === 5. protected() 保护文件 ===
output: protected("final.bam")     # 防止被误删

# === 6. directory() 输出目录 ===
output: directory("results/fastqc/")  # 输出是整个目录

# === 7. config 引用 ===
configfile: "config.yaml"
config["key"]                 # 读取配置值
config["nested"]["key"]       # 读取嵌套配置

# === 8. 条件逻辑 ===
rule all:
    input:
        "required.txt",
        "optional.txt" if config.get("run_optional") else []

# === 9. Python 函数作为 input ===
def get_input(wildcards):
    return f"data/{wildcards.sample}_R1.fastq.gz"

rule example:
    input: get_input
    ...

# === 10. run 块(Python 代码替代 shell) ===
rule merge:
    input: expand("results/{s}.txt", s=SAMPLES)
    output: "merged.txt"
    run:
        with open(output[0], "w") as out:
            for f in input:
                out.write(open(f).read())

10. 延伸学习资源

资源 地址 说明
Snakemake 官方文档 https://snakemake.readthedocs.io 最权威的参考
Snakemake 官方教程 https://snakemake.readthedocs.io/en/stable/tutorial/tutorial.html 官方入门教程
Snakemake Workflow Catalog https://snakemake.github.io/snakemake-workflow-catalog/ 社区共享流程
Snakemake Wrappers https://snakemake-wrappers.readthedocs.io 预写好的 rule 包装器
知识库 09_流程管理工具 本仓库 knowledge_base/ Conda/Snakemake/Git 基础
Snakemake GitHub https://github.com/snakemake/snakemake 源码和 Issues
Bioinformatics Workbook https://bioinformaticsworkbook.org 生信实操教程

推荐学习路径

第1天: 读完本文档,写出第一个 rule,理解 input/output/shell
第2天: 多规则串联 + 通配符处理多样本
第3天: 加入 config.yaml + log + benchmark
第4天: 尝试 --use-conda + DAG 可视化
第5天: 把你的 T2D 项目 shell 脚本改写成 Snakefile

面试话术

"我之前的项目用编号 shell 脚本管理流程,能跑通但不方便断点续跑和扩展。后来学了 Snakemake,把流程改写成 Snakefile,好处是:1)加新样本只改 config.yaml 不用动代码;2)-j 8 自动并行不怕写错;3)中途失败 --rerun-incomplete 自动续跑;4)--dag 一键生成流程图放到论文里。"


自我审核清单

  • [x] 一句话说明 ✓
  • [x] 为什么要学(shell vs 流程工具对比) ✓
  • [x] 核心概念白话版(Rule/input/output/wildcard/DAG/config) ✓
  • [x] 环境安装(conda + pip + graphviz) ✓
  • [x] 实操教程 7 个子模块全覆盖 ✓
  • [x] 完整宏基因组 Snakefile 模板(对应 T2D 项目) ✓
  • [x] Snakemake vs Nextflow 对比表 ✓
  • [x] 常见报错 7 个(超过要求的 6 个) ✓
  • [x] 速查表(命令 + 语法) ✓
  • [x] 延伸学习资源 + 学习路径 ✓
  • [x] 所有代码有中文注释 ✓
  • [x] 白话解释所有概念 ✓
  • [x] 字数约 4500 字 ✓