跳转至

FastAPI WebSocket — 实时双向通信


一句话说明

FastAPI 内置 WebSocket 支持,让服务端主动推送数据给浏览器(实时聊天、进度推送、实时日志),无需客户端轮询,比 HTTP 更高效。


安装与配置

# fastapi[standard] 已含 websockets 依赖
pip install "fastapi[standard]"      # 版本 0.115+

# 启动服务
uvicorn main:app --reload --host 0.0.0.0 --port 8000

核心用法

基础 WebSocket

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio

app = FastAPI()

# WebSocket 路由(用 @app.websocket 装饰)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()           # 接受连接(必须先调用)
    try:
        while True:
            # 接收客户端消息
            data = await websocket.receive_text()   # 接收文本
            print(f"收到消息:{data}")

            # 发送响应
            await websocket.send_text(f"服务器回复:{data}")

    except WebSocketDisconnect:
        print("客户端断开连接")        # 客户端关闭连接时会抛出此异常

广播(多客户端)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json

app = FastAPI()

# 连接管理器(管理所有在线客户端)
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []  # 活跃连接列表

    async def connect(self, ws: WebSocket):
        await ws.accept()                              # 接受连接
        self.active_connections.append(ws)             # 加入列表

    def disconnect(self, ws: WebSocket):
        self.active_connections.remove(ws)             # 移除连接

    async def broadcast(self, message: dict):
        """广播消息给所有在线客户端"""
        msg_str = json.dumps(message, ensure_ascii=False)
        for conn in self.active_connections:
            await conn.send_text(msg_str)

    async def send_personal(self, ws: WebSocket, message: str):
        """发送给特定客户端"""
        await ws.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_chat(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    await manager.broadcast({"type": "join", "user": client_id, "msg": "加入聊天室"})
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast({
                "type": "message",
                "user": client_id,
                "msg":  data,
            })
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast({"type": "leave", "user": client_id, "msg": "离开聊天室"})

实战案例

生信任务进度实时推送

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json

app = FastAPI()

# 模拟长时间运行的生信流程(比如比对/组装)
async def run_bioinf_pipeline(websocket: WebSocket, sample_id: str):
    """模拟管道步骤,实时推送进度"""
    steps = [
        ("质控", "FastQC 质量控制"),
        ("比对", "BWA 参考基因组比对"),
        ("分类", "Kraken2 物种分类"),
        ("组装", "MetaSPAdes 宏基因组组装"),
        ("注释", "Prokka 基因组注释"),
    ]

    for i, (step_id, step_name) in enumerate(steps):
        # 推送当前步骤开始
        await websocket.send_text(json.dumps({
            "step":    step_name,
            "progress": int((i / len(steps)) * 100),
            "status":  "running",
        }, ensure_ascii=False))

        await asyncio.sleep(2)    # 模拟真实耗时(实际调用子进程)

        # 推送步骤完成
        await websocket.send_text(json.dumps({
            "step":    step_name,
            "progress": int(((i+1) / len(steps)) * 100),
            "status":  "done",
        }, ensure_ascii=False))

    await websocket.send_text(json.dumps({"status": "finished", "progress": 100}))

@app.websocket("/pipeline/{sample_id}")
async def pipeline_progress(websocket: WebSocket, sample_id: str):
    await websocket.accept()
    try:
        await run_bioinf_pipeline(websocket, sample_id)
    except WebSocketDisconnect:
        print(f"客户端断开,样本:{sample_id}")

# 前端 JavaScript 连接示例(注释形式)
# const ws = new WebSocket("ws://localhost:8000/pipeline/S001");
# ws.onmessage = (e) => { const d = JSON.parse(e.data); console.log(d); }

常见报错与解决

报错原因解决
WebSocketDisconnect客户端关闭连接try/except WebSocketDisconnect 优雅处理
RuntimeError: websocket not connected连接前调用 send确保 await websocket.accept() 在最前
1006 Abnormal Closure服务端崩溃或超时检查服务端异常,加心跳保持连接

速查表

操作代码
定义 WS 路由@app.websocket("/ws")
接受连接await ws.accept()
发送文本await ws.send_text("...")
发送 JSONawait ws.send_json({"key": "val"})
接收文本data = await ws.receive_text()
接收 JSONdata = await ws.receive_json()
断开处理except WebSocketDisconnect: ...
主动关闭await ws.close(code=1000)