跳转至

Nextflow 模块化开发(DSL2)

适用版本:Nextflow >= 25.04(推荐 25.10+),nf-core/tools >= 4.0.0 前置知识:Nextflow 基础语法(process / channel / workflow) 面向人群:生信工程师面试准备、流程开发入门


一、什么是模块化?为什么要模块化?

白话解释

想象你在搭建一个宏基因组分析流程,包含质控、去宿主、物种分类三步。

不模块化(一整块泥巴): 所有代码写在一个 main.nf 里,几百行挤在一起。想改质控参数?在几百行里翻找。想把质控步骤给另一个项目用?复制粘贴一大段代码。

模块化(乐高积木): 每个分析步骤是一块"乐高积木"——fastp.nfbowtie2.nfkraken2.nf。想改质控?打开 fastp.nf 直接改。想复用?把这块积木拿到另一个项目里拼上就行。

模块化的好处

好处说明
可复用写一次,到处用,不用复制粘贴
好维护每个文件只管一件事,改起来不怕牵一发动全身
好测试可以单独测试一个模块,不用跑整个流程
好协作张三写质控模块,李四写比对模块,互不干扰
社区共享nf-core 有 1400+ 现成模块,直接装来用

二、DSL2 模块系统

Nextflow 从 22.03 版本开始默认使用 DSL2 语法。DSL2 最大的特点就是支持模块化——你可以把 process 定义写在单独的文件里,然后用 include 导入。

2.1 模块文件结构

一个标准的模块化项目目录长这样:

my_pipeline/                  # 项目根目录
├── main.nf                   # 主流程文件:组装各个模块
├── nextflow.config           # 配置文件:参数、资源、容器
├── modules/                  # 模块目录:每个工具一个文件
│   ├── fastp.nf              # 质控模块
│   ├── bowtie2.nf            # 去宿主模块
│   └── kraken2.nf            # 物种分类模块
├── subworkflows/             # 子工作流目录(可选)
│   └── qc_and_dehost.nf      # 质控+去宿主组合子流程
└── modules/nf-core/          # nf-core 安装的模块(自动生成)
    └── fastp/
        └── main.nf

核心原则:每个 process 一个文件。 就像每个函数放一个文件一样,清晰明了。

2.2 include 语法

include 是 DSL2 的"导入"语句,把别的文件里定义的 process 或 workflow 拿过来用。

// 基本语法:从指定文件导入一个 process
include { FASTP } from './modules/fastp'          // 导入 fastp 模块

// 同时导入多个 process
include { FASTP; MULTIQC } from './modules/qc'    // 从同一文件导入多个

// 给导入的 process 起别名(同一个模块用不同参数时很有用)
include { FASTP as FASTP_PE } from './modules/fastp'   // 用于双端测序
include { FASTP as FASTP_SE } from './modules/fastp'   // 用于单端测序

// 从子工作流导入
include { QC_AND_DEHOST } from './subworkflows/qc_and_dehost'

注意事项: - 路径用 ./ 开头表示相对路径 - 文件名可以省略 .nf 后缀(写 ./modules/fastp 等价于 ./modules/fastp.nf) - include 语句必须写在 workflow 块的外面(文件顶部)


三、实战:宏基因组流程模块化

下面我们把一个宏基因组分析流程拆成独立模块。流程是:fastp 质控 -> bowtie2 去宿主 -> kraken2 物种分类。

3.1 模块文件:modules/fastp.nf

// modules/fastp.nf
// 功能:使用 fastp 对双端测序数据做质量控制

