当前位置: 首页 > news >正文

Go中同/异步与锁的应用~~sync包

Go中锁的实现~~sync包

go中sync包中提供了互斥锁;

在前面Go中channel文章中我们使用了time.Sleep()函数使得main函数的Goroutine阻塞至所有协程Goroutine结束,但这并不是一个很好的办法,因为我们实际应用中并不能准确知道协程什么时候结束(这里面要考虑服务器的性能,网络波动以及io等一系列因素;

sysn包中提供了WaitGroup来实现协程之间的协调;

同步等待组

同步的sync与异步的sync;

在go中提供了同步等待组WaitGroup
来看源码:

//等待一组Goroutine完成; 阻塞的直至所有的goroutine完成;
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {noCopy noCopystate atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.sema  uint32
}

WaitGroup 实现了一些函数:

Add() 方法来设置应等待Goroutine的数量;

在结构体WaitGroup 中有这么一个属性:
state atomic.Uint64 //高32用于计数,低32为用于统计等待的数量

//如果
func (wg *WaitGroup) Add(delta int) {if race.Enabled {if delta < 0 { //判断增加的值// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}state := wg.state.Add(uint64(delta) << 32)v := int32(state >> 32)w := uint32(state)if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(&wg.sema))}if v < 0 {panic("sync: negative WaitGroup counter")}if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.  //当等待的协程大于0,设置计数器>0// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.if wg.state.Load() != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.wg.state.Store(0)for ; w != 0; w-- {runtime_Semrelease(&wg.sema, false, 0)}
}

Add()方法中在结构以WaitGroup{}内部计数器上加上delta,delta可以是负数;如果计数器变为0,那么等待的所有groutine都会被释放;
如果计数器小于0,则会出发panic;

注意: Add()方法参数为正数时的调用应该在Wait()之前,否则如果等待的所有groutine都会被释放(或者没有被全部释放),那么可能只会等待很少的goroutine完成;

通常我们应该在创建新的Goroutine或者其他应该等待的事件之前调用;

结束时应该调用**Done()**方法

Done()用于减少WaitGroup计数器的值,应该在Goroutine的最后执行;

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}

Wait()方法阻塞Goroutine 直到WaitGroup计数减为0

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {if race.Enabled {race.Disable()}for {state := wg.state.Load()v := int32(state >> 32) //低32位用于统计等待数w := uint32(state)if v == 0 { //如果等待数位0,那么释放所有的Goroutine// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.if wg.state.CompareAndSwap(state, state+1) {  //CAS操作if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(&wg.sema))}runtime_Semacquire(&wg.sema)if wg.state.Load() != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}

一个代码Demo


func main() {var sy sync.WaitGroupfmt.Printf("%T\n", sy)fmt.Println(sy)//增加10个sy.Add(5)rand.Seed(time.Now().UnixNano())go WaitGroupTest(1, &sy)go WaitGroupTest(2, &sy)go WaitGroupTest(3, &sy)go WaitGroupTest(4, &sy)go WaitGroupTest(5, &sy)sy.Wait()defer fmt.Println("main exit")
}
func WaitGroupTest(num int, sy *sync.WaitGroup) {for i := 0; i < 5; i++ {fmt.Printf("第%d号子goroutine,%d \n", num, i)time.Sleep(time.Second)}sy.Done()
}

当我们往WaitGroup中添加协程时要在定义协程之前
运行结果:
在这里插入图片描述

小结:
1,声明一个WaitGroup,
2,调用Add添加期望的计数
3,构建协程
4,等待所有协程运行完成后主协程才退出;

当我们注销掉sy.Done(),再次运行 会出现下面的结果–死锁了
在这里插入图片描述
当我们调整一下for循环中数量:
在这里插入图片描述
在这里插入图片描述

//只读 <-chan  只写的 chan <-func ChanWaitGroup(ch chan int, sy *sync.WaitGroup) {for i := 0; i < 10; i++ {//通道放入数据ch <- i}defer close(ch) //要关闭通道defer sy.Done() //执行完毕要标记一下执行完毕计数-1
}func main() {var wt sync.WaitGroupvar ch chan intch = make(chan int)wt.Add(3)//这里可以放入一些需要执行的函数,这些函数之间有关联,需要都执行完毕后在执行别的代码go ChanWaitGroup(ch, &wt)for i2 := range ch {fmt.Println(i2)}time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) //纯粹是为了复习go WaitGroupTest(1, &wt) //go WaitGroupTest(2, &wt) //wt.Wait()fmt.Println("main exit")
}

所有子Goroutine运行结束以后主Goroutine才退出。

互斥锁

在Go中互斥锁也是一个结构体:

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
//
// In the terminology of the Go memory model,
// the n'th call to Unlock “synchronizes before” the m'th call to Lock
// for any n < m.
// A successful call to TryLock is equivalent to a call to Lock.
// A failed call to TryLock does not establish any “synchronizes before”
// relation at all.
type Mutex struct {state int32sema  uint32
}

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和Goroutine无关,可以由不同的Goroutine加锁和解锁。

再看一下 加锁与解锁的源码:

// Lock locks m.   锁住 m 
// If the lock is already in use, the calling goroutine blocks until the mutex is available.   //如果该对象被锁住了,那么就阻塞,直到m解锁
func (m *Mutex) Lock() {// Fast path: grab unlocked mutex.if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// Slow path (outlined so that the fast path can be inlined)m.lockSlow()
}// Unlock unlocks m. //解锁m 
// It is a run-time error if m is not locked on entry to Unlock.   如果m没有被锁住就会报error
// A locked Mutex is not associated with a particular goroutine.  //锁与协程无关
// It is allowed for one goroutine to lock a Mutex and then  arrange for another goroutine to unlock it. //允许一个协程加锁,另一个协程解锁
func (m *Mutex) Unlock() {if race.Enabled {_ = m.staterace.Release(unsafe.Pointer(m))}// Fast path: drop lock bit.new := atomic.AddInt32(&m.state, -mutexLocked)if new != 0 {// Outlined slow path to allow inlining the fast path.// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.m.unlockSlow(new)}
}//尝试加锁
// TryLock tries to lock m and reports whether it succeeded. 
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem  in a particular use of mutexes. //很少用tryLock,而TryLock的使用通常表明在特定的互斥锁使用中存在更深层次的问题。
func (m *Mutex) TryLock() bool {old := m.stateif old&(mutexLocked|mutexStarving) != 0 {return false}// There may be a goroutine waiting for the mutex, but we are// running now and can try to grab the mutex before that// goroutine wakes up.if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {return false}if race.Enabled {race.Acquire(unsafe.Pointer(m))}return true
}

同时我们根据源码可以直到,如果要对一个对象的方法加锁/解锁,可以在结构体中声明一个匿名对象,比如下面这样:

type ATR struct {sync.Mutex
}func (a ATR) LockT() {}func main() {atr := ATR{}atr.Mutex.Lock()atr.LockT()
}

实际问题–售票,用go代码实现:

var tickets = 10
var wg sync.WaitGroup
var sm sync.Mutexfunc saleTickets(winname string, swg *sync.WaitGroup) {for {//上锁sm.Lock()if tickets > 0 { //如果有票tickets-- //卖票time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)} else {fmt.Println(winname, "窗口---票售完")sm.Unlock()break}sm.Unlock()}defer swg.Done()
}func main() {wg.Add(10)for i := 0; i < 10; i++ {go saleTickets("火车站", &wg)}wg.Wait()
}

实际就是加锁解锁方法的应用;

读写锁

在java中有关于读写锁的一些类和方法,在go中也有读写锁的一些API;

首先我们来看一下

未完待续

http://www.lryc.cn/news/91083.html

相关文章:

  • Flask知识点2
  • R语言生物群落(生态)数据统计分析与绘图(从数据整理到分析结果展示)
  • 代码随想录训练营Day58| 739. 每日温度 496.下一个更大元素 I
  • 设计模式-命令模式
  • 软考——下午题部分,例题一,二,三,六
  • 关于render: h => h(App)的解释
  • flask实现简易图书管理系统
  • 2021 年全国大学生物联网设计竞赛(华为杯)全国总决赛获奖名单
  • 操作系统复习2.3.5-管程
  • List Set Map Queue Deque 之间的区别是什么?
  • unity行为决策树实战详解
  • Spring学习记录
  • 模板方法-
  • [Kubernetes] - RabbitMQ学习
  • swagger页面 doc.html出不来,swagger-ui/index.html能出来
  • IEEE802.3和IEEE802.11的分类(仅为分类)
  • c# cad二次开发通过获取excel数据 在CAD绘图,将CAD属性导出到excel
  • LLM之高性能向量检索库
  • 实体类注解
  • 常见数据结构种类
  • linux高级---k8s中的五种控制器
  • 记一次udp服务性能优化经历
  • uniapp和VueI18n多语言H5项目语言国际化功能搭建流程
  • C# | 凸包算法之Jarvis,寻找一组点的边界/轮廓
  • SpringBoot接收请求参数的方式
  • MKS SERVO4257D 闭环步进电机_系列5 CAN指令说明
  • 安捷伦E4440A(Agilent) e4440a 3HZ-26.5G频谱分析仪
  • 华为OD机试真题 Java 实现【最长子字符串的长度】【2022Q4 100分】,附详细解题思路
  • 【iOS】--对象的底层结构
  • 高并发内存池设计_内存池