跳转至

Nextflow DSL2 入门教程

最后更新:2026-05 | Nextflow 最新稳定版:v25.10.4(2026-02 发布),最新版 v26.04.0(2026-04 发布) 适合人群:生信工程师面试准备、宏基因组流程开发入门


一、什么是 Nextflow?为什么生信工程师必须学?

白话解释

你写过 Shell 脚本跑生信流程吗?比如先 fastp 质控,再 bowtie2 去宿主,再 kraken2 物种分类。你大概是这样写的:

# 一个典型的 Shell 生信脚本
fastp -i sample_R1.fq.gz -I sample_R2.fq.gz -o clean_R1.fq.gz -O clean_R2.fq.gz
bowtie2 -x human_ref -1 clean_R1.fq.gz -2 clean_R2.fq.gz -S aligned.sam
kraken2 --db k2db --paired clean_R1.fq.gz clean_R2.fq.gz --output result.txt

这样写有什么问题?

Shell 脚本的痛点Nextflow 怎么解决
跑到一半挂了,要从头来断点续传-resume),自动跳过已完成的步骤
100 个样本要手动写循环自动并行,Channel 自动分发样本
换台机器要改一堆路径参数化params),路径写在配置文件里
不知道哪步出了错自动日志,每个任务有独立工作目录和日志
想用 Docker 容器很麻烦原生支持 Docker / Singularity / Conda
在本机跑完想上集群一行配置切换本地 / SLURM / PBS / 云

一句话总结:Nextflow 就是把你的 Shell 脚本变成"工业级流水线"的工具。

面试中为什么重要?

现在主流的生信流程框架有两个:NextflowSnakemake。其中 nf-core 社区(基于 Nextflow)提供了 100+ 个开箱即用的标准流程(如 nf-core/ampliseq、nf-core/taxprofiler),很多公司和研究所直接用。面试中被问"你用什么做流程管理"时,能说 Nextflow 就是加分项。


二、DSL1 vs DSL2:DSL2 是现在的标准

Nextflow 有两个版本的语法:

