跳转至

FastAPI 后端开发入门

一句话说明:FastAPI 是一个现代 Python Web 框架,让你用写 Python 函数的方式快速构建高性能 API 接口,自动生成文档,非常适合把生信分析工具"服务化"。


为什么生信工程师需要学 Web 后端?

你做完宏基因组分析,结果只有你自己电脑能看。老板想查某个菌的丰度?同事想跑你的流程?都得找你。

学会后端开发,你可以:

痛点 学完后端后
分析结果只能本地看 部署成 API,浏览器打开就能查
同事要用你的脚本得配环境 封装成服务,发个链接就行
面试只会写脚本 简历多一条"数据服务化"亮点
想做全栈生信平台 后端是第一步

数据服务化(白话:把你的分析脚本变成一个"点菜窗口",别人通过网络发请求,你的程序自动返回结果)是生信工程师进阶的核心能力。


核心概念白话版

API(Application Programming Interface)

白话:你去餐厅点菜,菜单就是 API。你不需要知道厨房怎么做菜,只需要照着菜单点,服务员就把菜端上来。API 就是程序之间的"菜单"。

HTTP 方法

白话:你对服务器说话的"动作"。

方法 白话含义 例子
GET 我要看/查 查询某个菌的丰度
POST 我要提交/创建 上传一个 FASTA 文件
PUT 我要整个替换 更新整条样本记录
DELETE 我要删除 删掉某个样本

路由(Route)

白话:URL 地址。/species/Ecoli 就是一条路由,告诉服务器"我要找大肠杆菌的信息"。

请求体(Request Body)

白话:你寄快递时填的包裹信息。POST 请求需要带上数据,这些数据就是请求体。

响应(Response)

白话:服务器给你的回信。通常是 JSON 格式(类似 Python 字典)。

中间件(Middleware)

白话:门卫。每个请求进来、每个响应出去都要经过它,可以做日志记录、权限检查等。

异步 async

白话:餐厅只有一个服务员,但他不会傻等厨房做完菜再去服务下一桌。他先记下订单,转身服务别人,菜好了再端过去。async 就是让你的服务器能同时处理多个请求而不卡住。


安装配置

# 建议在conda环境中操作
conda activate bioinfo

# 安装 FastAPI 和 Uvicorn(ASGI服务器,负责运行你的API)
pip install fastapi uvicorn

# 可选:安装常用扩展
pip install python-multipart  # 文件上传支持
pip install pydantic          # 数据验证(FastAPI自带,但可单独升级)
pip install aiofiles          # 异步文件操作

验证安装:

python -c "import fastapi; print(fastapi.__version__)"
# 输出类似:0.110.0


实操教程

1. Hello World —— 你的第一个 API

创建文件 main.py

# main.py —— 最简单的 FastAPI 应用
from fastapi import FastAPI  # 导入 FastAPI 类

app = FastAPI()  # 创建一个 FastAPI 应用实例,相当于开了一家餐厅

@app.get("/")  # 装饰器:当有人用 GET 方法访问根路径 "/" 时,执行下面的函数
def read_root():
    """根路径,返回欢迎信息"""
    return {"message": "你好,这是我的第一个 API!"}  # 返回 JSON 格式响应

@app.get("/hello/{name}")  # {name} 是路径参数,用户可以传入任意值
def say_hello(name: str):
    """向指定用户打招呼"""
    return {"message": f"你好,{name}!欢迎使用生信 API"}

运行:

# uvicorn 主文件名:app实例名 --reload(开发模式,修改代码自动重启)
uvicorn main:app --reload --host 0.0.0.0 --port 8000

浏览器打开 http://localhost:8000,你会看到 JSON 响应。 打开 http://localhost:8000/docs,你会看到自动生成的交互式文档。


2. 路径参数 —— URL 里传值

# path_params.py —— 路径参数示例
from fastapi import FastAPI

app = FastAPI()

