FastAPI WebSocket — 实时双向通信
一句话说明
FastAPI 内置 WebSocket 支持,让服务端主动推送数据给浏览器(实时聊天、进度推送、实时日志),无需客户端轮询,比 HTTP 更高效。
安装与配置
# fastapi[standard] 已含 websockets 依赖
pip install "fastapi[standard]" # 版本 0.115+
# 启动服务
uvicorn main:app --reload --host 0.0.0.0 --port 8000
核心用法
基础 WebSocket
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
app = FastAPI()
# WebSocket 路由(用 @app.websocket 装饰)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 接受连接(必须先调用)
try:
while True:
# 接收客户端消息
data = await websocket.receive_text() # 接收文本
print(f"收到消息:{data}")
# 发送响应
await websocket.send_text(f"服务器回复:{data}")
except WebSocketDisconnect:
print("客户端断开连接") # 客户端关闭连接时会抛出此异常
广播(多客户端)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
app = FastAPI()
# 连接管理器(管理所有在线客户端)
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = [] # 活跃连接列表
async def connect(self, ws: WebSocket):
await ws.accept() # 接受连接
self.active_connections.append(ws) # 加入列表
def disconnect(self, ws: WebSocket):
self.active_connections.remove(ws) # 移除连接
async def broadcast(self, message: dict):
"""广播消息给所有在线客户端"""
msg_str = json.dumps(message, ensure_ascii=False)
for conn in self.active_connections:
await conn.send_text(msg_str)
async def send_personal(self, ws: WebSocket, message: str):
"""发送给特定客户端"""
await ws.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_chat(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
await manager.broadcast({"type": "join", "user": client_id, "msg": "加入聊天室"})
try:
while True:
data = await websocket.receive_text()
await manager.broadcast({
"type": "message",
"user": client_id,
"msg": data,
})
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast({"type": "leave", "user": client_id, "msg": "离开聊天室"})
实战案例
生信任务进度实时推送
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json
app = FastAPI()
# 模拟长时间运行的生信流程(比如比对/组装)
async def run_bioinf_pipeline(websocket: WebSocket, sample_id: str):
"""模拟管道步骤,实时推送进度"""
steps = [
("质控", "FastQC 质量控制"),
("比对", "BWA 参考基因组比对"),
("分类", "Kraken2 物种分类"),
("组装", "MetaSPAdes 宏基因组组装"),
("注释", "Prokka 基因组注释"),
]
for i, (step_id, step_name) in enumerate(steps):
# 推送当前步骤开始
await websocket.send_text(json.dumps({
"step": step_name,
"progress": int((i / len(steps)) * 100),
"status": "running",
}, ensure_ascii=False))
await asyncio.sleep(2) # 模拟真实耗时(实际调用子进程)
# 推送步骤完成
await websocket.send_text(json.dumps({
"step": step_name,
"progress": int(((i+1) / len(steps)) * 100),
"status": "done",
}, ensure_ascii=False))
await websocket.send_text(json.dumps({"status": "finished", "progress": 100}))
@app.websocket("/pipeline/{sample_id}")
async def pipeline_progress(websocket: WebSocket, sample_id: str):
await websocket.accept()
try:
await run_bioinf_pipeline(websocket, sample_id)
except WebSocketDisconnect:
print(f"客户端断开,样本:{sample_id}")
# 前端 JavaScript 连接示例(注释形式)
# const ws = new WebSocket("ws://localhost:8000/pipeline/S001");
# ws.onmessage = (e) => { const d = JSON.parse(e.data); console.log(d); }
常见报错与解决
| 报错 | 原因 | 解决 |
|---|
WebSocketDisconnect | 客户端关闭连接 | 用 try/except WebSocketDisconnect 优雅处理 |
RuntimeError: websocket not connected | 连接前调用 send | 确保 await websocket.accept() 在最前 |
1006 Abnormal Closure | 服务端崩溃或超时 | 检查服务端异常,加心跳保持连接 |
速查表
| 操作 | 代码 |
|---|
| 定义 WS 路由 | @app.websocket("/ws") |
| 接受连接 | await ws.accept() |
| 发送文本 | await ws.send_text("...") |
| 发送 JSON | await ws.send_json({"key": "val"}) |
| 接收文本 | data = await ws.receive_text() |
| 接收 JSON | data = await ws.receive_json() |
| 断开处理 | except WebSocketDisconnect: ... |
| 主动关闭 | await ws.close(code=1000) |