process FASTP {
    tag "$sample_id"                              // 标签:在日志里显示样本名
    label 'process_medium'                        // 资源标签:在 config 里统一分配资源
    publishDir "${params.outdir}/fastp",           // 输出目录:结果文件复制到这里
        mode: 'copy'                               // 复制模式(而非默认的软链接)

    // 用 conda 或容器管理依赖,不用手动装软件
    conda "bioconda::fastp=0.24.0"                 // conda 环境
    container "quay.io/biocontainers/fastp:0.24.0--heae7151_0"  // Docker/Singularity 镜像

    input:                                         // 输入声明
    tuple val(sample_id), path(reads)              // 元组:样本名 + 原始 reads 文件

    output:                                        // 输出声明
    tuple val(sample_id), path("*_trimmed_R{1,2}.fastq.gz"),  // 质控后的 reads
        emit: trimmed_reads                        // 给输出起名,方便下游引用
    path("${sample_id}.fastp.json"),               // fastp 的 JSON 报告
        emit: json_report                          // 报告输出通道
    path("${sample_id}.fastp.html"),               // fastp 的 HTML 报告
        emit: html_report

    script:                                        // 执行的命令
    """
    fastp \\
        -i ${reads[0]} \\                          # 输入:正向 reads(R1)
        -I ${reads[1]} \\                          # 输入:反向 reads(R2)
        -o ${sample_id}_trimmed_R1.fastq.gz \\     # 输出:质控后 R1
        -O ${sample_id}_trimmed_R2.fastq.gz \\     # 输出:质控后 R2
        --qualified_quality_phred ${params.fastp_qualified_quality ?: 20} \\  # 碱基质量阈值,默认 20
        --length_required ${params.fastp_min_length ?: 50} \\                # 最短读长,默认 50bp
        --thread ${task.cpus} \\                   # 使用分配的 CPU 数
        --json ${sample_id}.fastp.json \\          # JSON 格式报告
        --html ${sample_id}.fastp.html             # HTML 格式报告
    """
}

3.2 模块文件:modules/bowtie2.nf

// modules/bowtie2.nf
// 功能:用 Bowtie2 将 reads 比对到宿主基因组,去除宿主污染

process BOWTIE2_DEHOST {
    tag "$sample_id"                              // 标签:样本名
    label 'process_high'                          // 资源标签:需要较多内存和 CPU

    conda "bioconda::bowtie2=2.5.4 bioconda::samtools=1.21"  // 同时装 bowtie2 和 samtools
    container "quay.io/biocontainers/mulled-v2-bowtie2-samtools:latest"

    input:
    tuple val(sample_id), path(reads)             // 质控后的 reads
    path(index)                                   // bowtie2 索引文件目录

    output:
    tuple val(sample_id), path("*_dehost_R{1,2}.fastq.gz"),  // 去宿主后的 reads
        emit: dehosted_reads
    path("${sample_id}_bowtie2.log"),             // 比对日志
        emit: log

    script:
    def index_base = index[0].toString().replaceAll(/\.\d+\.bt2$/, '')  // 提取索引前缀
    """
    bowtie2 \\
        -x ${index_base} \\                       # 宿主基因组索引
        -1 ${reads[0]} \\                         # R1 reads
        -2 ${reads[1]} \\                         # R2 reads
        --threads ${task.cpus} \\                 # CPU 数
        --very-sensitive \\                       # 高灵敏度模式
        --un-conc-gz ${sample_id}_dehost_R%.fastq.gz \\  # 未比上的 reads(即非宿主)
        2> ${sample_id}_bowtie2.log \\            # 日志重定向
    | samtools view -bS - > /dev/null             # 比对结果不保留(只要未比上的)
    """
}

3.3 模块文件:modules/kraken2.nf

// modules/kraken2.nf
// 功能:用 Kraken2 对去宿主后的 reads 做物种分类

process KRAKEN2 {
    tag "$sample_id"                              // 标签
    label 'process_high'                          // 需要大内存(Kraken2 数据库很大)

    conda "bioconda::kraken2=2.1.3 bioconda::krakentools=1.2"
    container "quay.io/biocontainers/kraken2:2.1.3--pl5321hdcf5f25_1"

    input:
    tuple val(sample_id), path(reads)             // 去宿主后的 reads
    path(db)                                      // Kraken2 数据库路径

    output:
    tuple val(sample_id), path("${sample_id}.kraken2.report"),  // 分类报告
        emit: report
    tuple val(sample_id), path("${sample_id}.kraken2.output"),  // 详细输出
        emit: output

    script:
    """
    kraken2 \\
        --db ${db} \\                             # 数据库路径
        --paired \\                               # 双端模式
        --threads ${task.cpus} \\                 # CPU 数
        --report ${sample_id}.kraken2.report \\   # 汇总报告
        --output ${sample_id}.kraken2.output \\   # 逐条 reads 分类结果
        --gzip-compressed \\                      # 输入文件是 gzip 压缩的
        ${reads[0]} ${reads[1]}                   # 输入 R1 和 R2
    """
}