@app.get("/species/{species_name}")  # species_name 从 URL 中提取
def get_species(species_name: str):
    """根据物种名查询信息"""
    # 实际项目中这里会查数据库
    return {
        "species": species_name,       # 返回物种名
        "status": "found",             # 查询状态
        "message": f"正在查询 {species_name} 的相关信息"
    }

@app.get("/sample/{sample_id}")  # sample_id 自动转换为整数
def get_sample(sample_id: int):
    """根据样本编号查询,FastAPI 自动做类型校验"""
    # 如果用户传入非数字,FastAPI 自动返回 422 错误
    return {"sample_id": sample_id, "info": f"这是第 {sample_id} 号样本"}

3. 查询参数 —— URL 问号后面的筛选条件

# query_params.py —— 查询参数示例
from fastapi import FastAPI
from typing import Optional  # 可选参数类型

app = FastAPI()

# 模拟数据库
fake_abundance_db = [
    {"species": "E.coli", "abundance": 0.15, "sample": "S001"},
    {"species": "B.fragilis", "abundance": 0.08, "sample": "S001"},
    {"species": "E.coli", "abundance": 0.22, "sample": "S002"},
    {"species": "L.acidophilus", "abundance": 0.05, "sample": "S002"},
]

@app.get("/abundance/")  # 查询参数不写在路径里,而是通过 ?key=value 传递
def query_abundance(
    species: Optional[str] = None,  # 可选参数:物种名筛选
    min_abundance: float = 0.0,     # 有默认值的参数:最低丰度阈值
    limit: int = 10                 # 返回条数限制
):
    """
    查询丰度数据
    调用示例:/abundance/?species=E.coli&min_abundance=0.1&limit=5
    """
    results = fake_abundance_db  # 从"数据库"获取全部数据

    # 按物种筛选
    if species:
        results = [r for r in results if r["species"] == species]

    # 按最低丰度筛选
    results = [r for r in results if r["abundance"] >= min_abundance]

    # 限制返回条数
    results = results[:limit]

    return {"count": len(results), "data": results}

访问 http://localhost:8000/abundance/?species=E.coli&min_abundance=0.1 即可筛选。


4. 请求体与 Pydantic 模型 —— 提交结构化数据

# request_body.py —— Pydantic 数据模型 + 请求体
from fastapi import FastAPI
from pydantic import BaseModel, Field  # Pydantic 用于数据验证
from typing import Optional, List

app = FastAPI()

# 定义数据模型(白话:规定提交数据必须长什么样)
class SampleCreate(BaseModel):
    """样本创建模型 —— 定义客户端必须提交哪些字段"""
    sample_id: str = Field(..., description="样本编号,如 S001")          # ... 表示必填
    patient_age: int = Field(..., ge=0, le=150, description="患者年龄")  # ge=大于等于0, le=小于等于150
    diagnosis: str = Field(..., description="诊断结果,如 T2D / Healthy")
    species_list: List[str] = Field(default=[], description="检出物种列表")
    notes: Optional[str] = Field(None, description="备注信息,可不填")

class SampleResponse(BaseModel):
    """响应模型 —— 定义返回给客户端的数据格式"""
    sample_id: str
    patient_age: int
    diagnosis: str
    species_list: List[str]
    notes: Optional[str]
    created: bool  # 额外字段:是否创建成功

# 内存存储(实际项目用数据库)
samples_db: List[dict] = []

@app.post("/samples/", response_model=SampleResponse)  # POST 方法,提交数据
def create_sample(sample: SampleCreate):
    """
    创建新样本记录
    客户端需要在请求体中发送 JSON 数据
    """
    sample_dict = sample.model_dump()  # Pydantic v2: 转为字典
    sample_dict["created"] = True      # 添加创建标记
    samples_db.append(sample_dict)     # 存入"数据库"
    return sample_dict                 # 返回创建的记录

用 curl 测试:

curl -X POST "http://localhost:8000/samples/" \
  -H "Content-Type: application/json" \
  -d '{"sample_id":"S003","patient_age":45,"diagnosis":"T2D","species_list":["E.coli","B.fragilis"]}'


