当前位置: 首页 > news >正文

Go语言的gRPC教程-拦截器

一、前言

gRPC拦截器和其他框架的拦截器(也称middleware)作用是一样的。利用拦截器我们可以在不侵入业务逻辑的前提下修改或者记录服务端或客户端的请求与响应,利用拦截器我们可以实现诸如日志记录权限认证限流等诸多功能

上一篇提到gRPC的通信模式分为unarystreaming几种模式,拦截器也分为两种:unary interceptorsstreaming interceptors ,两种拦截器可以分别应用在服务端和客户端,所以gRPC总共为我们提供了四种拦截器。它们已经被定义成了go中的接口,我们创建的拦截器只要实现这些接口即可

在这里插入图片描述

二、服务端拦截器

服务端的拦截器从请求开始按顺序执行拦截器,在执行完对应RPC的逻辑之后,再按反向的顺序执行拦截器中对响应的处理逻辑

在这里插入图片描述

2.1 unary interceptors

对于unary服务的拦截器只需实现UnaryServerInterceptor接口即可

**

func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
  • ctx context.Context:单个请求的上下文
  • req interface{}:RPC服务的请求结构体
  • info *UnaryServerInfo:RPC的服务信息
  • handler UnaryHandler:它包装了服务实现,通过调用它我们可以完成RPC并获取到响应

参数看不懂没关系,我们来看一个例子

示例

完整代码参考:https://github.com/liangwt/gr…

**

// GetClientIP 检查上下文以检索客户机的ip地址
func getClientIP(ctx context.Context) (string, error) {p, ok := peer.FromContext(ctx)if !ok {return "", fmt.Errorf("couldn't parse client IP address")}return p.Addr.String(), nil
}// serverUnaryInterceptor 服务端一元拦截器
func serverUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {start := time.Now()md, ok := metadata.FromIncomingContext(ctx) // 从上下文中获取元数据if !ok {return nil, fmt.Errorf("couldn't parse incoming context metadata")}os := md.Get("client-os") // 获取客户端操作系统信息token := md.Get("token")if token[0] != "Abc&123" {return nil, status.Errorf(codes.Unauthenticated, "token 校验失败")}// 获取客户端IP地址ip, err := getClientIP(ctx)if err != nil {return nil, err}// RPC 方法真正执行的逻辑// 调用RPC方法(invoking RPC method)resp, err = handler(ctx, req)fmt.Printf("耗时: %d\n, 客户端IP: %s\n, 客户端操作系统: %s\n", time.Since(start), ip, os)return resp, err
}func main() {
// 1. 监听端口listen, err := net.Listen("tcp", ":8090")if err != nil {fmt.Println("监听端口失败", err)}// 2. 创建gRPC服务器实例grpcServer := grpc.NewServer(grpc.UnaryInterceptor(serverUnaryInterceptor),)// 3. 注册服务helloworld.RegisterSearchServiceServer(grpcServer, &HelloWorldServer{})// 4. 启动服务, grpcServer.Serve(listen) 会阻塞当前 goroutine,直到服务停止err = grpcServer.Serve(listen)if err != nil {fmt.Println("启动服务失败", err)}
}

假设我们的客户端请求了Search,根据示例再重新看下拦截器接口的每一个参数

🌲 req interface{}

RPC服务的请求结构体,对于Search来说就是req *helloworld.SearchRequest

🌲 info *grpc.UnaryServerInfo包含两个字段:

FullMethod是请求的method名字(例如/SearchService/Search);

Server就是服务实现(就是示例helloworld.RegisterSearchServiceServer(grpcServer, &HelloWorldServer{}))

🌲 handler包装了服务实现

所以在调用它之前我们可以进行改写reqctx、记录逻辑开始时间等操作

调用完handler即完成了RPC并获取到响应,我们不仅可以记录响应还可以改写响应

2.2 streaming interceptors

