跳转至

566 Python 面向对象编程 — 生信实战

适用人群:编程基础薄弱的生信方向毕业生 Python 版本:3.10+(推荐 3.12) 关键词:class、OOP、继承、封装、多态、@dataclass


一、什么是 OOP?为什么生信工程师要学?

1.1 白话解释

面向对象编程(Object-Oriented Programming,简称 OOP)是一种代码组织方式。

最简单的类比:

概念类比说明
class(类)模具 / 图纸定义了"这类东西长什么样、能做什么"
object(对象/实例)用模具做出来的产品根据图纸造出来的具体东西

举个例子:你做月饼,模具就是 class(花纹、大小都定好了),用模具压出来的每个月饼就是 object。模具只有一个,但月饼可以压很多个。

1.2 没有 OOP 的痛点

写生信脚本写多了,你可能遇到过:

  • 全局变量乱飞sample_nameread_countquality 散落在各处,改一个变量名全文搜索替换
  • 函数越写越多parse_fastq()filter_fastq()count_reads()merge_results() ……几十个函数挤在一个文件里,互相依赖,改一个全崩
  • 复制粘贴改代码:处理 16S 和宏基因组的流程 90% 一样,但你复制了两份代码分别改,后面 bug 修一份忘了另一份

OOP 的核心价值:把数据和操作数据的方法打包在一起,让代码更有条理。


二、核心概念 — 用生信场景讲解

2.1 class 和 __init__

class 是定义"模具"的关键字。__init__初始化方法(白话:出厂设置),每次用模具造产品时自动运行。

class FastqSample:                          # 定义一个类,类名用大驼峰
    """表示一个 FASTQ 测序样本"""

    def __init__(self, sample_id, r1_path):  # __init__ = 出厂设置
        self.sample_id = sample_id           # self.xxx = 给这个产品贴标签
        self.r1_path = r1_path               # 记录 R1 文件路径

2.2 属性和方法

  • 属性(Attribute)= 贴在对象上的标签,如 sample.sample_id
  • 方法(Method)= 对象能做的事,如 sample.get_quality_stats()
class FastqSample:
    def __init__(self, sample_id):
        self.sample_id = sample_id          # 属性:样本 ID
        self.read_count = 0                 # 属性:reads 数量,默认 0

    def get_quality_stats(self):            # 方法:获取质量统计
        """返回该样本的基本质量信息"""
        return f"{self.sample_id}: {self.read_count} reads"

2.3 self 到底是什么?

白话:self 就是"自我介绍卡"。

当你写 self.sample_id = "S01" 时,意思是"的样本 ID 是 S01"。每个对象的 self 指向自己,这样 Python 才知道你在说哪个对象。

s1 = FastqSample("S01")                   # s1 的 self 指向 s1
s2 = FastqSample("S02")                   # s2 的 self 指向 s2
print(s1.sample_id)                        # 输出 S01(s1 的自我介绍卡上写的)
print(s2.sample_id)                        # 输出 S02(s2 的自我介绍卡上写的)

三、实战案例 1:FastqSample 类

一个完整的 FASTQ 样本管理类:

import os                                   # 导入文件操作模块


class FastqSample:
    """管理一个双端测序样本的所有信息和操作"""

    def __init__(self, sample_id, r1_path, r2_path=None,
                 read_count=0, quality_score=0.0):
        """
        初始化样本对象

        参数:
            sample_id: 样本编号,如 "T2D_001"
            r1_path: R1 文件路径
            r2_path: R2 文件路径(单端测序可不填)
            read_count: reads 总数
            quality_score: 平均质量分数(Phred)
        """
        self.sample_id = sample_id          # 样本编号
        self.r1_path = r1_path              # R1 FASTQ 路径
        self.r2_path = r2_path              # R2 FASTQ 路径(可选)
        self.read_count = read_count        # reads 数量
        self.quality_score = quality_score  # 平均质量值

    def check_paired(self):
        """检查是否为双端测序数据"""
        if self.r2_path is None:            # 如果没有 R2
            return False                    # 说明是单端
        # 检查两个文件是否都存在
        r1_exists = os.path.exists(self.r1_path)  # R1 是否在磁盘上
        r2_exists = os.path.exists(self.r2_path)  # R2 是否在磁盘上
        return r1_exists and r2_exists      # 两个都在才算配对成功

    def get_size_mb(self):
        """获取 FASTQ 文件总大小(单位 MB)"""
        total_size = 0                      # 先归零
        for path in [self.r1_path, self.r2_path]:  # 遍历 R1 和 R2
            if path and os.path.exists(path):      # 路径不为空且文件存在
                total_size += os.path.getsize(path)  # 累加文件字节数
        return round(total_size / (1024 * 1024), 2)  # 字节转 MB,保留 2 位

    def summary(self):
        """输出样本摘要信息"""
        paired_status = "双端" if self.r2_path else "单端"  # 判断测序模式
        return (
            f"样本: {self.sample_id}\n"                 # 样本 ID
            f"测序模式: {paired_status}\n"               # 单端/双端
            f"Reads 数: {self.read_count:,}\n"          # 千分位显示
            f"平均质量: Q{self.quality_score}\n"         # 质量分数
            f"文件大小: {self.get_size_mb()} MB"         # 文件大小
        )