5. CRUD 完整示例 —— 增删改查

# crud_app.py —— 完整的 CRUD 应用
from fastapi import FastAPI, HTTPException  # HTTPException 用于抛出错误
from pydantic import BaseModel
from typing import Optional, List

app = FastAPI(
    title="宏基因组样本管理 API",      # API 标题(显示在文档中)
    description="管理 T2D 研究的样本数据",  # API 描述
    version="1.0.0"                    # 版本号
)

# 数据模型
class Sample(BaseModel):
    sample_id: str
    patient_age: int
    diagnosis: str
    shannon_diversity: Optional[float] = None  # Shannon多样性指数,可选

class SampleUpdate(BaseModel):
    """更新模型:所有字段可选,只更新传入的字段"""
    patient_age: Optional[int] = None
    diagnosis: Optional[str] = None
    shannon_diversity: Optional[float] = None

# 模拟数据库
db: dict = {
    "S001": {"sample_id": "S001", "patient_age": 52, "diagnosis": "T2D", "shannon_diversity": 3.2},
    "S002": {"sample_id": "S002", "patient_age": 35, "diagnosis": "Healthy", "shannon_diversity": 4.1},
}

# ===== Create(创建)=====
@app.post("/samples/", status_code=201)  # 201 表示"已创建"
def create_sample(sample: Sample):
    """新增一条样本记录"""
    if sample.sample_id in db:
        raise HTTPException(status_code=400, detail="样本ID已存在")  # 抛出400错误
    db[sample.sample_id] = sample.model_dump()
    return {"message": "创建成功", "data": db[sample.sample_id]}

# ===== Read(查询)=====
@app.get("/samples/")  # 查询全部
def list_samples(skip: int = 0, limit: int = 20):
    """获取样本列表,支持分页"""
    all_samples = list(db.values())
    return {"total": len(all_samples), "data": all_samples[skip:skip+limit]}

@app.get("/samples/{sample_id}")  # 查询单条
def get_sample(sample_id: str):
    """根据ID查询单个样本"""
    if sample_id not in db:
        raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
    return db[sample_id]

# ===== Update(更新)=====
@app.put("/samples/{sample_id}")
def update_sample(sample_id: str, sample_update: SampleUpdate):
    """更新样本信息(部分更新)"""
    if sample_id not in db:
        raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
    # 只更新非None的字段
    update_data = sample_update.model_dump(exclude_unset=True)  # 排除未设置的字段
    for key, value in update_data.items():
        db[sample_id][key] = value
    return {"message": "更新成功", "data": db[sample_id]}

# ===== Delete(删除)=====
@app.delete("/samples/{sample_id}")
def delete_sample(sample_id: str):
    """删除样本记录"""
    if sample_id not in db:
        raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
    deleted = db.pop(sample_id)  # 从字典中移除并返回
    return {"message": "删除成功", "deleted": deleted}

6. 文件上传 —— 接收生信数据文件

# file_upload.py —— 文件上传示例
from fastapi import FastAPI, UploadFile, File  # UploadFile 处理上传文件
from fastapi.responses import JSONResponse
from typing import List  # 类型提示
import os

app = FastAPI()

UPLOAD_DIR = "./uploads"  # 上传文件保存目录
os.makedirs(UPLOAD_DIR, exist_ok=True)  # 目录不存在则创建

@app.post("/upload/fasta/")
async def upload_fasta(file: UploadFile = File(...)):
    """
    上传 FASTA 文件
    - async: 异步处理,不阻塞其他请求
    - UploadFile: FastAPI 的文件上传类型
    - File(...): 表示这个参数必须从表单数据中获取
    """
    # 检查文件扩展名
    if not file.filename.endswith((".fa", ".fasta", ".fna")):
        return JSONResponse(
            status_code=400,
            content={"error": "只接受 .fa / .fasta / .fna 格式文件"}
        )

    # 保存文件
    file_path = os.path.join(UPLOAD_DIR, file.filename)
    content = await file.read()  # await: 异步读取文件内容
    with open(file_path, "wb") as f:
        f.write(content)

    # 简单统计序列数量
    text = content.decode("utf-8")
    seq_count = text.count(">")  # FASTA 中每条序列以 > 开头

    return {
        "filename": file.filename,
        "size_bytes": len(content),       # 文件大小
        "sequence_count": seq_count,      # 序列条数
        "saved_to": file_path             # 保存路径
    }

