跳转至

Nextflow 流程管理实战

1. 一句话说明

Nextflow 是数据驱动的流程管理引擎,用 Channel(数据流)连接 Process(处理步骤),天然支持大规模并行和容器化部署,是工业级生信 pipeline 的首选框架。


2. Nextflow 是什么

白话解释

想象一条工厂流水线:原材料从一端放入,经过多个加工站,最终产出成品。Nextflow 就是这条流水线的控制系统——你只需要定义每个加工站(Process)做什么,数据(Channel)会自动在站之间流动,系统负责调度、并行、容错。

与 Snakemake 的本质区别: - Snakemake 是目标驱动(从最终文件倒推需要哪些步骤) - Nextflow 是数据驱动(数据流入自动触发下游处理)

DSL2 语法

Nextflow 从 v20.07 开始默认使用 DSL2 语法(类似写函数模块化编程):

// DSL2 的核心改进:
// 1. process 定义一次,可以多次调用(像函数一样复用)
// 2. workflow 块统一编排流程逻辑
// 3. 支持 module 导入(类似 Python 的 import)

nextflow.enable.dsl = 2  // 启用 DSL2(v22.03+ 默认开启)

3. 核心概念白话版

3.1 Process(加工站)

// Process = 流程中的一个处理步骤
// 白话:工厂里的一个加工站,有进料口(input)、出料口(output)、加工程序(script)

process FASTQC {
    // 定义容器环境(可选)
    container 'biocontainers/fastqc:0.12.1'

    input:
    path reads    // 进料:接收一个文件路径

    output:
    path "*.html"  // 出料:产出 HTML 报告
    path "*.zip"   // 出料:产出 zip 数据

    script:
    // 加工程序:实际执行的命令
    """
    fastqc ${reads} --threads ${task.cpus}
    """
}

3.2 Channel(传送带)

// Channel = 数据在 process 之间流动的管道
// 白话:工厂里连接各加工站的传送带,数据放上去自动流向下一站

// 从文件创建 channel
reads_ch = Channel.fromFilePairs("data/*_{R1,R2}.fastq.gz")
// 结果:[sample_id, [R1.fastq.gz, R2.fastq.gz]] 的元组流

// 从列表创建 channel
samples_ch = Channel.of('sample1', 'sample2', 'sample3')

// 从 CSV/TSV 创建 channel(生信最常用)
Channel.fromPath("samplesheet.csv")
    .splitCsv(header: true)       // 按 CSV 解析,第一行为表头
    .map { row -> 
        [row.sample, file(row.fastq_1), file(row.fastq_2)]
    }

3.3 Workflow(总控室)

// Workflow = 把 process 串联起来的编排逻辑
// 白话:工厂总控室,决定哪个加工站接哪个,数据怎么流

workflow {
    // 创建输入 channel
    reads_ch = Channel.fromFilePairs(params.reads)

    // 调用 process,像调用函数一样
    FASTQC(reads_ch)              // 第一步:质控
    TRIMGALORE(reads_ch)          // 第二步:修剪(可以和 FASTQC 并行!)
    MAPPING(TRIMGALORE.out.reads) // 第三步:比对(依赖修剪结果)
}

3.4 Operator(数据变换器)

// Operator = 对 channel 中的数据进行变换的操作符
// 白话:传送带上的分拣/合并/过滤装置

reads_ch
    .filter { it[1].size() > 1000 }   // 过滤:只保留大于1000bp的
    .map { sample, files -> [sample, files[0]] }  // 变换:只取R1
    .combine(reference_ch)              // 组合:和参考基因组配对
    .groupTuple()                       // 分组:按sample分组
    .collect()                          // 收集:等所有数据到齐

4. 安装配置

4.1 安装 Nextflow

# 方法1:curl 一键安装(推荐,需要 Java 11+)
curl -s https://get.nextflow.io | bash    # 下载 nextflow 可执行文件
sudo mv nextflow /usr/local/bin/          # 移到系统路径

# 方法2:conda 安装
conda install -c bioconda nextflow        # bioconda 渠道安装

# 方法3:手动指定版本
export NXF_VER=26.04.0                    # 指定版本号
curl -s https://get.nextflow.io | bash    # 下载指定版本