# ===== 使用示例 =====
sample = FastqSample(                       # 创建一个样本对象
    sample_id="T2D_001",                    # 糖尿病队列第 1 个样本
    r1_path="/data/T2D_001_R1.fastq.gz",    # R1 路径
    r2_path="/data/T2D_001_R2.fastq.gz",    # R2 路径
    read_count=5_000_000,                   # 500 万条 reads
    quality_score=35.2                      # 平均 Q35.2
)

print(sample.summary())                    # 打印样本摘要
print(f"是否双端: {sample.check_paired()}") # 检查配对

四、实战案例 2:AbundanceTable 类

用 OOP 管理物种丰度表(宏基因组最常用的数据结构):

import pandas as pd                         # 数据处理
import numpy as np                           # 数值计算


class AbundanceTable:
    """物种丰度表的通用操作"""

    def __init__(self, data):
        """
        参数:
            data: pandas DataFrame,行=物种,列=样本
        """
        self.data = data.copy()             # 复制一份,避免修改原始数据(不可变原则)
        self.samples = list(data.columns)   # 样本名列表
        self.species = list(data.index)     # 物种名列表

    def filter_low_abundance(self, min_abundance=0.001):
        """过滤低丰度物种(默认去掉 < 0.1% 的)"""
        # 计算每个物种在所有样本中的平均丰度
        mean_abundance = self.data.mean(axis=1)  # 按行求平均
        # 只保留平均丰度 >= 阈值的物种
        mask = mean_abundance >= min_abundance   # 布尔索引
        filtered = self.data.loc[mask]           # 筛选
        return AbundanceTable(filtered)          # 返回新对象(不改原数据)

    def normalize(self):
        """相对丰度标准化(每列总和归一化到 1)"""
        col_sums = self.data.sum(axis=0)         # 每个样本的总丰度
        normalized = self.data.div(col_sums, axis=1)  # 每个值 / 列总和
        return AbundanceTable(normalized)        # 返回新对象

    def calculate_diversity(self, method="shannon"):
        """
        计算 alpha 多样性

        参数:
            method: 多样性指数,默认 Shannon
        返回:
            dict: {样本名: 多样性值}
        """
        results = {}                             # 存结果的字典
        for sample in self.samples:              # 遍历每个样本
            abundances = self.data[sample]        # 取这个样本的丰度列
            abundances = abundances[abundances > 0]  # 去掉 0(log0 无意义)
            if method == "shannon":
                # Shannon 公式: H = -sum(p * ln(p))
                p = abundances / abundances.sum()    # 计算相对比例
                diversity = -np.sum(p * np.log(p))   # Shannon 指数
            elif method == "simpson":
                # Simpson 公式: D = 1 - sum(p^2)
                p = abundances / abundances.sum()
                diversity = 1 - np.sum(p ** 2)       # Simpson 指数
            else:
                raise ValueError(f"不支持的方法: {method}")  # 不认识就报错
            results[sample] = round(diversity, 4)    # 保留 4 位小数
        return results


