418_MongoDB聚合管道¶
一句话说明¶
MongoDB聚合管道就像工厂流水线,数据从一个"工序"流向下一个,每个阶段负责过滤、变换或统计文档。
核心知识点¶
聚合管道常用阶段¶
| 阶段 | 作用 | 类比SQL |
|---|---|---|
| $match | 过滤文档 | WHERE |
| $project | 选择/改造字段 | SELECT |
| $group | 分组统计 | GROUP BY |
| $sort | 排序 | ORDER BY |
| $limit | 限制数量 | LIMIT |
| $skip | 跳过文档 | OFFSET |
| $unwind | 展开数组 | JOIN展开 |
| $lookup | 关联其他集合 | LEFT JOIN |
| $addFields | 添加新字段 | SELECT计算列 |
| $bucket | 按范围分桶 | CASE WHEN |
关键操作符¶
$sum,$avg,$min,$max→ 聚合统计$push,$addToSet→ 收集数组$concat,$substr→ 字符串操作$cond,$ifNull→ 条件判断
实战代码¶
// ========== MongoDB聚合管道实战(JavaScript/mongosh语法)==========
// 数据库:生信分析报告系统
// 集合:analysis_reports(每个文档是一次分析结果)
// ========== 示例文档结构 ==========
/*
{
"_id": ObjectId("..."),
"report_id": "RPT_001",
"sample_id": "S001",
"patient_id": "P001",
"analysis_type": "variant_calling",
"status": "completed",
"created_at": ISODate("2024-01-15"),
"run_time_seconds": 3600,
"summary": {
"total_variants": 15432,
"snp_count": 14218,
"indel_count": 1214,
"quality_metrics": { "mean_depth": 45.7 }
},
"variants": [
{"gene": "BRCA1", "consequence": "missense_variant", "pathogenicity": "Pathogenic"},
{"gene": "TP53", "consequence": "stop_gained", "pathogenicity": "Pathogenic"},
{"gene": "KRAS", "consequence": "missense_variant", "pathogenicity": "Likely_Pathogenic"}
]
}
*/
// ========== 1. 基本聚合:统计各分析类型的报告数量和平均运行时间 ==========
db.analysis_reports.aggregate([
// Stage 1: 过滤已完成的报告(相当于WHERE status='completed')
{ $match: { status: "completed" } },
// Stage 2: 分组统计(相当于GROUP BY analysis_type)
{ $group: {
_id: "$analysis_type", // 分组键
report_count: { $sum: 1 }, // 计数(相当于COUNT(*))
avg_runtime: { $avg: "$run_time_seconds" }, // 平均运行时间
max_runtime: { $max: "$run_time_seconds" }, // 最长运行时间
total_variants: { $sum: "$summary.total_variants" } // 嵌套字段求和
}},
// Stage 3: 排序(按报告数量降序)
{ $sort: { report_count: -1 } }, // -1=降序, 1=升序
// Stage 4: 只取前5个
{ $limit: 5 }
])
// ========== 2. $unwind + $group:分析所有变异的致病性分布 ==========
db.analysis_reports.aggregate([
// 展开variants数组(每个变异变成一行文档)
{ $unwind: "$variants" },
// 过滤:只看missense_variant
{ $match: { "variants.consequence": "missense_variant" } },
// 统计各致病性类别的变异数量
{ $group: {
_id: "$variants.pathogenicity",
count: { $sum: 1 },
affected_genes: { $addToSet: "$variants.gene" } // 收集不重复的基因集合
}},
// 添加计算字段:基因数量
{ $addFields: { gene_count: { $size: "$affected_genes" } } },
// 只保留需要的字段
{ $project: {
pathogenicity: "$_id", // 重命名_id字段
count: 1,
gene_count: 1,
_id: 0 // 排除_id字段
}},
{ $sort: { count: -1 } }
])
// ========== 3. $lookup:关联查询(类似LEFT JOIN)==========
// 将报告与患者信息关联
db.analysis_reports.aggregate([
{ $match: { status: "completed", analysis_type: "variant_calling" } },
// 关联patients集合
{ $lookup: {
from: "patients", // 要关联的集合名
localField: "patient_id", // 当前集合的关联字段
foreignField: "patient_id", // 目标集合的关联字段
as: "patient_info" // 结果存入这个字段(数组形式)
}},
// 展开关联结果(因为lookup返回数组,展开为单个文档)
{ $unwind: { path: "$patient_info", preserveNullAndEmpty: true }},
// preserveNullAndEmpty: true = 没有匹配的文档也保留(类似LEFT JOIN)
{ $project: {
report_id: 1,
sample_id: 1,
"summary.total_variants": 1,
patient_name: "$patient_info.name",
patient_age: "$patient_info.age",
hospital: "$patient_info.hospital"
}}
])
// ========== 4. $bucket:按变异数量分组(分桶)==========
db.analysis_reports.aggregate([
{ $match: { "summary.total_variants": { $exists: true } } },
{ $bucket: {
groupBy: "$summary.total_variants", // 按total_variants分桶
boundaries: [0, 1000, 5000, 10000, 50000, Infinity], // 桶的边界
default: "Other", // 超出范围的放入"Other"
output: {
count: { $sum: 1 },
sample_ids: { $push: "$sample_id" } // 收集该桶中所有sample_id
}
}}
])
// 结果:
// {_id: 0, count: 5, ...} // 0-1000个变异
// {_id: 1000, count: 23, ...} // 1000-5000个变异
// {_id: 5000, count: 41, ...} // 5000-10000个变异
// ========== 5. 时序分析:按月统计分析量 ==========
db.analysis_reports.aggregate([
{ $match: { created_at: { $gte: new Date("2024-01-01") } } },
{ $group: {
_id: {
year: { $year: "$created_at" }, // 提取年份
month: { $month: "$created_at" } // 提取月份
},
count: { $sum: 1 },
avg_variants: { $avg: "$summary.total_variants" }
}},
// 添加格式化日期字段
{ $addFields: {
period: {
$concat: [
{ $toString: "$_id.year" }, "-",
{ $cond: [
{ $lt: ["$_id.month", 10] }, // 月份<10时补0
{ $concat: ["0", { $toString: "$_id.month" }] },
{ $toString: "$_id.month" }
]}
]
}
}},
{ $sort: { "_id.year": 1, "_id.month": 1 } } // 按时间升序
])
# ========== Python中使用pymongo执行聚合管道 ==========
from pymongo import MongoClient
import pandas as pd
client = MongoClient("mongodb://localhost:27017/")
db = client["bioinformatics"]
# 定义聚合管道(Python列表格式)
pipeline = [
{"$match": {"status": "completed"}}, # 注意:Python用双引号
{"$group": {
"_id": "$analysis_type",
"count": {"$sum": 1},
"avg_runtime": {"$avg": "$run_time_seconds"}
}},
{"$sort": {"count": -1}}
]
# 执行聚合
results = list(db.analysis_reports.aggregate(pipeline))
# 转为DataFrame方便分析
df = pd.DataFrame(results)
df.rename(columns={"_id": "analysis_type"}, inplace=True)
print(df)
面试常问点¶
Q: $match放在管道最前面有什么好处? A: $match越早过滤,后续阶段处理的文档数越少,性能越好。且$match在管道开头可以利用索引(放在后面就不行了)。
Q: $unwind之后数据量会增加多少? A: 如果数组有n个元素,$unwind后变为n条文档。处理大数组时要特别注意内存使用(aggregation有100MB内存限制,超过需设allowDiskUse)。
Q: $lookup和$unwind怎么配合实现LEFT JOIN? A: $lookup返回数组(可能为空),加
{$unwind: {path: "...", preserveNullAndEmpty: true}}可保留没有匹配的文档(preserveNullAndEmpty=true是LEFT JOIN的关键)。
速查表¶
// 常用聚合操作符
{ $match: { field: value } } // 过滤
{ $group: { _id: "$field", count: {$sum:1} } } // 分组
{ $sort: { field: -1 } } // 排序
{ $limit: N } // 限制
{ $project: { field: 1, _id: 0 } } // 选字段
{ $unwind: "$array_field" } // 展开数组
{ $lookup: { from, localField, foreignField, as } } // 关联
{ $addFields: { newField: expression } } // 添加字段
{ $count: "total" } // 计数输出
// 允许磁盘(超100MB时)
db.collection.aggregate(pipeline, { allowDiskUse: true })