FastAPI 后端开发入门¶
一句话说明:FastAPI 是一个现代 Python Web 框架,让你用写 Python 函数的方式快速构建高性能 API 接口,自动生成文档,非常适合把生信分析工具"服务化"。
为什么生信工程师需要学 Web 后端?¶
你做完宏基因组分析,结果只有你自己电脑能看。老板想查某个菌的丰度?同事想跑你的流程?都得找你。
学会后端开发,你可以:
| 痛点 | 学完后端后 |
|---|---|
| 分析结果只能本地看 | 部署成 API,浏览器打开就能查 |
| 同事要用你的脚本得配环境 | 封装成服务,发个链接就行 |
| 面试只会写脚本 | 简历多一条"数据服务化"亮点 |
| 想做全栈生信平台 | 后端是第一步 |
数据服务化(白话:把你的分析脚本变成一个"点菜窗口",别人通过网络发请求,你的程序自动返回结果)是生信工程师进阶的核心能力。
核心概念白话版¶
API(Application Programming Interface)¶
白话:你去餐厅点菜,菜单就是 API。你不需要知道厨房怎么做菜,只需要照着菜单点,服务员就把菜端上来。API 就是程序之间的"菜单"。
HTTP 方法¶
白话:你对服务器说话的"动作"。
| 方法 | 白话含义 | 例子 |
|---|---|---|
| GET | 我要看/查 | 查询某个菌的丰度 |
| POST | 我要提交/创建 | 上传一个 FASTA 文件 |
| PUT | 我要整个替换 | 更新整条样本记录 |
| DELETE | 我要删除 | 删掉某个样本 |
路由(Route)¶
白话:URL 地址。/species/Ecoli 就是一条路由,告诉服务器"我要找大肠杆菌的信息"。
请求体(Request Body)¶
白话:你寄快递时填的包裹信息。POST 请求需要带上数据,这些数据就是请求体。
响应(Response)¶
白话:服务器给你的回信。通常是 JSON 格式(类似 Python 字典)。
中间件(Middleware)¶
白话:门卫。每个请求进来、每个响应出去都要经过它,可以做日志记录、权限检查等。
异步 async¶
白话:餐厅只有一个服务员,但他不会傻等厨房做完菜再去服务下一桌。他先记下订单,转身服务别人,菜好了再端过去。async 就是让你的服务器能同时处理多个请求而不卡住。
安装配置¶
# 建议在conda环境中操作
conda activate bioinfo
# 安装 FastAPI 和 Uvicorn(ASGI服务器,负责运行你的API)
pip install fastapi uvicorn
# 可选:安装常用扩展
pip install python-multipart # 文件上传支持
pip install pydantic # 数据验证(FastAPI自带,但可单独升级)
pip install aiofiles # 异步文件操作
验证安装:
python -c "import fastapi; print(fastapi.__version__)"
# 输出类似:0.110.0
实操教程¶
1. Hello World —— 你的第一个 API¶
创建文件 main.py:
# main.py —— 最简单的 FastAPI 应用
from fastapi import FastAPI # 导入 FastAPI 类
app = FastAPI() # 创建一个 FastAPI 应用实例,相当于开了一家餐厅
@app.get("/") # 装饰器:当有人用 GET 方法访问根路径 "/" 时,执行下面的函数
def read_root():
"""根路径,返回欢迎信息"""
return {"message": "你好,这是我的第一个 API!"} # 返回 JSON 格式响应
@app.get("/hello/{name}") # {name} 是路径参数,用户可以传入任意值
def say_hello(name: str):
"""向指定用户打招呼"""
return {"message": f"你好,{name}!欢迎使用生信 API"}
运行:
# uvicorn 主文件名:app实例名 --reload(开发模式,修改代码自动重启)
uvicorn main:app --reload --host 0.0.0.0 --port 8000
浏览器打开 http://localhost:8000,你会看到 JSON 响应。
打开 http://localhost:8000/docs,你会看到自动生成的交互式文档。
2. 路径参数 —— URL 里传值¶
# path_params.py —— 路径参数示例
from fastapi import FastAPI
app = FastAPI()
@app.get("/species/{species_name}") # species_name 从 URL 中提取
def get_species(species_name: str):
"""根据物种名查询信息"""
# 实际项目中这里会查数据库
return {
"species": species_name, # 返回物种名
"status": "found", # 查询状态
"message": f"正在查询 {species_name} 的相关信息"
}
@app.get("/sample/{sample_id}") # sample_id 自动转换为整数
def get_sample(sample_id: int):
"""根据样本编号查询,FastAPI 自动做类型校验"""
# 如果用户传入非数字,FastAPI 自动返回 422 错误
return {"sample_id": sample_id, "info": f"这是第 {sample_id} 号样本"}
3. 查询参数 —— URL 问号后面的筛选条件¶
# query_params.py —— 查询参数示例
from fastapi import FastAPI
from typing import Optional # 可选参数类型
app = FastAPI()
# 模拟数据库
fake_abundance_db = [
{"species": "E.coli", "abundance": 0.15, "sample": "S001"},
{"species": "B.fragilis", "abundance": 0.08, "sample": "S001"},
{"species": "E.coli", "abundance": 0.22, "sample": "S002"},
{"species": "L.acidophilus", "abundance": 0.05, "sample": "S002"},
]
@app.get("/abundance/") # 查询参数不写在路径里,而是通过 ?key=value 传递
def query_abundance(
species: Optional[str] = None, # 可选参数:物种名筛选
min_abundance: float = 0.0, # 有默认值的参数:最低丰度阈值
limit: int = 10 # 返回条数限制
):
"""
查询丰度数据
调用示例:/abundance/?species=E.coli&min_abundance=0.1&limit=5
"""
results = fake_abundance_db # 从"数据库"获取全部数据
# 按物种筛选
if species:
results = [r for r in results if r["species"] == species]
# 按最低丰度筛选
results = [r for r in results if r["abundance"] >= min_abundance]
# 限制返回条数
results = results[:limit]
return {"count": len(results), "data": results}
访问 http://localhost:8000/abundance/?species=E.coli&min_abundance=0.1 即可筛选。
4. 请求体与 Pydantic 模型 —— 提交结构化数据¶
# request_body.py —— Pydantic 数据模型 + 请求体
from fastapi import FastAPI
from pydantic import BaseModel, Field # Pydantic 用于数据验证
from typing import Optional, List
app = FastAPI()
# 定义数据模型(白话:规定提交数据必须长什么样)
class SampleCreate(BaseModel):
"""样本创建模型 —— 定义客户端必须提交哪些字段"""
sample_id: str = Field(..., description="样本编号,如 S001") # ... 表示必填
patient_age: int = Field(..., ge=0, le=150, description="患者年龄") # ge=大于等于0, le=小于等于150
diagnosis: str = Field(..., description="诊断结果,如 T2D / Healthy")
species_list: List[str] = Field(default=[], description="检出物种列表")
notes: Optional[str] = Field(None, description="备注信息,可不填")
class SampleResponse(BaseModel):
"""响应模型 —— 定义返回给客户端的数据格式"""
sample_id: str
patient_age: int
diagnosis: str
species_list: List[str]
notes: Optional[str]
created: bool # 额外字段:是否创建成功
# 内存存储(实际项目用数据库)
samples_db: List[dict] = []
@app.post("/samples/", response_model=SampleResponse) # POST 方法,提交数据
def create_sample(sample: SampleCreate):
"""
创建新样本记录
客户端需要在请求体中发送 JSON 数据
"""
sample_dict = sample.model_dump() # Pydantic v2: 转为字典
sample_dict["created"] = True # 添加创建标记
samples_db.append(sample_dict) # 存入"数据库"
return sample_dict # 返回创建的记录
用 curl 测试:
curl -X POST "http://localhost:8000/samples/" \
-H "Content-Type: application/json" \
-d '{"sample_id":"S003","patient_age":45,"diagnosis":"T2D","species_list":["E.coli","B.fragilis"]}'
5. CRUD 完整示例 —— 增删改查¶
# crud_app.py —— 完整的 CRUD 应用
from fastapi import FastAPI, HTTPException # HTTPException 用于抛出错误
from pydantic import BaseModel
from typing import Optional, List
app = FastAPI(
title="宏基因组样本管理 API", # API 标题(显示在文档中)
description="管理 T2D 研究的样本数据", # API 描述
version="1.0.0" # 版本号
)
# 数据模型
class Sample(BaseModel):
sample_id: str
patient_age: int
diagnosis: str
shannon_diversity: Optional[float] = None # Shannon多样性指数,可选
class SampleUpdate(BaseModel):
"""更新模型:所有字段可选,只更新传入的字段"""
patient_age: Optional[int] = None
diagnosis: Optional[str] = None
shannon_diversity: Optional[float] = None
# 模拟数据库
db: dict = {
"S001": {"sample_id": "S001", "patient_age": 52, "diagnosis": "T2D", "shannon_diversity": 3.2},
"S002": {"sample_id": "S002", "patient_age": 35, "diagnosis": "Healthy", "shannon_diversity": 4.1},
}
# ===== Create(创建)=====
@app.post("/samples/", status_code=201) # 201 表示"已创建"
def create_sample(sample: Sample):
"""新增一条样本记录"""
if sample.sample_id in db:
raise HTTPException(status_code=400, detail="样本ID已存在") # 抛出400错误
db[sample.sample_id] = sample.model_dump()
return {"message": "创建成功", "data": db[sample.sample_id]}
# ===== Read(查询)=====
@app.get("/samples/") # 查询全部
def list_samples(skip: int = 0, limit: int = 20):
"""获取样本列表,支持分页"""
all_samples = list(db.values())
return {"total": len(all_samples), "data": all_samples[skip:skip+limit]}
@app.get("/samples/{sample_id}") # 查询单条
def get_sample(sample_id: str):
"""根据ID查询单个样本"""
if sample_id not in db:
raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
return db[sample_id]
# ===== Update(更新)=====
@app.put("/samples/{sample_id}")
def update_sample(sample_id: str, sample_update: SampleUpdate):
"""更新样本信息(部分更新)"""
if sample_id not in db:
raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
# 只更新非None的字段
update_data = sample_update.model_dump(exclude_unset=True) # 排除未设置的字段
for key, value in update_data.items():
db[sample_id][key] = value
return {"message": "更新成功", "data": db[sample_id]}
# ===== Delete(删除)=====
@app.delete("/samples/{sample_id}")
def delete_sample(sample_id: str):
"""删除样本记录"""
if sample_id not in db:
raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
deleted = db.pop(sample_id) # 从字典中移除并返回
return {"message": "删除成功", "deleted": deleted}
6. 文件上传 —— 接收生信数据文件¶
# file_upload.py —— 文件上传示例
from fastapi import FastAPI, UploadFile, File # UploadFile 处理上传文件
from fastapi.responses import JSONResponse
from typing import List # 类型提示
import os
app = FastAPI()
UPLOAD_DIR = "./uploads" # 上传文件保存目录
os.makedirs(UPLOAD_DIR, exist_ok=True) # 目录不存在则创建
@app.post("/upload/fasta/")
async def upload_fasta(file: UploadFile = File(...)):
"""
上传 FASTA 文件
- async: 异步处理,不阻塞其他请求
- UploadFile: FastAPI 的文件上传类型
- File(...): 表示这个参数必须从表单数据中获取
"""
# 检查文件扩展名
if not file.filename.endswith((".fa", ".fasta", ".fna")):
return JSONResponse(
status_code=400,
content={"error": "只接受 .fa / .fasta / .fna 格式文件"}
)
# 保存文件
file_path = os.path.join(UPLOAD_DIR, file.filename)
content = await file.read() # await: 异步读取文件内容
with open(file_path, "wb") as f:
f.write(content)
# 简单统计序列数量
text = content.decode("utf-8")
seq_count = text.count(">") # FASTA 中每条序列以 > 开头
return {
"filename": file.filename,
"size_bytes": len(content), # 文件大小
"sequence_count": seq_count, # 序列条数
"saved_to": file_path # 保存路径
}
@app.post("/upload/multiple/")
async def upload_multiple(files: List[UploadFile] = File(...)):
"""批量上传文件"""
results = []
for file in files:
content = await file.read()
results.append({
"filename": file.filename,
"size_bytes": len(content)
})
return {"uploaded_count": len(results), "files": results}
7. 中间件 —— 全局拦截处理¶
# middleware_demo.py —— 中间件示例
from fastapi import FastAPI, Request # Request 对象包含请求的所有信息
import time
app = FastAPI()
@app.middleware("http") # 注册 HTTP 中间件
async def add_process_time_header(request: Request, call_next):
"""
计时中间件:记录每个请求的处理耗时
- request: 进来的请求
- call_next: 调用下一个处理环节(即你的路由函数)
"""
start_time = time.time() # 记录开始时间
response = await call_next(request) # 执行实际的路由处理
process_time = time.time() - start_time # 计算耗时
response.headers["X-Process-Time"] = str(process_time) # 把耗时写入响应头
print(f"[{request.method}] {request.url.path} - {process_time:.4f}s") # 打印日志
return response
@app.middleware("http")
async def add_cors_headers(request: Request, call_next):
"""
简易 CORS 中间件:允许前端跨域访问
(生产环境建议用 fastapi.middleware.cors.CORSMiddleware)
"""
response = await call_next(request)
response.headers["Access-Control-Allow-Origin"] = "*" # 允许所有来源
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE"
return response
@app.get("/")
def root():
return {"message": "检查响应头中的 X-Process-Time"}
8. 错误处理 —— 优雅返回错误信息¶
# error_handling.py —— 错误处理示例
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError # Pydantic 验证错误
app = FastAPI()
# 自定义异常类
class SpeciesNotFoundError(Exception):
"""物种未找到异常"""
def __init__(self, species_name: str):
self.species_name = species_name
# 注册自定义异常处理器
@app.exception_handler(SpeciesNotFoundError)
async def species_not_found_handler(request: Request, exc: SpeciesNotFoundError):
"""当抛出 SpeciesNotFoundError 时,返回统一格式的错误响应"""
return JSONResponse(
status_code=404,
content={
"error": "species_not_found",
"detail": f"物种 '{exc.species_name}' 在数据库中不存在",
"suggestion": "请检查拼写或查看 /species/list 获取所有可用物种"
}
)
# 覆盖默认的验证错误处理器(让错误信息更友好)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""当请求数据不符合模型要求时,返回中文友好提示"""
errors = exc.errors()
friendly_errors = []
for err in errors:
friendly_errors.append({
"field": " -> ".join(str(loc) for loc in err["loc"]), # 出错字段
"message": err["msg"], # 错误原因
"type": err["type"] # 错误类型
})
return JSONResponse(
status_code=422,
content={"error": "数据验证失败", "details": friendly_errors}
)
# 使用示例
known_species = ["E.coli", "B.fragilis", "L.acidophilus"]
@app.get("/species/{name}")
def get_species(name: str):
"""查询物种,不存在则触发自定义异常"""
if name not in known_species:
raise SpeciesNotFoundError(species_name=name) # 抛出自定义异常
return {"species": name, "found": True}
生信实战案例¶
案例 1:物种丰度查询 API¶
# bioinfo_abundance_api.py —— 物种丰度查询服务
from fastapi import FastAPI, Query
from typing import Optional
import json
app = FastAPI(title="物种丰度查询 API", version="1.0.0")
# 模拟从分析结果加载数据(实际中可读取 TSV/CSV 或数据库)
abundance_data = {
"S001": {
"patient": "P001", "group": "T2D",
"species": {
"Escherichia_coli": 0.152,
"Bacteroides_fragilis": 0.083,
"Faecalibacterium_prausnitzii": 0.041,
"Akkermansia_muciniphila": 0.012,
}
},
"S002": {
"patient": "P002", "group": "Healthy",
"species": {
"Escherichia_coli": 0.045,
"Bacteroides_fragilis": 0.125,
"Faecalibacterium_prausnitzii": 0.098,
"Akkermansia_muciniphila": 0.067,
}
},
"S003": {
"patient": "P003", "group": "T2D",
"species": {
"Escherichia_coli": 0.201,
"Bacteroides_fragilis": 0.056,
"Faecalibacterium_prausnitzii": 0.022,
"Akkermansia_muciniphila": 0.008,
}
}
}
@app.get("/abundance/{sample_id}")
def get_sample_abundance(sample_id: str):
"""查询单个样本的所有物种丰度"""
if sample_id not in abundance_data:
return {"error": f"样本 {sample_id} 不存在", "available": list(abundance_data.keys())}
return abundance_data[sample_id]
@app.get("/abundance/{sample_id}/{species}")
def get_species_abundance(sample_id: str, species: str):
"""查询指定样本中指定物种的丰度"""
if sample_id not in abundance_data:
return {"error": f"样本 {sample_id} 不存在"}
species_data = abundance_data[sample_id]["species"]
if species not in species_data:
return {"error": f"物种 {species} 在样本 {sample_id} 中未检出"}
return {
"sample_id": sample_id,
"species": species,
"abundance": species_data[species],
"group": abundance_data[sample_id]["group"]
}
@app.get("/compare/")
def compare_groups(
species: str = Query(..., description="要比较的物种名"),
group1: str = Query("T2D", description="对照组1"),
group2: str = Query("Healthy", description="对照组2")
):
"""比较两组间某物种的丰度差异"""
group1_values = []
group2_values = []
for sample_id, data in abundance_data.items():
if species in data["species"]:
if data["group"] == group1:
group1_values.append(data["species"][species])
elif data["group"] == group2:
group2_values.append(data["species"][species])
# 计算均值
avg1 = sum(group1_values) / len(group1_values) if group1_values else 0
avg2 = sum(group2_values) / len(group2_values) if group2_values else 0
return {
"species": species,
"comparison": f"{group1} vs {group2}",
f"{group1}_mean": round(avg1, 4),
f"{group2}_mean": round(avg2, 4),
f"{group1}_n": len(group1_values),
f"{group2}_n": len(group2_values),
"fold_change": round(avg1 / avg2, 2) if avg2 > 0 else "inf"
}
案例 2:FASTA 解析服务¶
# fasta_parser_api.py —— FASTA 文件解析服务
from fastapi import FastAPI, UploadFile, File
from pydantic import BaseModel
from typing import List
from io import StringIO
app = FastAPI(title="FASTA 解析服务", version="1.0.0")
class SequenceInfo(BaseModel):
"""单条序列的信息"""
header: str # 序列头(>后面的内容)
length: int # 序列长度
gc_content: float # GC含量
class FastaResult(BaseModel):
"""解析结果"""
filename: str
total_sequences: int
total_bases: int
average_length: float
sequences: List[SequenceInfo]
def parse_fasta(content: str) -> List[dict]:
"""
解析 FASTA 格式字符串
FASTA格式:
>序列名 描述
ATCGATCG...
"""
sequences = [] # 存放所有序列
current_header = "" # 当前序列头
current_seq = [] # 当前序列片段列表
for line in content.strip().split("\n"):
line = line.strip()
if line.startswith(">"): # 遇到新序列头
if current_header: # 保存上一条序列
seq = "".join(current_seq)
sequences.append({"header": current_header, "sequence": seq})
current_header = line[1:] # 去掉 > 号
current_seq = [] # 清空序列缓存
else:
current_seq.append(line) # 累积序列行
# 别忘了最后一条序列
if current_header:
seq = "".join(current_seq)
sequences.append({"header": current_header, "sequence": seq})
return sequences
def calc_gc(sequence: str) -> float:
"""计算 GC 含量"""
seq_upper = sequence.upper()
gc_count = seq_upper.count("G") + seq_upper.count("C")
total = len(seq_upper)
return round(gc_count / total, 4) if total > 0 else 0.0
@app.post("/parse/fasta/", response_model=FastaResult)
async def parse_fasta_file(file: UploadFile = File(...)):
"""
上传 FASTA 文件,返回解析结果
包括:序列数量、总碱基数、每条序列的长度和GC含量
"""
# 读取上传文件内容
content = await file.read()
text = content.decode("utf-8")
# 解析FASTA
sequences = parse_fasta(text)
# 计算统计信息
seq_infos = []
total_bases = 0
for seq_dict in sequences:
seq = seq_dict["sequence"]
length = len(seq)
total_bases += length
seq_infos.append(SequenceInfo(
header=seq_dict["header"],
length=length,
gc_content=calc_gc(seq)
))
avg_length = total_bases / len(sequences) if sequences else 0
return FastaResult(
filename=file.filename,
total_sequences=len(sequences),
total_bases=total_bases,
average_length=round(avg_length, 1),
sequences=seq_infos
)
@app.post("/gc-content/")
async def batch_gc_content(file: UploadFile = File(...)):
"""批量计算所有序列的GC含量,返回排序结果"""
content = await file.read()
text = content.decode("utf-8")
sequences = parse_fasta(text)
results = []
for seq_dict in sequences:
gc = calc_gc(seq_dict["sequence"])
results.append({"header": seq_dict["header"], "gc_content": gc})
# 按GC含量降序排列
results.sort(key=lambda x: x["gc_content"], reverse=True)
return {"total": len(results), "sorted_by": "gc_content_desc", "data": results}
案例 3:连接 SQLite 数据库¶
# sqlite_api.py —— FastAPI + SQLite 数据库
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, List
import sqlite3
import os
app = FastAPI(title="样本数据库 API", version="1.0.0")
DATABASE = "samples.db" # SQLite 数据库文件路径
def get_db():
"""
数据库连接的依赖注入函数
每次请求获取一个连接,用完后自动关闭
"""
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row # 让查询结果可以用字段名访问
try:
yield conn # yield: 把连接"借"给路由函数
finally:
conn.close() # 请求结束后自动关闭连接
def init_db():
"""初始化数据库表结构"""
conn = sqlite3.connect(DATABASE)
conn.execute("""
CREATE TABLE IF NOT EXISTS samples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sample_id TEXT UNIQUE NOT NULL,
patient_age INTEGER,
diagnosis TEXT,
shannon_diversity REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 插入示例数据
conn.execute("""
INSERT OR IGNORE INTO samples (sample_id, patient_age, diagnosis, shannon_diversity)
VALUES ('S001', 52, 'T2D', 3.2), ('S002', 35, 'Healthy', 4.1)
""")
conn.commit()
conn.close()
# 应用启动时初始化数据库
init_db()
# 数据模型
class SampleIn(BaseModel):
sample_id: str
patient_age: int
diagnosis: str
shannon_diversity: Optional[float] = None
class SampleOut(BaseModel):
id: int
sample_id: str
patient_age: int
diagnosis: str
shannon_diversity: Optional[float]
@app.get("/db/samples/", response_model=List[SampleOut])
def list_db_samples(
diagnosis: Optional[str] = None, # 按诊断筛选
db: sqlite3.Connection = Depends(get_db) # 依赖注入获取数据库连接
):
"""从数据库查询样本列表"""
if diagnosis:
cursor = db.execute(
"SELECT * FROM samples WHERE diagnosis = ?", (diagnosis,)
)
else:
cursor = db.execute("SELECT * FROM samples")
rows = cursor.fetchall()
return [dict(row) for row in rows] # sqlite3.Row 转字典
@app.post("/db/samples/", status_code=201)
def create_db_sample(sample: SampleIn, db: sqlite3.Connection = Depends(get_db)):
"""向数据库插入新样本"""
try:
db.execute(
"INSERT INTO samples (sample_id, patient_age, diagnosis, shannon_diversity) VALUES (?, ?, ?, ?)",
(sample.sample_id, sample.patient_age, sample.diagnosis, sample.shannon_diversity)
)
db.commit()
return {"message": "创建成功", "sample_id": sample.sample_id}
except sqlite3.IntegrityError:
raise HTTPException(status_code=400, detail=f"样本 {sample.sample_id} 已存在")
@app.delete("/db/samples/{sample_id}")
def delete_db_sample(sample_id: str, db: sqlite3.Connection = Depends(get_db)):
"""从数据库删除样本"""
cursor = db.execute("DELETE FROM samples WHERE sample_id = ?", (sample_id,))
db.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail=f"样本 {sample_id} 不存在")
return {"message": "删除成功", "sample_id": sample_id}
自动文档(Swagger UI / ReDoc)¶
FastAPI 最大的亮点之一:零配置自动生成交互式 API 文档。
| 文档地址 | 说明 |
|---|---|
http://localhost:8000/docs |
Swagger UI —— 可以直接在页面上测试 API |
http://localhost:8000/redoc |
ReDoc —— 更适合阅读的文档格式 |
http://localhost:8000/openapi.json |
OpenAPI JSON 规范文件 |
文档自动从你的代码中提取: - 函数的 docstring → API 描述 - Pydantic 模型的 Field description → 参数说明 - 类型注解 → 参数类型 - response_model → 响应格式
FastAPI vs Flask vs Django 对比¶
| 特性 | FastAPI | Flask | Django |
|---|---|---|---|
| 定位 | 高性能 API | 轻量灵活 | 全栈框架 |
| 性能 | 极高(异步) | 中等 | 中等 |
| 学习曲线 | 低(会 Python 就行) | 最低 | 较高 |
| 自动文档 | 内置 Swagger/ReDoc | 需插件 | 需插件 |
| 数据验证 | 内置 Pydantic | 手动/插件 | 内置 Form |
| 异步支持 | 原生 async/await | 需 Quart | Django 4.1+ 部分支持 |
| 数据库 ORM | 需搭配 SQLAlchemy | 需搭配 | 内置强大 ORM |
| 管理后台 | 无 | 无 | 内置 Admin |
| 适合场景 | API 服务、微服务 | 小项目、原型 | 大型全栈网站 |
| 生信推荐度 | ★★★★★ | ★★★★ | ★★★ |
结论:生信工程师做 API 服务首选 FastAPI,因为: 1. 你已经会 Python,上手快 2. 性能好,能处理大文件 3. 自动文档,老板/同事直接看 4. 类型提示 + Pydantic 验证,减少 Bug
部署¶
Uvicorn 生产配置¶
# 开发环境(单进程,自动重载)
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# 生产环境(多 worker 并发处理)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# 或者用 gunicorn + uvicorn worker(Linux 推荐)
pip install gunicorn
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Docker 打包¶
# Dockerfile —— 容器化你的 FastAPI 应用
FROM python:3.10-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
requirements.txt:
fastapi==0.110.0
uvicorn[standard]==0.27.0
python-multipart==0.0.6
pydantic==2.6.0
构建和运行:
# 构建镜像
docker build -t my-bioinfo-api .
# 运行容器(-d 后台运行,-p 端口映射)
docker run -d -p 8000:8000 --name bioinfo-api my-bioinfo-api
# 查看日志
docker logs -f bioinfo-api
常见报错及解决¶
1. ModuleNotFoundError: No module named 'fastapi'¶
原因:没在当前环境安装 FastAPI。
# 解决:确认你在正确的 conda 环境中
conda activate bioinfo
pip install fastapi uvicorn
2. Error: Address already in use / [Errno 98] Address already in use¶
原因:端口 8000 被其他程序占用。
# 解决方案1:换个端口
uvicorn main:app --port 8001
# 解决方案2:杀掉占用端口的进程(Linux)
lsof -i :8000
kill -9 <PID>
3. 422 Unprocessable Entity¶
原因:请求数据不符合 Pydantic 模型要求(类型错误、缺少必填字段)。
# 解决:检查请求体是否符合模型定义
# 查看 /docs 页面的模型 Schema 确认字段要求
# 常见错误:字符串传了数字、缺少必填字段、字段名拼写错误
4. CORS policy: No 'Access-Control-Allow-Origin' header¶
原因:前端从不同域名访问你的 API,浏览器拦截了。
# 解决:添加 CORS 中间件
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源(开发环境用)
allow_credentials=True,
allow_methods=["*"], # 允许所有方法
allow_headers=["*"], # 允许所有请求头
)
5. RuntimeWarning: coroutine was never awaited¶
原因:在 async 函数中调用了协程但没加 await。
# 错误写法
async def my_endpoint():
file.read() # 缺少 await
# 正确写法
async def my_endpoint():
content = await file.read() # 加上 await
6. TypeError: Object of type datetime is not JSON serializable¶
原因:返回的数据中有 Python 对象不能直接转 JSON。
# 解决:使用 Pydantic 模型作为 response_model,或手动转换
from datetime import datetime
@app.get("/time/")
def get_time():
return {"now": datetime.now().isoformat()} # 用 .isoformat() 转字符串
速查表¶
┌─────────────────────────────────────────────────────────────────┐
│ FastAPI 速查表 │
├─────────────────────────────────────────────────────────────────┤
│ 创建应用 app = FastAPI() │
│ GET 路由 @app.get("/path") │
│ POST 路由 @app.post("/path") │
│ PUT 路由 @app.put("/path") │
│ DELETE 路由 @app.delete("/path") │
│ │
│ 路径参数 @app.get("/items/{item_id}") │
│ 查询参数 def func(skip: int = 0, limit: int = 10) │
│ 请求体 def func(item: PydanticModel) │
│ 文件上传 def func(file: UploadFile = File(...)) │
│ │
│ 状态码 @app.post("/", status_code=201) │
│ 抛出错误 raise HTTPException(status_code=404, detail="x") │
│ 依赖注入 def func(db = Depends(get_db)) │
│ 中间件 @app.middleware("http") │
│ │
│ 响应模型 @app.get("/", response_model=MyModel) │
│ 异步函数 async def func(): await something() │
│ │
│ 运行(开发) uvicorn main:app --reload │
│ 运行(生产) uvicorn main:app --workers 4 │
│ 文档地址 http://localhost:8000/docs │
│ ReDoc http://localhost:8000/redoc │
└─────────────────────────────────────────────────────────────────┘
延伸资源¶
| 资源 | 说明 |
|---|---|
| FastAPI 官方文档 | 最权威的教程,有中文版 |
| FastAPI 中文教程 | 官方中文翻译 |
| Pydantic V2 文档 | 数据验证库文档 |
| Uvicorn 配置 | ASGI 服务器详细配置 |
| SQLAlchemy | 生产级数据库 ORM |
| FastAPI + SQLAlchemy 教程 | 官方数据库教程 |
| httpx | 异步 HTTP 客户端(测试 API 用) |
| Docker 官方文档 | 容器化部署 |
学习路线建议¶
第1天:Hello World + 路径参数 + 查询参数 → 能跑起来
第2天:Pydantic 模型 + CRUD → 能做完整增删改查
第3天:文件上传 + 生信案例 → 能封装自己的分析工具
第4天:SQLite 数据库 + 部署 → 能上线给别人用
记住:FastAPI 的核心理念是 "写 Python 函数就是写 API"。你不需要学新语法,只是换了一种组织代码的方式。