3.4 主流程:main.nf

#!/usr/bin/env nextflow
// main.nf
// 宏基因组分析主流程:质控 -> 去宿主 -> 物种分类

nextflow.enable.dsl = 2                           // 启用 DSL2(25.10+ 版本默认开启)

// === 导入模块 ===
include { FASTP } from './modules/fastp'                    // 质控
include { BOWTIE2_DEHOST } from './modules/bowtie2'         // 去宿主
include { KRAKEN2 } from './modules/kraken2'                // 物种分类

// === 主工作流 ===
workflow {

    // 1. 读取样本信息
    // samplesheet 格式:sample_id, reads_R1, reads_R2
    Channel
        .fromFilePairs(params.reads, checkIfExists: true)   // 自动配对 R1/R2
        .set { raw_reads_ch }                                // 存入 channel

    // 2. 质控
    FASTP(raw_reads_ch)                                      // 调用 fastp 模块

    // 3. 去宿主
    host_index_ch = Channel.fromPath(params.host_index)      // 宿主索引文件
    BOWTIE2_DEHOST(                                          // 调用 bowtie2 模块
        FASTP.out.trimmed_reads,                             // 上一步的输出作为输入
        host_index_ch.collect()                              // collect() 让所有样本共用同一索引
    )

    // 4. 物种分类
    kraken_db_ch = Channel.fromPath(params.kraken2_db)       // Kraken2 数据库路径
    KRAKEN2(                                                 // 调用 kraken2 模块
        BOWTIE2_DEHOST.out.dehosted_reads,                   // 去宿主后的 reads
        kraken_db_ch.collect()                               // 所有样本共用同一数据库
    )
}

3.5 运行流程

# 运行主流程,指定参数
nextflow run main.nf \
    --reads "data/*_R{1,2}.fastq.gz" \
    --host_index "ref/human_genome/GRCh38" \
    --kraken2_db "db/kraken2_standard" \
    --outdir "results"

四、Sub-workflow(子工作流)

子工作流是把多个 process 组合在一起的"大积木块"。比如质控和去宿主经常一起用,可以打包成一个子工作流。

4.1 定义子工作流:subworkflows/qc_and_dehost.nf

// subworkflows/qc_and_dehost.nf
// 功能:质控 + 去宿主的组合子工作流

include { FASTP } from '../modules/fastp'                   // 导入质控模块
include { BOWTIE2_DEHOST } from '../modules/bowtie2'        // 导入去宿主模块

workflow QC_AND_DEHOST {

    take:                                                    // 子工作流的输入声明
    raw_reads                                                // 原始 reads channel
    host_index                                               // 宿主基因组索引

    main:                                                    // 子工作流的主逻辑
    FASTP(raw_reads)                                         // 第一步:质控
    BOWTIE2_DEHOST(                                          // 第二步:去宿主
        FASTP.out.trimmed_reads,
        host_index
    )

    emit:                                                    // 子工作流的输出声明
    clean_reads = BOWTIE2_DEHOST.out.dehosted_reads          // 输出去宿主后的 reads
    fastp_report = FASTP.out.json_report                     // 输出 fastp 报告
}

4.2 在主流程中使用子工作流

#!/usr/bin/env nextflow
// main.nf(使用子工作流的版本)

nextflow.enable.dsl = 2

include { QC_AND_DEHOST } from './subworkflows/qc_and_dehost'  // 导入子工作流
include { KRAKEN2 } from './modules/kraken2'

workflow {
    raw_reads_ch = Channel.fromFilePairs(params.reads, checkIfExists: true)
    host_index_ch = Channel.fromPath(params.host_index).collect()
    kraken_db_ch = Channel.fromPath(params.kraken2_db).collect()

    // 一行搞定质控+去宿主
    QC_AND_DEHOST(raw_reads_ch, host_index_ch)

    // 物种分类
    KRAKEN2(QC_AND_DEHOST.out.clean_reads, kraken_db_ch)
}

子工作流要点: - take: 声明输入(类似函数参数) - main: 写逻辑 - emit: 声明输出(类似函数返回值)


五、nf-core 模块:不要重复造轮子

nf-core 社区维护了 1400+ 个标准化模块70+ 个子工作流,涵盖绝大多数生信工具。安装即用,省时省力。

5.1 安装 nf-core/tools