@app.post("/upload/multiple/")
async def upload_multiple(files: List[UploadFile] = File(...)):
    """批量上传文件"""
    results = []
    for file in files:
        content = await file.read()
        results.append({
            "filename": file.filename,
            "size_bytes": len(content)
        })
    return {"uploaded_count": len(results), "files": results}

7. 中间件 —— 全局拦截处理

# middleware_demo.py —— 中间件示例
from fastapi import FastAPI, Request  # Request 对象包含请求的所有信息
import time

app = FastAPI()

@app.middleware("http")  # 注册 HTTP 中间件
async def add_process_time_header(request: Request, call_next):
    """
    计时中间件:记录每个请求的处理耗时
    - request: 进来的请求
    - call_next: 调用下一个处理环节(即你的路由函数)
    """
    start_time = time.time()                    # 记录开始时间
    response = await call_next(request)          # 执行实际的路由处理
    process_time = time.time() - start_time      # 计算耗时
    response.headers["X-Process-Time"] = str(process_time)  # 把耗时写入响应头
    print(f"[{request.method}] {request.url.path} - {process_time:.4f}s")  # 打印日志
    return response

@app.middleware("http")
async def add_cors_headers(request: Request, call_next):
    """
    简易 CORS 中间件:允许前端跨域访问
    (生产环境建议用 fastapi.middleware.cors.CORSMiddleware)
    """
    response = await call_next(request)
    response.headers["Access-Control-Allow-Origin"] = "*"       # 允许所有来源
    response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE"
    return response

@app.get("/")
def root():
    return {"message": "检查响应头中的 X-Process-Time"}

8. 错误处理 —— 优雅返回错误信息

# error_handling.py —— 错误处理示例
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError  # Pydantic 验证错误

app = FastAPI()

# 自定义异常类
class SpeciesNotFoundError(Exception):
    """物种未找到异常"""
    def __init__(self, species_name: str):
        self.species_name = species_name

# 注册自定义异常处理器
@app.exception_handler(SpeciesNotFoundError)
async def species_not_found_handler(request: Request, exc: SpeciesNotFoundError):
    """当抛出 SpeciesNotFoundError 时,返回统一格式的错误响应"""
    return JSONResponse(
        status_code=404,
        content={
            "error": "species_not_found",
            "detail": f"物种 '{exc.species_name}' 在数据库中不存在",
            "suggestion": "请检查拼写或查看 /species/list 获取所有可用物种"
        }
    )

# 覆盖默认的验证错误处理器(让错误信息更友好)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    """当请求数据不符合模型要求时,返回中文友好提示"""
    errors = exc.errors()
    friendly_errors = []
    for err in errors:
        friendly_errors.append({
            "field": " -> ".join(str(loc) for loc in err["loc"]),  # 出错字段
            "message": err["msg"],       # 错误原因
            "type": err["type"]          # 错误类型
        })
    return JSONResponse(
        status_code=422,
        content={"error": "数据验证失败", "details": friendly_errors}
    )

# 使用示例
known_species = ["E.coli", "B.fragilis", "L.acidophilus"]

@app.get("/species/{name}")
def get_species(name: str):
    """查询物种,不存在则触发自定义异常"""
    if name not in known_species:
        raise SpeciesNotFoundError(species_name=name)  # 抛出自定义异常
    return {"species": name, "found": True}

生信实战案例

案例 1:物种丰度查询 API

# bioinfo_abundance_api.py —— 物种丰度查询服务
from fastapi import FastAPI, Query
from typing import Optional
import json

app = FastAPI(title="物种丰度查询 API", version="1.0.0")

