跳转至

Shell 脚本改写为 Nextflow 流程

最后更新:2026-05 | Nextflow v25.10.4+ / v26.04.0 | 前置知识:560_Nextflow_DSL2入门 适合人群:已了解 Nextflow 基础概念,想把现有 Shell 流程迁移到 Nextflow 的生信工程师


一、为什么要把 Shell 脚本改成 Nextflow?

你已经有了一个能跑的 Shell 脚本,为什么还要花时间改成 Nextflow?看看这个真实场景:

Shell 脚本的 3 个致命伤

场景:你有 50 个宏基因组样本要分析

# Shell 版本:你不得不这样写
for sample in $(cat sample_list.txt); do
    fastp -i ${sample}_R1.fq.gz -I ${sample}_R2.fq.gz \
          -o clean/${sample}_R1.fq.gz -O clean/${sample}_R2.fq.gz
    bowtie2 -x human_ref -1 clean/${sample}_R1.fq.gz \
            -2 clean/${sample}_R2.fq.gz -S ${sample}.sam
    kraken2 --db k2db --paired clean/${sample}_R1.fq.gz \
            clean/${sample}_R2.fq.gz --output ${sample}_kraken.txt
done
问题Shell 脚本Nextflow
并行for 循环是串行的,50 个样本排队跑自动并行,有多少核就同时跑多少样本
断点续传跑到第 30 个样本挂了,要从头来或手动改脚本-resume,自动跳过已完成的 29 个
可移植换台机器要改路径、改集群参数配置文件里改一行就行
容器化手动装所有工具,版本冲突每个 process 自带 Docker/Conda 环境
日志追踪echo 打日志,出错难定位每个任务有独立工作目录和日志文件

一句话:Shell 脚本是"作坊",Nextflow 是"工厂"。


二、改写思路:3 步走

改写不是"重写",而是"拆分重组"。按这 3 步走:

第 1 步:识别输入、输出、步骤

看你的 Shell 脚本,找出: - 输入:原始 FASTQ 文件、参考基因组、数据库 - 输出:清洗后的 FASTQ、比对结果、分类结果 - 步骤:fastp → bowtie2 → kraken2

输入文件 → [fastp 质控] → 清洗文件 → [bowtie2 去宿主] → 非宿主文件 → [kraken2 分类] → 结果

第 2 步:每个步骤变成一个 Process

一个 Shell 命令(或一组相关命令)= 一个 Nextflow Process

Shell 步骤Nextflow Process
fastp -i ... -o ...process FASTP { ... }
bowtie2 -x ... -1 ... -2 ...process BOWTIE2_REMOVE_HOST { ... }
kraken2 --db ... --paired ...process KRAKEN2_CLASSIFY { ... }

第 3 步:用 Channel 连接各个 Process

Process 之间通过 Channel(水管)传递数据,上一个 process 的 output 就是下一个 process 的 input。

[Channel: 原始reads] → FASTP → [Channel: 清洗reads] → BOWTIE2 → [Channel: 非宿主reads] → KRAKEN2 → [Channel: 结果]

三、实战案例:宏基因组分析 Shell → Nextflow

原始 Shell 脚本

这是一个典型的宏基因组分析脚本(fastp 质控 → bowtie2 去宿主 → kraken2 物种分类):

#!/bin/bash
# 文件名:metagenome_pipeline.sh
# 功能:宏基因组分析流程(单样本版)

# ===== 配置 =====
SAMPLE="sample1"                                    # 样本名
R1="data/${SAMPLE}_R1.fastq.gz"                     # 正向读段
R2="data/${SAMPLE}_R2.fastq.gz"                     # 反向读段
HOST_REF="ref/human_genome"                         # 宿主参考基因组(bowtie2 索引前缀)
KRAKEN_DB="db/kraken2_standard"                     # Kraken2 数据库路径
THREADS=8                                           # 线程数
OUTDIR="results"                                    # 输出目录

mkdir -p ${OUTDIR}/fastp ${OUTDIR}/bowtie2 ${OUTDIR}/kraken2   # 创建输出目录

