Shell 脚本改写为 Nextflow 流程¶
最后更新:2026-05 | Nextflow v25.10.4+ / v26.04.0 | 前置知识:560_Nextflow_DSL2入门 适合人群:已了解 Nextflow 基础概念,想把现有 Shell 流程迁移到 Nextflow 的生信工程师
一、为什么要把 Shell 脚本改成 Nextflow?¶
你已经有了一个能跑的 Shell 脚本,为什么还要花时间改成 Nextflow?看看这个真实场景:
Shell 脚本的 3 个致命伤¶
场景:你有 50 个宏基因组样本要分析
# Shell 版本:你不得不这样写
for sample in $(cat sample_list.txt); do
fastp -i ${sample}_R1.fq.gz -I ${sample}_R2.fq.gz \
-o clean/${sample}_R1.fq.gz -O clean/${sample}_R2.fq.gz
bowtie2 -x human_ref -1 clean/${sample}_R1.fq.gz \
-2 clean/${sample}_R2.fq.gz -S ${sample}.sam
kraken2 --db k2db --paired clean/${sample}_R1.fq.gz \
clean/${sample}_R2.fq.gz --output ${sample}_kraken.txt
done
| 问题 | Shell 脚本 | Nextflow |
|---|---|---|
| 并行 | for 循环是串行的,50 个样本排队跑 | 自动并行,有多少核就同时跑多少样本 |
| 断点续传 | 跑到第 30 个样本挂了,要从头来或手动改脚本 | 加 -resume,自动跳过已完成的 29 个 |
| 可移植 | 换台机器要改路径、改集群参数 | 配置文件里改一行就行 |
| 容器化 | 手动装所有工具,版本冲突 | 每个 process 自带 Docker/Conda 环境 |
| 日志追踪 | echo 打日志,出错难定位 | 每个任务有独立工作目录和日志文件 |
一句话:Shell 脚本是"作坊",Nextflow 是"工厂"。
二、改写思路:3 步走¶
改写不是"重写",而是"拆分重组"。按这 3 步走:
第 1 步:识别输入、输出、步骤¶
看你的 Shell 脚本,找出: - 输入:原始 FASTQ 文件、参考基因组、数据库 - 输出:清洗后的 FASTQ、比对结果、分类结果 - 步骤:fastp → bowtie2 → kraken2
第 2 步:每个步骤变成一个 Process¶
一个 Shell 命令(或一组相关命令)= 一个 Nextflow Process
| Shell 步骤 | Nextflow Process |
|---|---|
fastp -i ... -o ... | process FASTP { ... } |
bowtie2 -x ... -1 ... -2 ... | process BOWTIE2_REMOVE_HOST { ... } |
kraken2 --db ... --paired ... | process KRAKEN2_CLASSIFY { ... } |
第 3 步:用 Channel 连接各个 Process¶
Process 之间通过 Channel(水管)传递数据,上一个 process 的 output 就是下一个 process 的 input。
[Channel: 原始reads] → FASTP → [Channel: 清洗reads] → BOWTIE2 → [Channel: 非宿主reads] → KRAKEN2 → [Channel: 结果]
三、实战案例:宏基因组分析 Shell → Nextflow¶
原始 Shell 脚本¶
这是一个典型的宏基因组分析脚本(fastp 质控 → bowtie2 去宿主 → kraken2 物种分类):
#!/bin/bash
# 文件名:metagenome_pipeline.sh
# 功能:宏基因组分析流程(单样本版)
# ===== 配置 =====
SAMPLE="sample1" # 样本名
R1="data/${SAMPLE}_R1.fastq.gz" # 正向读段
R2="data/${SAMPLE}_R2.fastq.gz" # 反向读段
HOST_REF="ref/human_genome" # 宿主参考基因组(bowtie2 索引前缀)
KRAKEN_DB="db/kraken2_standard" # Kraken2 数据库路径
THREADS=8 # 线程数
OUTDIR="results" # 输出目录
mkdir -p ${OUTDIR}/fastp ${OUTDIR}/bowtie2 ${OUTDIR}/kraken2 # 创建输出目录
# ===== Step 1: fastp 质控 =====
echo ">>> Step 1: Running fastp..."
fastp \
-i ${R1} \
-I ${R2} \
-o ${OUTDIR}/fastp/${SAMPLE}_clean_R1.fq.gz \
-O ${OUTDIR}/fastp/${SAMPLE}_clean_R2.fq.gz \
-h ${OUTDIR}/fastp/${SAMPLE}_fastp.html \
-j ${OUTDIR}/fastp/${SAMPLE}_fastp.json \
--thread ${THREADS} \
--qualified_quality_phred 20 \
--length_required 50
# ===== Step 2: bowtie2 去宿主 =====
echo ">>> Step 2: Removing host reads with bowtie2..."
bowtie2 \
-x ${HOST_REF} \
-1 ${OUTDIR}/fastp/${SAMPLE}_clean_R1.fq.gz \
-2 ${OUTDIR}/fastp/${SAMPLE}_clean_R2.fq.gz \
--threads ${THREADS} \
--very-sensitive \
--un-conc-gz ${OUTDIR}/bowtie2/${SAMPLE}_nonhost_R%.fq.gz \
-S /dev/null # 不保存 SAM 文件(只要未比对的读段)
# ===== Step 3: kraken2 物种分类 =====
echo ">>> Step 3: Running Kraken2 classification..."
kraken2 \
--db ${KRAKEN_DB} \
--paired \
${OUTDIR}/bowtie2/${SAMPLE}_nonhost_R1.fq.gz \
${OUTDIR}/bowtie2/${SAMPLE}_nonhost_R2.fq.gz \
--output ${OUTDIR}/kraken2/${SAMPLE}_kraken2.output \
--report ${OUTDIR}/kraken2/${SAMPLE}_kraken2.report \
--threads ${THREADS}
echo ">>> Done! Results in ${OUTDIR}/"
改写后的 Nextflow DSL2 版本¶
文件名:metagenome_pipeline.nf
#!/usr/bin/env nextflow
// 文件名:metagenome_pipeline.nf
// 功能:宏基因组分析流程(fastp → bowtie2 去宿主 → kraken2 分类)
// 语法:Nextflow DSL2
// ===== 参数定义(对应 Shell 脚本里的变量) =====
params.reads = "data/*_{R1,R2}.fastq.gz" // 输入文件(支持通配符,自动匹配多样本)
params.host_ref = "ref/human_genome" // 宿主参考基因组(bowtie2 索引前缀)
params.kraken_db = "db/kraken2_standard" // Kraken2 数据库路径
params.outdir = "results" // 输出目录
params.threads = 8 // 默认线程数
// ======================================================================
// Process 1: FASTP 质控
// 对应 Shell 脚本的 Step 1
// ======================================================================
process FASTP {
tag "${sample_id}" // 在日志中显示样本名(方便追踪)
publishDir "${params.outdir}/fastp", mode: 'copy' // 把结果复制到输出目录
cpus params.threads // 使用指定的线程数
input:
tuple val(sample_id), path(reads) // 输入:(样本名, [R1文件, R2文件])
output:
tuple val(sample_id), path("${sample_id}_clean_R{1,2}.fq.gz"), emit: clean_reads
// 输出:(样本名, [清洗后R1, 清洗后R2])
path "${sample_id}_fastp.html", emit: html // 输出:HTML 报告
path "${sample_id}_fastp.json", emit: json // 输出:JSON 报告
script:
"""
fastp \\
-i ${reads[0]} \\
-I ${reads[1]} \\
-o ${sample_id}_clean_R1.fq.gz \\
-O ${sample_id}_clean_R2.fq.gz \\
-h ${sample_id}_fastp.html \\
-j ${sample_id}_fastp.json \\
--thread ${task.cpus} \\
--qualified_quality_phred 20 \\
--length_required 50
"""
// reads[0] = R1 文件,reads[1] = R2 文件
// task.cpus = Nextflow 分配的 CPU 数(来自 cpus 指令)
}
// ======================================================================
// Process 2: BOWTIE2 去宿主
// 对应 Shell 脚本的 Step 2
// ======================================================================
process BOWTIE2_REMOVE_HOST {
tag "${sample_id}" // 日志标签
publishDir "${params.outdir}/bowtie2", mode: 'copy'
cpus params.threads
input:
tuple val(sample_id), path(clean_reads) // 输入:FASTP 的输出(清洗后的读段)
path host_ref // 输入:宿主参考基因组索引目录
output:
tuple val(sample_id), path("${sample_id}_nonhost_R{1,2}.fq.gz"), emit: nonhost_reads
// 输出:去宿主后的读段
script:
// 获取 bowtie2 索引前缀(去掉 .1.bt2 等后缀)
def ref_prefix = host_ref[0].toString().replaceAll(/\.\d+\.bt2l?$/, '')
"""
bowtie2 \\
-x ${ref_prefix} \\
-1 ${clean_reads[0]} \\
-2 ${clean_reads[1]} \\
--threads ${task.cpus} \\
--very-sensitive \\
--un-conc-gz ${sample_id}_nonhost_R%.fq.gz \\
-S /dev/null
"""
// --un-conc-gz: 未比对上的读段(即非宿主读段)输出为 gz 压缩
// -S /dev/null: 不保存比对结果(只要非宿主读段)
// R% 会被自动替换为 R1 和 R2
}
// ======================================================================
// Process 3: KRAKEN2 物种分类
// 对应 Shell 脚本的 Step 3
// ======================================================================
process KRAKEN2_CLASSIFY {
tag "${sample_id}" // 日志标签
publishDir "${params.outdir}/kraken2", mode: 'copy'
cpus params.threads
input:
tuple val(sample_id), path(nonhost_reads) // 输入:去宿主后的读段
path kraken_db // 输入:Kraken2 数据库
output:
path "${sample_id}_kraken2.output", emit: output // 输出:分类结果
path "${sample_id}_kraken2.report", emit: report // 输出:分类报告
script:
"""
kraken2 \\
--db ${kraken_db} \\
--paired \\
${nonhost_reads[0]} \\
${nonhost_reads[1]} \\
--output ${sample_id}_kraken2.output \\
--report ${sample_id}_kraken2.report \\
--threads ${task.cpus}
"""
}
// ======================================================================
// Workflow:把 3 个 Process 串起来(生产线)
// ======================================================================
workflow {
// ----- 创建输入 Channel -----
// fromFilePairs 自动匹配 R1/R2 配对文件
// 返回格式:[sample_id, [R1.fq.gz, R2.fq.gz]]
reads_ch = Channel.fromFilePairs(params.reads) // 样本配对文件(queue channel)
host_ref_ch = Channel.fromPath("${params.host_ref}*") // 宿主参考索引文件
.collect() // 收集所有索引文件为一个列表
kraken_db_ch = Channel.fromPath(params.kraken_db) // Kraken2 数据库路径
// ----- 连接生产线 -----
// Step 1: 质控
FASTP(reads_ch) // 原始读段 → FASTP
// Step 2: 去宿主(用 FASTP 的输出 + 宿主参考)
BOWTIE2_REMOVE_HOST( // FASTP 输出 → BOWTIE2
FASTP.out.clean_reads, // 上一步的清洗读段
host_ref_ch // 宿主参考基因组
)
// Step 3: 物种分类(用去宿主的输出 + Kraken2 数据库)
KRAKEN2_CLASSIFY( // BOWTIE2 输出 → KRAKEN2
BOWTIE2_REMOVE_HOST.out.nonhost_reads, // 上一步的非宿主读段
kraken_db_ch // Kraken2 数据库
)
}
逐行对比说明¶
| Shell 脚本 | Nextflow 版本 | 改了什么 |
|---|---|---|
SAMPLE="sample1" | params.reads = "data/*_{R1,R2}.fastq.gz" | 不再硬编码样本名,用通配符自动匹配所有样本 |
R1="data/${SAMPLE}_R1.fastq.gz" | Channel.fromFilePairs(params.reads) | 自动配对 R1/R2,自动处理多样本 |
THREADS=8 | params.threads = 8 + cpus params.threads | 参数化,可在命令行覆盖 |
mkdir -p ${OUTDIR}/... | publishDir "...", mode: 'copy' | Nextflow 自动管理目录 |
fastp -i ${R1} ... | process FASTP { script: ... } | 包装成独立 process |
串行 for 循环 | Channel.fromFilePairs() | 自动并行处理所有样本 |
| 手动传路径 | FASTP.out.clean_reads | 自动通过 Channel 传递 |
四、多样本处理:fromFilePairs¶
fromFilePairs 是处理双端测序数据的利器。它自动把 R1 和 R2 配对在一起。
文件命名要求¶
data/
├── sample1_R1.fastq.gz ← 配对 1
├── sample1_R2.fastq.gz ← 配对 1
├── sample2_R1.fastq.gz ← 配对 2
├── sample2_R2.fastq.gz ← 配对 2
├── sample3_R1.fastq.gz ← 配对 3
└── sample3_R2.fastq.gz ← 配对 3
使用方式¶
// 基本用法
reads_ch = Channel.fromFilePairs("data/*_{R1,R2}.fastq.gz")
// 产生 3 个元素:
// [sample1, [data/sample1_R1.fastq.gz, data/sample1_R2.fastq.gz]]
// [sample2, [data/sample2_R1.fastq.gz, data/sample2_R2.fastq.gz]]
// [sample3, [data/sample3_R1.fastq.gz, data/sample3_R2.fastq.gz]]
// 调试:查看配对结果
reads_ch.view() // 打印所有配对,确认是否正确
// 如果文件名不是 _R1/_R2 格式(比如 _1/_2)
reads_ch = Channel.fromFilePairs("data/*_{1,2}.fastq.gz")
// 如果配对规则更复杂,用自定义正则
reads_ch = Channel.fromFilePairs("data/*_{R1,R2}.fq.gz", size: 2)
// size: 2 表示每组必须恰好 2 个文件
在 Process 中使用¶
process FASTP {
input:
tuple val(sample_id), path(reads) // 自动解包:sample_id = 样本名,reads = [R1, R2]
script:
"""
fastp \\
-i ${reads[0]} \\
-I ${reads[1]} \\
-o ${sample_id}_clean_R1.fq.gz \\
-O ${sample_id}_clean_R2.fq.gz
"""
// reads[0] = R1 文件
// reads[1] = R2 文件
// sample_id = 从文件名自动提取的样本名
}
常见陷阱¶
// 错误:忘记用 tuple 解包
input:
path reads // 这样只能拿到文件,拿不到样本名!
// 正确:用 tuple 同时拿到样本名和文件
input:
tuple val(sample_id), path(reads) // 样本名 + 文件列表
五、参数化:params 机制¶
为什么不能硬编码?¶
// 错误:硬编码路径(换机器就挂)
process FASTP {
script:
"""
fastp -i /home/zhangsan/data/sample1_R1.fq.gz ...
"""
}
// 正确:使用 params 参数化
params.reads = "data/*_{R1,R2}.fastq.gz" // 可以在命令行覆盖
params 的 3 层优先级¶
Nextflow 参数有 3 层来源,优先级从高到低:
第 1 层:脚本默认值(最低优先级)¶
// 在 .nf 文件中定义
params.reads = "data/*_{R1,R2}.fastq.gz" // 默认值
params.outdir = "results" // 默认值
params.threads = 4 // 默认值
第 2 层:配置文件(中等优先级)¶
// nextflow.config 文件
params {
reads = "data/*_{R1,R2}.fastq.gz" // 覆盖脚本默认值
outdir = "results"
threads = 8 // 服务器上用 8 线程
host_ref = "ref/human_genome"
kraken_db = "db/kraken2_standard"
}
第 3 层:命令行参数(最高优先级)¶
# 命令行传参,覆盖一切
nextflow run metagenome_pipeline.nf \
--reads "new_data/*_{R1,R2}.fq.gz" \
--outdir "new_results" \
--threads 16 # 覆盖 config 和脚本中的值
实用配置文件模板¶
创建 nextflow.config(和 .nf 文件放同一目录):
// nextflow.config
// 全局参数
params {
reads = "data/*_{R1,R2}.fastq.gz" // 输入文件
host_ref = "ref/human_genome" // 宿主参考基因组
kraken_db = "db/kraken2_standard" // Kraken2 数据库
outdir = "results" // 输出目录
threads = 4 // 默认线程数
}
// 全局 process 配置
process {
cpus = 2 // 默认 CPU
memory = '4 GB' // 默认内存
time = '2h' // 默认时间限制
// 针对特定 process 设置资源
withName: 'FASTP' { // FASTP 专属配置
cpus = 4
memory = '8 GB'
}
withName: 'BOWTIE2_REMOVE_HOST' { // BOWTIE2 专属配置
cpus = 8
memory = '16 GB'
}
withName: 'KRAKEN2_CLASSIFY' { // KRAKEN2 专属配置
cpus = 8
memory = '32 GB' // Kraken2 数据库大,需要更多内存
}
}
// 运行环境
profiles {
standard { // 默认:本地运行
process.executor = 'local'
}
slurm { // SLURM 集群
process.executor = 'slurm'
process.queue = 'normal'
process.clusterOptions = '--account=mylab'
}
docker { // Docker 容器
docker.enabled = true
}
conda { // Conda 环境
conda.enabled = true
}
}
// 报告和追踪
report {
enabled = true // 生成 HTML 运行报告
file = "${params.outdir}/pipeline_report.html"
}
timeline {
enabled = true // 生成时间线图
file = "${params.outdir}/timeline.html"
}
六、运行与监控¶
基本运行¶
# 最简运行(使用默认参数)
nextflow run metagenome_pipeline.nf # 运行流程
# 指定参数
nextflow run metagenome_pipeline.nf \
--reads "data/*_{R1,R2}.fq.gz" \
--outdir results_v2 # 自定义输入输出
# 使用特定 profile
nextflow run metagenome_pipeline.nf \
-profile slurm # 在 SLURM 集群上运行
断点续传(-resume)¶
这是 Nextflow 最强大的功能之一:
# 第一次运行(跑了 30 分钟后中断了)
nextflow run metagenome_pipeline.nf
# 第二次运行:加 -resume,自动跳过已完成的任务
nextflow run metagenome_pipeline.nf -resume # 只重跑失败的部分
# 原理:Nextflow 用 work/ 目录缓存每个任务的结果
# 如果输入没变 + 脚本没变 → 直接用缓存结果
# 如果改了参数或脚本 → 只重跑受影响的步骤
注意:不要手动删 work/ 目录,否则 -resume 没法用!
查看运行历史¶
# 查看所有运行记录
nextflow log # 列出所有运行的时间、状态、ID
# 查看某次运行的详细信息
nextflow log <run_name> -f name,status,hash # 查看每个任务的状态
清理缓存¶
# 清理所有 work 目录(释放磁盘空间)
nextflow clean -f # 强制清理(谨慎使用!会导致无法 resume)
# 只清理 30 天前的缓存
nextflow clean -before 30d -f # 清理 30 天前的旧缓存
运行时的终端输出解读¶
N E X T F L O W ~ version 25.10.4
Launching `metagenome_pipeline.nf` [happy_curie] DSL2 - revision: a1b2c3d4
executor > local (6) ← 本地运行,共 6 个任务
[ab/123456] process > FASTP (sample1) [100%] 3 of 3 ✔ ← 3 个样本质控全部完成
[cd/789012] process > BOWTIE2 (sample1) [ 66%] 2 of 3 ← 去宿主完成 2/3
[ef/345678] process > KRAKEN2 (sample1) [ 0%] 0 of 3 ← 分类还没开始
# [ab/123456] 是任务的工作目录(work/ab/123456...)
# 出错时去这个目录看日志
七、常见报错与解决¶
报错 1:No matching files found for pattern¶
完整报错:
原因:文件路径或命名模式不对。
排查步骤:
# 1. 检查文件是否存在
ls data/*_{R1,R2}.fastq.gz # 看看能不能匹配到文件
# 2. 检查文件命名
ls data/ # 看看实际文件名是什么
# 常见问题:文件名是 _1/_2 而不是 _R1/_R2
# 3. 修改 pattern
# 如果文件名是 sample1_1.fq.gz / sample1_2.fq.gz
nextflow run pipeline.nf --reads "data/*_{1,2}.fq.gz"
报错 2:Process terminated with error exit status 137¶
原因:内存不够(OOM Killer 杀掉了进程,退出码 137 = 被信号 SIGKILL 终止)。
解决:
// 在 nextflow.config 或 process 中增加内存
process {
withName: 'KRAKEN2_CLASSIFY' {
memory = '64 GB' // Kraken2 标准库需要很大内存
}
}
报错 3:Command error: Unable to access jarfile(Java 相关)¶
原因:Java 版本不对或 Java 没装好。
解决:
# 检查 Java 版本
java -version # 需要 Java 17+
# 用 conda 安装正确版本
conda install -c conda-forge openjdk=17 # 安装 Java 17
# 验证
nextflow -version # 重新检查 nextflow
报错 4:Workflow resume failed -- Work directory has changed¶
原因:换了工作目录后用 -resume,Nextflow 找不到之前的 work/ 缓存。
解决:
# 方法 1:回到原来的目录运行
cd /原来的目录
nextflow run pipeline.nf -resume
# 方法 2:指定 work 目录
nextflow run pipeline.nf -resume -w /原来的目录/work
八、速查表¶
Shell → Nextflow 对照表¶
| Shell 写法 | Nextflow 写法 |
|---|---|
SAMPLE="sample1" | params.sample = "sample1" |
for sample in ...; do | Channel.fromFilePairs(...) |
mkdir -p output/ | publishDir "output/", mode: 'copy' |
tool -t 8 | tool -t ${task.cpus} |
| 串行执行 | 自动并行 |
if [ $? -ne 0 ]; then exit 1; fi | 自动检测退出码 |
手动日志 echo "Step 1..." | tag "${sample_id}" + 自动日志 |
| 手动恢复 | nextflow run ... -resume |
Process 指令速查¶
| 指令 | 说明 | 示例 |
|---|---|---|
tag | 日志标签 | tag "${sample_id}" |
publishDir | 发布输出文件 | publishDir "results/", mode: 'copy' |
cpus | CPU 核数 | cpus 4 |
memory | 内存限制 | memory '16 GB' |
time | 时间限制 | time '4h' |
conda | Conda 依赖 | conda 'bioconda::fastp=1.3.3' |
container | Docker 容器 | container 'biocontainers/fastp:1.3.3' |
maxForks | 最大并行数 | maxForks 1(变串行) |
errorStrategy | 出错策略 | errorStrategy 'retry' |
maxRetries | 最大重试次数 | maxRetries 3 |
常用命令速查¶
| 命令 | 说明 |
|---|---|
nextflow run pipeline.nf | 运行流程 |
nextflow run pipeline.nf -resume | 断点续传 |
nextflow run pipeline.nf --reads "..." | 传递参数 |
nextflow run pipeline.nf -profile slurm | 使用集群配置 |
nextflow run pipeline.nf -with-report | 生成 HTML 报告 |
nextflow run pipeline.nf -with-timeline | 生成时间线 |
nextflow log | 查看运行历史 |
nextflow clean -f | 清理缓存 |
nextflow info | 系统信息 |
改写检查清单¶
把 Shell 脚本改成 Nextflow 时,逐项确认:
- [ ] 所有硬编码路径都改成了
params.xxx - [ ] 每个独立步骤都包装成了一个
process - [ ] 用
Channel.fromFilePairs()处理双端数据 - [ ] 用
tuple val(sample_id), path(reads)保留样本名 - [ ] 用
${task.cpus}代替硬编码线程数 - [ ] 用
publishDir发布结果文件 - [ ] 用
tag添加日志标签 - [ ] 创建了
nextflow.config配置文件 - [ ] 测试了
-resume断点续传功能 - [ ] 用
reads_ch.view()验证了 Channel 内容
学习资源¶
上一篇:560_Nextflow_DSL2入门 —— Nextflow 基础概念和安装配置。