# 模拟从分析结果加载数据(实际中可读取 TSV/CSV 或数据库)
abundance_data = {
    "S001": {
        "patient": "P001", "group": "T2D",
        "species": {
            "Escherichia_coli": 0.152,
            "Bacteroides_fragilis": 0.083,
            "Faecalibacterium_prausnitzii": 0.041,
            "Akkermansia_muciniphila": 0.012,
        }
    },
    "S002": {
        "patient": "P002", "group": "Healthy",
        "species": {
            "Escherichia_coli": 0.045,
            "Bacteroides_fragilis": 0.125,
            "Faecalibacterium_prausnitzii": 0.098,
            "Akkermansia_muciniphila": 0.067,
        }
    },
    "S003": {
        "patient": "P003", "group": "T2D",
        "species": {
            "Escherichia_coli": 0.201,
            "Bacteroides_fragilis": 0.056,
            "Faecalibacterium_prausnitzii": 0.022,
            "Akkermansia_muciniphila": 0.008,
        }
    }
}

@app.get("/abundance/{sample_id}")
def get_sample_abundance(sample_id: str):
    """查询单个样本的所有物种丰度"""
    if sample_id not in abundance_data:
        return {"error": f"样本 {sample_id} 不存在", "available": list(abundance_data.keys())}
    return abundance_data[sample_id]

@app.get("/abundance/{sample_id}/{species}")
def get_species_abundance(sample_id: str, species: str):
    """查询指定样本中指定物种的丰度"""
    if sample_id not in abundance_data:
        return {"error": f"样本 {sample_id} 不存在"}
    species_data = abundance_data[sample_id]["species"]
    if species not in species_data:
        return {"error": f"物种 {species} 在样本 {sample_id} 中未检出"}
    return {
        "sample_id": sample_id,
        "species": species,
        "abundance": species_data[species],
        "group": abundance_data[sample_id]["group"]
    }

@app.get("/compare/")
def compare_groups(
    species: str = Query(..., description="要比较的物种名"),
    group1: str = Query("T2D", description="对照组1"),
    group2: str = Query("Healthy", description="对照组2")
):
    """比较两组间某物种的丰度差异"""
    group1_values = []
    group2_values = []

    for sample_id, data in abundance_data.items():
        if species in data["species"]:
            if data["group"] == group1:
                group1_values.append(data["species"][species])
            elif data["group"] == group2:
                group2_values.append(data["species"][species])

    # 计算均值
    avg1 = sum(group1_values) / len(group1_values) if group1_values else 0
    avg2 = sum(group2_values) / len(group2_values) if group2_values else 0

    return {
        "species": species,
        "comparison": f"{group1} vs {group2}",
        f"{group1}_mean": round(avg1, 4),
        f"{group2}_mean": round(avg2, 4),
        f"{group1}_n": len(group1_values),
        f"{group2}_n": len(group2_values),
        "fold_change": round(avg1 / avg2, 2) if avg2 > 0 else "inf"
    }

案例 2:FASTA 解析服务

# fasta_parser_api.py —— FASTA 文件解析服务
from fastapi import FastAPI, UploadFile, File
from pydantic import BaseModel
from typing import List
from io import StringIO

app = FastAPI(title="FASTA 解析服务", version="1.0.0")

class SequenceInfo(BaseModel):
    """单条序列的信息"""
    header: str          # 序列头(>后面的内容)
    length: int          # 序列长度
    gc_content: float    # GC含量

class FastaResult(BaseModel):
    """解析结果"""
    filename: str
    total_sequences: int
    total_bases: int
    average_length: float
    sequences: List[SequenceInfo]