# 用 pip 安装(推荐,当前最新版 4.0.2)
pip install nf-core                               # 安装 nf-core 命令行工具

# 或者用 conda
conda install -c bioconda nf-core                 # 通过 bioconda 安装

# 验证安装
nf-core --version                                 # 应显示 4.0.2 或更新版本

5.2 查看可用模块

# 列出所有远程可用模块(1400+ 个)
nf-core modules list remote                       # 显示所有可安装的模块

# 按关键词搜索
nf-core modules list remote | grep fastp          # 搜索 fastp 相关模块

# 列出当前项目已安装的模块
nf-core modules list local                        # 显示本项目已安装的模块

也可以在 nf-core 网站浏览模块列表:https://nf-co.re/modules

5.3 安装 nf-core 的 fastp 模块

# 初始化一个 nf-core 兼容的项目(如果还没有)
nf-core pipelines create --name my_pipeline --org mylab  # 创建项目骨架

# 安装 fastp 模块
nf-core modules install fastp                      # 自动下载到 modules/nf-core/fastp/

# 安装完成后,目录结构自动生成:
# modules/nf-core/fastp/
# ├── main.nf          # 模块主文件(process 定义)
# ├── meta.yml         # 模块元数据(输入输出说明)
# └── tests/           # 测试文件

5.4 在流程中使用 nf-core 的 fastp 模块

// main.nf
nextflow.enable.dsl = 2

// 导入 nf-core 安装的 fastp 模块
include { FASTP } from './modules/nf-core/fastp/main'

workflow {
    // nf-core 模块的输入格式通常是 [meta, reads] 的元组
    // meta 是一个 map,包含样本信息
    Channel
        .fromFilePairs(params.reads)
        .map { sample_id, reads ->
            def meta = [id: sample_id, single_end: false]   // 构造 meta map
            [meta, reads]                                    // 返回 [meta, reads] 元组
        }
        .set { input_ch }

    // 调用 nf-core fastp 模块
    // nf-core 模块通常需要额外参数:adapter_fasta 和 save 选项
    FASTP(
        input_ch,                                            // [meta, reads]
        [],                                                  // adapter_fasta(空 = 自动检测)
        false,                                               // save_trimmed_fail
        false                                                // save_merged
    )
}

5.5 更新模块

# 更新单个模块
nf-core modules update fastp                       # 更新 fastp 到最新版

# 更新所有模块
nf-core modules update --all                       # 一次更新全部

# 查看更新前后的差异
nf-core modules update fastp --preview             # 预览变更,不实际更新

六、模块测试

6.1 使用 nf-test 测试单个模块

nf-test 是 Nextflow 生态的标准测试框架,可以单独测试一个模块。

# 安装 nf-test
conda install -c bioconda nf-test                  # 通过 conda 安装

# 目录结构
# modules/fastp.nf
# tests/modules/fastp/
# ├── main.nf.test       # 测试脚本
# └── test_data/         # 测试数据

6.2 编写测试文件

// tests/modules/fastp/main.nf.test
// 功能:测试 fastp 模块是否正常工作

nextflow_process {
    name "Test FASTP"                               // 测试名称
    script "modules/fastp.nf"                       // 被测试的模块文件
    process "FASTP"                                 // 被测试的 process 名

    test("Should run fastp on paired-end reads") {  // 测试用例描述

        when {
            params {                                 // 测试参数
                outdir = "$outputDir"
                fastp_qualified_quality = 20
                fastp_min_length = 50
            }
            process {                                // 模拟输入
                """
                input[0] = [
                    'test_sample',
                    [
                        file('test_data/test_R1.fastq.gz'),
                        file('test_data/test_R2.fastq.gz')
                    ]
                ]
                """
            }
        }

        then {
            assert process.success                   // 断言:进程成功完成
            assert process.out.trimmed_reads         // 断言:有输出 reads
            assert process.out.json_report           // 断言:有 JSON 报告
        }
    }
}

6.3 运行测试

# 运行特定模块的测试
nf-test test tests/modules/fastp/main.nf.test      # 只测 fastp

# 运行所有测试
nf-test test                                        # 运行所有 test 文件

# 带详细输出
nf-test test --verbose                              # 看到更多调试信息

6.4 快速验证模块(不用 nf-test)

