跳转至

542_设计高通量测序数据管道


一句话说明

高通量测序数据管道(pipeline)是将原始 FASTQ 文件自动化转化为生物学结论的数据流处理系统。


核心知识点

管道设计原则

  1. 幂等性:同一输入多次运行,结果相同
  2. 可重现性:固定软件版本,锁定参数
  3. 容错性:单步失败不影响已完成步骤
  4. 可扩展性:新样本加入无需改代码
  5. 可观测性:每步有日志、有指标

典型 WES/WGS 管道流程

FASTQ原始数据
[质控 FastQC/fastp]
    │ 过滤低质量reads,去接头
[比对 BWA-MEM2/STAR]
    │ 对齐到参考基因组
[排序去重 samtools sort + MarkDuplicates]
    │ 去PCR重复
[BQSR 碱基质量校正]
    │ 系统误差校正
[变异检测 GATK HaplotypeCaller]
    ├── SNV/InDel → VQSR过滤 → 注释 (ANNOVAR/VEP)
    └── CNV/SV → GATK-CNV / Manta
[结果输出 VCF/TSV 报告]

管道框架对比

框架语言适合场景优势
NextflowGroovy/DSL云HPC混合最活跃社区
SnakemakePython本地集群语法简单
WDL+CromwellWDLTerra平台GATK官方
CWLYAML跨平台标准化

实战代码/设计图/模板

Nextflow WES 管道骨架

// nextflow.config
params {
    reads    = "data/*_R{1,2}.fastq.gz"
    genome   = "ref/hg38.fa"
    dbsnp    = "ref/dbsnp_138.hg38.vcf.gz"
    outdir   = "results"
}

process FASTP {
    tag "${sample_id}"
    container 'biocontainers/fastp:v0.23.2'
    publishDir "${params.outdir}/fastp", mode: 'copy'

    input:
    tuple val(sample_id), path(reads)

    output:
    tuple val(sample_id), path("*_trimmed.fastq.gz"), emit: trimmed
    path "*.html", emit: report

    script:
    """
    fastp \
        -i ${reads[0]} -I ${reads[1]} \
        -o ${sample_id}_R1_trimmed.fastq.gz \
        -O ${sample_id}_R2_trimmed.fastq.gz \
        --thread ${task.cpus} \
        --html ${sample_id}_fastp.html \
        --json ${sample_id}_fastp.json
    """
}

process BWA_MEM {
    tag "${sample_id}"
    container 'biocontainers/bwa-mem2:v2.2.1'

    input:
    tuple val(sample_id), path(reads)
    path genome

    output:
    tuple val(sample_id), path("*.bam")

    script:
    """
    bwa-mem2 mem \
        -t ${task.cpus} \
        -R "@RG\\tID:${sample_id}\\tSM:${sample_id}\\tPL:ILLUMINA" \
        ${genome} ${reads} | \
    samtools sort -@ ${task.cpus} -o ${sample_id}.sorted.bam
    samtools index ${sample_id}.sorted.bam
    """
}

workflow {
    reads_ch = Channel.fromFilePairs(params.reads)
    FASTP(reads_ch)
    BWA_MEM(FASTP.out.trimmed, params.genome)
}

并行样本处理设计

样本1 ─┐
样本2 ─┤→ [散列 scatter] → [并行处理] → [聚合 gather] → 联合变异检测
样本3 ─┘

# 单核瓶颈 vs 并行设计
串行:N个样本 × T时间 = N×T总时间
并行:N个样本同时跑,总时间 ≈ T(理想)

质控指标自动判断

# 自动质控判断脚本
import json

def check_fastqc(json_file: str) -> dict:
    """解析 fastp JSON,判断质量是否合格"""
    with open(json_file) as f:
        data = json.load(f)

    summary = data["summary"]
    issues = []

    # 检查Q30比例(>80%为合格)
    q30 = summary["after_filtering"]["q30_rate"]
    if q30 < 0.80:
        issues.append(f"Q30偏低: {q30:.1%}")

    # 检查过滤率(>20%被过滤说明质量差)
    total_reads = data["read1_before_filtering"]["total_reads"]
    passed_reads = data["read1_after_filtering"]["total_reads"]
    filter_rate = 1 - passed_reads / total_reads
    if filter_rate > 0.20:
        issues.append(f"过滤率过高: {filter_rate:.1%}")

    return {"pass": len(issues) == 0, "issues": issues}

# 使用
result = check_fastqc("sample1_fastp.json")
if not result["pass"]:
    print(f"质控不合格: {result['issues']}")

面试常问点

问题参考答案
如何处理样本级别的并行?scatter-gather模式,每样本独立
管道如何断点续跑?检查output文件是否存在(-resume)
如何统一软件版本?Docker/Singularity容器锁定环境
如何监控管道运行?Nextflow Tower、日志聚合ELK
BQSR为什么要做?校正测序仪系统误差,提高SNP精度

速查表

典型处理时间参考(32核服务器):
  fastp质控:10-20分钟/样本(WES)
  BWA比对:30-60分钟/样本(WES)
  MarkDuplicates:15-30分钟/样本
  BQSR:20-40分钟/样本
  HaplotypeCaller:60-120分钟/样本

GATK最佳实践关键步骤:
  1. FastQC → 2. Trimming → 3. BWA-MEM
  4. SortSam → 5. MarkDuplicates
  6. BQSR → 7. HaplotypeCaller
  8. VQSR → 9. 注释