# ===== Step 1: fastp 质控 =====
echo ">>> Step 1: Running fastp..."
fastp \
    -i ${R1} \
    -I ${R2} \
    -o ${OUTDIR}/fastp/${SAMPLE}_clean_R1.fq.gz \
    -O ${OUTDIR}/fastp/${SAMPLE}_clean_R2.fq.gz \
    -h ${OUTDIR}/fastp/${SAMPLE}_fastp.html \
    -j ${OUTDIR}/fastp/${SAMPLE}_fastp.json \
    --thread ${THREADS} \
    --qualified_quality_phred 20 \
    --length_required 50

# ===== Step 2: bowtie2 去宿主 =====
echo ">>> Step 2: Removing host reads with bowtie2..."
bowtie2 \
    -x ${HOST_REF} \
    -1 ${OUTDIR}/fastp/${SAMPLE}_clean_R1.fq.gz \
    -2 ${OUTDIR}/fastp/${SAMPLE}_clean_R2.fq.gz \
    --threads ${THREADS} \
    --very-sensitive \
    --un-conc-gz ${OUTDIR}/bowtie2/${SAMPLE}_nonhost_R%.fq.gz \
    -S /dev/null                                    # 不保存 SAM 文件(只要未比对的读段)

# ===== Step 3: kraken2 物种分类 =====
echo ">>> Step 3: Running Kraken2 classification..."
kraken2 \
    --db ${KRAKEN_DB} \
    --paired \
    ${OUTDIR}/bowtie2/${SAMPLE}_nonhost_R1.fq.gz \
    ${OUTDIR}/bowtie2/${SAMPLE}_nonhost_R2.fq.gz \
    --output ${OUTDIR}/kraken2/${SAMPLE}_kraken2.output \
    --report ${OUTDIR}/kraken2/${SAMPLE}_kraken2.report \
    --threads ${THREADS}

echo ">>> Done! Results in ${OUTDIR}/"

改写后的 Nextflow DSL2 版本

文件名:metagenome_pipeline.nf

#!/usr/bin/env nextflow
// 文件名:metagenome_pipeline.nf
// 功能:宏基因组分析流程(fastp → bowtie2 去宿主 → kraken2 分类)
// 语法:Nextflow DSL2

// ===== 参数定义(对应 Shell 脚本里的变量) =====
params.reads      = "data/*_{R1,R2}.fastq.gz"      // 输入文件(支持通配符,自动匹配多样本)
params.host_ref   = "ref/human_genome"              // 宿主参考基因组(bowtie2 索引前缀)
params.kraken_db  = "db/kraken2_standard"           // Kraken2 数据库路径
params.outdir     = "results"                       // 输出目录
params.threads    = 8                               // 默认线程数

// ======================================================================
// Process 1: FASTP 质控
// 对应 Shell 脚本的 Step 1
// ======================================================================
process FASTP {
    tag "${sample_id}"                              // 在日志中显示样本名(方便追踪)
    publishDir "${params.outdir}/fastp", mode: 'copy'  // 把结果复制到输出目录
    cpus params.threads                             // 使用指定的线程数

    input:
    tuple val(sample_id), path(reads)               // 输入:(样本名, [R1文件, R2文件])

    output:
    tuple val(sample_id), path("${sample_id}_clean_R{1,2}.fq.gz"), emit: clean_reads
                                                    // 输出:(样本名, [清洗后R1, 清洗后R2])
    path "${sample_id}_fastp.html", emit: html      // 输出:HTML 报告
    path "${sample_id}_fastp.json", emit: json      // 输出:JSON 报告

    script:
    """
    fastp \\
        -i ${reads[0]} \\
        -I ${reads[1]} \\
        -o ${sample_id}_clean_R1.fq.gz \\
        -O ${sample_id}_clean_R2.fq.gz \\
        -h ${sample_id}_fastp.html \\
        -j ${sample_id}_fastp.json \\
        --thread ${task.cpus} \\
        --qualified_quality_phred 20 \\
        --length_required 50
    """
    // reads[0] = R1 文件,reads[1] = R2 文件
    // task.cpus = Nextflow 分配的 CPU 数(来自 cpus 指令)
}

