gRPC 微服务通信
一句话概述:gRPC 是 Google 开源的高性能 RPC 框架,用 Protocol Buffers 定义接口,自动生成多语言客户端和服务端代码,二进制传输比 JSON REST 快 7-10 倍,特别适合微服务间通信。
核心知识点
| 概念 | 白话解释 |
|---|
| RPC | 远程过程调用,像调用本地函数一样调用远程服务 |
| Protocol Buffers (protobuf) | Google 的二进制序列化格式,比 JSON 小且快 |
.proto 文件 | 接口定义文件,写服务的方法和数据结构 |
| 代码生成 | 从 .proto 文件自动生成各语言的代码 |
| 单向流 | 客户端发一个请求,服务端回一个响应(最常用) |
| 服务端流 | 客户端发一个请求,服务端回多个响应 |
| 客户端流 | 客户端发多个请求,服务端回一个响应 |
| 双向流 | 双方同时发送和接收多个消息 |
安装配置
Python
pip install grpcio grpcio-tools # 安装 gRPC 和代码生成工具
Node.js
npm install @grpc/grpc-js @grpc/proto-loader # 动态加载方式
# 或
npm install @grpc/grpc-js google-protobuf # 静态生成方式
npm install -D grpc-tools grpc_tools_node_protoc_ts # 代码生成工具
Go
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest # protobuf 生成器
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest # gRPC 生成器
基本使用
定义 Proto 文件
// proto/user.proto - 接口定义
syntax = "proto3"; // 使用 proto3 语法
package user; // 包名
// 定义服务(类似 REST 的 Controller)
service UserService {
rpc GetUser (GetUserRequest) returns (GetUserResponse); // 获取用户(单向)
rpc ListUsers (ListUsersRequest) returns (stream User); // 列出用户(服务端流)
rpc CreateUser (CreateUserRequest) returns (CreateUserResponse); // 创建用户
}
// 请求消息
message GetUserRequest {
int32 id = 1; // 字段编号(不是值,是标识符)
}
// 响应消息
message GetUserResponse {
User user = 1; // 嵌套消息
}
// 用户消息
message User {
int32 id = 1; // ID
string name = 2; // 名字
string email = 3; // 邮箱
int32 age = 4; // 年龄
}
message ListUsersRequest {
int32 page = 1; // 页码
int32 page_size = 2; // 每页数量
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message CreateUserResponse {
User user = 1;
bool success = 2;
}
Python 实现
# 从 proto 生成 Python 代码
python -m grpc_tools.protoc \
--python_out=. \
--grpc_python_out=. \
--proto_path=proto \
proto/user.proto
# 生成 user_pb2.py(消息类)和 user_pb2_grpc.py(服务类)
# server.py - gRPC 服务端
import grpc # gRPC 核心库
from concurrent import futures # 线程池
import user_pb2 # 生成的消息类
import user_pb2_grpc # 生成的服务基类
# 实现服务
class UserServicer(user_pb2_grpc.UserServiceServicer):
def GetUser(self, request, context):
"""获取用户"""
user_id = request.id # 从请求中获取 ID
# 实际项目这里查数据库
return user_pb2.GetUserResponse(
user=user_pb2.User(
id=user_id,
name="张三",
email="zhang@example.com",
age=25,
)
)
def CreateUser(self, request, context):
"""创建用户"""
# 输入验证
if not request.name:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "名字不能为空") # 返回错误
new_user = user_pb2.User(
id=1,
name=request.name,
email=request.email,
age=request.age,
)
return user_pb2.CreateUserResponse(user=new_user, success=True)
# 启动服务器
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # 创建服务器,10 个工作线程
user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server) # 注册服务
server.add_insecure_port('[::]:50051') # 监听端口
server.start() # 启动
print("gRPC 服务器启动在端口 50051")
server.wait_for_termination() # 等待终止
if __name__ == '__main__':
serve()
# client.py - gRPC 客户端
import grpc
import user_pb2
import user_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50051') # 连接服务器
stub = user_pb2_grpc.UserServiceStub(channel) # 创建客户端存根
# 调用 GetUser(像调用本地函数一样!)
response = stub.GetUser(user_pb2.GetUserRequest(id=1)) # 发送请求
print(f"用户: {response.user.name}, 邮箱: {response.user.email}")
# 调用 CreateUser
response = stub.CreateUser(user_pb2.CreateUserRequest(
name="李四",
email="li@example.com",
age=30,
))
print(f"创建成功: {response.success}")
if __name__ == '__main__':
run()
高级用法
流式通信
// proto 定义流式接口
service ChatService {
rpc StreamChat (stream ChatMessage) returns (stream ChatMessage); // 双向流
}
message ChatMessage {
string user = 1;
string text = 2;
}
# 服务端:双向流
class ChatServicer(chat_pb2_grpc.ChatServiceServicer):
def StreamChat(self, request_iterator, context):
for message in request_iterator: # 接收客户端消息流
reply = chat_pb2.ChatMessage(
user="Bot",
text=f"收到: {message.text}",
)
yield reply # 返回消息流
错误处理
# 服务端返回错误
def GetUser(self, request, context):
if request.id <= 0:
context.abort( # 中止并返回错误
grpc.StatusCode.INVALID_ARGUMENT, # 错误码
"ID 必须是正整数" # 错误消息
)
if not find_user(request.id):
context.abort(grpc.StatusCode.NOT_FOUND, "用户不存在")
拦截器(中间件)
# 客户端拦截器:添加认证 Token
class AuthInterceptor(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
metadata = list(client_call_details.metadata or [])
metadata.append(('authorization', 'Bearer token123')) # 添加认证头
new_details = grpc.ClientCallDetails(
client_call_details.method, client_call_details.timeout,
metadata, client_call_details.credentials, None, None,
)
return continuation(new_details, request)
常见报错
| 报错信息 | 原因 | 解决方案 |
|---|
UNAVAILABLE: Connection refused | 服务端没启动 | 检查服务端是否在运行 |
UNIMPLEMENTED | 方法没实现 | 在 Servicer 类中实现对应方法 |
DEADLINE_EXCEEDED | 超时了 | 增大超时时间或优化服务端 |
INVALID_ARGUMENT | 参数不对 | 检查请求消息的字段 |
| proto 文件编译失败 | 语法错误 | 检查 proto3 语法 |
| 版本不兼容 | proto 文件改了 | 重新生成代码 |
速查表
# protoc 代码生成
# Python
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I proto proto/xxx.proto
# Go
protoc --go_out=. --go-grpc_out=. proto/xxx.proto
# gRPC 状态码
OK=0 CANCELLED=1 UNKNOWN=2 INVALID_ARGUMENT=3
NOT_FOUND=5 ALREADY_EXISTS=6 PERMISSION_DENIED=7
UNAUTHENTICATED=16 RESOURCE_EXHAUSTED=8
UNAVAILABLE=14 DEADLINE_EXCEEDED=4 INTERNAL=13
UNIMPLEMENTED=12
# protobuf 字段类型
int32 int64 float double # 数字
string bytes # 字符串/字节
bool # 布尔
repeated <type> # 数组/列表
map<key_type, value_type> # 字典
enum # 枚举
oneof # 联合类型
# gRPC 通信模式
Unary # 一请求一响应
Server Stream # 一请求多响应
Client Stream # 多请求一响应
Bidirectional # 双向流
参考:gRPC 官网 | Protocol Buffers 文档 | gRPC Python 教程