跳转至

418_MongoDB聚合管道


一句话说明

MongoDB聚合管道就像工厂流水线,数据从一个"工序"流向下一个,每个阶段负责过滤、变换或统计文档。


核心知识点

聚合管道常用阶段

阶段作用类比SQL
$match过滤文档WHERE
$project选择/改造字段SELECT
$group分组统计GROUP BY
$sort排序ORDER BY
$limit限制数量LIMIT
$skip跳过文档OFFSET
$unwind展开数组JOIN展开
$lookup关联其他集合LEFT JOIN
$addFields添加新字段SELECT计算列
$bucket按范围分桶CASE WHEN

关键操作符

  • $sum, $avg, $min, $max → 聚合统计
  • $push, $addToSet → 收集数组
  • $concat, $substr → 字符串操作
  • $cond, $ifNull → 条件判断

实战代码

// ========== MongoDB聚合管道实战(JavaScript/mongosh语法)==========
// 数据库:生信分析报告系统
// 集合:analysis_reports(每个文档是一次分析结果)

// ========== 示例文档结构 ==========
/*
{
  "_id": ObjectId("..."),
  "report_id": "RPT_001",
  "sample_id": "S001",
  "patient_id": "P001",
  "analysis_type": "variant_calling",
  "status": "completed",
  "created_at": ISODate("2024-01-15"),
  "run_time_seconds": 3600,
  "summary": {
    "total_variants": 15432,
    "snp_count": 14218,
    "indel_count": 1214,
    "quality_metrics": { "mean_depth": 45.7 }
  },
  "variants": [
    {"gene": "BRCA1", "consequence": "missense_variant", "pathogenicity": "Pathogenic"},
    {"gene": "TP53", "consequence": "stop_gained", "pathogenicity": "Pathogenic"},
    {"gene": "KRAS", "consequence": "missense_variant", "pathogenicity": "Likely_Pathogenic"}
  ]
}
*/

// ========== 1. 基本聚合:统计各分析类型的报告数量和平均运行时间 ==========
db.analysis_reports.aggregate([
  // Stage 1: 过滤已完成的报告(相当于WHERE status='completed')
  { $match: { status: "completed" } },

  // Stage 2: 分组统计(相当于GROUP BY analysis_type)
  { $group: {
    _id: "$analysis_type",              // 分组键
    report_count: { $sum: 1 },          // 计数(相当于COUNT(*))
    avg_runtime: { $avg: "$run_time_seconds" },  // 平均运行时间
    max_runtime: { $max: "$run_time_seconds" },  // 最长运行时间
    total_variants: { $sum: "$summary.total_variants" }  // 嵌套字段求和
  }},

  // Stage 3: 排序(按报告数量降序)
  { $sort: { report_count: -1 } },  // -1=降序, 1=升序

  // Stage 4: 只取前5个
  { $limit: 5 }
])

// ========== 2. $unwind + $group:分析所有变异的致病性分布 ==========
db.analysis_reports.aggregate([
  // 展开variants数组(每个变异变成一行文档)
  { $unwind: "$variants" },

  // 过滤:只看missense_variant
  { $match: { "variants.consequence": "missense_variant" } },

  // 统计各致病性类别的变异数量
  { $group: {
    _id: "$variants.pathogenicity",
    count: { $sum: 1 },
    affected_genes: { $addToSet: "$variants.gene" }  // 收集不重复的基因集合
  }},

  // 添加计算字段:基因数量
  { $addFields: { gene_count: { $size: "$affected_genes" } } },

  // 只保留需要的字段
  { $project: {
    pathogenicity: "$_id",  // 重命名_id字段
    count: 1,
    gene_count: 1,
    _id: 0  // 排除_id字段
  }},

  { $sort: { count: -1 } }
])

// ========== 3. $lookup:关联查询(类似LEFT JOIN)==========
// 将报告与患者信息关联
db.analysis_reports.aggregate([
  { $match: { status: "completed", analysis_type: "variant_calling" } },

  // 关联patients集合
  { $lookup: {
    from: "patients",           // 要关联的集合名
    localField: "patient_id",   // 当前集合的关联字段
    foreignField: "patient_id", // 目标集合的关联字段
    as: "patient_info"          // 结果存入这个字段(数组形式)
  }},

  // 展开关联结果(因为lookup返回数组,展开为单个文档)
  { $unwind: { path: "$patient_info", preserveNullAndEmpty: true }},
  // preserveNullAndEmpty: true = 没有匹配的文档也保留(类似LEFT JOIN)

  { $project: {
    report_id: 1,
    sample_id: 1,
    "summary.total_variants": 1,
    patient_name: "$patient_info.name",
    patient_age: "$patient_info.age",
    hospital: "$patient_info.hospital"
  }}
])