// ======================================================================
// Process 2: BOWTIE2 去宿主
// 对应 Shell 脚本的 Step 2
// ======================================================================
process BOWTIE2_REMOVE_HOST {
    tag "${sample_id}"                              // 日志标签
    publishDir "${params.outdir}/bowtie2", mode: 'copy'
    cpus params.threads

    input:
    tuple val(sample_id), path(clean_reads)         // 输入:FASTP 的输出(清洗后的读段)
    path host_ref                                   // 输入:宿主参考基因组索引目录

    output:
    tuple val(sample_id), path("${sample_id}_nonhost_R{1,2}.fq.gz"), emit: nonhost_reads
                                                    // 输出:去宿主后的读段

    script:
    // 获取 bowtie2 索引前缀(去掉 .1.bt2 等后缀)
    def ref_prefix = host_ref[0].toString().replaceAll(/\.\d+\.bt2l?$/, '')
    """
    bowtie2 \\
        -x ${ref_prefix} \\
        -1 ${clean_reads[0]} \\
        -2 ${clean_reads[1]} \\
        --threads ${task.cpus} \\
        --very-sensitive \\
        --un-conc-gz ${sample_id}_nonhost_R%.fq.gz \\
        -S /dev/null
    """
    // --un-conc-gz: 未比对上的读段(即非宿主读段)输出为 gz 压缩
    // -S /dev/null: 不保存比对结果(只要非宿主读段)
    // R% 会被自动替换为 R1 和 R2
}

// ======================================================================
// Process 3: KRAKEN2 物种分类
// 对应 Shell 脚本的 Step 3
// ======================================================================
process KRAKEN2_CLASSIFY {
    tag "${sample_id}"                              // 日志标签
    publishDir "${params.outdir}/kraken2", mode: 'copy'
    cpus params.threads

    input:
    tuple val(sample_id), path(nonhost_reads)       // 输入:去宿主后的读段
    path kraken_db                                  // 输入:Kraken2 数据库

    output:
    path "${sample_id}_kraken2.output", emit: output    // 输出:分类结果
    path "${sample_id}_kraken2.report", emit: report    // 输出:分类报告

    script:
    """
    kraken2 \\
        --db ${kraken_db} \\
        --paired \\
        ${nonhost_reads[0]} \\
        ${nonhost_reads[1]} \\
        --output ${sample_id}_kraken2.output \\
        --report ${sample_id}_kraken2.report \\
        --threads ${task.cpus}
    """
}

// ======================================================================
// Workflow:把 3 个 Process 串起来(生产线)
// ======================================================================
workflow {
    // ----- 创建输入 Channel -----
    // fromFilePairs 自动匹配 R1/R2 配对文件
    // 返回格式:[sample_id, [R1.fq.gz, R2.fq.gz]]
    reads_ch    = Channel.fromFilePairs(params.reads)       // 样本配对文件(queue channel)
    host_ref_ch = Channel.fromPath("${params.host_ref}*")   // 宿主参考索引文件
                         .collect()                          // 收集所有索引文件为一个列表
    kraken_db_ch = Channel.fromPath(params.kraken_db)       // Kraken2 数据库路径

    // ----- 连接生产线 -----
    // Step 1: 质控
    FASTP(reads_ch)                                         // 原始读段 → FASTP

    // Step 2: 去宿主(用 FASTP 的输出 + 宿主参考)
    BOWTIE2_REMOVE_HOST(                                    // FASTP 输出 → BOWTIE2
        FASTP.out.clean_reads,                              // 上一步的清洗读段
        host_ref_ch                                         // 宿主参考基因组
    )

    // Step 3: 物种分类(用去宿主的输出 + Kraken2 数据库)
    KRAKEN2_CLASSIFY(                                       // BOWTIE2 输出 → KRAKEN2
        BOWTIE2_REMOVE_HOST.out.nonhost_reads,              // 上一步的非宿主读段
        kraken_db_ch                                        // Kraken2 数据库
    )
}

逐行对比说明

