当前位置: 首页 > news >正文

golang线程池ants-实现架构

1、总体架构

ants协程池,在使用上有多种方式(使用方式参考这篇文章:golang线程池ants-四种使用方法),但是在实现的核心就一个,如下架构图:

总的来说,就是三个数据结构: Pool、WorkerStack、goWorker以及这三个结构实现的方法,了解了这些,基本上对ants的实现原理就了如指掌了。

2、详细实现

2.1 worker的设计实现

worker结构如下:

type goWorker struct {// pool who owns this worker.pool *Pool// task is a job should be done.task chan func()// lastUsed will be updated when putting a worker back into queue.lastUsed time.Time
}

该结构设计非常简单,三个成员:归属的线程池、执行函数、该worker最后一次运行时间,goWorker结构实现如下接口:

type worker interface {run()finish()lastUsedTime() time.TimeinputFunc(func())inputParam(interface{})
}

核心函数run,该函数从管道task里获取到任务函数,并执行,执行完成后,将此worker放回协程池(此时worker阻塞等待任务到来,调用函数:w.pool.revertWorker(w)放回池子中),以便复用:

func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {w.pool.once.Do(func() {close(w.pool.allDone)})}w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// Call Signal() here in case there are goroutines waiting for available workers.w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
}

finish函数,调用该函数,代表此worker的生命周期结束:

func (w *goWorker) finish() {w.task <- nil
}

这个时候run函数从遍历task管道中结束,进入defer函数,worker放入workerCache,备用。

inputFunc很容易理解,将任务放入管道,让worker去执行:

func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}

2.2 workerStack结构

type workerStack struct {items  []workerexpiry []worker
}

该结构就两个成员,都为worker的切片,items切片用于存储正常执行的worker,expiry存放过期的worker,workStack结构实现了如下接口:

type workerQueue interface {len() intisEmpty() boolinsert(worker) errordetach() workerrefresh(duration time.Duration) []worker // clean up the stale workers and return themreset()
}

len函数:返回正在运行worker的长度

isEmpty函数:判断是否有正在运行的worker

insert函数:将worker插入切片。

detach函数:获取一个worker。

refresh:更新所有worker,淘汰过期worker。

reset:清除所有worker。

重点看refresh函数:

func (wq *workerStack) refresh(duration time.Duration) []worker {n := wq.len()if n == 0 {return nil}expiryTime := time.Now().Add(-duration)index := wq.binarySearch(0, n-1, expiryTime)wq.expiry = wq.expiry[:0]if index != -1 {wq.expiry = append(wq.expiry, wq.items[:index+1]...)m := copy(wq.items, wq.items[index+1:])for i := m; i < n; i++ {wq.items[i] = nil}wq.items = wq.items[:m]}return wq.expiry
}

 这个函数用于根据给定的时间间隔duration来刷新工作队列中的过期项。主要执行以下步骤:

  1. 获取队列长度:首先,通过调用wq.len()获取工作队列wq中当前元素的数量n。如果队列为空(即n == 0),则直接返回nil,表示没有过期项。

  2. 计算过期时间:通过time.Now().Add(-duration)计算出一个时间点,这个时间点是duration时间之前的时间,即认为是“过期”的时间点。

  3. 二分查找:使用wq.binarySearch(0, n-1, expiryTime)在队列中查找第一个过期项的位置(即第一个最后使用时间早于expiryTime的项)。这个函数返回一个索引,如果找到这样的项,则返回该项的索引;如果没有找到,则返回-1

  4. 清理过期项

    • 首先,清空wq.expiry切片,用它来存储所有过期的项。
    • 如果找到了过期项(即index != -1),则将wq.items中从0index(包含index)的所有项(即所有过期项)追加到wq.expiry中。
    • 然后,使用copy函数将wq.items中从index+1n-1的所有项向前移动,覆盖掉前面的过期项。这里mcopy函数返回的值,表示实际复制的元素数量,即队列中剩余的非过期项的数量。
    • 接下来,遍历wq.items中从mn-1的所有位置,将它们设置为nil。
    • 最后,通过wq.items = wq.items[:m]更新wq.items的长度,去除所有过期的项。
  5. 返回过期项:函数返回wq.expiry,这是一个包含所有被移除的过期项的切片。

需要注意的是,wq.items是一个切片,用于存储工作项;wq.expiry也是一个切片,用于临时存储过期的项。

 2.3 Pool结构

pool结构的定义源码稍作改了一下,之前poolCommon的结构就是Pool的结构,目前最新版本做了一个简单的封装。

type Pool struct {poolCommon
}
type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx  context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx  context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}

创建一个线程池:

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{poolCommon: poolCommon{capacity: int32(size),allDone:  make(chan struct{}),lock:     syncx.NewSpinLock(),once:     &sync.Once{},options:  opts,}}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()p.goTicktock()return p, nil
}