def parse_fasta(content: str) -> List[dict]:
    """
    解析 FASTA 格式字符串
    FASTA格式:
    >序列名 描述
    ATCGATCG...
    """
    sequences = []           # 存放所有序列
    current_header = ""      # 当前序列头
    current_seq = []         # 当前序列片段列表

    for line in content.strip().split("\n"):
        line = line.strip()
        if line.startswith(">"):          # 遇到新序列头
            if current_header:            # 保存上一条序列
                seq = "".join(current_seq)
                sequences.append({"header": current_header, "sequence": seq})
            current_header = line[1:]     # 去掉 > 号
            current_seq = []              # 清空序列缓存
        else:
            current_seq.append(line)      # 累积序列行

    # 别忘了最后一条序列
    if current_header:
        seq = "".join(current_seq)
        sequences.append({"header": current_header, "sequence": seq})

    return sequences

def calc_gc(sequence: str) -> float:
    """计算 GC 含量"""
    seq_upper = sequence.upper()
    gc_count = seq_upper.count("G") + seq_upper.count("C")
    total = len(seq_upper)
    return round(gc_count / total, 4) if total > 0 else 0.0

@app.post("/parse/fasta/", response_model=FastaResult)
async def parse_fasta_file(file: UploadFile = File(...)):
    """
    上传 FASTA 文件,返回解析结果
    包括:序列数量、总碱基数、每条序列的长度和GC含量
    """
    # 读取上传文件内容
    content = await file.read()
    text = content.decode("utf-8")

    # 解析FASTA
    sequences = parse_fasta(text)

    # 计算统计信息
    seq_infos = []
    total_bases = 0
    for seq_dict in sequences:
        seq = seq_dict["sequence"]
        length = len(seq)
        total_bases += length
        seq_infos.append(SequenceInfo(
            header=seq_dict["header"],
            length=length,
            gc_content=calc_gc(seq)
        ))

    avg_length = total_bases / len(sequences) if sequences else 0

    return FastaResult(
        filename=file.filename,
        total_sequences=len(sequences),
        total_bases=total_bases,
        average_length=round(avg_length, 1),
        sequences=seq_infos
    )

@app.post("/gc-content/")
async def batch_gc_content(file: UploadFile = File(...)):
    """批量计算所有序列的GC含量,返回排序结果"""
    content = await file.read()
    text = content.decode("utf-8")
    sequences = parse_fasta(text)

    results = []
    for seq_dict in sequences:
        gc = calc_gc(seq_dict["sequence"])
        results.append({"header": seq_dict["header"], "gc_content": gc})

    # 按GC含量降序排列
    results.sort(key=lambda x: x["gc_content"], reverse=True)

    return {"total": len(results), "sorted_by": "gc_content_desc", "data": results}

案例 3:连接 SQLite 数据库

# sqlite_api.py —— FastAPI + SQLite 数据库
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, List
import sqlite3
import os

app = FastAPI(title="样本数据库 API", version="1.0.0")

DATABASE = "samples.db"  # SQLite 数据库文件路径

def get_db():
    """
    数据库连接的依赖注入函数
    每次请求获取一个连接,用完后自动关闭
    """
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row  # 让查询结果可以用字段名访问
    try:
        yield conn  # yield: 把连接"借"给路由函数
    finally:
        conn.close()  # 请求结束后自动关闭连接