对于stream服务的拦截器只要实现StreamServerInterceptor接口即可。它适用于我们上一篇介绍的

  • 服务器端流式 RPC
  • 客户端流式 RPC
  • 双向流式 RPC

**

func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
  • srv interface{}:服务实现
  • ss ServerStream:服务端视角的流。怎么理解呢?无论是哪一种流式RPC对于服务端来说发送(SendMsg)就代表着响应数据,接收(RecvMsg)就代表着请求数据,不同的流式RPC的区别就在于是多次发送数据(服务器端流式 RPC)还是多次接收数据(客户端流式 RPC)或者两者均有(双向流式 RPC)。因此仅使用这一个抽象就代表了所有的流式RPC场景
  • info *StreamServerInfo:RPC的服务信息
  • handler StreamHandler:它包装了服务实现,通过调用它我们可以完成RPC

示例

func streamServerInterceptor(srv interface{},ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// Pre-processing logics := time.Now()// Invoking the StreamHandler to complete the execution of RPC invocationerr := handler(srv, ss)// Post processing logiclog.Printf("Method: %s, isClientStream: %v, isServerStream: %v, latency: %s\n", info.FullMethod, info.IsClientStream, info.IsServerStream, time.Now().Sub(s))return err
}

根据示例再重新看下拦截器接口的参数

🌲 srv interface{}

​ 服务实现(就是示例streaming.RegisterGreeterServer(grpcServer, &server{}))

🌲 ss grpc.ServerStream

​ 服务端发送和接收数据的接口,注意它是一个接口

🌲 info *grpc.StreamServerInfo包含三个字段:

FullMethod是请求的method名字

IsClientStream 是否是客户端流

IsServerStream 是否是服务端流

🌲 handler包装了服务实现

​ 所以在调用它之前我们可以进行改写数据流、记录逻辑开始时间等操作

​ 调用完handler即完成了RPC,因为是流式调用所以不会返回响应数据,只有error

流式拦截器既没有请求字段,handler也不会返回响应,该如何记录、修改请求响应呢?

如果想劫持流数据,答案就在ss ServerStream。再重复一遍它的含义:服务端视角的流,它是一个接口。无论是哪一种流式RPC对于服务端来说发送(SendMsg)就代表着响应数据,接收(RecvMsg)就代表着请求数据,不同的流式RPC的区别就在于是多次发送数据(服务器端流式 RPC)还是多次接收数据(客户端流式 RPC)或者两者均有(双向流式 RPC)。因此可以对ss进行包装,只要传入handler的类型实现ServerStream即可

**

func streamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {wrapper := newStreamServer(ss)return handler(srv, wrapper)
}// 嵌入式EdgeServerStream允许我们访问RecvMsg函数
type streamServer struct {grpc.ServerStream
}func newStreamServer(s grpc.ServerStream) grpc.ServerStream {return &streamServer{s}
}// RecvMsg 从流中接收消息
func (e *streamServer) RecvMsg(m interface{}) error {// 在这里,我们可以对接收到的消息执行额外的逻辑,例如// 验证log.Printf("Server Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))if err := e.ServerStream.RecvMsg(m); err != nil {return err}return nil
}// SendMsg 向流中发送消息
func (e *streamServer) SendMsg(m interface{}) error {// 在这里,我们可以对接收到的消息执行额外的逻辑,例如// 验证log.Printf("Server Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))if err := e.ServerStream.SendMsg(m); err != nil {return err}return nil
}

三、客户端拦截器

客户端拦截器和服务端拦截器类似,从请求开始按顺序执行拦截器,在获取到服务端响应之后,再按反向的顺序执行拦截器中对响应的处理逻辑

在这里插入图片描述

3.1 unary interceptors

client端要实现UnaryClientInterceptor接口实现的接口如下

**

func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

