跳转至

gRPC 微服务通信

一句话概述:gRPC 是 Google 开源的高性能 RPC 框架,用 Protocol Buffers 定义接口,自动生成多语言客户端和服务端代码,二进制传输比 JSON REST 快 7-10 倍,特别适合微服务间通信。

核心知识点

概念白话解释
RPC远程过程调用,像调用本地函数一样调用远程服务
Protocol Buffers (protobuf)Google 的二进制序列化格式,比 JSON 小且快
.proto 文件接口定义文件,写服务的方法和数据结构
代码生成.proto 文件自动生成各语言的代码
单向流客户端发一个请求,服务端回一个响应(最常用)
服务端流客户端发一个请求,服务端回多个响应
客户端流客户端发多个请求,服务端回一个响应
双向流双方同时发送和接收多个消息

安装配置

Python

pip install grpcio grpcio-tools  # 安装 gRPC 和代码生成工具

Node.js

npm install @grpc/grpc-js @grpc/proto-loader  # 动态加载方式
# 或
npm install @grpc/grpc-js google-protobuf  # 静态生成方式
npm install -D grpc-tools grpc_tools_node_protoc_ts  # 代码生成工具

Go

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest  # protobuf 生成器
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest  # gRPC 生成器

基本使用

定义 Proto 文件

// proto/user.proto - 接口定义
syntax = "proto3";  // 使用 proto3 语法

package user;  // 包名

// 定义服务(类似 REST 的 Controller)
service UserService {
  rpc GetUser (GetUserRequest) returns (GetUserResponse);  // 获取用户(单向)
  rpc ListUsers (ListUsersRequest) returns (stream User);  // 列出用户(服务端流)
  rpc CreateUser (CreateUserRequest) returns (CreateUserResponse);  // 创建用户
}

// 请求消息
message GetUserRequest {
  int32 id = 1;  // 字段编号(不是值,是标识符)
}

// 响应消息
message GetUserResponse {
  User user = 1;  // 嵌套消息
}

// 用户消息
message User {
  int32 id = 1;      // ID
  string name = 2;   // 名字
  string email = 3;  // 邮箱
  int32 age = 4;     // 年龄
}

message ListUsersRequest {
  int32 page = 1;     // 页码
  int32 page_size = 2;  // 每页数量
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  int32 age = 3;
}

message CreateUserResponse {
  User user = 1;
  bool success = 2;
}

Python 实现

# 从 proto 生成 Python 代码
python -m grpc_tools.protoc \
  --python_out=. \
  --grpc_python_out=. \
  --proto_path=proto \
  proto/user.proto
# 生成 user_pb2.py(消息类)和 user_pb2_grpc.py(服务类)
# server.py - gRPC 服务端
import grpc  # gRPC 核心库
from concurrent import futures  # 线程池
import user_pb2  # 生成的消息类
import user_pb2_grpc  # 生成的服务基类

# 实现服务
class UserServicer(user_pb2_grpc.UserServiceServicer):
    def GetUser(self, request, context):
        """获取用户"""
        user_id = request.id  # 从请求中获取 ID
        # 实际项目这里查数据库
        return user_pb2.GetUserResponse(
            user=user_pb2.User(
                id=user_id,
                name="张三",
                email="zhang@example.com",
                age=25,
            )
        )

    def CreateUser(self, request, context):
        """创建用户"""
        # 输入验证
        if not request.name:
            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "名字不能为空")  # 返回错误

        new_user = user_pb2.User(
            id=1,
            name=request.name,
            email=request.email,
            age=request.age,
        )
        return user_pb2.CreateUserResponse(user=new_user, success=True)

# 启动服务器
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # 创建服务器,10 个工作线程
    user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)  # 注册服务
    server.add_insecure_port('[::]:50051')  # 监听端口
    server.start()  # 启动
    print("gRPC 服务器启动在端口 50051")
    server.wait_for_termination()  # 等待终止

if __name__ == '__main__':
    serve()
# client.py - gRPC 客户端
import grpc
import user_pb2
import user_pb2_grpc

def run():
    channel = grpc.insecure_channel('localhost:50051')  # 连接服务器
    stub = user_pb2_grpc.UserServiceStub(channel)  # 创建客户端存根

    # 调用 GetUser(像调用本地函数一样!)
    response = stub.GetUser(user_pb2.GetUserRequest(id=1))  # 发送请求
    print(f"用户: {response.user.name}, 邮箱: {response.user.email}")

    # 调用 CreateUser
    response = stub.CreateUser(user_pb2.CreateUserRequest(
        name="李四",
        email="li@example.com",
        age=30,
    ))
    print(f"创建成功: {response.success}")

if __name__ == '__main__':
    run()

高级用法

流式通信

// proto 定义流式接口
service ChatService {
  rpc StreamChat (stream ChatMessage) returns (stream ChatMessage);  // 双向流
}

message ChatMessage {
  string user = 1;
  string text = 2;
}
# 服务端:双向流
class ChatServicer(chat_pb2_grpc.ChatServiceServicer):
    def StreamChat(self, request_iterator, context):
        for message in request_iterator:  # 接收客户端消息流
            reply = chat_pb2.ChatMessage(
                user="Bot",
                text=f"收到: {message.text}",
            )
            yield reply  # 返回消息流

错误处理

# 服务端返回错误
def GetUser(self, request, context):
    if request.id <= 0:
        context.abort(  # 中止并返回错误
            grpc.StatusCode.INVALID_ARGUMENT,  # 错误码
            "ID 必须是正整数"  # 错误消息
        )
    if not find_user(request.id):
        context.abort(grpc.StatusCode.NOT_FOUND, "用户不存在")

拦截器(中间件)

# 客户端拦截器:添加认证 Token
class AuthInterceptor(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', 'Bearer token123'))  # 添加认证头
        new_details = grpc.ClientCallDetails(
            client_call_details.method, client_call_details.timeout,
            metadata, client_call_details.credentials, None, None,
        )
        return continuation(new_details, request)

常见报错

报错信息原因解决方案
UNAVAILABLE: Connection refused服务端没启动检查服务端是否在运行
UNIMPLEMENTED方法没实现在 Servicer 类中实现对应方法
DEADLINE_EXCEEDED超时了增大超时时间或优化服务端
INVALID_ARGUMENT参数不对检查请求消息的字段
proto 文件编译失败语法错误检查 proto3 语法
版本不兼容proto 文件改了重新生成代码

速查表

# protoc 代码生成
# Python
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I proto proto/xxx.proto

# Go
protoc --go_out=. --go-grpc_out=. proto/xxx.proto

# gRPC 状态码
OK=0  CANCELLED=1  UNKNOWN=2  INVALID_ARGUMENT=3
NOT_FOUND=5  ALREADY_EXISTS=6  PERMISSION_DENIED=7
UNAUTHENTICATED=16  RESOURCE_EXHAUSTED=8
UNAVAILABLE=14  DEADLINE_EXCEEDED=4  INTERNAL=13
UNIMPLEMENTED=12

# protobuf 字段类型
int32 int64 float double     # 数字
string bytes                 # 字符串/字节
bool                         # 布尔
repeated <type>              # 数组/列表
map<key_type, value_type>    # 字典
enum                         # 枚举
oneof                        # 联合类型

# gRPC 通信模式
Unary         # 一请求一响应
Server Stream # 一请求多响应
Client Stream # 多请求一响应
Bidirectional # 双向流

参考:gRPC 官网 | Protocol Buffers 文档 | gRPC Python 教程