def init_db():
    """初始化数据库表结构"""
    conn = sqlite3.connect(DATABASE)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS samples (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            sample_id TEXT UNIQUE NOT NULL,
            patient_age INTEGER,
            diagnosis TEXT,
            shannon_diversity REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    # 插入示例数据
    conn.execute("""
        INSERT OR IGNORE INTO samples (sample_id, patient_age, diagnosis, shannon_diversity)
        VALUES ('S001', 52, 'T2D', 3.2), ('S002', 35, 'Healthy', 4.1)
    """)
    conn.commit()
    conn.close()

# 应用启动时初始化数据库
init_db()

# 数据模型
class SampleIn(BaseModel):
    sample_id: str
    patient_age: int
    diagnosis: str
    shannon_diversity: Optional[float] = None

class SampleOut(BaseModel):
    id: int
    sample_id: str
    patient_age: int
    diagnosis: str
    shannon_diversity: Optional[float]

@app.get("/db/samples/", response_model=List[SampleOut])
def list_db_samples(
    diagnosis: Optional[str] = None,  # 按诊断筛选
    db: sqlite3.Connection = Depends(get_db)  # 依赖注入获取数据库连接
):
    """从数据库查询样本列表"""
    if diagnosis:
        cursor = db.execute(
            "SELECT * FROM samples WHERE diagnosis = ?", (diagnosis,)
        )
    else:
        cursor = db.execute("SELECT * FROM samples")

    rows = cursor.fetchall()
    return [dict(row) for row in rows]  # sqlite3.Row 转字典

@app.post("/db/samples/", status_code=201)
def create_db_sample(sample: SampleIn, db: sqlite3.Connection = Depends(get_db)):
    """向数据库插入新样本"""
    try:
        db.execute(
            "INSERT INTO samples (sample_id, patient_age, diagnosis, shannon_diversity) VALUES (?, ?, ?, ?)",
            (sample.sample_id, sample.patient_age, sample.diagnosis, sample.shannon_diversity)
        )
        db.commit()
        return {"message": "创建成功", "sample_id": sample.sample_id}
    except sqlite3.IntegrityError:
        raise HTTPException(status_code=400, detail=f"样本 {sample.sample_id} 已存在")

@app.delete("/db/samples/{sample_id}")
def delete_db_sample(sample_id: str, db: sqlite3.Connection = Depends(get_db)):
    """从数据库删除样本"""
    cursor = db.execute("DELETE FROM samples WHERE sample_id = ?", (sample_id,))
    db.commit()
    if cursor.rowcount == 0:
        raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
    return {"message": "删除成功", "sample_id": sample_id}

自动文档(Swagger UI / ReDoc)

FastAPI 最大的亮点之一:零配置自动生成交互式 API 文档

文档地址 说明
http://localhost:8000/docs Swagger UI —— 可以直接在页面上测试 API
http://localhost:8000/redoc ReDoc —— 更适合阅读的文档格式
http://localhost:8000/openapi.json OpenAPI JSON 规范文件

文档自动从你的代码中提取: - 函数的 docstring → API 描述 - Pydantic 模型的 Field description → 参数说明 - 类型注解 → 参数类型 - response_model → 响应格式


FastAPI vs Flask vs Django 对比

特性 FastAPI Flask Django
定位 高性能 API 轻量灵活 全栈框架
性能 极高(异步) 中等 中等
学习曲线 低(会 Python 就行) 最低 较高
自动文档 内置 Swagger/ReDoc 需插件 需插件
数据验证 内置 Pydantic 手动/插件 内置 Form
异步支持 原生 async/await 需 Quart Django 4.1+ 部分支持
数据库 ORM 需搭配 SQLAlchemy 需搭配 内置强大 ORM
管理后台 内置 Admin
适合场景 API 服务、微服务 小项目、原型 大型全栈网站
生信推荐度 ★★★★★ ★★★★ ★★★

结论:生信工程师做 API 服务首选 FastAPI,因为: 1. 你已经会 Python,上手快 2. 性能好,能处理大文件 3. 自动文档,老板/同事直接看 4. 类型提示 + Pydantic 验证,减少 Bug


部署

Uvicorn 生产配置

# 开发环境(单进程,自动重载)
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# 生产环境(多 worker 并发处理)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

# 或者用 gunicorn + uvicorn worker(Linux 推荐)
pip install gunicorn
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

Docker 打包

# Dockerfile —— 容器化你的 FastAPI 应用
FROM python:3.10-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

requirements.txt

fastapi==0.110.0
uvicorn[standard]==0.27.0
python-multipart==0.0.6
pydantic==2.6.0

构建和运行:

# 构建镜像
docker build -t my-bioinfo-api .

# 运行容器(-d 后台运行,-p 端口映射)
docker run -d -p 8000:8000 --name bioinfo-api my-bioinfo-api

# 查看日志
docker logs -f bioinfo-api


常见报错及解决

1. ModuleNotFoundError: No module named 'fastapi'

原因:没在当前环境安装 FastAPI。

# 解决:确认你在正确的 conda 环境中
conda activate bioinfo
pip install fastapi uvicorn

2. Error: Address already in use / [Errno 98] Address already in use

原因:端口 8000 被其他程序占用。

# 解决方案1:换个端口
uvicorn main:app --port 8001

# 解决方案2:杀掉占用端口的进程(Linux)
lsof -i :8000
kill -9 <PID>

3. 422 Unprocessable Entity

原因:请求数据不符合 Pydantic 模型要求(类型错误、缺少必填字段)。

# 解决:检查请求体是否符合模型定义
# 查看 /docs 页面的模型 Schema 确认字段要求
# 常见错误:字符串传了数字、缺少必填字段、字段名拼写错误

4. CORS policy: No 'Access-Control-Allow-Origin' header

原因:前端从不同域名访问你的 API,浏览器拦截了。

# 解决:添加 CORS 中间件
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],        # 允许所有来源(开发环境用)
    allow_credentials=True,
    allow_methods=["*"],        # 允许所有方法
    allow_headers=["*"],        # 允许所有请求头
)

