跳转至

550_设计大规模日志系统


一句话说明

大规模日志系统(如 ELK Stack)负责收集、传输、存储、检索海量日志,是排查生信管道故障的关键工具。


核心知识点

日志系统要解决什么

问题:100台服务器,每天产生100GB日志,如何:
  1. 快速搜索"某样本的分析日志"
  2. 统计每天任务失败数
  3. 发现异常模式(如某错误突然增多)
  4. 保留90天历史日志

ELK Stack(最常用日志系统)

E = Elasticsearch  ← 存储和搜索引擎
L = Logstash       ← 日志收集和处理(较重)
K = Kibana         ← 可视化界面

现代变体:EFK Stack
F = Filebeat/Fluentd  ← 轻量级日志收集器(替代Logstash)

日志级别规范

# 日志级别从高到低
CRITICAL = 50  # 系统崩溃,立即处理
ERROR    = 40  # 操作失败,需要关注
WARNING  = 30  # 可能有问题,需要留意
INFO     = 20  # 正常运行信息(默认级别)
DEBUG    = 10  # 调试信息(生产环境关闭)

# 生信场景示例:
logger.info("开始分析样本 sample_id=SRR123456")
logger.warning("样本reads数量偏低: reads=8000 < 10000")
logger.error("BWA比对失败: returncode=1, stderr=...")
logger.critical("数据库连接中断,无法保存结果")

实战代码/设计图/模板

ELK 架构图

[各服务节点]
  ├── API服务日志
  ├── Worker任务日志
  └── 数据库慢查询日志
       │ 文件写入
[Filebeat Agent]  ← 每台服务器部署,读取日志文件
       │ 推送
[消息队列 Kafka]  ← 日志缓冲,防止ES压垮
       │ 消费
[Logstash]        ← 日志过滤、格式化、富化
       │ 写入
[Elasticsearch集群] ← 存储搜索(3节点以上)
[Kibana]          ← 查询、Dashboard、告警

结构化日志(推荐格式)

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    """JSON格式化器,便于机器解析"""

    def format(self, record: logging.LogRecord) -> str:
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        # 添加自定义字段
        if hasattr(record, 'job_id'):
            log_entry['job_id'] = record.job_id
        if hasattr(record, 'sample_id'):
            log_entry['sample_id'] = record.sample_id

        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)

        return json.dumps(log_entry, ensure_ascii=False)

# 配置日志
def setup_logger(name: str, log_file: str = None):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    # 控制台输出(开发环境)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(JSONFormatter())
    logger.addHandler(console_handler)

    # 文件输出(生产环境,Filebeat读取)
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(JSONFormatter())
        logger.addHandler(file_handler)

    return logger

# 使用示例
logger = setup_logger('bioinf', '/var/log/bioinf/app.log')

def run_pipeline(job_id: str, sample_id: str):
    # 添加上下文字段到日志记录
    extra = {'job_id': job_id, 'sample_id': sample_id}

    logger.info("开始执行流程", extra=extra)
    try:
        result = execute(job_id, sample_id)
        logger.info(f"执行完成, 结果大小={len(result)}", extra=extra)
    except Exception as e:
        logger.error(f"执行失败: {e}", extra=extra, exc_info=True)
        raise

Filebeat 配置

# filebeat.yml - 监听日志文件并发送到Elasticsearch
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/bioinf/*.log
    fields:
      service: bioinf-worker     # 自定义标签
      environment: production
    fields_under_root: true

    # 多行合并(Python traceback跨多行)
    multiline.pattern: '^\{'      # JSON格式以{开头
    multiline.negate: true
    multiline.match: after

processors:
  - decode_json_fields:            # 解析JSON日志
      fields: ["message"]
      target: ""

output.elasticsearch:
  hosts: ["es-master:9200"]
  index: "bioinf-logs-%{+yyyy.MM.dd}"  # 按天分索引

# 或者输出到Kafka
# output.kafka:
#   hosts: ["kafka:9092"]
#   topic: "bioinf-logs"

Elasticsearch 查询示例

# 查询某任务的所有日志
curl -X GET "es-master:9200/bioinf-logs-*/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "must": [
        {"term": {"job_id": "job_abc123"}},
        {"range": {"@timestamp": {"gte": "now-1h"}}}
      ]
    }
  },
  "sort": [{"@timestamp": "asc"}],
  "size": 100
}'

# 统计过去1小时错误数(Python调用)
from elasticsearch import Elasticsearch

es = Elasticsearch(["http://es-master:9200"])
result = es.count(
    index="bioinf-logs-*",
    body={
        "query": {
            "bool": {
                "must": [
                    {"term": {"level": "ERROR"}},
                    {"range": {"@timestamp": {"gte": "now-1h"}}}
                ]
            }
        }
    }
)
print(f"过去1小时错误数: {result['count']}")

面试常问点

问题参考答案
为什么用ELK而不是直接grep?多机器集中查询、全文检索、可视化
Logstash vs Filebeat?Filebeat轻量,采集用;Logstash重,处理用
日志数据如何归档?ES ILM策略,热→温→冷→删除
如何防止日志写入成为瓶颈?异步日志、Kafka缓冲、批量写入
日志保留多久?法规要求(医疗数据6年),一般90天

速查表

常用 Kibana 查询语法:
  job_id: "job_abc123"          # 精确匹配
  level: ERROR                   # 查错误日志
  message: *failed*              # 通配符搜索
  @timestamp: [now-1h TO now]   # 时间范围
  NOT level: DEBUG               # 排除DEBUG

日志文件管理:
  logrotate:按大小/时间轮转日志文件
  /etc/logrotate.d/bioinf:
    /var/log/bioinf/*.log {
      daily       # 每天轮转
      rotate 30   # 保留30天
      compress    # gzip压缩
      missingok   # 文件不存在不报错
    }