gRPC之Interceptor
1、gRPC Interceptor
在应用开发过程中会有这样的需求,就是在请求执行前后做一些通用的处理逻辑,比如记录日志、tracing、身份
认证等,在web框架中一般是使用middleware来实现的,gRPC 在客户端和服务端都支持了拦截器功能,用来处
理这种业务需求。
gRPC服务端跟客户端均可实现各自的拦截器,根据rpc的两种请求方式可分为两种。
-
Unary Interceptor
(一元拦截器) -
Stream Interceptor
(流式拦截器)
1.1 一元拦截器
对于一元服务器拦截器,只需要定义UnaryServerInterceptor
方法即可,其中,handler(ctx, req)
即调用
rpc方法。
type UnaryServerInterceptor func(ctx context.Context, // rpc上下文req interface{}, // rpc请求参数info *UnaryServerInfo, // rpc方法信息handler UnaryHandler // rpc方法本身,真正执行逻辑
) (interface{}, error){return handler(ctx, req)
}
而对于一元客户端拦截器,一样需要定义一个方法UnaryClientInterceptor
,其中执行invoker()
才真正请求
rpc。
type UnaryClientInterceptor func(ctx context.Context, // rpc上下文method string, // 调用方法名req, // rpc请求参数reply interface{}, // rpc响应结果cc *ClientConn, // 连接句柄invoker UnaryInvoker, // 调用rpc方法本身opts ...CallOption // 调用配置
) error {return invoker(ctx, method, req, reply, cc, opts...)
}
一元拦截器的实现,根据调用handler或invoker的前后,可分为三部分:调用前预处理,调用rpc方法,调用后处
理。
下面在客户端和服务端分别实现一个记录请求日志的拦截器,打印请求前后的信息。
1.1.1 proto编写
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应rpc Ping(PingRequest) returns (PongResponse);
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}
1.1.2 生成bp.go文件
$ protoc -I . --go_out=plugins=grpc:. ./ping.proto
1.1.3 编写server
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""log""net"
)// PingPongServer实现pb.PingPongServer接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// Ping 单次请求-响应模式
func (s *PingPongServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PongResponse, error) {return &pb.PongResponse{Value: "pong"}, nil
}// 服务端拦截器 - 记录请求和响应日志
func serverUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {// 前置逻辑log.Printf("[Server Interceptor] accept request: %s", info.FullMethod)// 处理请求response, err := handler(ctx, req)// 后置逻辑log.Printf("[Server Interceptor] response: %s", response)return response, err
}// 启动server
func main() {// 以option的方式添加拦截器srv := grpc.NewServer(grpc.UnaryInterceptor(serverUnaryInterceptor))// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":1234")if err != nil {log.Fatal(err)}log.Println("listen on 1234")srv.Serve(lis)
}
[root@zsx demo]# go run server.go
2023/02/11 12:13:48 listen on 1234
1.1.4 编写client
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""log"
)// 客户端拦截器 - 记录请求和响应日志
func clientUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {// 前置逻辑log.Printf("[Client Interceptor] send request: %s", method)// 发起请求err := invoker(ctx, method, req, reply, cc, opts...)// 后置逻辑log.Printf("[Client Interceptor] response: %s", reply)return err
}// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), grpc.WithUnaryInterceptor(clientUnaryInterceptor))if err != nil {log.Fatal(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)res, err := client.Ping(context.Background(), &pb.PingRequest{Value: "ping"})if err != nil {log.Fatal(err)}log.Println(res.Value)
}
[root@zsx demo]# go run client.go
2023/02/11 12:23:31 [Client Interceptor] send request: /protos.PingPong/Ping
2023/02/11 12:23:31 [Client Interceptor] response: value:"pong"
2023/02/11 12:23:31 pong
server
端输出:
2023/02/11 12:23:31 [Server Interceptor] accept request: /protos.PingPong/Ping
2023/02/11 12:23:31 [Server Interceptor] response: value:"pong"
这里分别定义了 serverUnaryInterceptor
和 clientUnaryInterceptor
拦截器,函数的签名定义在
google.golang.org/grpc
包中,分别为 UnaryServerInterceptor
和 UnaryClientInterceptor
, 在前置逻
辑位置可以对请求信息做处理,在后置逻辑位置可以对响应信息做处理。在初始化服务端和客户端连接时以
option的形式配置就好了,同时也支持配置多个拦截器。
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
1.2 流式拦截器
流式拦截器的实现,与一元拦截器一致,实现提供的方法即可,方法参数含义如下:
type StreamServerInterceptor func(srv interface{}, // rpc请求参数ss ServerStream, // 服务端stream对象info *StreamServerInfo, // rpc方法信息handler StreamHandler // rpc方法本身,真正执行逻辑
) (err error){return handler(src, ss)
}
type StreamClientInterceptor func(ctx context.Context, // rpc上下文desc *StreamDesc, // 流信息cc *ClientConn, // 连接句柄method string, // 调用方法名streamer Streamer, // 调用rpc方法本身opts ...CallOption // 调用配置
)(ClientStream, error){// 流操作预处理clientStream, err := streamer(ctx, desc, cc, method, opts...)// 根据某些条件,通过clientStream拦截流操作return clientStream, err
}
与其他拦截器不同,客户端流式拦截器的实现分为两部分,流操作预处理和流操作拦截,其不能在事后进行rpc方
法调用和后处理,只能通过ClientStream对象进行流操作拦截,例如根据特定的metadata,调用
ClientStream.CloseSend()
终止流操作。
下面同样实现一个打印请求和响应日志的拦截器,只是函数签名变成了 grpc.StreamServerInterceptor
和
grpc.StreamClientInterceptor
。
1.2.1 proto编写
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应rpc Ping(PingRequest) returns (PongResponse);rpc Streaming (stream StreamRequest) returns (stream StreamResponse) {}
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}message StreamRequest {string input = 1;
}message StreamResponse {string output = 1;
}
1.2.2 生成bp.go文件
$ protoc -I . --go_out=plugins=grpc:. ./ping.proto
1.2.3 服务端
服务端实现其实和一元拦截器的使用方式没太大区别,但是流的特性在于请求和响应不是一次性处理完成的,而是
多次发送和接收数据,所以我们可能需要在发送和接收数据的过程中处理一些公共逻辑,这才是流拦截器特别的地
方。我们注意到 handler
方法调用的第二个参数是一个 grpc.ServerStream
接口类型,这个接口类型包含了
SendMsg
和 RecvMsg
方法,所以我们可以使用一个自定义类型实现这个接口,对原对象进行包装重写这两个
方法,这样就能达到我们的目的了。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""log""net""io""strconv"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// Ping 单次请求-响应模式
func (s *PingPongServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PongResponse, error) {return &pb.PongResponse{Value: "pong"}, nil
}// 服务端拦截器 - 记录stream请求和响应日志
func serverStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// 前置逻辑log.Printf("[Server Stream Interceptor] accept request: %s", info.FullMethod)// 处理请求,使用自定义类型包装 ServerStreamerr := handler(srv, &customServerStream{ss})return err
}type customServerStream struct {grpc.ServerStream
}func (s *customServerStream) SendMsg(m interface{}) error {log.Printf("[Server Stream Interceptor] send: %T", m)return s.ServerStream.SendMsg(m)
}func (s *customServerStream) RecvMsg(m interface{}) error {log.Printf("[Server Stream Interceptor] recv: %T", m)return s.ServerStream.RecvMsg(m)
}// 一元拦截器
// 服务端拦截器 - 记录请求和响应日志
func serverUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {// 前置逻辑log.Printf("[Server Interceptor] accept request: %s", info.FullMethod)// 处理请求response, err := handler(ctx, req)// 后置逻辑log.Printf("[Server Interceptor] response: %s", response)return response, err
}// 启动server
func main() {// 以option的方式添加拦截器opts := []grpc.ServerOption{grpc.UnaryInterceptor(serverUnaryInterceptor),grpc.StreamInterceptor(serverStreamInterceptor),}srv := grpc.NewServer(opts...)// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":50001")if err != nil {log.Fatal(err)}log.Println("listen on 50001")srv.Serve(lis)
}func (s *PingPongServer) Streaming(stream pb.PingPong_StreamingServer) error {for n := 0; ; {res, err := stream.Recv()if err == io.EOF {return nil}if err != nil {return err}v, _ := strconv.Atoi(res.Input)log.Println(v)n += vstream.Send(&pb.StreamResponse{Output: strconv.Itoa(n)})}
}
[root@zsx demo]# go run server.go
2023/02/11 17:27:21 listen on 50001
1.2.4 客户端
客户端的使用方式和服务端类似,只是对应的数据处理接口类型变成了 grpc.ClientStream
。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""log""io""strconv"
)// 一元拦截器
// 客户端拦截器 - 记录请求和响应日志
func clientUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {// 前置逻辑log.Printf("[Client Interceptor] send request: %s", method)// 发起请求err := invoker(ctx, method, req, reply, cc, opts...)// 后置逻辑log.Printf("[Client Interceptor] response: %s", reply)return err
}// 流拦截器
func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {// 前置逻辑log.Printf("[Client Stream Interceptor] send request: %s", method)// 请求s, err := streamer(ctx, desc, cc, method, opts...)if err != nil {return nil, err}// 自定义类型包装 ClientStreamreturn &customClientStream{s}, nil
}type customClientStream struct {grpc.ClientStream
}func (s *customClientStream) SendMsg(m interface{}) error {log.Printf("[Client Stream Interceptor] send: %T", m)return s.ClientStream.SendMsg(m)
}func (s *customClientStream) RecvMsg(m interface{}) error {log.Printf("[Client Stream Interceptor] recv: %T", m)return s.ClientStream.RecvMsg(m)
}// Ping 单次请求-响应模式
func main() {opts := []grpc.DialOption{grpc.WithInsecure(),grpc.WithUnaryInterceptor(clientUnaryInterceptor),grpc.WithStreamInterceptor(clientStreamInterceptor),}conn, err := grpc.Dial("localhost:50001", opts...)if err != nil {log.Fatal(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)res, err := client.Ping(context.Background(), &pb.PingRequest{Value: "ping"})if err != nil {log.Fatal(err)}log.Println(res.Value)streaming(client)
}func streaming(client pb.PingPongClient) error {stream, _ := client.Streaming(context.Background())for n := 0; n < 10; n++ {log.Println("Streaming Send:", n)err := stream.Send(&pb.StreamRequest{Input: strconv.Itoa(n)})if err != nil {return err}res, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}log.Println("Streaming Recv:", res.Output)}stream.CloseSend()return nil
}
[root@zsx demo]# go run client.go
2023/02/11 17:55:29 [Client Interceptor] send request: /protos.PingPong/Ping
2023/02/11 17:55:29 [Client Interceptor] response: value:"pong"
2023/02/11 17:55:29 pong
2023/02/11 17:55:29 [Client Stream Interceptor] send request: /protos.PingPong/Streaming
2023/02/11 17:55:29 Streaming Send: 0
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 0
2023/02/11 17:55:29 Streaming Send: 1
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 1
2023/02/11 17:55:29 Streaming Send: 2
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 3
2023/02/11 17:55:29 Streaming Send: 3
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 6
2023/02/11 17:55:29 Streaming Send: 4
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 10
2023/02/11 17:55:29 Streaming Send: 5
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 15
2023/02/11 17:55:29 Streaming Send: 6
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 21
2023/02/11 17:55:29 Streaming Send: 7
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 28
2023/02/11 17:55:29 Streaming Send: 8
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 36
2023/02/11 17:55:29 Streaming Send: 9
2023/02/11 17:55:29 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:55:29 [Client Stream Interceptor] recv: *ping.StreamResponse
2023/02/11 17:55:29 Streaming Recv: 45
server
输出:
2023/02/11 17:55:17 listen on 50001
2023/02/11 17:55:29 [Server Interceptor] accept request: /protos.PingPong/Ping
2023/02/11 17:55:29 [Server Interceptor] response: value:"pong"
2023/02/11 17:55:29 [Server Stream Interceptor] accept request: /protos.PingPong/Streaming
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 0
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 1
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 2
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 3
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 4
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 5
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 6
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 7
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 8
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
2023/02/11 17:55:29 9
2023/02/11 17:55:29 [Server Stream Interceptor] send: *ping.StreamResponse
2023/02/11 17:55:29 [Server Stream Interceptor] recv: *ping.StreamRequest
server端的send和recv互成一对,最后一次输出的recv是结束消息,err == io.EOF
。
注意:在自定义的 RecvMsg
方法中,前置位置只能读取消息的类型,无法读取实际数据,因为这个时候接收到的
消息还没有解析处理,如果要获取接收消息的实际数据,需要把自定义的处理逻辑放在后置位置。
func (s *customClientStream) RecvMsg(m interface{}) error {err := s.ClientStream.RecvMsg(m)log.Printf("[Client Stream Interceptor] recv: %v", m)return err
}
修改之后客户端的输出:
[root@zsx demo]# go run client.go
2023/02/11 17:57:32 [Client Interceptor] send request: /protos.PingPong/Ping
2023/02/11 17:57:32 [Client Interceptor] response: value:"pong"
2023/02/11 17:57:32 pong
2023/02/11 17:57:32 [Client Stream Interceptor] send request: /protos.PingPong/Streaming
2023/02/11 17:57:32 Streaming Send: 0
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"0"
2023/02/11 17:57:32 Streaming Recv: 0
2023/02/11 17:57:32 Streaming Send: 1
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"1"
2023/02/11 17:57:32 Streaming Recv: 1
2023/02/11 17:57:32 Streaming Send: 2
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"3"
2023/02/11 17:57:32 Streaming Recv: 3
2023/02/11 17:57:32 Streaming Send: 3
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"6"
2023/02/11 17:57:32 Streaming Recv: 6
2023/02/11 17:57:32 Streaming Send: 4
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"10"
2023/02/11 17:57:32 Streaming Recv: 10
2023/02/11 17:57:32 Streaming Send: 5
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"15"
2023/02/11 17:57:32 Streaming Recv: 15
2023/02/11 17:57:32 Streaming Send: 6
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"21"
2023/02/11 17:57:32 Streaming Recv: 21
2023/02/11 17:57:32 Streaming Send: 7
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"28"
2023/02/11 17:57:32 Streaming Recv: 28
2023/02/11 17:57:32 Streaming Send: 8
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"36"
2023/02/11 17:57:32 Streaming Recv: 36
2023/02/11 17:57:32 Streaming Send: 9
2023/02/11 17:57:32 [Client Stream Interceptor] send: *ping.StreamRequest
2023/02/11 17:57:32 [Client Stream Interceptor] recv: output:"45"
2023/02/11 17:57:32 Streaming Recv: 45
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
1.3 其它拦截器
如果需要使用多个拦截器,grpc-go中提供了相应的四种拦截器:
-
grpc.ChainUnaryInterceptor(i ...UnaryServerInterceptor)
-
grpc.ChainStreamInterceptor(i ...StreamServerInterceptor)
-
grpc.WithChainUnaryInterceptor(i ...UnaryClientInterceptor)
-
grpc.WithChainStreamInterceptor(i ...StreamClientInterceptor)
如果grpc版本过老,可能还未提供chain api,可以使用第三方库grpc-ecosystem/go-grpc-middleware
。
go-grpc-middleware地址:https://github.com/grpc-ecosystem/go-grpc-middleware
。
除了链接器,库中还提供了许多常用的拦截器,例如grpc_zap
,grpc_recovery
等。当然,特殊需求也可以通
过实现对应方法,实现自定义interceptor
。
1.4 Interceptor实现Token验证
grpc服务端和客户端都提供了interceptor功能,功能类似middleware,很适合在这里处理验证、日志等流程。
在自定义Token认证中,认证信息是由每个服务中的方法处理并认证的,如果有大量的接口方法,这种姿势就太不
优雅了,每个接口实现都要先处理认证信息。这个时候interceptor就可以用来解决了这个问题,在请求被转到具
体接口之前处理认证信息,一处认证,到处无忧。在客户端,我们增加一个请求日志,记录请求相关的参数和耗时
等等。
本案例结合前面的TLS+Toekn
自定义认证方式和Interceptor
实现Token验证。
1.4.1 proto编写
syntax = "proto3"; // 指定proto版本
package hello; // 指定默认包名
// 指定golang包名
option go_package = "./hello";
// 定义Hello服务
service Hello {// 定义SayHello方法rpc SayHello(HelloRequest) returns (HelloResponse) {}
}
// HelloRequest 请求结构
message HelloRequest {string name = 1;
}
// HelloResponse 响应结构
message HelloResponse {string message = 1;
}
1.4.2 生成bp.go文件
$ protoc -I . --go_out=plugins=grpc:. ./hello.proto
1.4.3 编写server
package mainimport ("context""fmt"pb "demo/hello""google.golang.org/grpc"// grpc 响应状态码"google.golang.org/grpc/codes"// grpc认证包"google.golang.org/grpc/credentials""log"// grpc metadata包"google.golang.org/grpc/metadata""net"
)const (// Address gRPC服务地址Address = "127.0.0.1:50052"
)// 定义helloService并实现约定的接口
type helloService struct{}// HelloService Hello服务
var HelloService = helloService{}// SayHello实现Hello服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {resp := new(pb.HelloResponse)resp.Message = fmt.Sprintf("Hello %s.", in.Name)return resp, nil
}func main() {listen, err := net.Listen("tcp", Address)if err != nil {log.Fatalf("Failed to listen: %v", err)}var opts []grpc.ServerOption// TLS认证creds, err := credentials.NewServerTLSFromFile("./cert/server/server.pem", "./cert/server/server.key")if err != nil {log.Fatalf("Failed to generate credentials %v", err)}opts = append(opts, grpc.Creds(creds))// 注册interceptoropts = append(opts, grpc.UnaryInterceptor(interceptor))// 实例化grpc Servers := grpc.NewServer(opts...)// 注册HelloServicepb.RegisterHelloServer(s, HelloService)log.Println("Listen on " + Address + " with TLS + Token + Interceptor")s.Serve(listen)
}// auth 验证Token
func auth(ctx context.Context) error {md, ok := metadata.FromIncomingContext(ctx)if !ok {return grpc.Errorf(codes.Unauthenticated, "无Token认证信息")}var (appid stringappkey string)if val, ok := md["appid"]; ok {appid = val[0]}if val, ok := md["appkey"]; ok {appkey = val[0]}if appid != "101010" || appkey != "I am key" {return grpc.Errorf(codes.Unauthenticated, "Token认证信息无效: appid=%s, appkey=%s", appid, appkey)}return nil
}// interceptor 拦截器
func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {err := auth(ctx)if err != nil {return nil, err}// 继续处理请求return handler(ctx, req)
}
[root@zsx demo]# go run server.go
2023/02/11 18:50:44 Listen on 127.0.0.1:50052 with TLS + Token + Interceptor
1.4.4 编写client
package mainimport ("context"// 引入proto包pb "demo/hello""google.golang.org/grpc"// 引入grpc认证包"google.golang.org/grpc/credentials""log""time"
)const (// Address gRPC服务地址Address = "127.0.0.1:50052"// OpenTLS 是否开启TLS认证OpenTLS = true
)// customCredential 自定义认证
type customCredential struct{}// GetRequestMetadata 实现自定义认证接口
func (c customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {return map[string]string{"appid": "101010","appkey": "I am key",}, nil
}// RequireTransportSecurity 自定义认证是否开启TLS
func (c customCredential) RequireTransportSecurity() bool {return OpenTLS
}func main() {var err errorvar opts []grpc.DialOptionif OpenTLS {// TLS连接creds, err := credentials.NewClientTLSFromFile("./cert/server/server.pem", "test.example.com")if err != nil {log.Fatalf("Failed to create TLS credentials %v", err)}opts = append(opts, grpc.WithTransportCredentials(creds))} else {opts = append(opts, grpc.WithInsecure())}// 指定自定义认证opts = append(opts, grpc.WithPerRPCCredentials(new(customCredential)))// 指定客户端interceptoropts = append(opts, grpc.WithUnaryInterceptor(interceptor))conn, err := grpc.Dial(Address, opts...)if err != nil {log.Fatalln(err)}defer conn.Close()// 初始化客户端c := pb.NewHelloClient(conn)// 调用方法req := &pb.HelloRequest{Name: "gRPC"}res, err := c.SayHello(context.Background(), req)if err != nil {log.Fatalln(err)}log.Println(res.Message)
}// interceptor 客户端拦截器
func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {start := time.Now()err := invoker(ctx, method, req, reply, cc, opts...)log.Printf("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)return err
}
[root@zsx demo]# go run client.go
2023/02/11 18:54:01 method=/hello.Hello/SayHello req=name:"gRPC" rep=message:"Hello gRPC." duration=3.708474ms error=<nil>
2023/02/11 18:54:01 Hello gRPC.
# 项目结构
[root@zsx protoc]# tree demo/
demo/
├── cert
│ ├── ca.crt
│ ├── ca.csr
│ ├── ca.key
│ ├── ca.srl
│ ├── client
│ │ ├── client.csr
│ │ ├── client.key
│ │ └── client.pem
│ ├── openssl.cnf
│ └── server
│ ├── server.csr
│ ├── server.key
│ └── server.pem
├── client.go
├── go.mod
├── go.sum
├── hello
│ └── hello.pb.go
├── hello.proto
└── server.go4 directories, 17 files
go-grpc-middleware:https://github.com/grpc-ecosystem/go-grpc-middleware
这个项目对interceptor进行了封装,支持多个拦截器的链式组装,对于需要多种处理的地方使用起来会更方便
些。