写一个小的测试流程直接跑模块:

// test_fastp.nf — 快速测试脚本
nextflow.enable.dsl = 2
include { FASTP } from './modules/fastp'

workflow {
    test_reads = Channel.of(
        ['test_sample', [file('test_data/test_R1.fastq.gz'),
                         file('test_data/test_R2.fastq.gz')]]
    )
    FASTP(test_reads)
}
# 运行测试脚本
nextflow run test_fastp.nf --outdir test_results   # 单独跑 fastp 模块看看

七、常见报错

报错 1:Missing process or function with name 'XXX'

ERROR ~ No such variable: FASTP

原因: include 语句写错了,或者模块文件路径不对。

解决:

// 错误:路径写错
include { FASTP } from './module/fastp'            // "module" 少了个 s

// 正确
include { FASTP } from './modules/fastp'           // 检查目录名是否正确

报错 2:Process 'FASTP' has been already used

ERROR ~ Process 'FASTP' has been already used

原因: 同一个 process 在工作流里被调用了两次。DSL2 中每个 process 只能调用一次(除非用别名)。

解决:

// 用别名区分两次调用
include { FASTP as FASTP_ROUND1 } from './modules/fastp'
include { FASTP as FASTP_ROUND2 } from './modules/fastp'

workflow {
    FASTP_ROUND1(raw_reads)                        // 第一次调用
    FASTP_ROUND2(FASTP_ROUND1.out.trimmed_reads)   // 第二次调用
}

报错 3:emit 名字冲突或未定义

ERROR ~ Invalid output emit name 'trimmed_reads' in process 'FASTP'

原因: emit 名字和 Nextflow 保留字冲突,或者拼写错误。

解决:

// 检查 emit 名字不要用保留字(如 val, path, env, stdin, stdout)
output:
tuple val(sample_id), path("*.fastq.gz"), emit: trimmed_reads   // 正确
tuple val(sample_id), path("*.fastq.gz"), emit: val             // 错误!val 是保留字

报错 4:nf-core modules install 失败

ERROR: Could not install module 'xxx'. Module not found in remote

原因: 模块名拼错了,或者不在 nf-core 仓库里。

解决:

# 搜索正确的模块名
nf-core modules list remote | grep -i "你想找的工具名"

# 确认项目已初始化为 nf-core 格式
# 检查是否有 modules.json 文件
ls modules.json


八、速查表

include 语法速查

写法说明
include { FASTP } from './modules/fastp'基本导入
include { FASTP as FASTP_PE } from './modules/fastp'别名导入
include { FASTP; MULTIQC } from './modules/qc'多个导入
include { QC_FLOW } from './subworkflows/qc'导入子工作流

模块文件结构速查

process 工具名 {                    // 大写蛇形命名
    tag "$sample_id"               // 日志标签
    label 'process_medium'         // 资源标签
    publishDir "..."               // 输出目录
    conda "bioconda::工具=版本"     // 依赖
    container "镜像地址"            // 容器

    input:                         // 输入
    tuple val(id), path(files)

    output:                        // 输出 + emit
    path("*.txt"), emit: result

    script:                        // Shell 命令
    """
    command ...
    """
}

子工作流结构速查

workflow 子工作流名 {
    take:                          // 输入
    input_ch

    main:                          // 逻辑
    STEP1(input_ch)
    STEP2(STEP1.out.xxx)

    emit:                          // 输出
    result = STEP2.out.xxx
}

nf-core 常用命令速查

命令功能
nf-core modules list remote列出所有可用模块
nf-core modules list local列出已安装模块
nf-core modules install fastp安装模块
nf-core modules update fastp更新模块
nf-core modules update --all更新全部模块
nf-core modules info fastp查看模块文档
nf-core modules lint fastp检查模块规范
nf-core pipelines create创建新项目骨架

模块测试速查

命令功能
nf-test test tests/modules/fastp/测试单个模块
nf-test test运行全部测试
nf-test test --verbose详细输出
nextflow run test_xxx.nf快速脚本测试

参考资源: - Nextflow 官方模块文档:https://www.nextflow.io/docs/latest/module.html - nf-core 模块列表:https://nf-co.re/modules - nf-core/tools 文档:https://nf-co.re/docs/nf-core-tools - Nextflow 培训教程:https://training.nextflow.io/