Shell 脚本Nextflow 版本改了什么
SAMPLE="sample1"params.reads = "data/*_{R1,R2}.fastq.gz"不再硬编码样本名,用通配符自动匹配所有样本
R1="data/${SAMPLE}_R1.fastq.gz"Channel.fromFilePairs(params.reads)自动配对 R1/R2,自动处理多样本
THREADS=8params.threads = 8 + cpus params.threads参数化,可在命令行覆盖
mkdir -p ${OUTDIR}/...publishDir "...", mode: 'copy'Nextflow 自动管理目录
fastp -i ${R1} ...process FASTP { script: ... }包装成独立 process
串行 for 循环Channel.fromFilePairs()自动并行处理所有样本
手动传路径FASTP.out.clean_reads自动通过 Channel 传递

四、多样本处理:fromFilePairs

fromFilePairs 是处理双端测序数据的利器。它自动把 R1 和 R2 配对在一起。

文件命名要求

data/
├── sample1_R1.fastq.gz    ← 配对 1
├── sample1_R2.fastq.gz    ← 配对 1
├── sample2_R1.fastq.gz    ← 配对 2
├── sample2_R2.fastq.gz    ← 配对 2
├── sample3_R1.fastq.gz    ← 配对 3
└── sample3_R2.fastq.gz    ← 配对 3

使用方式

// 基本用法
reads_ch = Channel.fromFilePairs("data/*_{R1,R2}.fastq.gz")
// 产生 3 个元素:
// [sample1, [data/sample1_R1.fastq.gz, data/sample1_R2.fastq.gz]]
// [sample2, [data/sample2_R1.fastq.gz, data/sample2_R2.fastq.gz]]
// [sample3, [data/sample3_R1.fastq.gz, data/sample3_R2.fastq.gz]]

// 调试:查看配对结果
reads_ch.view()                             // 打印所有配对,确认是否正确

// 如果文件名不是 _R1/_R2 格式(比如 _1/_2)
reads_ch = Channel.fromFilePairs("data/*_{1,2}.fastq.gz")

// 如果配对规则更复杂,用自定义正则
reads_ch = Channel.fromFilePairs("data/*_{R1,R2}.fq.gz", size: 2)
// size: 2 表示每组必须恰好 2 个文件

在 Process 中使用

process FASTP {
    input:
    tuple val(sample_id), path(reads)       // 自动解包:sample_id = 样本名,reads = [R1, R2]

    script:
    """
    fastp \\
        -i ${reads[0]} \\
        -I ${reads[1]} \\
        -o ${sample_id}_clean_R1.fq.gz \\
        -O ${sample_id}_clean_R2.fq.gz
    """
    // reads[0] = R1 文件
    // reads[1] = R2 文件
    // sample_id = 从文件名自动提取的样本名
}

常见陷阱

// 错误:忘记用 tuple 解包
input:
path reads                                  // 这样只能拿到文件,拿不到样本名!

// 正确:用 tuple 同时拿到样本名和文件
input:
tuple val(sample_id), path(reads)           // 样本名 + 文件列表

五、参数化:params 机制

为什么不能硬编码?

// 错误:硬编码路径(换机器就挂)
process FASTP {
    script:
    """
    fastp -i /home/zhangsan/data/sample1_R1.fq.gz ...
    """
}

// 正确:使用 params 参数化
params.reads = "data/*_{R1,R2}.fastq.gz"    // 可以在命令行覆盖

params 的 3 层优先级

Nextflow 参数有 3 层来源,优先级从高到低:

命令行参数(--reads)  >  nextflow.config 文件  >  脚本中的 params 默认值

第 1 层:脚本默认值(最低优先级)

// 在 .nf 文件中定义
params.reads     = "data/*_{R1,R2}.fastq.gz"   // 默认值
params.outdir    = "results"                     // 默认值
params.threads   = 4                             // 默认值

第 2 层:配置文件(中等优先级)

// nextflow.config 文件
params {
    reads     = "data/*_{R1,R2}.fastq.gz"       // 覆盖脚本默认值
    outdir    = "results"
    threads   = 8                                // 服务器上用 8 线程
    host_ref  = "ref/human_genome"
    kraken_db = "db/kraken2_standard"
}

第 3 层:命令行参数(最高优先级)

# 命令行传参,覆盖一切
nextflow run metagenome_pipeline.nf \
    --reads "new_data/*_{R1,R2}.fq.gz" \
    --outdir "new_results" \
    --threads 16                                # 覆盖 config 和脚本中的值