对比项DSL1(旧语法)DSL2(新语法,现在的标准
Process 定义定义和调用绑在一起定义和调用分开(像函数一样)
代码复用复制粘贴include 导入模块
Channel 连接from / into直接传参数
多次使用同一个 process要起别名,很麻烦直接调用多次
状态已弃用(v24.04 后默认 DSL2)默认启用

结论:直接学 DSL2,不要浪费时间在 DSL1 上。

从 Nextflow 24.04 开始,DSL2 已经是默认语法。从 25.04 开始引入了"严格语法(strict syntax)"模式,对代码规范要求更高。本教程全部基于 DSL2。


三、安装 Nextflow

前提条件

  • Java 17 或更新版本(Nextflow 25.04 开始要求 Java 17+)
  • Bash 3.2+(Linux / macOS 都自带)

方式一:Conda 安装(推荐新手)

# 创建一个专门的 conda 环境(避免版本冲突)
conda create -n nextflow-env -c bioconda -c conda-forge nextflow   # 从 bioconda 安装 nextflow

# 激活环境
conda activate nextflow-env   # 切换到 nextflow 环境

# 验证安装
nextflow -version   # 查看版本号,应显示 25.10.4 或更新版本

提示:Conda 安装的版本可能不是最新的,但对入门完全够用。截至 2026-04,Bioconda 上最新为 v26.04.0。

方式二:直接安装(官方推荐)

# 下载 Nextflow 启动器脚本
curl -s https://get.nextflow.io | bash   # 一行命令下载 nextflow 可执行文件

# 移动到 PATH 中(这样任何地方都能用)
sudo mv nextflow /usr/local/bin/   # 把 nextflow 移到系统路径

# 验证
nextflow -version   # 检查版本

# 如果想安装特定版本(比如 25.10.4)
export NXF_VER=25.10.4   # 设置想要的版本号
curl -s https://get.nextflow.io | bash   # 会安装指定版本

验证 Java 版本

java -version   # 查看 Java 版本,需要 17 或更新

# 如果没有 Java 17,用 conda 安装
conda install -c conda-forge openjdk=17   # 安装 OpenJDK 17

四、核心概念白话解释

Nextflow 有 4 个核心概念,用"工厂"来类比最好懂:

1. Channel(数据通道)—— 类比"水管"

Channel 就是连接各个步骤的"水管",数据(比如 FASTQ 文件)从水管一端流入,从另一端流出给下一个步骤。

// 创建一个 channel,里面放 3 个文件路径
Channel.fromPath("data/*.fastq.gz")   // 从 data 目录找所有 .fastq.gz 文件,放进水管

特点: - 水管里的数据是异步的(不是一次全部到达,而是来一个处理一个) - 一根水管可以同时接到多个车间(自动分叉)

2. Process(处理步骤)—— 类比"工厂车间"

Process 就是一个独立的处理车间。它定义了:输入什么、输出什么、里面干什么活。

// 定义一个质控车间
process FASTP_QC {                          // 车间名字叫 FASTP_QC
    input:                                  // 输入:接收原始数据
    path reads                              // 一个文件路径

    output:                                 // 输出:产出清洗后的数据
    path "clean_*.fastq.gz"                 // 清洗后的文件

    script:                                 // 具体干活的命令
    """
    fastp -i ${reads} -o clean_${reads}     // 用 fastp 做质控
    """
}

3. Workflow(工作流)—— 类比"生产线"

Workflow 把多个车间串起来,变成完整的生产线。

workflow {                                  // 定义生产线
    reads_ch = Channel.fromPath("*.fastq.gz")   // 先把原料放进水管
    FASTP_QC(reads_ch)                      // 原料送进质控车间
    // 可以继续接更多车间...
}

4. Operator(操作符)—— 类比"阀门 / 分流器"

Operator 是装在水管上的"阀门",用来过滤、合并、拆分数据。

Channel.fromPath("*.fastq.gz")       // 从水管取文件
    .filter { it.name =~ /sample1/ } // 阀门:只放 sample1 相关的文件通过
    .view()                          // 查看水管里有什么(调试用)

概念关系图

原始数据 → [Channel 水管] → [Process 车间1] → [Channel 水管] → [Process 车间2] → 最终结果
                                    ↑                                ↑
                              Operator 阀门                    Operator 阀门
                           (过滤/合并/拆分)                (过滤/合并/拆分)

                        ←———————— Workflow 生产线 ————————→

五、Hello World 示例

创建一个文件 hello.nf

#!/usr/bin/env nextflow
// 文件名:hello.nf
// 功能:Nextflow Hello World 示例

// 定义一个 process,接收字符串并打印
process SAY_HELLO {
    input:
    val greeting                           // 输入:一个字符串值

    output:
    stdout                                 // 输出:标准输出(打印的内容)

    script:
    """
    echo "Hello, ${greeting}!"             // 打印问候语
    """
}

// 工作流:把数据送进 process
workflow {
    // 创建一个 channel,包含 3 个名字
    greetings_ch = Channel.of(             // of() 创建一个包含多个值的 channel
        "Nextflow",
        "Bioinformatics",
        "World"
    )
    // 把 channel 送进 SAY_HELLO process
    SAY_HELLO(greetings_ch)                // 调用 process(像调用函数一样)
    // 查看输出
    SAY_HELLO.out.view()                   // 打印 process 的输出结果
}

运行:

nextflow run hello.nf   # 运行 hello.nf 流程

预期输出:

N E X T F L O W  ~  version 25.10.4
Launching `hello.nf` [friendly_tesla] - revision: abc123
executor >  local (3)
[xx/yyyyyy] process > SAY_HELLO (1) [100%] 3 of 3 ✔

Hello, Nextflow!
Hello, Bioinformatics!
Hello, World!

注意:三个问候语的打印顺序可能不同,因为 Nextflow 是并行执行的。


六、第一个生信示例:用 Nextflow 跑 fastp 质控

准备工作

conda activate bioinfo                     # 激活你的生信环境(确保有 fastp)
fastp --version                            # 验证 fastp 已安装(当前最新 v1.3.3)

创建测试数据目录

mkdir -p nextflow_demo/data                # 创建项目目录
cd nextflow_demo                           # 进入项目目录
# 假设你在 data/ 目录下有一对双端测序文件:
# data/sample1_R1.fastq.gz
# data/sample1_R2.fastq.gz

编写流程文件 fastp_qc.nf

#!/usr/bin/env nextflow
// 文件名:fastp_qc.nf
// 功能:用 Nextflow 调用 fastp 对单样本做质控

// ===== 参数定义 =====
params.reads_r1 = "data/sample1_R1.fastq.gz"   // 正向读段文件路径(默认值)
params.reads_r2 = "data/sample1_R2.fastq.gz"   // 反向读段文件路径(默认值)
params.outdir   = "results"                     // 输出目录(默认值)

// ===== Process 定义 =====
process FASTP {
    // publishDir:把输出文件复制到指定目录(不然只在 work/ 下)
    publishDir "${params.outdir}/fastp", mode: 'copy'   // 结果复制到 results/fastp/

    input:
    path r1                                // 输入:正向读段文件
    path r2                                // 输入:反向读段文件

    output:
    path "clean_R1.fastq.gz", emit: clean_r1     // 输出:清洗后的正向读段
    path "clean_R2.fastq.gz", emit: clean_r2     // 输出:清洗后的反向读段
    path "fastp_report.html", emit: report       // 输出:质控报告
    path "fastp_report.json", emit: json         // 输出:JSON 格式报告

    script:
    """
    fastp \\
        -i ${r1} \\
        -I ${r2} \\
        -o clean_R1.fastq.gz \\
        -O clean_R2.fastq.gz \\
        -h fastp_report.html \\
        -j fastp_report.json \\
        --thread 4 \\
        --qualified_quality_phred 20 \\
        --length_required 50
    """
    // -i/-I: 输入的正向/反向读段
    // -o/-O: 输出的正向/反向读段
    // -h/-j: HTML 和 JSON 格式的报告
    // --thread: 线程数
    // --qualified_quality_phred 20: 质量值低于 20 的碱基视为低质量
    // --length_required 50: 过滤掉长度小于 50bp 的读段
}

// ===== Workflow =====
workflow {
    // 创建输入 channel
    r1_ch = Channel.fromPath(params.reads_r1)   // 把正向读段路径放进 channel
    r2_ch = Channel.fromPath(params.reads_r2)   // 把反向读段路径放进 channel

    // 调用 FASTP process
    FASTP(r1_ch, r2_ch)                         // 把两个 channel 送进 FASTP 车间

    // 打印输出文件路径(调试用)
    FASTP.out.clean_r1.view { "Clean R1: $it" } // 查看清洗后的 R1 路径
    FASTP.out.report.view { "Report: $it" }     // 查看报告路径
}

运行

# 基本运行
nextflow run fastp_qc.nf                       # 使用默认参数运行

# 指定自定义输入文件
nextflow run fastp_qc.nf \
    --reads_r1 data/my_R1.fq.gz \
    --reads_r2 data/my_R2.fq.gz \
    --outdir my_results                        # 用自定义参数运行

# 断点续传(之前跑过的步骤自动跳过)
nextflow run fastp_qc.nf -resume               # 加 -resume 跳过已完成的步骤

七、Channel 类型详解

Nextflow 有两种 Channel,理解它们的区别非常重要:

1. Queue Channel(队列通道)

  • 数据像"排队"一样,用一次就消失(先进先出)
  • 适合:样本文件、需要逐个处理的数据
// 创建 queue channel 的几种方式
Channel.of(1, 2, 3)                        // 从值列表创建
Channel.fromPath("data/*.fastq.gz")        // 从文件路径创建
Channel.fromFilePairs("data/*_{R1,R2}.fastq.gz")  // 从配对文件创建(常用!)
Channel.fromSRA("SRP012345")               // 从 SRA 数据库创建

2. Value Channel(值通道)

  • 数据可以被反复使用,不会消失
  • 适合:参考基因组路径、数据库路径、参数值
// 创建 value channel
Channel.value("hg38")                      // 一个字符串值
Channel.value(file("/path/to/reference.fa"))  // 一个文件路径

// 也可以用 Channel.of() 创建单值(自动变成 value channel)
ref_ch = Channel.value(file(params.reference))  // 参考基因组可以重复使用

区别对比

特性Queue ChannelValue Channel
数据消耗用一次就没了可以反复用
典型用途样本文件参考基因组、数据库路径
创建方式fromPath, fromFilePairs, of(多值)value(), of(单值)
能否接多个 process可以(DSL2 自动 fork)可以

实际场景

// 场景:10 个样本都要比对到同一个参考基因组
samples_ch = Channel.fromFilePairs("data/*_{1,2}.fq.gz")  // 队列:10 个样本排队
ref_ch     = Channel.value(file("ref/hg38.fa"))            // 值:参考基因组反复用

process ALIGN {
    input:
    tuple val(sample_id), path(reads)      // 从队列取一个样本
    path reference                          // 从值通道取参考基因组(每次都能取到)

    script:
    """
    bowtie2 -x ${reference} -1 ${reads[0]} -2 ${reads[1]} -S ${sample_id}.sam
    """
}

workflow {
    ALIGN(samples_ch, ref_ch)              // 10 个样本并行比对,ref_ch 被重复使用
}

八、常见报错与解决

报错 1:Command error: java.lang.RuntimeException: No such variable

原因:在 script: 块的 """...""" 中,$ 符号会被 Nextflow 当成变量插值。如果你想用 Shell 自己的变量(比如 $line),Nextflow 会报错说找不到这个变量。

解决:Shell 变量前面用 \$ 转义,或者用 shell: 代替 script:

// 方法一:转义 Shell 变量
script:
"""
for line in \$(cat input.txt); do    // 注意 \$ 转义
    echo \$line
done
"""

// 方法二:用 shell 块(Nextflow 变量用 !{} 而不是 ${})
shell:
'''
for line in $(cat input.txt); do     // Shell 变量正常写 $
    echo !{params.prefix}_$line      // Nextflow 变量用 !{}
done
'''

报错 2:Process requirement exceeds available memory

原因:Process 默认申请的内存 / CPU 超过了你的机器资源。

解决:在 process 中指定资源。

process MY_TASK {
    cpus 2                                 // 限制使用 2 个 CPU
    memory '4 GB'                          // 限制使用 4GB 内存

    script:
    """
    some_tool --threads ${task.cpus}       // 用 task.cpus 获取分配的 CPU 数
    """
}

报错 3:Missing output file(s) expected by process

原因:Process 定义了 output: 要产出某些文件,但脚本实际没有生成这些文件。

解决: 1. 检查 output: 中的文件名模式和脚本中实际生成的文件名是否一致 2. 去 work/xx/yyyyyy/ 目录看看实际生成了什么文件

# 查看出错任务的工作目录
ls work/xx/yyyyyy/                         # 看看里面有哪些文件
cat work/xx/yyyyyy/.command.log            # 查看任务日志
cat work/xx/yyyyyy/.command.err            # 查看错误信息

报错 4:Not a valid project name or Git URL

原因:运行 nextflow run xxx 时,Nextflow 把你的文件名当成了 GitHub 仓库名。

解决:确保文件名正确,加上 ./ 前缀。

nextflow run ./my_pipeline.nf              # 加 ./ 明确指定本地文件

九、速查表:核心语法速查

Channel 创建

语法说明示例
Channel.of(...)从值列表创建Channel.of(1, 2, 3)
Channel.fromPath(...)从文件路径创建Channel.fromPath("data/*.fq.gz")
Channel.fromFilePairs(...)从配对文件创建Channel.fromFilePairs("*_{R1,R2}.fq.gz")
Channel.value(...)创建可重复使用的值Channel.value(file("ref.fa"))
Channel.empty()创建空 channelChannel.empty()

Process 结构

process 名字 {
    // 指令(可选)
    publishDir "输出目录", mode: 'copy'    // 发布结果文件
    cpus 4                                 // CPU 数
    memory '8 GB'                          // 内存限制
    conda 'bioconda::fastp=1.3.3'          // Conda 依赖
    container 'biocontainers/fastp:1.3.3'  // Docker 容器

    input:                                 // 输入声明
    path reads                             // 文件类型输入
    val sample_id                          // 值类型输入
    tuple val(id), path(files)             // 组合类型输入

    output:                                // 输出声明
    path "*.bam", emit: bam_files          // 文件输出(带名称)
    stdout emit: log_output                // 标准输出

    script:                                // 要执行的命令
    """
    some_command ${reads}
    """
}

常用 Operator

Operator说明示例
.view()查看 channel 内容(调试)ch.view()
.map { }转换每个元素ch.map { it.toUpperCase() }
.filter { }过滤元素ch.filter { it.size() > 100 }
.collect()收集所有元素为一个列表ch.collect()
.flatten()展平嵌套列表ch.flatten()
.mix(other_ch)合并两个 channelch1.mix(ch2)
.join(other_ch)按 key 合并ch1.join(ch2)
.groupTuple()按 key 分组ch.groupTuple()
.first()取第一个元素ch.first()
.ifEmpty(value)channel 为空时给默认值ch.ifEmpty("none")

运行命令

命令说明
nextflow run pipeline.nf运行流程
nextflow run pipeline.nf -resume断点续传
nextflow run pipeline.nf --param value传递参数
nextflow log查看运行历史
nextflow clean -f清理 work 目录
nextflow info查看系统信息

配置文件 nextflow.config

// nextflow.config —— 全局配置文件
params {
    reads   = "data/*_{R1,R2}.fastq.gz"    // 默认输入路径
    outdir  = "results"                     // 默认输出路径
    threads = 4                             // 默认线程数
}

process {
    cpus   = 2                              // 所有 process 默认 2 核
    memory = '4 GB'                         // 所有 process 默认 4GB 内存
}

// 不同运行环境的配置
profiles {
    local {                                 // 本地运行
        process.executor = 'local'
    }
    slurm {                                 // SLURM 集群
        process.executor = 'slurm'
        process.queue    = 'normal'
    }
}

学习资源


下一篇561_Shell脚本改写为Nextflow流程 —— 手把手教你把宏基因组分析的 Shell 脚本改写成 Nextflow DSL2 流程。