看如下几行代码:

	p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}

workerCache为sync.Pool类型,sync.Pool是Go语言标准库中提供的一个对象池化的工具,旨在通过复用对象来减少内存分配的频率并降低垃圾回收的开销,从而提高程序的性能。其内部维护了一组可复用的对象。当你需要一个对象时,可以尝试从sync.Pool中获取。如果sync.Pool中有可用的对象,它将返回一个;否则,它会调用你提供的构造函数来创建一个新对象,sync.PoolNew字段是一个可选的函数,用于在池中无可用对象时创建新的对象。

这里这样写即为:当无可用的worker时,则通过New函数创建一个新的worker。

创建workder列表,内部其实就是创建了了一个切片,类型为workerStack,用于管理所有的worker。

p.workers = newWorkerQueue(queueTypeStack, 0)

NewPool函数执行完成后,一个协程池就创建完成了。

协程池创建完成后,需要用来处理任务,如何将任务函数传递到worker去执行呢?看如下函数:

// Submit submits a task to this pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()if w != nil {w.inputFunc(task)}return err
}

函数的入参为一个无返回值、无入参的函数,因此所有需要worker执行的函数都是func()类型,w, err := p.retrieveWorker(),取出一个空闲worker,取出成功后,将任务传递到worker内部:w.inputFunc(task),注意,当线程池中所有worker都忙碌时,inputFunc函数阻塞,一直到有worker空闲。

其他主要的函数,从池中获取worker的函数:

func (p *Pool) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// First try to fetch the worker from the queue.if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// If the worker queue is empty, and we don't run out of the pool capacity,// then just spawn a new worker goroutine.if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(*goWorker)w.run()return}// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}goto retry
}

这个函数,获取worker有三个逻辑:

  • 当池中有空闲worker,直接获取。
  • 当池中没有空闲worker,从缓存workerCache中取出过期的worker使用,复用资源,降低开销。
  • 等待有worker执行完任务释放。(阻塞情况)

revertWorker,将worker放回池中,以执行下次的任务。 

func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.lastUsed = p.nowTime()p.lock.Lock()// To avoid memory leaks, add a double check in the lock scope.// Issue: https://github.com/panjf2000/ants/issues/113if p.IsClosed() {p.lock.Unlock()return false}if err := p.workers.insert(worker); err != nil {p.lock.Unlock()return false}// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.p.cond.Signal()p.lock.Unlock()return true
}

以上就为ants线程池实现的主要技术细节,希望对各位热爱技术的同学们提供一些些帮助。

3、总结

ants协程池是一个高性能、易用的Go语言协程池库,它通过复用goroutines、自动调度任务、定期清理过期goroutines等方式,帮助开发者更加高效地管理并发任务。无论是处理网络请求、数据处理还是其他需要高并发性能的场景,ants协程池都是一个值得推荐的选择。

http://www.lryc.cn/news/393346.html

相关文章:

  • Mysql面试合集
  • Android Gradle 开发与应用 (五): 构建变体与自定义任务
  • Django学习第六天
  • docker部署mycat,连接上面一篇的一主二从mysql
  • VUE2拖拽组件:vue-draggable-resizable-gorkys
  • 容器:stack
  • 跨平台Ribbon UI组件QtitanRibbon全新发布v6.7.0——支持Qt 6.6.3
  • (6) 深入探索Python-Pandas库的核心数据结构:DataFrame全面解析
  • 在 Azure 云中开始使用适用于 Ubuntu 的 Grafana
  • 1.Python学习笔记
  • 中英双语介绍百老汇著名歌剧:《猫》(Cats)和《剧院魅影》(The Phantom of the Opera)
  • RpcChannel的调用过程
  • 东芝TB6560AHQ/AFG步进电机驱动IC:解锁卓越的电机控制性能
  • 免杀笔记 ----> DLL注入
  • 奇迹MU 骷髅战士在哪
  • leetcode力扣_贪心思想
  • Vue中Class数据绑定
  • Python数据分析案例49——基于机器学习的垃圾邮件分类系统构建(朴素贝叶斯,支持向量机)
  • 贪心算法-以学籍管理系统为例
  • PyCharm 安装
  • C++:对象指针访问成员函数
  • Linux 防火墙配置指南:firewalld 端口管理应用案例(二十个实列)
  • 推荐Bulk Image Downloader插件下载网页中图片链接很好用
  • 详解前缀码与前缀编码
  • 数据库管理工具 -- Navicat Premium v17.0.8 特别版
  • 【Linux】进程创建和终止 | slab分配器
  • 计算机网络--网络层
  • 【CSS】如何实现分栏布局
  • 2025湖北武汉智慧教育装备信息化展/智慧校园展/湖北高博会
  • Android Studio Run窗口中文乱码解决办法