实用配置文件模板

创建 nextflow.config(和 .nf 文件放同一目录):

// nextflow.config
// 全局参数
params {
    reads      = "data/*_{R1,R2}.fastq.gz"      // 输入文件
    host_ref   = "ref/human_genome"              // 宿主参考基因组
    kraken_db  = "db/kraken2_standard"           // Kraken2 数据库
    outdir     = "results"                       // 输出目录
    threads    = 4                               // 默认线程数
}

// 全局 process 配置
process {
    cpus   = 2                                   // 默认 CPU
    memory = '4 GB'                              // 默认内存
    time   = '2h'                                // 默认时间限制

    // 针对特定 process 设置资源
    withName: 'FASTP' {                          // FASTP 专属配置
        cpus   = 4
        memory = '8 GB'
    }
    withName: 'BOWTIE2_REMOVE_HOST' {            // BOWTIE2 专属配置
        cpus   = 8
        memory = '16 GB'
    }
    withName: 'KRAKEN2_CLASSIFY' {               // KRAKEN2 专属配置
        cpus   = 8
        memory = '32 GB'                         // Kraken2 数据库大,需要更多内存
    }
}

// 运行环境
profiles {
    standard {                                   // 默认:本地运行
        process.executor = 'local'
    }
    slurm {                                      // SLURM 集群
        process.executor = 'slurm'
        process.queue    = 'normal'
        process.clusterOptions = '--account=mylab'
    }
    docker {                                     // Docker 容器
        docker.enabled = true
    }
    conda {                                      // Conda 环境
        conda.enabled = true
    }
}

// 报告和追踪
report {
    enabled = true                               // 生成 HTML 运行报告
    file    = "${params.outdir}/pipeline_report.html"
}

timeline {
    enabled = true                               // 生成时间线图
    file    = "${params.outdir}/timeline.html"
}

六、运行与监控

基本运行

# 最简运行(使用默认参数)
nextflow run metagenome_pipeline.nf             # 运行流程

# 指定参数
nextflow run metagenome_pipeline.nf \
    --reads "data/*_{R1,R2}.fq.gz" \
    --outdir results_v2                          # 自定义输入输出

# 使用特定 profile
nextflow run metagenome_pipeline.nf \
    -profile slurm                               # 在 SLURM 集群上运行