# 验证安装
nextflow -version                          # 查看版本(当前最新 v26.04.0)

4.2 Java 环境

# Nextflow 依赖 Java 11+(推荐 Java 17)
sudo apt install openjdk-17-jre-headless   # Ubuntu 安装 Java 17
java -version                               # 验证 Java 版本

# 如果用 conda,Java 会自动装好

4.3 配置文件 nextflow.config

// nextflow.config —— 放在项目根目录,全局配置

// 执行参数
params {
    reads = "data/*_{R1,R2}.fastq.gz"   // 输入文件模式
    outdir = "results"                   // 输出目录
    genome = "GRCh38"                    // 参考基因组
}

// 进程默认资源
process {
    cpus = 4                             // 默认CPU数
    memory = '8 GB'                      // 默认内存
    time = '2h'                          // 默认时间限制

    // 针对特定 process 设置资源
    withName: 'MAPPING' {
        cpus = 16
        memory = '32 GB'
    }
}

// 执行引擎(本地/集群/云)
profiles {
    local {
        executor.name = 'local'          // 本地执行
        executor.cpus = 8                // 最多用8核
    }
    slurm {
        executor.name = 'slurm'          // SLURM 集群
        queue = 'normal'                 // 队列名
    }
    docker {
        docker.enabled = true            // 启用 Docker
    }
    singularity {
        singularity.enabled = true       // 启用 Singularity(HPC常用)
    }
}

5. 实操教程

5.1 第一个 Pipeline

#!/usr/bin/env nextflow
// 文件名:hello.nf
// 功能:最简单的 Nextflow pipeline —— 打招呼

nextflow.enable.dsl = 2   // 启用 DSL2 语法

// 定义一个 process:接收名字,输出问候语
process GREET {
    input:
    val name       // 输入:一个字符串值

    output:
    stdout         // 输出:标准输出

    script:
    """
    echo "Hello, ${name}! Welcome to Nextflow."
    """
}

// workflow 编排
workflow {
    // 创建一个包含多个名字的 channel
    names_ch = Channel.of('Nextflow', 'Bioinformatics', 'World')

    // 调用 process(3个名字会自动并行处理!)
    GREET(names_ch)

    // 打印结果
    GREET.out.view()
}
# 运行
nextflow run hello.nf             # 执行 pipeline
# 输出会显示3个并行任务的结果

nextflow run hello.nf -resume     # 断点续跑(已完成的不会重跑)

5.2 多步串联 Pipeline

#!/usr/bin/env nextflow
// 文件名:qc_pipeline.nf
// 功能:质控 → 修剪 → 再质控 的完整流程

nextflow.enable.dsl = 2

// 参数定义(命令行可覆盖)
params.reads = "data/*_{R1,R2}.fastq.gz"
params.outdir = "results"

// 步骤1:FastQC 原始数据质控
process FASTQC_RAW {
    tag "${sample_id}"               // 任务标签,日志里显示样本名
    publishDir "${params.outdir}/fastqc_raw"  // 结果发布目录

    input:
    tuple val(sample_id), path(reads)   // 输入:[样本名, [R1, R2]]

    output:
    path "*.html"                       // 输出:质控报告
    path "*.zip"

    script:
    """
    fastqc ${reads} --threads ${task.cpus}
    """
}

// 步骤2:Fastp 修剪
process FASTP {
    tag "${sample_id}"
    publishDir "${params.outdir}/trimmed"

    input:
    tuple val(sample_id), path(reads)

    output:
    tuple val(sample_id), path("*_trimmed_{R1,R2}.fastq.gz"), emit: reads  // emit 命名输出
    path "*.json", emit: report

    script:
    """
    fastp \\
        -i ${reads[0]} -I ${reads[1]} \\
        -o ${sample_id}_trimmed_R1.fastq.gz \\
        -O ${sample_id}_trimmed_R2.fastq.gz \\
        --json ${sample_id}_fastp.json \\
        --thread ${task.cpus}
    """
}