class MetagenomicAbundance(AbundanceTable):
    """
    继承 AbundanceTable,增加宏基因组特有功能

    白话:AbundanceTable 是"通用丰度表",
          MetagenomicAbundance 是"宏基因组专用丰度表",
          专用的可以做通用的所有事,还多了一些额外技能。
    """

    def __init__(self, data, taxonomy_level="genus"):
        super().__init__(data)                   # 调用父类的 __init__
        self.taxonomy_level = taxonomy_level     # 新增:分类学层级

    def collapse_to_level(self, level="phylum"):
        """将物种丰度合并到更高分类层级"""
        # 假设物种名格式: "k__Bacteria;p__Firmicutes;..."
        def extract_level(species_name, target):  # 从全名中提取某个层级
            levels = {"kingdom": 0, "phylum": 1, "class": 2,
                      "order": 3, "family": 4, "genus": 5, "species": 6}
            parts = species_name.split(";")       # 按分号拆分
            idx = levels.get(target, 1)           # 找到目标层级的位置
            if idx < len(parts):                  # 如果名字够长
                return parts[idx].strip()         # 返回对应层级
            return "Unclassified"                 # 不够长就标"未分类"

        new_index = [extract_level(s, level) for s in self.species]  # 提取层级
        grouped = self.data.copy()                # 复制数据
        grouped.index = new_index                 # 替换索引
        grouped = grouped.groupby(level=0).sum()  # 相同层级的加起来
        return MetagenomicAbundance(grouped, taxonomy_level=level)


# ===== 使用示例 =====
# 创建模拟丰度数据
mock_data = pd.DataFrame(
    {
        "T2D_001": [0.3, 0.25, 0.2, 0.15, 0.05, 0.05],   # 样本 1 各物种丰度
        "T2D_002": [0.1, 0.35, 0.15, 0.2, 0.1, 0.1],     # 样本 2
        "Ctrl_001": [0.2, 0.2, 0.2, 0.2, 0.1, 0.1],      # 对照组
    },
    index=["Bacteroides", "Prevotella", "Faecalibacterium",
           "Roseburia", "Akkermansia", "Others"]           # 物种名
)

table = AbundanceTable(mock_data)                          # 创建丰度表对象
diversity = table.calculate_diversity("shannon")           # 计算 Shannon 多样性
print("Shannon 多样性:", diversity)

filtered = table.filter_low_abundance(0.08)                # 过滤 < 8% 的物种
print("过滤后剩余物种:", filtered.species)

五、进阶概念

5.1 继承(Inheritance)— 工具运行器

白话:儿子可以用爸爸的所有技能,还能学新技能。

import subprocess                            # 调用命令行工具


class ToolRunner:
    """通用的命令行工具运行器(父类)"""

    def __init__(self, tool_name, version="unknown"):
        self.tool_name = tool_name           # 工具名,如 fastp
        self.version = version               # 版本号

    def run(self, cmd):
        """运行命令行指令"""
        print(f"[{self.tool_name}] 执行: {cmd}")  # 打印要执行的命令
        result = subprocess.run(             # 调用系统命令
            cmd, shell=True,                 # 用 shell 执行
            capture_output=True, text=True   # 捕获输出,文本模式
        )
        if result.returncode != 0:           # 非 0 表示出错
            raise RuntimeError(              # 抛出错误
                f"{self.tool_name} 失败: {result.stderr}"
            )
        return result.stdout                 # 返回标准输出

    def check_installed(self):
        """检查工具是否已安装"""
        try:
            self.run(f"which {self.tool_name}")  # Linux 的 which 命令
            return True
        except RuntimeError:
            return False


class FastpRunner(ToolRunner):
    """fastp 质控工具运行器(继承 ToolRunner)"""

    def __init__(self):
        super().__init__("fastp")            # 调用父类初始化,工具名固定

    def run_qc(self, r1, r2, out_dir):
        """运行 fastp 质控"""
        cmd = (
            f"fastp "                        # fastp 主程序
            f"-i {r1} -I {r2} "              # 输入的 R1 和 R2
            f"-o {out_dir}/clean_R1.fq.gz "  # 输出的干净 R1
            f"-O {out_dir}/clean_R2.fq.gz "  # 输出的干净 R2
            f"--json {out_dir}/report.json "  # JSON 报告
            f"--html {out_dir}/report.html"   # HTML 报告
        )
        return self.run(cmd)                 # 复用父类的 run() 方法