你可以在调用远程函数前拦截RPC,通过获取RPC相关信息,如参数,上下文,函数名,请求等,你甚至可以修改原始的远程调用

  • ctx context.Context:单个请求的上下文
  • method string:请求的method名字
  • req, reply interface{}:请求和响应数据
  • cc *ClientConn:客户端与服务端的链接
  • invoker UnaryInvoker:通过调用它我们可以完成RPC并获取到响应
  • opts ...CallOption:RPC调用的所有配置项,包含设置到conn上的,也包含配置在每一个调用上的

示例


func clientUnaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {start := time.Now()cos := runtime.GOOS                                             // 获取当前操作系统ctx = metadata.AppendToOutgoingContext(ctx, "client-os", cos)   // 向元数据中添加操作系统信息ctx = metadata.AppendToOutgoingContext(ctx, "token", "Abc&123") // 向元数据中添加 token 信息err := invoker(ctx, method, req, reply, cc, opts...)if err != nil {fmt.Println("调用失败:", err)}fmt.Printf("client interceptor 耗时: %d\n, 客户端操作系统: %s\n, method: %s", time.Since(start), cos, method)return err
}func main() {conn, err := grpc.Dial("127.0.0.1:8090",grpc.WithInsecure(),grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),)if err != nil {panic(err)}// 1. 创建客户端实例client := helloworld.NewSearchServiceClient(conn)// ...
}

根据示例再重新看下拦截器接口的参数

🌲 cc *grpc.ClientConn客户端与服务端的链接

​ 这里的cc就是示例代码中client := helloworld.NewSearchServiceClient(conn)conn

🌲 invoker grpc.UnaryInvoker包装了服务实现

​ 调用完invoker即完成了RPC,所以我们可以改写req或者在获取到reply之后修改响应

3.2 streaming interceptors

要实现的接口StreamClientInterceptor

**

func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

和serve端类似的参数类似,重点关注下面几个参数

  • cs ClientStream:客户端视角的流。类比服务端的ss ServerStream,无论是哪一种流式RPC对于客户端来说发送(SendMsg)就代表着请求数据,接收(RecvMsg)就代表着响应数据(正好和服务端是反过来的)
  • streamer Streamer:完成RPC请求的调用

示例

func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc,cc *grpc.ClientConn, method string, streamer grpc.Streamer,opts ...grpc.CallOption) (grpc.ClientStream, error) {// Pre-processing logics := time.Now()cs, err := streamer(ctx, desc, cc, method, opts...)// Post processing logiclog.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))return cs, err
}func main() {conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), grpc.WithStreamInterceptor(streamInterceptor))if err != nil {panic(err)}defer conn.Close()// 1. 创建客户端实例client := streaming.NewGreeterClient(conn)// ...
}

如何记录或者修改流拦截器的请求响应数据?

和服务端stream interceptor同样的道理,通过包装ClientStream即可做到

**

func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {stream, err := streamer(ctx, desc, cc, method, opts...)return newStreamClient(stream), err
}// 嵌入式 streamClient 允许我们访问SendMsg和RecvMsg函数
type streamClient struct {grpc.ClientStream
}// 对ClientStream进行包装
func newStreamClient(c grpc.ClientStream) grpc.ClientStream {return &streamClient{c}
}// RecvMsg 从流中接收消息
func (e *streamClient) RecvMsg(m interface{}) error {// 在这里,我们可以对接收到的消息执行额外的逻辑,例如log.Printf("Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))if err := e.ClientStream.RecvMsg(m); err != nil {return err}return nil
}// SendMsg 向流中发送消息
func (e *streamClient) SendMsg(m interface{}) error {// 在这里,我们可以对发送消息执行额外的逻辑,例如log.Printf("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))if err := e.ClientStream.SendMsg(m); err != nil {return err}return nil
}

四、拦截器链

服务器只能配置一个 unary interceptorstream interceptor,否则会报错,客户端也是,虽然不会报错,但是只有最后一个才起作用。