// 步骤3:FastQC 修剪后质控
process FASTQC_TRIMMED {
    tag "${sample_id}"
    publishDir "${params.outdir}/fastqc_trimmed"

    input:
    tuple val(sample_id), path(reads)

    output:
    path "*.html"
    path "*.zip"

    script:
    """
    fastqc ${reads} --threads ${task.cpus}
    """
}

// Workflow 编排:串联三步
workflow {
    // 从文件对创建 channel
    reads_ch = Channel.fromFilePairs(params.reads)

    // 步骤串联
    FASTQC_RAW(reads_ch)          // 原始质控
    FASTP(reads_ch)               // 修剪
    FASTQC_TRIMMED(FASTP.out.reads)  // 修剪后质控(依赖上一步输出)
}
# 运行多步 pipeline
nextflow run qc_pipeline.nf --reads "data/*_{R1,R2}.fastq.gz" --outdir results

# 指定CPU资源
nextflow run qc_pipeline.nf -process.cpus 8

5.3 多样本自动并行

// Nextflow 天然支持并行!每个样本独立流入 channel,自动并行处理
// 不需要额外设置,只要 channel 里有多个元素,就会自动并行

// 方式1:fromFilePairs 自动发现多样本
Channel.fromFilePairs("data/*_{R1,R2}.fastq.gz")
// 自动产生:[sampleA, [A_R1.fq.gz, A_R2.fq.gz]]
//           [sampleB, [B_R1.fq.gz, B_R2.fq.gz]]
//           [sampleC, [C_R1.fq.gz, C_R2.fq.gz]]
// 三个样本同时进入 process,并行执行!

// 方式2:samplesheet 方式(nf-core 标准)
Channel.fromPath(params.input)
    .splitCsv(header: true, sep: ',')
    .map { row ->
        def meta = [id: row.sample, single_end: row.single_end.toBoolean()]
        def reads = row.single_end.toBoolean() ?
            [file(row.fastq_1)] :
            [file(row.fastq_1), file(row.fastq_2)]
        [meta, reads]
    }
# samplesheet.csv 示例
sample,fastq_1,fastq_2,single_end
sampleA,data/A_R1.fastq.gz,data/A_R2.fastq.gz,false
sampleB,data/B_R1.fastq.gz,data/B_R2.fastq.gz,false
sampleC,data/C_R1.fastq.gz,data/C_R2.fastq.gz,false

5.4 使用容器

// 方式1:process 级别指定容器
process FASTQC {
    container 'biocontainers/fastqc:0.12.1'  // Docker 镜像
    // 或:container 'https://depot.galaxyproject.org/singularity/fastqc:0.12.1'

    input:
    path reads
    output:
    path "*.html"
    script:
    """
    fastqc ${reads}
    """
}

// 方式2:nextflow.config 全局配置
// docker {
//     enabled = true
//     runOptions = '-u $(id -u):$(id -g)'  // 保持文件权限
// }

// 方式3:Singularity(HPC 集群推荐)
// singularity {
//     enabled = true
//     autoMounts = true        // 自动挂载路径
//     cacheDir = '/scratch/singularity_cache'  // 缓存目录
// }
# 运行时选择容器引擎
nextflow run pipeline.nf -profile docker        # 使用 Docker
nextflow run pipeline.nf -profile singularity   # 使用 Singularity

5.5 使用 nf-core 现成 Pipeline

# 安装 nf-core 工具
pip install nf-core              # 安装 nf-core CLI

# 查看可用 pipeline(目前有 149 个!)
nf-core list                     # 列出所有 pipeline

# 下载并运行 RNA-seq pipeline
nf-core download rnaseq          # 下载 pipeline + 容器
nextflow run nf-core/rnaseq \
    --input samplesheet.csv \    # 样本表
    --genome GRCh38 \            # 参考基因组
    --outdir results \           # 输出目录
    -profile singularity \       # 容器引擎
    -r 3.14.0                    # 指定版本

# 运行宏基因组相关 pipeline
nextflow run nf-core/ampliseq \  # 16S/ITS 扩增子分析
    --input samplesheet.csv \
    --FW_primer "GTGYCAGCMGCCGCGGTAA" \
    --RV_primer "GGACTACNVGGGTWTCTAAT" \
    -profile docker

