542_设计高通量测序数据管道
一句话说明
高通量测序数据管道(pipeline)是将原始 FASTQ 文件自动化转化为生物学结论的数据流处理系统。
核心知识点
管道设计原则
- 幂等性:同一输入多次运行,结果相同
- 可重现性:固定软件版本,锁定参数
- 容错性:单步失败不影响已完成步骤
- 可扩展性:新样本加入无需改代码
- 可观测性:每步有日志、有指标
典型 WES/WGS 管道流程
FASTQ原始数据
│
[质控 FastQC/fastp]
│ 过滤低质量reads,去接头
▼
[比对 BWA-MEM2/STAR]
│ 对齐到参考基因组
▼
[排序去重 samtools sort + MarkDuplicates]
│ 去PCR重复
▼
[BQSR 碱基质量校正]
│ 系统误差校正
▼
[变异检测 GATK HaplotypeCaller]
│
├── SNV/InDel → VQSR过滤 → 注释 (ANNOVAR/VEP)
└── CNV/SV → GATK-CNV / Manta
▼
[结果输出 VCF/TSV 报告]
管道框架对比
| 框架 | 语言 | 适合场景 | 优势 |
|---|
| Nextflow | Groovy/DSL | 云HPC混合 | 最活跃社区 |
| Snakemake | Python | 本地集群 | 语法简单 |
| WDL+Cromwell | WDL | Terra平台 | GATK官方 |
| CWL | YAML | 跨平台 | 标准化 |
实战代码/设计图/模板
Nextflow WES 管道骨架
// nextflow.config
params {
reads = "data/*_R{1,2}.fastq.gz"
genome = "ref/hg38.fa"
dbsnp = "ref/dbsnp_138.hg38.vcf.gz"
outdir = "results"
}
process FASTP {
tag "${sample_id}"
container 'biocontainers/fastp:v0.23.2'
publishDir "${params.outdir}/fastp", mode: 'copy'
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*_trimmed.fastq.gz"), emit: trimmed
path "*.html", emit: report
script:
"""
fastp \
-i ${reads[0]} -I ${reads[1]} \
-o ${sample_id}_R1_trimmed.fastq.gz \
-O ${sample_id}_R2_trimmed.fastq.gz \
--thread ${task.cpus} \
--html ${sample_id}_fastp.html \
--json ${sample_id}_fastp.json
"""
}
process BWA_MEM {
tag "${sample_id}"
container 'biocontainers/bwa-mem2:v2.2.1'
input:
tuple val(sample_id), path(reads)
path genome
output:
tuple val(sample_id), path("*.bam")
script:
"""
bwa-mem2 mem \
-t ${task.cpus} \
-R "@RG\\tID:${sample_id}\\tSM:${sample_id}\\tPL:ILLUMINA" \
${genome} ${reads} | \
samtools sort -@ ${task.cpus} -o ${sample_id}.sorted.bam
samtools index ${sample_id}.sorted.bam
"""
}
workflow {
reads_ch = Channel.fromFilePairs(params.reads)
FASTP(reads_ch)
BWA_MEM(FASTP.out.trimmed, params.genome)
}
并行样本处理设计
样本1 ─┐
样本2 ─┤→ [散列 scatter] → [并行处理] → [聚合 gather] → 联合变异检测
样本3 ─┘
# 单核瓶颈 vs 并行设计
串行:N个样本 × T时间 = N×T总时间
并行:N个样本同时跑,总时间 ≈ T(理想)
质控指标自动判断
# 自动质控判断脚本
import json
def check_fastqc(json_file: str) -> dict:
"""解析 fastp JSON,判断质量是否合格"""
with open(json_file) as f:
data = json.load(f)
summary = data["summary"]
issues = []
# 检查Q30比例(>80%为合格)
q30 = summary["after_filtering"]["q30_rate"]
if q30 < 0.80:
issues.append(f"Q30偏低: {q30:.1%}")
# 检查过滤率(>20%被过滤说明质量差)
total_reads = data["read1_before_filtering"]["total_reads"]
passed_reads = data["read1_after_filtering"]["total_reads"]
filter_rate = 1 - passed_reads / total_reads
if filter_rate > 0.20:
issues.append(f"过滤率过高: {filter_rate:.1%}")
return {"pass": len(issues) == 0, "issues": issues}
# 使用
result = check_fastqc("sample1_fastp.json")
if not result["pass"]:
print(f"质控不合格: {result['issues']}")
面试常问点
| 问题 | 参考答案 |
|---|
| 如何处理样本级别的并行? | scatter-gather模式,每样本独立 |
| 管道如何断点续跑? | 检查output文件是否存在(-resume) |
| 如何统一软件版本? | Docker/Singularity容器锁定环境 |
| 如何监控管道运行? | Nextflow Tower、日志聚合ELK |
| BQSR为什么要做? | 校正测序仪系统误差,提高SNP精度 |
速查表
典型处理时间参考(32核服务器):
fastp质控:10-20分钟/样本(WES)
BWA比对:30-60分钟/样本(WES)
MarkDuplicates:15-30分钟/样本
BQSR:20-40分钟/样本
HaplotypeCaller:60-120分钟/样本
GATK最佳实践关键步骤:
1. FastQC → 2. Trimming → 3. BWA-MEM
4. SortSam → 5. MarkDuplicates
6. BQSR → 7. HaplotypeCaller
8. VQSR → 9. 注释