5.2 封装(Encapsulation)— 保护内部数据

白话:有些数据不想让外面随便改,就像银行卡密码不能写在卡面上。

class SequencingRun:
    """一次测序运行的封装示例"""

    def __init__(self, run_id, raw_data):
        self.run_id = run_id                 # 公开属性:运行 ID
        self._raw_data = raw_data            # 私有属性:原始数据(前缀 _)
        self._is_validated = False            # 私有属性:是否已验证

    def validate(self):
        """验证数据完整性"""
        if len(self._raw_data) > 0:          # 简单检查:数据不为空
            self._is_validated = True         # 标记为已验证
            return True
        return False

    def get_data(self):
        """只有验证通过后才能拿到数据"""
        if not self._is_validated:            # 未验证就报错
            raise PermissionError("请先调用 validate() 验证数据")
        return self._raw_data.copy()          # 返回副本,不给原件

Python 的约定:以 _ 开头的属性是"私有的",外部不应直接访问。这是一种君子协定,Python 不会强制阻止你访问,但你不应该这么做。

5.3 多态(Polymorphism)— 统一接口

白话:不管你用什么品牌的手机,"打电话"这个操作的方式都一样。

class Aligner:
    """比对工具的统一接口(基类)"""

    def align(self, reads, reference):
        raise NotImplementedError("子类必须实现 align 方法")


class Bowtie2Aligner(Aligner):
    """Bowtie2 比对"""

    def align(self, reads, reference):
        return f"bowtie2 -x {reference} -U {reads}"  # Bowtie2 命令


class BWAAligner(Aligner):
    """BWA 比对"""

    def align(self, reads, reference):
        return f"bwa mem {reference} {reads}"         # BWA 命令


# 使用时不需要关心具体是哪个工具
def run_alignment(aligner, reads, ref):
    """统一的比对流程,传入任何 Aligner 子类都能用"""
    cmd = aligner.align(reads, ref)          # 调用 align,具体实现由子类决定
    print(f"执行命令: {cmd}")
    return cmd

# bowtie2 和 bwa 用法完全一样
run_alignment(Bowtie2Aligner(), "sample.fq", "hg38")   # 用 Bowtie2
run_alignment(BWAAligner(), "sample.fq", "hg38")       # 用 BWA

5.4 @property 装饰器

白话:让方法"伪装"成属性,用起来更自然。

class GenomeAssembly:
    """基因组组装结果"""

    def __init__(self, contigs):
        self._contigs = contigs              # 存储所有 contig 序列

    @property
    def n50(self):
        """计算 N50,用法像属性:assembly.n50"""
        sorted_lens = sorted(                # 对 contig 长度排序
            [len(c) for c in self._contigs],
            reverse=True                     # 从大到小
        )
        total = sum(sorted_lens)             # 总长度
        cumsum = 0                           # 累加器
        for length in sorted_lens:
            cumsum += length
            if cumsum >= total / 2:          # 累加到 50% 总长时
                return length                # 这个长度就是 N50
        return 0

    @property
    def contig_count(self):
        """contig 总数"""
        return len(self._contigs)


assembly = GenomeAssembly(["ATCG" * 100, "GCTA" * 50, "TTAA" * 200])
print(f"N50: {assembly.n50}")                # 像属性一样用,不需要加括号
print(f"Contig 数: {assembly.contig_count}")

5.5 @dataclass 简化写法(推荐)

Python 3.7 引入的 @dataclass 装饰器可以自动生成 __init____repr____eq__ 等方法,大幅减少样板代码。Python 3.10+ 还支持 slots=True 优化内存。

from dataclasses import dataclass, field     # 导入 dataclass 模块


@dataclass                                   # 加上这个装饰器,自动生成 __init__ 等
class SampleInfo:
    """样本元信息 — 使用 dataclass 大幅简化代码"""
    sample_id: str                           # 样本 ID(必填)
    group: str                               # 分组(如 T2D / Control)
    age: int                                 # 年龄
    bmi: float                               # BMI 指数
    reads_count: int = 0                     # 测序深度(默认 0)
    tags: list = field(default_factory=list)  # 标签(可变默认值要用 field)

    @property
    def is_obese(self):
        """BMI >= 28 判定为肥胖"""
        return self.bmi >= 28.0