# 体细胞/胚系变异检测
nextflow run nf-core/sarek \     # WGS/WES 变异检测
    --input samplesheet.csv \
    --genome GATK.GRCh38 \
    --tools mutect2,strelka \
    -profile docker

6. nf-core 社区

什么是 nf-core

nf-core 是 Nextflow 社区维护的标准化生信 pipeline 集合,目前有 149 个经过同行审阅的 pipeline

生信常用 nf-core Pipeline

Pipeline用途面试关键词
nf-core/rnaseqRNA-seq 分析全流程STAR/HISAT2 + featureCounts + DESeq2
nf-core/sarekWGS/WES 变异检测GATK4 + Mutect2 + Strelka
nf-core/ampliseq16S/ITS 扩增子DADA2 + QIIME2
nf-core/mag宏基因组组装与binningMEGAHIT + MetaBAT2 + GTDB-Tk
nf-core/taxprofiler宏基因组物种分类Kraken2 + Bracken + MetaPhlAn
nf-core/fetchngs从 SRA 下载数据SRA-tools 自动化
nf-core/chipseqChIP-seq 分析BWA + MACS2 + DiffBind
nf-core/scrnaseq单细胞 RNA-seqSTARsolo + Cellranger
nf-core/methylseq甲基化测序Bismark + MethylDackel
nf-core/nanoseqNanopore 长读长Minimap2 + NanoPlot

nf-core 标准化优势

  1. 标准化输入格式:统一的 samplesheet.csv
  2. 完善的测试:每个 PR 自动跑 CI/CD 测试
  3. 多配置支持:Docker/Singularity/Conda 一键切换
  4. 文档完善:每个参数有说明、有示例
  5. 版本管理:语义化版本,可复现

7. Nextflow vs Snakemake 详细对比

维度NextflowSnakemake
编程语言Groovy(JVM 语言)Python
设计哲学数据驱动(dataflow)目标驱动(从输出倒推)
核心抽象Process + ChannelRule + 文件依赖
依赖判断Channel 数据流传递文件时间戳
并行模型天然异步,Channel 自动调度DAG 调度,需设置 -j
语法门槛较高(需学 Groovy)较低(Python 语法)
容器支持一等公民,原生集成支持但非默认
云计算AWS Batch/Google Cloud 原生支持但配置复杂
HPC 集群SLURM/PBS/SGE 原生支持同样支持
社区生态nf-core(149 pipeline)Snakemake Catalog
工业使用制药/临床多(Seqera Platform)学术研究多
断点续跑-resume(基于任务哈希)--rerun-incomplete(基于文件)
模块复用DSL2 module + nf-core modulesWrapper + rules 导入
报告生成内置 HTML 执行报告需要额外配置
学习曲线陡峭但上限高平缓适合快速上手
适用场景大规模生产/临床/云端学术研究/中小项目
调试难度较高(异步执行)较低(顺序思维)
代表用户Broad Institute, Seqera LabsSnakemake 开发组, 各大学

选择建议

  • 选 Snakemake:学术研究、快速原型、团队熟悉 Python、中小规模项目
  • 选 Nextflow:工业生产、临床应用、云部署需求、需要 nf-core 现成流程

8. 面试怎么答

Q1:Nextflow 和 Snakemake 有什么区别?你怎么选?

"核心区别在于设计哲学:Nextflow 是数据驱动的,数据通过 Channel 流入 Process 自动触发执行;Snakemake 是目标驱动的,从最终文件倒推需要哪些 Rule。实际选择上,如果是临床/工业生产级 pipeline,或者需要云端大规模运行,我会选 Nextflow,因为它的容器集成、云计算支持更成熟,nf-core 社区有 149 个经过审阅的标准化 pipeline 可以直接用。如果是学术研究快速原型或者团队都熟悉 Python,Snakemake 上手更快。在该宏基因组项目中,我了解到 nf-core/mag 和 nf-core/taxprofiler 可以用 Nextflow 跑宏基因组的完整流程。"

Q2:解释 Nextflow 的 Channel 和 Process