断点续传(-resume

这是 Nextflow 最强大的功能之一:

# 第一次运行(跑了 30 分钟后中断了)
nextflow run metagenome_pipeline.nf

# 第二次运行:加 -resume,自动跳过已完成的任务
nextflow run metagenome_pipeline.nf -resume     # 只重跑失败的部分

# 原理:Nextflow 用 work/ 目录缓存每个任务的结果
# 如果输入没变 + 脚本没变 → 直接用缓存结果
# 如果改了参数或脚本 → 只重跑受影响的步骤

注意:不要手动删 work/ 目录,否则 -resume 没法用!

查看运行历史

# 查看所有运行记录
nextflow log                                    # 列出所有运行的时间、状态、ID

# 查看某次运行的详细信息
nextflow log <run_name> -f name,status,hash     # 查看每个任务的状态

清理缓存

# 清理所有 work 目录(释放磁盘空间)
nextflow clean -f                               # 强制清理(谨慎使用!会导致无法 resume)

# 只清理 30 天前的缓存
nextflow clean -before 30d -f                   # 清理 30 天前的旧缓存

运行时的终端输出解读

N E X T F L O W  ~  version 25.10.4
Launching `metagenome_pipeline.nf` [happy_curie] DSL2 - revision: a1b2c3d4

executor >  local (6)                           ← 本地运行,共 6 个任务
[ab/123456] process > FASTP (sample1)       [100%] 3 of 3 ✔    ← 3 个样本质控全部完成
[cd/789012] process > BOWTIE2 (sample1)     [ 66%] 2 of 3      ← 去宿主完成 2/3
[ef/345678] process > KRAKEN2 (sample1)     [  0%] 0 of 3      ← 分类还没开始

# [ab/123456] 是任务的工作目录(work/ab/123456...)
# 出错时去这个目录看日志

七、常见报错与解决

报错 1:No matching files found for pattern

完整报错

Channel.fromFilePairs: No matching files found for pattern: data/*_{R1,R2}.fastq.gz

原因:文件路径或命名模式不对。

排查步骤

# 1. 检查文件是否存在
ls data/*_{R1,R2}.fastq.gz                      # 看看能不能匹配到文件

# 2. 检查文件命名
ls data/                                         # 看看实际文件名是什么
# 常见问题:文件名是 _1/_2 而不是 _R1/_R2

# 3. 修改 pattern
# 如果文件名是 sample1_1.fq.gz / sample1_2.fq.gz
nextflow run pipeline.nf --reads "data/*_{1,2}.fq.gz"

报错 2:Process terminated with error exit status 137

原因:内存不够(OOM Killer 杀掉了进程,退出码 137 = 被信号 SIGKILL 终止)。

解决

// 在 nextflow.config 或 process 中增加内存
process {
    withName: 'KRAKEN2_CLASSIFY' {
        memory = '64 GB'                         // Kraken2 标准库需要很大内存
    }
}

报错 3:Command error: Unable to access jarfile(Java 相关)

原因:Java 版本不对或 Java 没装好。

解决

# 检查 Java 版本
java -version                                    # 需要 Java 17+

# 用 conda 安装正确版本
conda install -c conda-forge openjdk=17          # 安装 Java 17

# 验证
nextflow -version                                # 重新检查 nextflow

报错 4:Workflow resume failed -- Work directory has changed

原因:换了工作目录后用 -resume,Nextflow 找不到之前的 work/ 缓存。

解决

# 方法 1:回到原来的目录运行
cd /原来的目录
nextflow run pipeline.nf -resume

# 方法 2:指定 work 目录
nextflow run pipeline.nf -resume -w /原来的目录/work

八、速查表

Shell → Nextflow 对照表

Shell 写法Nextflow 写法
SAMPLE="sample1"params.sample = "sample1"
for sample in ...; doChannel.fromFilePairs(...)
mkdir -p output/publishDir "output/", mode: 'copy'
tool -t 8tool -t ${task.cpus}
串行执行自动并行
if [ $? -ne 0 ]; then exit 1; fi自动检测退出码
手动日志 echo "Step 1..."tag "${sample_id}" + 自动日志
手动恢复nextflow run ... -resume

Process 指令速查

指令说明示例
tag日志标签tag "${sample_id}"
publishDir发布输出文件publishDir "results/", mode: 'copy'
cpusCPU 核数cpus 4
memory内存限制memory '16 GB'
time时间限制time '4h'
condaConda 依赖conda 'bioconda::fastp=1.3.3'
containerDocker 容器container 'biocontainers/fastp:1.3.3'
maxForks最大并行数maxForks 1(变串行)
errorStrategy出错策略errorStrategy 'retry'
maxRetries最大重试次数maxRetries 3

常用命令速查

命令说明
nextflow run pipeline.nf运行流程
nextflow run pipeline.nf -resume断点续传
nextflow run pipeline.nf --reads "..."传递参数
nextflow run pipeline.nf -profile slurm使用集群配置
nextflow run pipeline.nf -with-report生成 HTML 报告
nextflow run pipeline.nf -with-timeline生成时间线
nextflow log查看运行历史
nextflow clean -f清理缓存
nextflow info系统信息

改写检查清单

把 Shell 脚本改成 Nextflow 时,逐项确认:

  • [ ] 所有硬编码路径都改成了 params.xxx
  • [ ] 每个独立步骤都包装成了一个 process
  • [ ] 用 Channel.fromFilePairs() 处理双端数据
  • [ ] 用 tuple val(sample_id), path(reads) 保留样本名
  • [ ] 用 ${task.cpus} 代替硬编码线程数
  • [ ] 用 publishDir 发布结果文件
  • [ ] 用 tag 添加日志标签
  • [ ] 创建了 nextflow.config 配置文件
  • [ ] 测试了 -resume 断点续传功能
  • [ ] 用 reads_ch.view() 验证了 Channel 内容

学习资源


上一篇560_Nextflow_DSL2入门 —— Nextflow 基础概念和安装配置。