Nextflow 模块化开发(DSL2)¶
适用版本:Nextflow >= 25.04(推荐 25.10+),nf-core/tools >= 4.0.0 前置知识:Nextflow 基础语法(process / channel / workflow) 面向人群:生信工程师面试准备、流程开发入门
一、什么是模块化?为什么要模块化?¶
白话解释¶
想象你在搭建一个宏基因组分析流程,包含质控、去宿主、物种分类三步。
不模块化(一整块泥巴): 所有代码写在一个 main.nf 里,几百行挤在一起。想改质控参数?在几百行里翻找。想把质控步骤给另一个项目用?复制粘贴一大段代码。
模块化(乐高积木): 每个分析步骤是一块"乐高积木"——fastp.nf、bowtie2.nf、kraken2.nf。想改质控?打开 fastp.nf 直接改。想复用?把这块积木拿到另一个项目里拼上就行。
模块化的好处¶
| 好处 | 说明 |
|---|---|
| 可复用 | 写一次,到处用,不用复制粘贴 |
| 好维护 | 每个文件只管一件事,改起来不怕牵一发动全身 |
| 好测试 | 可以单独测试一个模块,不用跑整个流程 |
| 好协作 | 张三写质控模块,李四写比对模块,互不干扰 |
| 社区共享 | nf-core 有 1400+ 现成模块,直接装来用 |
二、DSL2 模块系统¶
Nextflow 从 22.03 版本开始默认使用 DSL2 语法。DSL2 最大的特点就是支持模块化——你可以把 process 定义写在单独的文件里,然后用 include 导入。
2.1 模块文件结构¶
一个标准的模块化项目目录长这样:
my_pipeline/ # 项目根目录
├── main.nf # 主流程文件:组装各个模块
├── nextflow.config # 配置文件:参数、资源、容器
├── modules/ # 模块目录:每个工具一个文件
│ ├── fastp.nf # 质控模块
│ ├── bowtie2.nf # 去宿主模块
│ └── kraken2.nf # 物种分类模块
├── subworkflows/ # 子工作流目录(可选)
│ └── qc_and_dehost.nf # 质控+去宿主组合子流程
└── modules/nf-core/ # nf-core 安装的模块(自动生成)
└── fastp/
└── main.nf
核心原则:每个 process 一个文件。 就像每个函数放一个文件一样,清晰明了。
2.2 include 语法¶
include 是 DSL2 的"导入"语句,把别的文件里定义的 process 或 workflow 拿过来用。
// 基本语法:从指定文件导入一个 process
include { FASTP } from './modules/fastp' // 导入 fastp 模块
// 同时导入多个 process
include { FASTP; MULTIQC } from './modules/qc' // 从同一文件导入多个
// 给导入的 process 起别名(同一个模块用不同参数时很有用)
include { FASTP as FASTP_PE } from './modules/fastp' // 用于双端测序
include { FASTP as FASTP_SE } from './modules/fastp' // 用于单端测序
// 从子工作流导入
include { QC_AND_DEHOST } from './subworkflows/qc_and_dehost'
注意事项: - 路径用 ./ 开头表示相对路径 - 文件名可以省略 .nf 后缀(写 ./modules/fastp 等价于 ./modules/fastp.nf) - include 语句必须写在 workflow 块的外面(文件顶部)
三、实战:宏基因组流程模块化¶
下面我们把一个宏基因组分析流程拆成独立模块。流程是:fastp 质控 -> bowtie2 去宿主 -> kraken2 物种分类。
3.1 模块文件:modules/fastp.nf¶
// modules/fastp.nf
// 功能:使用 fastp 对双端测序数据做质量控制
process FASTP {
tag "$sample_id" // 标签:在日志里显示样本名
label 'process_medium' // 资源标签:在 config 里统一分配资源
publishDir "${params.outdir}/fastp", // 输出目录:结果文件复制到这里
mode: 'copy' // 复制模式(而非默认的软链接)
// 用 conda 或容器管理依赖,不用手动装软件
conda "bioconda::fastp=0.24.0" // conda 环境
container "quay.io/biocontainers/fastp:0.24.0--heae7151_0" // Docker/Singularity 镜像
input: // 输入声明
tuple val(sample_id), path(reads) // 元组:样本名 + 原始 reads 文件
output: // 输出声明
tuple val(sample_id), path("*_trimmed_R{1,2}.fastq.gz"), // 质控后的 reads
emit: trimmed_reads // 给输出起名,方便下游引用
path("${sample_id}.fastp.json"), // fastp 的 JSON 报告
emit: json_report // 报告输出通道
path("${sample_id}.fastp.html"), // fastp 的 HTML 报告
emit: html_report
script: // 执行的命令
"""
fastp \\
-i ${reads[0]} \\ # 输入:正向 reads(R1)
-I ${reads[1]} \\ # 输入:反向 reads(R2)
-o ${sample_id}_trimmed_R1.fastq.gz \\ # 输出:质控后 R1
-O ${sample_id}_trimmed_R2.fastq.gz \\ # 输出:质控后 R2
--qualified_quality_phred ${params.fastp_qualified_quality ?: 20} \\ # 碱基质量阈值,默认 20
--length_required ${params.fastp_min_length ?: 50} \\ # 最短读长,默认 50bp
--thread ${task.cpus} \\ # 使用分配的 CPU 数
--json ${sample_id}.fastp.json \\ # JSON 格式报告
--html ${sample_id}.fastp.html # HTML 格式报告
"""
}
3.2 模块文件:modules/bowtie2.nf¶
// modules/bowtie2.nf
// 功能:用 Bowtie2 将 reads 比对到宿主基因组,去除宿主污染
process BOWTIE2_DEHOST {
tag "$sample_id" // 标签:样本名
label 'process_high' // 资源标签:需要较多内存和 CPU
conda "bioconda::bowtie2=2.5.4 bioconda::samtools=1.21" // 同时装 bowtie2 和 samtools
container "quay.io/biocontainers/mulled-v2-bowtie2-samtools:latest"
input:
tuple val(sample_id), path(reads) // 质控后的 reads
path(index) // bowtie2 索引文件目录
output:
tuple val(sample_id), path("*_dehost_R{1,2}.fastq.gz"), // 去宿主后的 reads
emit: dehosted_reads
path("${sample_id}_bowtie2.log"), // 比对日志
emit: log
script:
def index_base = index[0].toString().replaceAll(/\.\d+\.bt2$/, '') // 提取索引前缀
"""
bowtie2 \\
-x ${index_base} \\ # 宿主基因组索引
-1 ${reads[0]} \\ # R1 reads
-2 ${reads[1]} \\ # R2 reads
--threads ${task.cpus} \\ # CPU 数
--very-sensitive \\ # 高灵敏度模式
--un-conc-gz ${sample_id}_dehost_R%.fastq.gz \\ # 未比上的 reads(即非宿主)
2> ${sample_id}_bowtie2.log \\ # 日志重定向
| samtools view -bS - > /dev/null # 比对结果不保留(只要未比上的)
"""
}
3.3 模块文件:modules/kraken2.nf¶
// modules/kraken2.nf
// 功能:用 Kraken2 对去宿主后的 reads 做物种分类
process KRAKEN2 {
tag "$sample_id" // 标签
label 'process_high' // 需要大内存(Kraken2 数据库很大)
conda "bioconda::kraken2=2.1.3 bioconda::krakentools=1.2"
container "quay.io/biocontainers/kraken2:2.1.3--pl5321hdcf5f25_1"
input:
tuple val(sample_id), path(reads) // 去宿主后的 reads
path(db) // Kraken2 数据库路径
output:
tuple val(sample_id), path("${sample_id}.kraken2.report"), // 分类报告
emit: report
tuple val(sample_id), path("${sample_id}.kraken2.output"), // 详细输出
emit: output
script:
"""
kraken2 \\
--db ${db} \\ # 数据库路径
--paired \\ # 双端模式
--threads ${task.cpus} \\ # CPU 数
--report ${sample_id}.kraken2.report \\ # 汇总报告
--output ${sample_id}.kraken2.output \\ # 逐条 reads 分类结果
--gzip-compressed \\ # 输入文件是 gzip 压缩的
${reads[0]} ${reads[1]} # 输入 R1 和 R2
"""
}
3.4 主流程:main.nf¶
#!/usr/bin/env nextflow
// main.nf
// 宏基因组分析主流程:质控 -> 去宿主 -> 物种分类
nextflow.enable.dsl = 2 // 启用 DSL2(25.10+ 版本默认开启)
// === 导入模块 ===
include { FASTP } from './modules/fastp' // 质控
include { BOWTIE2_DEHOST } from './modules/bowtie2' // 去宿主
include { KRAKEN2 } from './modules/kraken2' // 物种分类
// === 主工作流 ===
workflow {
// 1. 读取样本信息
// samplesheet 格式:sample_id, reads_R1, reads_R2
Channel
.fromFilePairs(params.reads, checkIfExists: true) // 自动配对 R1/R2
.set { raw_reads_ch } // 存入 channel
// 2. 质控
FASTP(raw_reads_ch) // 调用 fastp 模块
// 3. 去宿主
host_index_ch = Channel.fromPath(params.host_index) // 宿主索引文件
BOWTIE2_DEHOST( // 调用 bowtie2 模块
FASTP.out.trimmed_reads, // 上一步的输出作为输入
host_index_ch.collect() // collect() 让所有样本共用同一索引
)
// 4. 物种分类
kraken_db_ch = Channel.fromPath(params.kraken2_db) // Kraken2 数据库路径
KRAKEN2( // 调用 kraken2 模块
BOWTIE2_DEHOST.out.dehosted_reads, // 去宿主后的 reads
kraken_db_ch.collect() // 所有样本共用同一数据库
)
}
3.5 运行流程¶
# 运行主流程,指定参数
nextflow run main.nf \
--reads "data/*_R{1,2}.fastq.gz" \
--host_index "ref/human_genome/GRCh38" \
--kraken2_db "db/kraken2_standard" \
--outdir "results"
四、Sub-workflow(子工作流)¶
子工作流是把多个 process 组合在一起的"大积木块"。比如质控和去宿主经常一起用,可以打包成一个子工作流。
4.1 定义子工作流:subworkflows/qc_and_dehost.nf¶
// subworkflows/qc_and_dehost.nf
// 功能:质控 + 去宿主的组合子工作流
include { FASTP } from '../modules/fastp' // 导入质控模块
include { BOWTIE2_DEHOST } from '../modules/bowtie2' // 导入去宿主模块
workflow QC_AND_DEHOST {
take: // 子工作流的输入声明
raw_reads // 原始 reads channel
host_index // 宿主基因组索引
main: // 子工作流的主逻辑
FASTP(raw_reads) // 第一步:质控
BOWTIE2_DEHOST( // 第二步:去宿主
FASTP.out.trimmed_reads,
host_index
)
emit: // 子工作流的输出声明
clean_reads = BOWTIE2_DEHOST.out.dehosted_reads // 输出去宿主后的 reads
fastp_report = FASTP.out.json_report // 输出 fastp 报告
}
4.2 在主流程中使用子工作流¶
#!/usr/bin/env nextflow
// main.nf(使用子工作流的版本)
nextflow.enable.dsl = 2
include { QC_AND_DEHOST } from './subworkflows/qc_and_dehost' // 导入子工作流
include { KRAKEN2 } from './modules/kraken2'
workflow {
raw_reads_ch = Channel.fromFilePairs(params.reads, checkIfExists: true)
host_index_ch = Channel.fromPath(params.host_index).collect()
kraken_db_ch = Channel.fromPath(params.kraken2_db).collect()
// 一行搞定质控+去宿主
QC_AND_DEHOST(raw_reads_ch, host_index_ch)
// 物种分类
KRAKEN2(QC_AND_DEHOST.out.clean_reads, kraken_db_ch)
}
子工作流要点: - take: 声明输入(类似函数参数) - main: 写逻辑 - emit: 声明输出(类似函数返回值)
五、nf-core 模块:不要重复造轮子¶
nf-core 社区维护了 1400+ 个标准化模块和 70+ 个子工作流,涵盖绝大多数生信工具。安装即用,省时省力。
5.1 安装 nf-core/tools¶
# 用 pip 安装(推荐,当前最新版 4.0.2)
pip install nf-core # 安装 nf-core 命令行工具
# 或者用 conda
conda install -c bioconda nf-core # 通过 bioconda 安装
# 验证安装
nf-core --version # 应显示 4.0.2 或更新版本
5.2 查看可用模块¶
# 列出所有远程可用模块(1400+ 个)
nf-core modules list remote # 显示所有可安装的模块
# 按关键词搜索
nf-core modules list remote | grep fastp # 搜索 fastp 相关模块
# 列出当前项目已安装的模块
nf-core modules list local # 显示本项目已安装的模块
也可以在 nf-core 网站浏览模块列表:https://nf-co.re/modules
5.3 安装 nf-core 的 fastp 模块¶
# 初始化一个 nf-core 兼容的项目(如果还没有)
nf-core pipelines create --name my_pipeline --org mylab # 创建项目骨架
# 安装 fastp 模块
nf-core modules install fastp # 自动下载到 modules/nf-core/fastp/
# 安装完成后,目录结构自动生成:
# modules/nf-core/fastp/
# ├── main.nf # 模块主文件(process 定义)
# ├── meta.yml # 模块元数据(输入输出说明)
# └── tests/ # 测试文件
5.4 在流程中使用 nf-core 的 fastp 模块¶
// main.nf
nextflow.enable.dsl = 2
// 导入 nf-core 安装的 fastp 模块
include { FASTP } from './modules/nf-core/fastp/main'
workflow {
// nf-core 模块的输入格式通常是 [meta, reads] 的元组
// meta 是一个 map,包含样本信息
Channel
.fromFilePairs(params.reads)
.map { sample_id, reads ->
def meta = [id: sample_id, single_end: false] // 构造 meta map
[meta, reads] // 返回 [meta, reads] 元组
}
.set { input_ch }
// 调用 nf-core fastp 模块
// nf-core 模块通常需要额外参数:adapter_fasta 和 save 选项
FASTP(
input_ch, // [meta, reads]
[], // adapter_fasta(空 = 自动检测)
false, // save_trimmed_fail
false // save_merged
)
}
5.5 更新模块¶
# 更新单个模块
nf-core modules update fastp # 更新 fastp 到最新版
# 更新所有模块
nf-core modules update --all # 一次更新全部
# 查看更新前后的差异
nf-core modules update fastp --preview # 预览变更,不实际更新
六、模块测试¶
6.1 使用 nf-test 测试单个模块¶
nf-test 是 Nextflow 生态的标准测试框架,可以单独测试一个模块。
# 安装 nf-test
conda install -c bioconda nf-test # 通过 conda 安装
# 目录结构
# modules/fastp.nf
# tests/modules/fastp/
# ├── main.nf.test # 测试脚本
# └── test_data/ # 测试数据
6.2 编写测试文件¶
// tests/modules/fastp/main.nf.test
// 功能:测试 fastp 模块是否正常工作
nextflow_process {
name "Test FASTP" // 测试名称
script "modules/fastp.nf" // 被测试的模块文件
process "FASTP" // 被测试的 process 名
test("Should run fastp on paired-end reads") { // 测试用例描述
when {
params { // 测试参数
outdir = "$outputDir"
fastp_qualified_quality = 20
fastp_min_length = 50
}
process { // 模拟输入
"""
input[0] = [
'test_sample',
[
file('test_data/test_R1.fastq.gz'),
file('test_data/test_R2.fastq.gz')
]
]
"""
}
}
then {
assert process.success // 断言:进程成功完成
assert process.out.trimmed_reads // 断言:有输出 reads
assert process.out.json_report // 断言:有 JSON 报告
}
}
}
6.3 运行测试¶
# 运行特定模块的测试
nf-test test tests/modules/fastp/main.nf.test # 只测 fastp
# 运行所有测试
nf-test test # 运行所有 test 文件
# 带详细输出
nf-test test --verbose # 看到更多调试信息
6.4 快速验证模块(不用 nf-test)¶
写一个小的测试流程直接跑模块:
// test_fastp.nf — 快速测试脚本
nextflow.enable.dsl = 2
include { FASTP } from './modules/fastp'
workflow {
test_reads = Channel.of(
['test_sample', [file('test_data/test_R1.fastq.gz'),
file('test_data/test_R2.fastq.gz')]]
)
FASTP(test_reads)
}
七、常见报错¶
报错 1:Missing process or function with name 'XXX'¶
原因: include 语句写错了,或者模块文件路径不对。
解决:
// 错误:路径写错
include { FASTP } from './module/fastp' // "module" 少了个 s
// 正确
include { FASTP } from './modules/fastp' // 检查目录名是否正确
报错 2:Process 'FASTP' has been already used¶
原因: 同一个 process 在工作流里被调用了两次。DSL2 中每个 process 只能调用一次(除非用别名)。
解决:
// 用别名区分两次调用
include { FASTP as FASTP_ROUND1 } from './modules/fastp'
include { FASTP as FASTP_ROUND2 } from './modules/fastp'
workflow {
FASTP_ROUND1(raw_reads) // 第一次调用
FASTP_ROUND2(FASTP_ROUND1.out.trimmed_reads) // 第二次调用
}
报错 3:emit 名字冲突或未定义¶
原因: emit 名字和 Nextflow 保留字冲突,或者拼写错误。
解决:
// 检查 emit 名字不要用保留字(如 val, path, env, stdin, stdout)
output:
tuple val(sample_id), path("*.fastq.gz"), emit: trimmed_reads // 正确
tuple val(sample_id), path("*.fastq.gz"), emit: val // 错误!val 是保留字
报错 4:nf-core modules install 失败¶
原因: 模块名拼错了,或者不在 nf-core 仓库里。
解决:
# 搜索正确的模块名
nf-core modules list remote | grep -i "你想找的工具名"
# 确认项目已初始化为 nf-core 格式
# 检查是否有 modules.json 文件
ls modules.json
八、速查表¶
include 语法速查¶
| 写法 | 说明 |
|---|---|
include { FASTP } from './modules/fastp' | 基本导入 |
include { FASTP as FASTP_PE } from './modules/fastp' | 别名导入 |
include { FASTP; MULTIQC } from './modules/qc' | 多个导入 |
include { QC_FLOW } from './subworkflows/qc' | 导入子工作流 |
模块文件结构速查¶
process 工具名 { // 大写蛇形命名
tag "$sample_id" // 日志标签
label 'process_medium' // 资源标签
publishDir "..." // 输出目录
conda "bioconda::工具=版本" // 依赖
container "镜像地址" // 容器
input: // 输入
tuple val(id), path(files)
output: // 输出 + emit
path("*.txt"), emit: result
script: // Shell 命令
"""
command ...
"""
}
子工作流结构速查¶
workflow 子工作流名 {
take: // 输入
input_ch
main: // 逻辑
STEP1(input_ch)
STEP2(STEP1.out.xxx)
emit: // 输出
result = STEP2.out.xxx
}
nf-core 常用命令速查¶
| 命令 | 功能 |
|---|---|
nf-core modules list remote | 列出所有可用模块 |
nf-core modules list local | 列出已安装模块 |
nf-core modules install fastp | 安装模块 |
nf-core modules update fastp | 更新模块 |
nf-core modules update --all | 更新全部模块 |
nf-core modules info fastp | 查看模块文档 |
nf-core modules lint fastp | 检查模块规范 |
nf-core pipelines create | 创建新项目骨架 |
模块测试速查¶
| 命令 | 功能 |
|---|---|
nf-test test tests/modules/fastp/ | 测试单个模块 |
nf-test test | 运行全部测试 |
nf-test test --verbose | 详细输出 |
nextflow run test_xxx.nf | 快速脚本测试 |
参考资源: - Nextflow 官方模块文档:https://www.nextflow.io/docs/latest/module.html - nf-core 模块列表:https://nf-co.re/modules - nf-core/tools 文档:https://nf-co.re/docs/nf-core-tools - Nextflow 培训教程:https://training.nextflow.io/