"Process 是流程中的一个独立计算步骤,定义了输入、输出和执行脚本,类似工厂里的一个加工站。Channel 是连接 Process 之间的数据管道,数据放入 Channel 后会自动流向下游的 Process。这种设计的好处是天然支持并行——只要 Channel 里有多个数据元素,下游的 Process 就会自动并行处理它们,不需要额外配置。"

Q3:nf-core 是什么?有什么优势?

"nf-core 是一个社区驱动的标准化 Nextflow pipeline 集合,目前有 149 个 pipeline,覆盖 RNA-seq、变异检测、宏基因组等主流分析。它的优势在于:统一的 samplesheet 输入格式、完善的 CI/CD 测试、Docker/Singularity/Conda 多种运行方式、详细的文档和版本管理。对于生信工程师来说,很多分析不需要从头写,直接用 nf-core 的标准流程即可,既节省时间又保证可复现性。"

Q4:Nextflow 怎么实现断点续跑?

"Nextflow 通过 -resume 参数实现断点续跑。它的原理是对每个任务计算哈希值(基于输入数据、脚本内容、容器版本等),如果哈希值不变说明该任务不需要重跑,直接使用缓存的结果。相比 Snakemake 基于文件时间戳的机制,Nextflow 的哈希机制更准确,不会因为文件被 touch 而误判。"

Q5:如何在 HPC 集群上运行 Nextflow?

"在 nextflow.config 中配置 executor 为对应的调度器即可,比如 SLURM。可以设置队列、资源需求(CPU/内存/时间),Nextflow 会自动将每个 Process 作为独立作业提交。配合 Singularity 容器使用,可以在集群上实现完全可复现的分析。还可以用 profiles 机制,开发时用 local,上线时切到 slurm,一行参数切换。"


9. 速查表

# ===== 运行命令 =====
nextflow run main.nf                    # 运行 pipeline
nextflow run main.nf -resume            # 断点续跑
nextflow run main.nf -profile docker    # 指定 profile
nextflow run main.nf --reads "*.fq.gz"  # 传递参数(双横线)
nextflow run main.nf -with-report       # 生成 HTML 报告
nextflow run main.nf -with-timeline     # 生成时间线图
nextflow run main.nf -with-dag flow.png # 生成 DAG 流程图
nextflow run main.nf -bg                # 后台运行

# ===== nf-core =====
nf-core list                            # 列出所有 pipeline
nf-core launch rnaseq                   # 交互式配置参数
nf-core download rnaseq                 # 下载 pipeline + 容器
nextflow run nf-core/rnaseq -r 3.14.0   # 运行指定版本

# ===== 管理命令 =====
nextflow log                            # 查看运行历史
nextflow clean -f                       # 清理缓存
nextflow info                           # 系统信息
nextflow pull nf-core/rnaseq            # 更新 pipeline

# ===== 常用 Channel 工厂 =====
Channel.of(1, 2, 3)                     # 从值创建
Channel.fromPath("*.fastq.gz")          # 从文件路径
Channel.fromFilePairs("*_{R1,R2}.fq.gz") # 从文件对
Channel.fromSRA("PRJNA123456")          # 从 SRA 项目号

# ===== 常用 Operator =====
.map { }                                # 变换元素
.filter { }                             # 过滤
.collect()                              # 收集为列表
.groupTuple()                           # 按key分组
.combine(other_ch)                      # 笛卡尔积组合
.join(other_ch)                         # 按key合并(类似SQL JOIN)
.mix(other_ch)                          # 合并多个 channel
.flatten()                              # 展平嵌套
.first()                                # 取第一个
.count()                                # 计数

10. 延伸资源

资源说明
Nextflow 官方文档最权威的参考
nf-core149 个标准化 pipeline
Nextflow Training官方培训教程
Seqera Platform企业级管理平台(原 Nextflow Tower)
nf-core/mag宏基因组 MAGs pipeline
nf-core/taxprofiler宏基因组物种分类
Awesome Nextflow社区资源汇总

本文与 11_Snakemake流程管理实战.md 互补:Snakemake 篇侧重 Python 语法和学术场景,本篇侧重 Nextflow 的数据驱动模型、容器化和 nf-core 工业级应用。两篇对照阅读效果最佳。