550_设计大规模日志系统¶
一句话说明¶
大规模日志系统(如 ELK Stack)负责收集、传输、存储、检索海量日志,是排查生信管道故障的关键工具。
核心知识点¶
日志系统要解决什么¶
ELK Stack(最常用日志系统)¶
E = Elasticsearch ← 存储和搜索引擎
L = Logstash ← 日志收集和处理(较重)
K = Kibana ← 可视化界面
现代变体:EFK Stack
F = Filebeat/Fluentd ← 轻量级日志收集器(替代Logstash)
日志级别规范¶
# 日志级别从高到低
CRITICAL = 50 # 系统崩溃,立即处理
ERROR = 40 # 操作失败,需要关注
WARNING = 30 # 可能有问题,需要留意
INFO = 20 # 正常运行信息(默认级别)
DEBUG = 10 # 调试信息(生产环境关闭)
# 生信场景示例:
logger.info("开始分析样本 sample_id=SRR123456")
logger.warning("样本reads数量偏低: reads=8000 < 10000")
logger.error("BWA比对失败: returncode=1, stderr=...")
logger.critical("数据库连接中断,无法保存结果")
实战代码/设计图/模板¶
ELK 架构图¶
[各服务节点]
├── API服务日志
├── Worker任务日志
└── 数据库慢查询日志
│ 文件写入
[Filebeat Agent] ← 每台服务器部署,读取日志文件
│ 推送
[消息队列 Kafka] ← 日志缓冲,防止ES压垮
│ 消费
[Logstash] ← 日志过滤、格式化、富化
│ 写入
[Elasticsearch集群] ← 存储搜索(3节点以上)
│
[Kibana] ← 查询、Dashboard、告警
结构化日志(推荐格式)¶
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""JSON格式化器,便于机器解析"""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# 添加自定义字段
if hasattr(record, 'job_id'):
log_entry['job_id'] = record.job_id
if hasattr(record, 'sample_id'):
log_entry['sample_id'] = record.sample_id
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
return json.dumps(log_entry, ensure_ascii=False)
# 配置日志
def setup_logger(name: str, log_file: str = None):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# 控制台输出(开发环境)
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
logger.addHandler(console_handler)
# 文件输出(生产环境,Filebeat读取)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)
return logger
# 使用示例
logger = setup_logger('bioinf', '/var/log/bioinf/app.log')
def run_pipeline(job_id: str, sample_id: str):
# 添加上下文字段到日志记录
extra = {'job_id': job_id, 'sample_id': sample_id}
logger.info("开始执行流程", extra=extra)
try:
result = execute(job_id, sample_id)
logger.info(f"执行完成, 结果大小={len(result)}", extra=extra)
except Exception as e:
logger.error(f"执行失败: {e}", extra=extra, exc_info=True)
raise
Filebeat 配置¶
# filebeat.yml - 监听日志文件并发送到Elasticsearch
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/bioinf/*.log
fields:
service: bioinf-worker # 自定义标签
environment: production
fields_under_root: true
# 多行合并(Python traceback跨多行)
multiline.pattern: '^\{' # JSON格式以{开头
multiline.negate: true
multiline.match: after
processors:
- decode_json_fields: # 解析JSON日志
fields: ["message"]
target: ""
output.elasticsearch:
hosts: ["es-master:9200"]
index: "bioinf-logs-%{+yyyy.MM.dd}" # 按天分索引
# 或者输出到Kafka
# output.kafka:
# hosts: ["kafka:9092"]
# topic: "bioinf-logs"
Elasticsearch 查询示例¶
# 查询某任务的所有日志
curl -X GET "es-master:9200/bioinf-logs-*/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"bool": {
"must": [
{"term": {"job_id": "job_abc123"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
},
"sort": [{"@timestamp": "asc"}],
"size": 100
}'
# 统计过去1小时错误数(Python调用)
from elasticsearch import Elasticsearch
es = Elasticsearch(["http://es-master:9200"])
result = es.count(
index="bioinf-logs-*",
body={
"query": {
"bool": {
"must": [
{"term": {"level": "ERROR"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
}
}
)
print(f"过去1小时错误数: {result['count']}")
面试常问点¶
| 问题 | 参考答案 |
|---|---|
| 为什么用ELK而不是直接grep? | 多机器集中查询、全文检索、可视化 |
| Logstash vs Filebeat? | Filebeat轻量,采集用;Logstash重,处理用 |
| 日志数据如何归档? | ES ILM策略,热→温→冷→删除 |
| 如何防止日志写入成为瓶颈? | 异步日志、Kafka缓冲、批量写入 |
| 日志保留多久? | 法规要求(医疗数据6年),一般90天 |
速查表¶
常用 Kibana 查询语法:
job_id: "job_abc123" # 精确匹配
level: ERROR # 查错误日志
message: *failed* # 通配符搜索
@timestamp: [now-1h TO now] # 时间范围
NOT level: DEBUG # 排除DEBUG
日志文件管理:
logrotate:按大小/时间轮转日志文件
/etc/logrotate.d/bioinf:
/var/log/bioinf/*.log {
daily # 每天轮转
rotate 30 # 保留30天
compress # gzip压缩
missingok # 文件不存在不报错
}