// 服务端拦截器
s := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptor),grpc.StreamInterceptor(streamServerInterceptor),
)
// 客户端拦截器
conn, err := grpc.Dial("127.0.0.1:8090",grpc.WithInsecure(),grpc.WithUnaryInterceptor(unaryClientInterceptor),grpc.WithStreamInterceptor(streamClientInterceptor),
)

如果你想配置多个,可以使用拦截器链或者自己实现一个。

**

// 服务端拦截器
s := grpc.NewServer(grpc.ChainUnaryInterceptor(unaryServerInterceptor1,unaryServerInterceptor2,),grpc.ChainStreamInterceptor(serverStreamInterceptor1,serverStreamInterceptor2,),
)

**

// 客户端拦截器
conn, err := grpc.Dial("127.0.0.1:8090",grpc.WithInsecure(),grpc.WithChainUnaryInterceptor(unaryClientInterceptor1,unaryClientInterceptor2,),grpc.WithChainStreamInterceptor(streamClientInterceptor1,streamClientInterceptor2,),
)

五、生态

除了可以自己实现拦截器外,gRPC生态也提供了一系列的开源的拦截器可供使用,覆盖权限、日志、监控等诸多方面

https://github.com/grpc-ecosy…

Auth
  • grpc_auth - a customizable (via AuthFunc) piece of auth middleware
Logging
  • grpc_ctxtags - a library that adds a Tag map to context, with data populated from request body
  • grpc_zap - integration of zap logging library into gRPC handlers.
  • grpc_logrus - integration of logrus logging library into gRPC handlers.
Monitoring
  • grpc_prometheus⚡ - Prometheus client-side and server-side monitoring middleware
  • otgrpc⚡ - OpenTracing client-side and server-side interceptors
  • grpc_opentracing - OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
Client
  • grpc_retry - a generic gRPC response code retry mechanism, client-side middleware
Server
  • grpc_validator - codegen inbound message validation from .proto options
  • grpc_recovery - turn panics into gRPC errors
  • ratelimit - grpc rate limiting by your own limiter

参考资料

  • https://grpc.org.cn/docs/languages/go/basics
  • https://segmentfault.com/a/1190000043385059

示例代码

gitee

http://www.lryc.cn/news/607602.html

相关文章:

  • IO流File类的基本使用
  • 【2】专业自定义图表创建及应用方法
  • JS核心语法与实战技巧
  • 力扣:2477. 到达首都的最少油耗
  • OCR、文档解析工具合集
  • EasyExcel 格式设置大全
  • LangChain详解
  • OpenShift AI - 用 Hardware profiles 为运行环境分配可用的硬件规格
  • Windows和Linux的tree工具
  • 移动端 WebView 内存泄漏与性能退化问题如何排查 实战调试方法汇总
  • 【数据结构与算法】21.合并两个有序链表(LeetCode)
  • (28)运动目标检测之随机曲线上的离散点进行插值
  • 【MySQL索引失效场景】索引失效原因及最左前缀原则详解
  • 【C语言】字符函数与字符串函数详解
  • 数据结构(11)栈和队列算法题 OVA
  • dify 升级1.7.1 插件无法下载依赖
  • [VL|RIS] ReferSAM
  • 11.Layout-Pinia优化重复请求
  • 使用 whisper, 音频分割, 初步尝试,切割为小块,效果还不错 1
  • [ Leetcode ]---快乐数
  • [lvgl_player] 用户界面(LVGL) | 播放器核心设计
  • 8.1每日一题
  • Vue 3 入门教程 8 - 路由管理 Vue Router
  • 使用GPU和NPU视频生成的优劣对比
  • Windows系统优化命令-记录
  • 面向对象学习(一)
  • 【Linux我做主】细说环境变量
  • Vue2 项目实现 Gzip 压缩全攻略:从配置到部署避坑指南
  • IIS 让asp.net core 项目一直运行
  • TwinCAT3编程入门2