5. RuntimeWarning: coroutine was never awaited

原因:在 async 函数中调用了协程但没加 await

# 错误写法
async def my_endpoint():
    file.read()  # 缺少 await

# 正确写法
async def my_endpoint():
    content = await file.read()  # 加上 await

6. TypeError: Object of type datetime is not JSON serializable

原因:返回的数据中有 Python 对象不能直接转 JSON。

# 解决:使用 Pydantic 模型作为 response_model,或手动转换
from datetime import datetime

@app.get("/time/")
def get_time():
    return {"now": datetime.now().isoformat()}  # 用 .isoformat() 转字符串


速查表

┌─────────────────────────────────────────────────────────────────┐
│                    FastAPI 速查表                                 │
├─────────────────────────────────────────────────────────────────┤
│ 创建应用        app = FastAPI()                                  │
│ GET 路由        @app.get("/path")                                │
│ POST 路由       @app.post("/path")                               │
│ PUT 路由        @app.put("/path")                                │
│ DELETE 路由     @app.delete("/path")                             │
│                                                                  │
│ 路径参数        @app.get("/items/{item_id}")                     │
│ 查询参数        def func(skip: int = 0, limit: int = 10)        │
│ 请求体          def func(item: PydanticModel)                    │
│ 文件上传        def func(file: UploadFile = File(...))           │
│                                                                  │
│ 状态码          @app.post("/", status_code=201)                  │
│ 抛出错误        raise HTTPException(status_code=404, detail="x") │
│ 依赖注入        def func(db = Depends(get_db))                   │
│ 中间件          @app.middleware("http")                           │
│                                                                  │
│ 响应模型        @app.get("/", response_model=MyModel)            │
│ 异步函数        async def func():  await something()             │
│                                                                  │
│ 运行(开发)    uvicorn main:app --reload                        │
│ 运行(生产)    uvicorn main:app --workers 4                     │
│ 文档地址        http://localhost:8000/docs                        │
│ ReDoc          http://localhost:8000/redoc                        │
└─────────────────────────────────────────────────────────────────┘

延伸资源

资源 说明
FastAPI 官方文档 最权威的教程,有中文版
FastAPI 中文教程 官方中文翻译
Pydantic V2 文档 数据验证库文档
Uvicorn 配置 ASGI 服务器详细配置
SQLAlchemy 生产级数据库 ORM
FastAPI + SQLAlchemy 教程 官方数据库教程
httpx 异步 HTTP 客户端(测试 API 用)
Docker 官方文档 容器化部署

学习路线建议

第1天:Hello World + 路径参数 + 查询参数 → 能跑起来
第2天:Pydantic 模型 + CRUD → 能做完整增删改查
第3天:文件上传 + 生信案例 → 能封装自己的分析工具
第4天:SQLite 数据库 + 部署 → 能上线给别人用

记住:FastAPI 的核心理念是 "写 Python 函数就是写 API"。你不需要学新语法,只是换了一种组织代码的方式。