// ========== 4. $bucket:按变异数量分组(分桶)==========
db.analysis_reports.aggregate([
  { $match: { "summary.total_variants": { $exists: true } } },

  { $bucket: {
    groupBy: "$summary.total_variants",    // 按total_variants分桶
    boundaries: [0, 1000, 5000, 10000, 50000, Infinity],  // 桶的边界
    default: "Other",                      // 超出范围的放入"Other"
    output: {
      count: { $sum: 1 },
      sample_ids: { $push: "$sample_id" }  // 收集该桶中所有sample_id
    }
  }}
])
// 结果:
// {_id: 0,     count: 5,  ...}   // 0-1000个变异
// {_id: 1000,  count: 23, ...}   // 1000-5000个变异
// {_id: 5000,  count: 41, ...}   // 5000-10000个变异

// ========== 5. 时序分析:按月统计分析量 ==========
db.analysis_reports.aggregate([
  { $match: { created_at: { $gte: new Date("2024-01-01") } } },

  { $group: {
    _id: {
      year: { $year: "$created_at" },    // 提取年份
      month: { $month: "$created_at" }   // 提取月份
    },
    count: { $sum: 1 },
    avg_variants: { $avg: "$summary.total_variants" }
  }},

  // 添加格式化日期字段
  { $addFields: {
    period: { 
      $concat: [
        { $toString: "$_id.year" }, "-",
        { $cond: [
          { $lt: ["$_id.month", 10] },  // 月份<10时补0
          { $concat: ["0", { $toString: "$_id.month" }] },
          { $toString: "$_id.month" }
        ]}
      ]
    }
  }},

  { $sort: { "_id.year": 1, "_id.month": 1 } }  // 按时间升序
])
# ========== Python中使用pymongo执行聚合管道 ==========
from pymongo import MongoClient
import pandas as pd

client = MongoClient("mongodb://localhost:27017/")
db = client["bioinformatics"]

# 定义聚合管道(Python列表格式)
pipeline = [
    {"$match": {"status": "completed"}},  # 注意:Python用双引号
    {"$group": {
        "_id": "$analysis_type",
        "count": {"$sum": 1},
        "avg_runtime": {"$avg": "$run_time_seconds"}
    }},
    {"$sort": {"count": -1}}
]

# 执行聚合
results = list(db.analysis_reports.aggregate(pipeline))

# 转为DataFrame方便分析
df = pd.DataFrame(results)
df.rename(columns={"_id": "analysis_type"}, inplace=True)
print(df)

面试常问点

  1. Q: $match放在管道最前面有什么好处? A: $match越早过滤,后续阶段处理的文档数越少,性能越好。且$match在管道开头可以利用索引(放在后面就不行了)。

  2. Q: $unwind之后数据量会增加多少? A: 如果数组有n个元素,$unwind后变为n条文档。处理大数组时要特别注意内存使用(aggregation有100MB内存限制,超过需设allowDiskUse)。

  3. Q: $lookup和$unwind怎么配合实现LEFT JOIN? A: $lookup返回数组(可能为空),加{$unwind: {path: "...", preserveNullAndEmpty: true}}可保留没有匹配的文档(preserveNullAndEmpty=true是LEFT JOIN的关键)。


速查表

// 常用聚合操作符
{ $match: { field: value } }              // 过滤
{ $group: { _id: "$field", count: {$sum:1} } }  // 分组
{ $sort: { field: -1 } }                 // 排序
{ $limit: N }                            // 限制
{ $project: { field: 1, _id: 0 } }      // 选字段
{ $unwind: "$array_field" }              // 展开数组
{ $lookup: { from, localField, foreignField, as } }  // 关联
{ $addFields: { newField: expression } } // 添加字段
{ $count: "total" }                      // 计数输出

// 允许磁盘(超100MB时)
db.collection.aggregate(pipeline, { allowDiskUse: true })