当前位置: 首页 > news >正文

【gRPC】clientPool 客户端连接池简单实现与go案例

什么是 gRPC 客户端连接池?

  • 在 gRPC 中,创建和维护一个到服务器的连接是非常消耗资源的(比如 TCP 连接建立和 TLS 握手)。

  • 而在高并发场景下,如果每次请求都创建新的连接,不仅会导致性能下降,还可能耗尽系统资源。

  • 因此,客户端连接池的作用是复用一定数量的连接,提高资源利用率和性能。


gRPC 客户端连接池的原理

  1. 连接复用,池子里的连接使用时取出,用完放回
  2. 控制连接数,可以固定数量或动态调整,防止建太多连接
  3. 并发安全

先展示一个基于sync.pool创建的clientPool

  • 实际上,企业不推荐使用sync包里的无锁机制,
  • 因为sync包里的无锁设计适用于高并发,短暂资源的情况,
  • 而gRPC本身设计初衷是客户端连接是长生命周期,需要稳定管理的资源,与sync.pool的特性不完全匹配
因此为了更好实现,可以自己加锁设计,或者使用第三方库这里举例github:go-grpc-pool

type ClientPool interface {Get() *grpc.ClientConnPut(conn *grpc.ClientConn)
}type clientPool struct {pool sync.Pool
}func GetPool(target string, opts ...grpc.DialOption) (ClientPool, error) {return &clientPool{pool: sync.Pool{New: func() any {conn, err := grpc.Dial(target, opts...)if err != nil {log.Println(err)return nil}return conn},},}, nil
}func (c *clientPool) Get() *grpc.ClientConn {conn := c.pool.Get().(*grpc.ClientConn)if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {conn.Close()conn = c.pool.New().(*grpc.ClientConn)}return conn
}func (c *clientPool) Put(conn *grpc.ClientConn) {if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {conn.Close()return}c.pool.Put(conn)
}

自己加锁设计

package mainimport ("log""sync""time""google.golang.org/grpc""google.golang.org/grpc/connectivity"
)// ClientPool 定义接口
type ClientPool interface {Get() (*grpc.ClientConn, error)Put(conn *grpc.ClientConn)Close()
}// clientPool 是 ClientPool 的实现
type clientPool struct {mu          sync.Mutexconnections chan *grpc.ClientConnmaxSize     intidleTimeout time.Durationtarget      stringopts        []grpc.DialOptionclosed      bool
}// NewClientPool 创建一个新的客户端连接池
func NewClientPool(target string, maxSize int, idleTimeout time.Duration, opts ...grpc.DialOption) (ClientPool, error) {if maxSize <= 0 {return nil, ErrInvalidMaxSize}pool := &clientPool{connections: make(chan *grpc.ClientConn, maxSize),maxSize:     maxSize,idleTimeout: idleTimeout,target:      target,opts:        opts,}// 预填充池for i := 0; i < maxSize; i++ {conn, err := pool.createConnection()if err != nil {return nil, err}pool.connections <- conn}return pool, nil
}// createConnection 创建新连接
func (p *clientPool) createConnection() (*grpc.ClientConn, error) {conn, err := grpc.Dial(p.target, p.opts...)if err != nil {return nil, err}return conn, nil
}// Get 从连接池获取一个连接
func (p *clientPool) Get() (*grpc.ClientConn, error) {p.mu.Lock()defer p.mu.Unlock()if p.closed {return nil, ErrPoolClosed}select {case conn := <-p.connections:// 检查连接状态if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {conn.Close()return p.createConnection()}return conn, nildefault:// 如果没有空闲连接,尝试创建新的连接return p.createConnection()}
}// Put 将连接放回池中
func (p *clientPool) Put(conn *grpc.ClientConn) {if conn == nil {return}// 检查连接状态if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {conn.Close()return}select {case p.connections <- conn:// 放回池中default:// 如果池已满,直接关闭连接conn.Close()}
}// Close 关闭连接池
func (p *clientPool) Close() {p.mu.Lock()defer p.mu.Unlock()if p.closed {return}p.closed = trueclose(p.connections)for conn := range p.connections {conn.Close()}
}// 错误定义
var (ErrInvalidMaxSize = log.New("invalid max size")ErrPoolClosed     = log.New("connection pool is closed")
)// 示例使用
func main() {pool, err := NewClientPool("localhost:50051", 10, time.Minute, grpc.WithInsecure())if err != nil {log.Fatalf("Failed to create pool: %v", err)}conn, err := pool.Get()if err != nil {log.Fatalf("Failed to get connection: %v", err)}// 使用连接// client := pb.NewYourServiceClient(conn)// 放回连接pool.Put(conn)// 程序退出时关闭连接池pool.Close()
}
http://www.lryc.cn/news/521824.html

相关文章:

  • Android 15应用适配指南:所有应用的行为变更
  • 24-25-1-单片机开卷部分习题和评分标准
  • STM32第6章、WWDG
  • 汽车免拆诊断案例 | 2007 款法拉利 599 GTB 车发动机故障灯异常点亮
  • C语言-数据结构-队列
  • STL之VectorMapList针对erase方法踩坑笔记
  • 梯度下降法为什么要提前停止
  • 【vue3项目使用 animate动画效果】
  • 1.1.1 C语言常用的一些函数(持续更新)
  • 李宏毅机器学习课程笔记03 | 类神经网络优化技巧
  • 简洁明快git入门及github实践教程
  • Python使用socket实现简易的http服务
  • 【Hive】海量数据存储利器之Hive库原理初探
  • linux系统监视(centos 7)
  • Blazor中Syncfusion图像编辑器组件使用方法
  • 电动汽车V2G技术Matlab/Simulink仿真模型
  • C++中的unordered_set和unordered_map的模拟实现
  • Spring Boot 2 学习指南与资料分享
  • (一)QSQLite3库简介
  • 《计算机网络》课后探研题书面报告_网际校验和算法
  • hot100_240. 搜索二维矩阵 II
  • 78_Redis网络模型
  • python范围
  • vulnhub靶场【Raven系列】之2 ,对于mysql udf提权的复习
  • 基于vite+vue3+mapbox-gl从零搭建一个项目
  • 向harbor中上传镜像(向harbor上传image)
  • 【线性代数】行列式的性质
  • 智能家居企业如何通过设计师渠道打造第二曲线?
  • Unity3d 实时天气系统基于UniStorm插件和xx天气API实现(含源码)
  • 年后找工作需要注意的事项