@dataclass(frozen=True)                      # frozen=True → 创建后不能改(不可变)
class GeneAnnotation:
    """基因注释信息(不可变对象)"""
    gene_id: str                             # 基因 ID
    gene_name: str                           # 基因名
    ko_id: str                               # KEGG Orthology ID
    description: str                         # 功能描述


# 使用示例 — 不需要写 __init__,自动生成
s = SampleInfo("T2D_001", "T2D", 55, 26.3, reads_count=8_000_000)
print(s)                                     # 自动生成 __repr__,打印漂亮
print(f"是否肥胖: {s.is_obese}")

gene = GeneAnnotation("K01990", "ABC transporter", "K01990", "ABC转运蛋白")
# gene.gene_name = "xxx"                    # 报错!frozen=True 不允许修改

推荐: 如果你的类主要是"存数据",优先用 @dataclass,代码量减少 50% 以上。


六、OOP vs 函数式:什么时候用 class,什么时候用函数?

场景推荐理由
一次性脚本(跑个质控、画个图)函数简单直接,不需要额外结构
管理多个样本/文件的状态class数据和操作绑在一起,不会搞混
写可复用的流程工具class继承 + 多态,新工具加起来很方便
简单的数据转换函数输入 → 输出,不需要维护状态
构建分析 pipelineclass封装每个步骤,流程更清晰
纯计算(Shannon 指数、距离矩阵)函数无状态,测试更简单

经验法则: 如果你发现自己在多个函数之间传递同一组变量(sample_id, r1_path, r2_path, ...),那就该用 class 把它们包在一起了。


七、常见报错

报错 1:TypeError: __init__() missing required argument

TypeError: FastqSample.__init__() missing 1 required positional argument: 'r1_path'

原因: 创建对象时漏传了必填参数。 解决: 检查 __init__ 需要哪些参数,一个都不能少(除了有默认值的)。

# 错误:
sample = FastqSample("S01")                # 缺少 r1_path

# 正确:
sample = FastqSample("S01", "/data/S01_R1.fq.gz")  # 传齐参数

报错 2:AttributeError: 'XXX' object has no attribute 'yyy'

AttributeError: 'FastqSample' object has no attribute 'quality'

原因: 访问了一个不存在的属性。常见于拼写错误或忘了在 __init__ 里定义。 解决: 检查 __init__ 中是否有 self.quality = ...,注意拼写。

报错 3:TypeError: method() takes 1 positional argument but 2 were given

TypeError: summary() takes 0 positional arguments but 1 was given

原因: 定义方法时忘写 self解决: 类里面的方法第一个参数永远是 self

# 错误:
def summary():                              # 缺少 self
    return self.sample_id

# 正确:
def summary(self):                          # 第一个参数必须是 self
    return self.sample_id

报错 4:TypeError: cannot unpack non-sequence NoneType

原因: 方法忘写 return,默认返回 None解决: 检查方法末尾是否有 return 语句。


八、速查表

基本语法

# 定义类
class MyClass:
    def __init__(self, arg1, arg2):          # 初始化
        self.arg1 = arg1                    # 设置属性
    def my_method(self):                    # 定义方法
        return self.arg1

# 创建对象
obj = MyClass("hello", 42)

# 继承
class Child(Parent):
    def __init__(self, extra_arg):
        super().__init__()                  # 调用父类初始化

# dataclass(推荐方式)
from dataclasses import dataclass
@dataclass
class Sample:
    name: str
    count: int = 0

关键概念一句话总结

概念一句话关键词
class定义模具class XXX:
object用模具造出来的产品obj = XXX()
__init__出厂设置第一个参数是 self
self自我介绍卡每个方法必须有
继承儿子用爸爸的技能class Son(Dad):
封装密码不写在卡面上self._private
多态不同手机同一个打电话操作子类重写父类方法
@property方法伪装成属性不用加括号
@dataclass自动生成样板代码from dataclasses import dataclass

命名规范

类型命名方式示例
类名大驼峰FastqSample
方法名小写下划线get_quality_stats()
私有属性前缀 _self._raw_data
常量全大写MAX_READ_LENGTH = 300

下一篇推荐: 567_pytest单元测试_生信函数测试.md — 学会给你的 class 和函数写测试