当前位置: 首页 > news >正文

go sync包(五) WaitGroup

WaitGroup

sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))for _, request := range requests {go func(r *Request) {defer wg.Done()// res, err := service.call(r)}(request)
}
wg.Wait()
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers only guarantee that 64-bit fields are 32-bit aligned.// For this reason on 32 bit architectures we need to check in state()// if state1 is aligned or not, and dynamically "swap" the field order if// needed.state1 uint64state2 uint32
}
  • noCopy:禁止拷贝。
  • state:state 是 WaitGroup 的状态数据字段,且是一个无符号64 bit的数据,内容包含了 counter , waiter 信息
    • counter 代表目前尚未完成的个数,WaitGroup.Add(n) 将会导致 counter += n, 而 WaitGroup.Done() 将导致 counter–。
    • waiter 代表目前已调用 WaitGroup.Wait 的 goroutine 的个数。

在这里插入图片描述

Add

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()if race.Enabled {if delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}// counter 增加数量state := wg.state.Add(uint64(delta) << 32)// 取出 counterv := int32(state >> 32)// 取出 waiterw := uint32(state)if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(&wg.sema))}// counter < 0,panicif v < 0 {panic("sync: negative WaitGroup counter")}// delta > 0 && v == int32(delta) : 表示从 0 开始添加计数值// w!=0 :表示已经有了等待者// 说明说明在调用了 Wait() 方法之后又想加入新的等待者,这种操作是不允许的if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.// 避免并发调用 Add() 和 Wait()if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.// 唤醒所有 waiter*statep = 0for ; w != 0; w-- {runtime_Semrelease(semap, false, 0)}
}

Wait

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref early race.Disable()}for {// 读取state// 高32位 ==> counter// 低32位 ==> waiterstate := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)// counter减到0了,returnif v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.// counter 不为0,waiters++if wg.state.CompareAndSwap(state, state+1) {if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(semap))}// 阻塞runtime_Semacquire(semap)// 被唤醒,检查state是否等于0// 不为0,说明存在计数值未恢复为0就重用,panicif *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}

Done

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}

Done 只是调用了 Add() ,将groutine - 1。

小结

  • sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能被重新使用。
  • sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,我们可以向 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine。
  • 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒。
http://www.lryc.cn/news/392479.html

相关文章:

  • 基于深度学习的相机内参标定
  • 适合金融行业的国产传输软件应该是怎样的?
  • 昇思25天学习打卡营第9天|MindSpore使用静态图加速(基于context的开启方式)
  • class类和style内联样式的绑定
  • 3033.力扣每日一题7/5 Java
  • GPT-5:下一代AI如何彻底改变我们的未来
  • 重载一元运算符
  • 10元 DIY 一个柔性灯丝氛围灯
  • 表单自定义组件 - 可选择卡片SelectCard
  • Ubuntu / Debian安装FTP服务
  • 若依 Vue 前端分离 3.8.8 版中生成的前端代码中关于下拉框只有下拉箭头的问题
  • C++把一个类封装成动态链接库
  • 每天一个项目管理概念之项目章程
  • c++11新特性-4-返回类型后置
  • Linux-C语言实现一个进度条小项目
  • vue使用glide.js实现轮播图(可直接复制使用)
  • TK养号工具开发会用上的源代码科普!
  • 信创-办公软件应用工程师认证
  • 数组操作forEach和map
  • 流式处理应用场景与流式计算处理框架选择建议
  • 2024年软件测试岗必问的100+个面试题【含答案】
  • A4-C四驱高防轮式巡检机器人
  • Https网站如何申请免费的SSL证书及操作使用指南
  • 实现资产优化管理:智慧校园资产分类功能解析
  • 大厂开发必知必会:Devops、CI/CD、流水线和Paas的关系解析说明
  • Qt学习:Qt窗口组件以及窗口类型
  • 基于AGX ORIN与FPGA K7实现PCIE高速数据通信/Orin与FPGA高速数据传输/XDMA在linux系统使用教程
  • Vue3:全局播放背景音乐
  • 2024年07月03日 Redis部署方式和持久化
  • 成都仅需浏览器即可快速查看的数据采集监控平台!