Nextflow DSL2 入门教程¶
最后更新:2026-05 | Nextflow 最新稳定版:v25.10.4(2026-02 发布),最新版 v26.04.0(2026-04 发布) 适合人群:生信工程师面试准备、宏基因组流程开发入门
一、什么是 Nextflow?为什么生信工程师必须学?¶
白话解释¶
你写过 Shell 脚本跑生信流程吗?比如先 fastp 质控,再 bowtie2 去宿主,再 kraken2 物种分类。你大概是这样写的:
# 一个典型的 Shell 生信脚本
fastp -i sample_R1.fq.gz -I sample_R2.fq.gz -o clean_R1.fq.gz -O clean_R2.fq.gz
bowtie2 -x human_ref -1 clean_R1.fq.gz -2 clean_R2.fq.gz -S aligned.sam
kraken2 --db k2db --paired clean_R1.fq.gz clean_R2.fq.gz --output result.txt
这样写有什么问题?
| Shell 脚本的痛点 | Nextflow 怎么解决 |
|---|---|
| 跑到一半挂了,要从头来 | 断点续传(-resume),自动跳过已完成的步骤 |
| 100 个样本要手动写循环 | 自动并行,Channel 自动分发样本 |
| 换台机器要改一堆路径 | 参数化(params),路径写在配置文件里 |
| 不知道哪步出了错 | 自动日志,每个任务有独立工作目录和日志 |
| 想用 Docker 容器很麻烦 | 原生支持 Docker / Singularity / Conda |
| 在本机跑完想上集群 | 一行配置切换本地 / SLURM / PBS / 云 |
一句话总结:Nextflow 就是把你的 Shell 脚本变成"工业级流水线"的工具。
面试中为什么重要?¶
现在主流的生信流程框架有两个:Nextflow 和 Snakemake。其中 nf-core 社区(基于 Nextflow)提供了 100+ 个开箱即用的标准流程(如 nf-core/ampliseq、nf-core/taxprofiler),很多公司和研究所直接用。面试中被问"你用什么做流程管理"时,能说 Nextflow 就是加分项。
二、DSL1 vs DSL2:DSL2 是现在的标准¶
Nextflow 有两个版本的语法:
| 对比项 | DSL1(旧语法) | DSL2(新语法,现在的标准) |
|---|---|---|
| Process 定义 | 定义和调用绑在一起 | 定义和调用分开(像函数一样) |
| 代码复用 | 复制粘贴 | include 导入模块 |
| Channel 连接 | 用 from / into | 直接传参数 |
| 多次使用同一个 process | 要起别名,很麻烦 | 直接调用多次 |
| 状态 | 已弃用(v24.04 后默认 DSL2) | 默认启用 |
结论:直接学 DSL2,不要浪费时间在 DSL1 上。
从 Nextflow 24.04 开始,DSL2 已经是默认语法。从 25.04 开始引入了"严格语法(strict syntax)"模式,对代码规范要求更高。本教程全部基于 DSL2。
三、安装 Nextflow¶
前提条件¶
- Java 17 或更新版本(Nextflow 25.04 开始要求 Java 17+)
- Bash 3.2+(Linux / macOS 都自带)
方式一:Conda 安装(推荐新手)¶
# 创建一个专门的 conda 环境(避免版本冲突)
conda create -n nextflow-env -c bioconda -c conda-forge nextflow # 从 bioconda 安装 nextflow
# 激活环境
conda activate nextflow-env # 切换到 nextflow 环境
# 验证安装
nextflow -version # 查看版本号,应显示 25.10.4 或更新版本
提示:Conda 安装的版本可能不是最新的,但对入门完全够用。截至 2026-04,Bioconda 上最新为 v26.04.0。
方式二:直接安装(官方推荐)¶
# 下载 Nextflow 启动器脚本
curl -s https://get.nextflow.io | bash # 一行命令下载 nextflow 可执行文件
# 移动到 PATH 中(这样任何地方都能用)
sudo mv nextflow /usr/local/bin/ # 把 nextflow 移到系统路径
# 验证
nextflow -version # 检查版本
# 如果想安装特定版本(比如 25.10.4)
export NXF_VER=25.10.4 # 设置想要的版本号
curl -s https://get.nextflow.io | bash # 会安装指定版本
验证 Java 版本¶
java -version # 查看 Java 版本,需要 17 或更新
# 如果没有 Java 17,用 conda 安装
conda install -c conda-forge openjdk=17 # 安装 OpenJDK 17
四、核心概念白话解释¶
Nextflow 有 4 个核心概念,用"工厂"来类比最好懂:
1. Channel(数据通道)—— 类比"水管"¶
Channel 就是连接各个步骤的"水管",数据(比如 FASTQ 文件)从水管一端流入,从另一端流出给下一个步骤。
特点: - 水管里的数据是异步的(不是一次全部到达,而是来一个处理一个) - 一根水管可以同时接到多个车间(自动分叉)
2. Process(处理步骤)—— 类比"工厂车间"¶
Process 就是一个独立的处理车间。它定义了:输入什么、输出什么、里面干什么活。
// 定义一个质控车间
process FASTP_QC { // 车间名字叫 FASTP_QC
input: // 输入:接收原始数据
path reads // 一个文件路径
output: // 输出:产出清洗后的数据
path "clean_*.fastq.gz" // 清洗后的文件
script: // 具体干活的命令
"""
fastp -i ${reads} -o clean_${reads} // 用 fastp 做质控
"""
}
3. Workflow(工作流)—— 类比"生产线"¶
Workflow 把多个车间串起来,变成完整的生产线。
workflow { // 定义生产线
reads_ch = Channel.fromPath("*.fastq.gz") // 先把原料放进水管
FASTP_QC(reads_ch) // 原料送进质控车间
// 可以继续接更多车间...
}
4. Operator(操作符)—— 类比"阀门 / 分流器"¶
Operator 是装在水管上的"阀门",用来过滤、合并、拆分数据。
Channel.fromPath("*.fastq.gz") // 从水管取文件
.filter { it.name =~ /sample1/ } // 阀门:只放 sample1 相关的文件通过
.view() // 查看水管里有什么(调试用)
概念关系图¶
原始数据 → [Channel 水管] → [Process 车间1] → [Channel 水管] → [Process 车间2] → 最终结果
↑ ↑
Operator 阀门 Operator 阀门
(过滤/合并/拆分) (过滤/合并/拆分)
←———————— Workflow 生产线 ————————→
五、Hello World 示例¶
创建一个文件 hello.nf:
#!/usr/bin/env nextflow
// 文件名:hello.nf
// 功能:Nextflow Hello World 示例
// 定义一个 process,接收字符串并打印
process SAY_HELLO {
input:
val greeting // 输入:一个字符串值
output:
stdout // 输出:标准输出(打印的内容)
script:
"""
echo "Hello, ${greeting}!" // 打印问候语
"""
}
// 工作流:把数据送进 process
workflow {
// 创建一个 channel,包含 3 个名字
greetings_ch = Channel.of( // of() 创建一个包含多个值的 channel
"Nextflow",
"Bioinformatics",
"World"
)
// 把 channel 送进 SAY_HELLO process
SAY_HELLO(greetings_ch) // 调用 process(像调用函数一样)
// 查看输出
SAY_HELLO.out.view() // 打印 process 的输出结果
}
运行:
预期输出:
N E X T F L O W ~ version 25.10.4
Launching `hello.nf` [friendly_tesla] - revision: abc123
executor > local (3)
[xx/yyyyyy] process > SAY_HELLO (1) [100%] 3 of 3 ✔
Hello, Nextflow!
Hello, Bioinformatics!
Hello, World!
注意:三个问候语的打印顺序可能不同,因为 Nextflow 是并行执行的。
六、第一个生信示例:用 Nextflow 跑 fastp 质控¶
准备工作¶
创建测试数据目录¶
mkdir -p nextflow_demo/data # 创建项目目录
cd nextflow_demo # 进入项目目录
# 假设你在 data/ 目录下有一对双端测序文件:
# data/sample1_R1.fastq.gz
# data/sample1_R2.fastq.gz
编写流程文件 fastp_qc.nf¶
#!/usr/bin/env nextflow
// 文件名:fastp_qc.nf
// 功能:用 Nextflow 调用 fastp 对单样本做质控
// ===== 参数定义 =====
params.reads_r1 = "data/sample1_R1.fastq.gz" // 正向读段文件路径(默认值)
params.reads_r2 = "data/sample1_R2.fastq.gz" // 反向读段文件路径(默认值)
params.outdir = "results" // 输出目录(默认值)
// ===== Process 定义 =====
process FASTP {
// publishDir:把输出文件复制到指定目录(不然只在 work/ 下)
publishDir "${params.outdir}/fastp", mode: 'copy' // 结果复制到 results/fastp/
input:
path r1 // 输入:正向读段文件
path r2 // 输入:反向读段文件
output:
path "clean_R1.fastq.gz", emit: clean_r1 // 输出:清洗后的正向读段
path "clean_R2.fastq.gz", emit: clean_r2 // 输出:清洗后的反向读段
path "fastp_report.html", emit: report // 输出:质控报告
path "fastp_report.json", emit: json // 输出:JSON 格式报告
script:
"""
fastp \\
-i ${r1} \\
-I ${r2} \\
-o clean_R1.fastq.gz \\
-O clean_R2.fastq.gz \\
-h fastp_report.html \\
-j fastp_report.json \\
--thread 4 \\
--qualified_quality_phred 20 \\
--length_required 50
"""
// -i/-I: 输入的正向/反向读段
// -o/-O: 输出的正向/反向读段
// -h/-j: HTML 和 JSON 格式的报告
// --thread: 线程数
// --qualified_quality_phred 20: 质量值低于 20 的碱基视为低质量
// --length_required 50: 过滤掉长度小于 50bp 的读段
}
// ===== Workflow =====
workflow {
// 创建输入 channel
r1_ch = Channel.fromPath(params.reads_r1) // 把正向读段路径放进 channel
r2_ch = Channel.fromPath(params.reads_r2) // 把反向读段路径放进 channel
// 调用 FASTP process
FASTP(r1_ch, r2_ch) // 把两个 channel 送进 FASTP 车间
// 打印输出文件路径(调试用)
FASTP.out.clean_r1.view { "Clean R1: $it" } // 查看清洗后的 R1 路径
FASTP.out.report.view { "Report: $it" } // 查看报告路径
}
运行¶
# 基本运行
nextflow run fastp_qc.nf # 使用默认参数运行
# 指定自定义输入文件
nextflow run fastp_qc.nf \
--reads_r1 data/my_R1.fq.gz \
--reads_r2 data/my_R2.fq.gz \
--outdir my_results # 用自定义参数运行
# 断点续传(之前跑过的步骤自动跳过)
nextflow run fastp_qc.nf -resume # 加 -resume 跳过已完成的步骤
七、Channel 类型详解¶
Nextflow 有两种 Channel,理解它们的区别非常重要:
1. Queue Channel(队列通道)¶
- 数据像"排队"一样,用一次就消失(先进先出)
- 适合:样本文件、需要逐个处理的数据
// 创建 queue channel 的几种方式
Channel.of(1, 2, 3) // 从值列表创建
Channel.fromPath("data/*.fastq.gz") // 从文件路径创建
Channel.fromFilePairs("data/*_{R1,R2}.fastq.gz") // 从配对文件创建(常用!)
Channel.fromSRA("SRP012345") // 从 SRA 数据库创建
2. Value Channel(值通道)¶
- 数据可以被反复使用,不会消失
- 适合:参考基因组路径、数据库路径、参数值
// 创建 value channel
Channel.value("hg38") // 一个字符串值
Channel.value(file("/path/to/reference.fa")) // 一个文件路径
// 也可以用 Channel.of() 创建单值(自动变成 value channel)
ref_ch = Channel.value(file(params.reference)) // 参考基因组可以重复使用
区别对比¶
| 特性 | Queue Channel | Value Channel |
|---|---|---|
| 数据消耗 | 用一次就没了 | 可以反复用 |
| 典型用途 | 样本文件 | 参考基因组、数据库路径 |
| 创建方式 | fromPath, fromFilePairs, of(多值) | value(), of(单值) |
| 能否接多个 process | 可以(DSL2 自动 fork) | 可以 |
实际场景¶
// 场景:10 个样本都要比对到同一个参考基因组
samples_ch = Channel.fromFilePairs("data/*_{1,2}.fq.gz") // 队列:10 个样本排队
ref_ch = Channel.value(file("ref/hg38.fa")) // 值:参考基因组反复用
process ALIGN {
input:
tuple val(sample_id), path(reads) // 从队列取一个样本
path reference // 从值通道取参考基因组(每次都能取到)
script:
"""
bowtie2 -x ${reference} -1 ${reads[0]} -2 ${reads[1]} -S ${sample_id}.sam
"""
}
workflow {
ALIGN(samples_ch, ref_ch) // 10 个样本并行比对,ref_ch 被重复使用
}
八、常见报错与解决¶
报错 1:Command error: java.lang.RuntimeException: No such variable¶
原因:在 script: 块的 """...""" 中,$ 符号会被 Nextflow 当成变量插值。如果你想用 Shell 自己的变量(比如 $line),Nextflow 会报错说找不到这个变量。
解决:Shell 变量前面用 \$ 转义,或者用 shell: 代替 script:。
// 方法一:转义 Shell 变量
script:
"""
for line in \$(cat input.txt); do // 注意 \$ 转义
echo \$line
done
"""
// 方法二:用 shell 块(Nextflow 变量用 !{} 而不是 ${})
shell:
'''
for line in $(cat input.txt); do // Shell 变量正常写 $
echo !{params.prefix}_$line // Nextflow 变量用 !{}
done
'''
报错 2:Process requirement exceeds available memory¶
原因:Process 默认申请的内存 / CPU 超过了你的机器资源。
解决:在 process 中指定资源。
process MY_TASK {
cpus 2 // 限制使用 2 个 CPU
memory '4 GB' // 限制使用 4GB 内存
script:
"""
some_tool --threads ${task.cpus} // 用 task.cpus 获取分配的 CPU 数
"""
}
报错 3:Missing output file(s) expected by process¶
原因:Process 定义了 output: 要产出某些文件,但脚本实际没有生成这些文件。
解决: 1. 检查 output: 中的文件名模式和脚本中实际生成的文件名是否一致 2. 去 work/xx/yyyyyy/ 目录看看实际生成了什么文件
# 查看出错任务的工作目录
ls work/xx/yyyyyy/ # 看看里面有哪些文件
cat work/xx/yyyyyy/.command.log # 查看任务日志
cat work/xx/yyyyyy/.command.err # 查看错误信息
报错 4:Not a valid project name or Git URL¶
原因:运行 nextflow run xxx 时,Nextflow 把你的文件名当成了 GitHub 仓库名。
解决:确保文件名正确,加上 ./ 前缀。
九、速查表:核心语法速查¶
Channel 创建¶
| 语法 | 说明 | 示例 |
|---|---|---|
Channel.of(...) | 从值列表创建 | Channel.of(1, 2, 3) |
Channel.fromPath(...) | 从文件路径创建 | Channel.fromPath("data/*.fq.gz") |
Channel.fromFilePairs(...) | 从配对文件创建 | Channel.fromFilePairs("*_{R1,R2}.fq.gz") |
Channel.value(...) | 创建可重复使用的值 | Channel.value(file("ref.fa")) |
Channel.empty() | 创建空 channel | Channel.empty() |
Process 结构¶
process 名字 {
// 指令(可选)
publishDir "输出目录", mode: 'copy' // 发布结果文件
cpus 4 // CPU 数
memory '8 GB' // 内存限制
conda 'bioconda::fastp=1.3.3' // Conda 依赖
container 'biocontainers/fastp:1.3.3' // Docker 容器
input: // 输入声明
path reads // 文件类型输入
val sample_id // 值类型输入
tuple val(id), path(files) // 组合类型输入
output: // 输出声明
path "*.bam", emit: bam_files // 文件输出(带名称)
stdout emit: log_output // 标准输出
script: // 要执行的命令
"""
some_command ${reads}
"""
}
常用 Operator¶
| Operator | 说明 | 示例 |
|---|---|---|
.view() | 查看 channel 内容(调试) | ch.view() |
.map { } | 转换每个元素 | ch.map { it.toUpperCase() } |
.filter { } | 过滤元素 | ch.filter { it.size() > 100 } |
.collect() | 收集所有元素为一个列表 | ch.collect() |
.flatten() | 展平嵌套列表 | ch.flatten() |
.mix(other_ch) | 合并两个 channel | ch1.mix(ch2) |
.join(other_ch) | 按 key 合并 | ch1.join(ch2) |
.groupTuple() | 按 key 分组 | ch.groupTuple() |
.first() | 取第一个元素 | ch.first() |
.ifEmpty(value) | channel 为空时给默认值 | ch.ifEmpty("none") |
运行命令¶
| 命令 | 说明 |
|---|---|
nextflow run pipeline.nf | 运行流程 |
nextflow run pipeline.nf -resume | 断点续传 |
nextflow run pipeline.nf --param value | 传递参数 |
nextflow log | 查看运行历史 |
nextflow clean -f | 清理 work 目录 |
nextflow info | 查看系统信息 |
配置文件 nextflow.config¶
// nextflow.config —— 全局配置文件
params {
reads = "data/*_{R1,R2}.fastq.gz" // 默认输入路径
outdir = "results" // 默认输出路径
threads = 4 // 默认线程数
}
process {
cpus = 2 // 所有 process 默认 2 核
memory = '4 GB' // 所有 process 默认 4GB 内存
}
// 不同运行环境的配置
profiles {
local { // 本地运行
process.executor = 'local'
}
slurm { // SLURM 集群
process.executor = 'slurm'
process.queue = 'normal'
}
}
学习资源¶
下一篇:561_Shell脚本改写为Nextflow流程 —— 手把手教你把宏基因组分析的 Shell 脚本改写成 Nextflow DSL2 流程。