当前位置: 首页 > news >正文

go中的并发

goruntine(协程)

每一个并发的执行单元叫做一个goruntine,要编写一个并发任务,可以在函数名前加go关键字,就能使这个函数以协程的方式运行,

如:go 函数名(函数参数)、

如果函数有返回值,返回值会被忽略。所以当你用go关键字后,主进程调用函数的时候函数的返回值就没有和主进程进行数据交换,而只能用channel

线程和协程的区别:

区别1:线程有固定的栈,基本都是2MB,都是固定分配的;这个栈用于保存局部变量,在函数切换时使用。对于协程来说,固定的栈可能会导致资源浪费,go采用了动态收缩扩张收缩的策略,初始化为2KB,最大可扩张到1GB

区别2:线程切换要陷入内核,进行上下文切换,而协程在用户态由协程调度器完成,不用陷入内核

demo:协程的基本使用

package mainimport ("fmt""time"
)func Task1() {for {fmt.Println(time.Now().Format("10:48:00"), "正在处理task1的任务")time.Sleep(time.Second * 3)}
}func Task2() {for {fmt.Println(time.Now().Format("10:48:00"), "正在处理task2的任务")time.Sleep(time.Second * 1)}}func main() {go Task1()go Task2()for {fmt.Println(time.Now().Format("10:48:00"), "正在处理主进程的任务")time.Sleep(time.Second * 2)}
}

demo2:

package mainimport ("fmt""time"
)/*
当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们称之为main goroutine。下面这个程序运行后不会有任何输出,因为运行go task1(),程序立刻返回到main函数,main函数后没有任何代码逻辑,程序判断为执行完毕,终止所有协程。
如果要让task1函数执行,可以在main函数加上一些等待逻辑,确定子协程执行完毕后结束main函数。如sleep()
*/func task1() {for {fmt.Println(time.Now().Format("10:48:00"), "正在处理task1的任务")time.Sleep(time.Second * 3)}
}func main() {go task1()time.Sleep(time.Second * 100)
}

demo3: 使用匿名函数创建goruntine

package mainimport ("fmt""time"
)/*
使用匿名函数创建goruntine,格式如下:
go func(参数列表){函数体
}(调用参数列表)*/func main() {go func() {for {fmt.Println(time.Now().Format("11:17:00"), "正在处理task1的任务!")time.Sleep(time.Second * 3)}}()time.Sleep(time.Second * 100)
}

channel

go中推荐使用channel作为goruntine之间同步和通信的手段

常见写法:var channelName chan T

含义:用chan关键字声明了一个channel变量,并指定了传输的数据类型T

channel的发送和接收:channel作为一个队列,收发数据时按先入先出的原则,同一时刻内只能允许一个goruntine访问channel来发送和接受数据

发送:channel<-val

含义:表示将var发送到channel中,如果channel中数据被填满之后会阻塞当前goruntine

接受:val<-channel

含义:表示从channel中读取数据赋值给var上,如果channel中没有数据,会阻塞读取的goruntine直到有数据被放到channel中。

注:可以在读取channel时立刻返回,如 var,ok:=<-channel ,此时检查ok是否为true,用来判断是否读到了有效数据。

初始化:ch:=make(chan T,sizeofChan)

注:

  • 在创建channel时要指定channel传输的数据类型,长度是可选的。
  • 如果不指定数据类型,会导致向channel发送数据的goruntine被阻塞,直到数据被读取
  • 如果指定了长度,表示是有缓冲的channel,在缓冲区未满时发送给数据不会被阻塞。
  • 无论channel是否携带缓冲区,读取的goroutine都会被阻塞,直到channel中有数据可被读取。

demo:下面通过一个例子演示goruntine和channel配合,创建一个channel用于在两个goruntine中收发数据,其中一个goruntine从命令行读取输入发送到channel中,另一个goruntine循环的从channel中读取数据并输出

import ("bufio""fmt""os"
)//从channel中读数据进行打印
func printInput(ch chan string) {for val1 := range ch {//读取到结束符合if val1 == "EOF" {break}fmt.Printf("input is: %s\n", val1)}
}func main() {//创建一个无缓冲的channelch := make(chan string)go printInput(ch)//从命令行读取输入scanner := bufio.NewScanner(os.Stdin)for scanner.Scan() {var1 := scanner.Text()//将读到的命令行发送到channelch <- var1if var1 == "EOF" {fmt.Println("End the game")break}}//关闭channeldefer close(ch)
}

运行结果:

i am cc
input is: i am cc
eof
input is: eof
EOF
End the game

上述例子的代码中,我们通过for:range语法从channel中循环读取数据:当channel中没有数据时,printInput的goroutine将会被阻塞。

代码的最后,我们还通过defer close(ch)关闭了创建的channel,需要注意的是在channel关闭后不允许再往通道中放入数据,不然会抛出panic;

而再从关闭的channel读取数据或者之前从channel读取数据并被阻塞的goroutine将会接收到零值,直接返回。

demo2:带缓冲区的channel

package mainimport ("fmt""time"
)/*
带缓冲区的channel:创建channel时指定了channel的长度,那么channel将会拥有缓冲区,goroutine在缓冲区未满时往channel发送数据将不会被阻塞。*/func consume(ch chan int) {//线程休息30s再从channel中读数据time.Sleep(time.Second * 30)<-ch
}func main() {//创建一个长度为2的channelch := make(chan int, 2)go consume(ch)ch <- 0ch <- 1//发送数据不被阻塞fmt.Println("I am free!")ch <- 2fmt.Println("I can not go there within 100s!")time.Sleep(time.Second*2)
}

运行结果:30s之内只输出I am free! ,这段时间内是阻塞的,30s之后协程从channel读数据后,再往里面发送数据2是不阻塞的,所以输出了I can not go there within 30s!

I am free!
I can not go there within 30s!

除了声明双向channel,.我们还可以声明单向的channel,即只能从channel发送数据或者只能从channel中读取数据,代码如下所示:

ch := make(chan T)
// 声明只能发送的通道
var chInput chan <- int = ch// 声明只能读取的通道
var chOutput int <- chan = ch

demo3:select使用

运行结果:

get value 0 from ch1
get value 1 from ch1 
get value 10 from ch2
get value 11 from ch2
get value 12 from ch2
get value 2 from ch1 
get value 3 from ch1 
get value 13 from ch2
get value 4 from ch1 
get value 5 from ch1 
get value 6 from ch1
get value 14 from ch2
get value 15 from ch2
get value 16 from ch2
get value 7 from ch1
get value 8 from ch1
get value 9 from ch1
get value 17 from ch2
get value 18 from ch2
get value 19 from ch2
time out

当然每次运行结果都可能不一样,因为 goroutine 调度的不确定性。

上述代码中,我们通过select多路复用分别从chl和ch2中读取数据,如果多个case语句中的ch同时到达,那么select将会运行一个伪随机算法随机选择一个case。在最后的case中我们设定可接受的最大时长为2s,如果时间超过2s,将从select中退出。

由于channel的阻塞是无法被中断的,所以这是一种有效地从阻塞的channel中超时返回的小技巧。

sync包

go中除了使用channel进行goruntine之间的同步和通信操作外,还可以使用sync包提供的并发工具类主要有7种:

  • Mutex 互斥锁
  • RWMutex 读写锁
  • WaitGroup 并发等待组
  • Map 并发安全字典
  • Cond 同步等待条件
  • Once 只执行一次
  • Pool 临时对象池

这节讲解一下互斥锁的用法:sync.Mutex能够保证同一个时间段内只有一个goruntine持有锁,这就能保证在某一个时间段内有且仅有一个goruntine访问共享资源,其他想申请锁的goruntine将会被阻塞直到锁被释放,然后重新争抢锁的持有权

demo:使用Sync.Mutex控制多个goruntine串行执行

package mainimport ("fmt""sync""time"
)func main() {var lock sync.Mutexgo func() {//加锁lock.Lock()defer lock.Unlock()fmt.Println("func1 get lock at:" + time.Now().String())time.Sleep(time.Second)fmt.Println("func1 release lock at:" + time.Now().String())}()time.Sleep(time.Second / 10)go func() {lock.Lock()defer lock.Unlock()fmt.Println("func2 get lock at:" + time.Now().String())time.Sleep(time.Second)fmt.Println("func2 release lock at:" + time.Now().String())}()//等待所有goruntine执行完毕time.Sleep(time.Second * 4)
}

读写锁的用法:

  • 为了保证同一时间段有多个goruntine访问同一资源,要满足以下条件:
  • 同一时间段只能有一个goruntine获取到写锁
  • 同一时间段可以有任意多个goruntine获取到读锁
  • 同一时间段只能存在读锁或写锁(读锁和写锁互斥)

demo:sync.RWMutex 允许多读和单写的案例

package mainimport ("fmt""strconv""sync""time"
)var rwLock sync.RWMutexfunc main() {//获取读锁for i := 0; i < 5; i++ {go func(i int) {rwLock.RLock()defer rwLock.RUnlock()fmt.Println("read func" + strconv.Itoa(i) + " get rlock at:" + time.Now().String())time.Sleep(time.Second)}(i)}time.Sleep(time.Second / 10)//获取写锁for i := 0; i < 5; i++ {go func(i int) {rwLock.Lock()defer rwLock.Unlock()fmt.Println("write func" + strconv.Itoa(i) + " get wlock at" + time.Now().String())time.Sleep(time.Second)}(i)}//保证所有的goruntine执行结束time.Sleep(time.Second * 10)
}

运行结果:

read func4 get rlock at:2023-03-14 14:24:02.5203537 +0800 CST m=+0.003223301
read func1 get rlock at:2023-03-14 14:24:02.5203537 +0800 CST m=+0.003223301
read func2 get rlock at:2023-03-14 14:24:02.5203537 +0800 CST m=+0.003223301
read func3 get rlock at:2023-03-14 14:24:02.5203537 +0800 CST m=+0.003223301
read func0 get rlock at:2023-03-14 14:24:02.5203537 +0800 CST m=+0.003223301
write func0 get wlock at2023-03-14 14:24:03.5513528 +0800 CST m=+1.034222401
write func4 get wlock at2023-03-14 14:24:04.5535884 +0800 CST m=+2.036458001
write func2 get wlock at2023-03-14 14:24:05.5637904 +0800 CST m=+3.046660001
write func1 get wlock at2023-03-14 14:24:06.5747967 +0800 CST m=+4.057666301
write func3 get wlock at2023-03-14 14:24:07.5782438 +0800 CST m=+5.061113401

从输出的结果可以看出,在写锁没有被获取时,所有read goroutine可以同时申请到读锁,如read func0到read func4几乎在同一时间点获取到读锁;

而申请写锁的goroutine必须等到没有任何的读锁和其他写锁存在时才能申请成功,如write func0需要等到readfunc最后一个goroutine释放读锁才能申请到写锁,其他申请写锁的write func需要等待前一个write func释放写锁后才能重新争抢写锁,它们获取到写锁的时间大约相差1秒,这刚好是 每个write func 持有写锁的时间

demo:sync.WaitGroup使用

package mainimport ("fmt""strconv""sync""time"
)/*
sync.WaitGroup使用:
使用sync.WaitGroup的goruntine会等预设好数量的goruntine都提交执行结束后,才会继续往下执行代码*/func main() {var waitGroup sync.WaitGroup//设置等待goruntine数量为5waitGroup.Add(5)for i := 0; i < 5; i++ {go func(i int) {fmt.Println("work" + strconv.Itoa(i) + " is done at " + time.Now().String())time.Sleep(time.Second)waitGroup.Done()}(i)}waitGroup.Wait()fmt.Println("all works are done at:" + time.Now().String())
}

demo:sync.Map的使用

package mainimport ("fmt""strconv""sync"
)/*
sync.Map的使用:
Go语言中原生的Map并不是并发安全的,在多个goroutine同时往Map中添加数据时,可能会导致部分添加数据的丢失,为了避免这种情况,
在Go语言l.9版本之前,我们需要结合sync.RWMutex和Map实现并发安全的字典。
Go语言1.9之后提供了sync.Map,相对于原生的Map,它只提供以下接口:*/var syncMap sync.Map
var waitGroup sync.WaitGroupfunc main() {routineSize := 5//设置等待goruntine数量为5waitGroup.Add(routineSize)//并发添加数据for i := 0; i < routineSize; i++ {go addNumber(i * 10)}//开始等待waitGroup.Wait()var size int//统计数量syncMap.Range(func(key, value interface{}) bool {size++fmt.Println("key-value pair is ", key, value)return true})fmt.Println("syncMap current size is " + strconv.Itoa(size))//获取键为0的值value, ok := syncMap.Load(0)if ok {fmt.Println("key 0 has value ", value, " ")}
}func addNumber(begin int) {//往syncMap中放入数据for i := begin; i < begin+3; i++ {syncMap.Store(i, i)}//通知数据已添加完毕waitGroup.Done()
}

补充学习资料:关于golang的并发同步与安全 - 9ong (可以补充学习)

并发同步之sync.WaitGroup

golang经常会有多协程并行的场景,而我们又需要这些协程全部结束后,继续完成main协程的后续行为,这个时候,我们需要一个谁来统计所有协程的运行结果,并通知main协程,不能仅仅依靠timeout、sleep的方式硬编写main协程等待其他协程时间,无法完美预估其他协程最晚完成时间。

所以就有了sync.WaitGroup,sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。比如启动了5个并发任务时,就将计数器值增加5,每个任务完成时通过调用Done()方法将计数器减1,当计数器值为0时,表示所有并发任务已经完成,通过调用Wait()来等待并发任务执行完,达到main协程知道所有协程任务完成的效果。

package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc goodjob(num int) {defer wg.Done()fmt.Println(num, "good job.")
}
func main() {for i := 0; i < 5; i++ {wg.Add(1)go goodjob(i)}fmt.Println("main goroutine do something.")wg.Wait()fmt.Println("main goroutine end.")
}

使用sync.WaitGroup后的输出:

main goroutine do something.
1 good job.
0 good job.
2 good job.
3 good job.
4 good job.
main goroutine end.

如果不使用WaitGroup,将不会看到goodjob协程的输出打印,只有main协程的输出,因为main协程并不会去等待其他协程,而是先执行并退出main协程,一旦退出main协程,其他协程也会被销毁(在销毁前还来不及执行,因为需要一些时间给GMP调度)

并发加载之sync.once

可能不同协程会加载相同的静态资源,在高并发场景下,这些资源其实只需要加载一次就可以,比如配置、关闭一次通道、连接远端资源等,sync包提供了Once解决方案,一次只执行一次的解决方案。

package mainimport ("errors""fmt""sync"
)type subConfig struct {url  stringsrc  stringdesc string
}var configs map[string]subConfig
var loadOnce sync.Oncefunc loadConfigs() {configs = map[string]subConfig{"icons":   loadIcons(),"ads":     loadAds(),"banners": loadBanners(),}
}func getConfig(key string) (res subConfig, err error) {//这里如果我们判断nil的方式来规避并发的话,是不安全的// if configs == nil {//  loadConfigs()// }//所以我们换成了sync.Once的Do方法来控制并发安全及避免通过锁的机制来控制资源并发访问带来的性能问题loadOnce.Do(loadConfigs)res, ok := configs[key]if !ok {errors.New("key invalid.")}return
}func loadIcons() subConfig {//可能从文件、缓存、数据库等资源获取icons := subConfig{src:  "http://www.9ong.com/xxx.png",url:  "http://www.9ong.com/",desc: "9ong icon desc",}return icons
}func loadBanners() subConfig {//可能从文件、缓存、数据库等资源获取banners := subConfig{src:  "http://www.9ong.com/xxx.png",url:  "http://www.9ong.com/",desc: "9ong banner desc",}return banners
}func loadAds() subConfig {//可能从文件、缓存、数据库等资源获取ads := subConfig{src:  "http://www.9ong.com/xxx.png",url:  "http://www.9ong.com/",desc: "9ong ads desc",}return ads
}func main() {//并发调用getConfigads, err := getConfig("ads")if err != nil {fmt.Println(err)} else {fmt.Println(ads)}
}

在getConfig函数中,我们通过判断configs这个全局变量是否为nil,决定是否重新加载configs数据,在这种情况下就会出现即使判断了configs不是nil,也不意味着configs变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化加载configs全局资源变量的时候不会被其他的协程读写,但是这样做又会引发性能问题。

sync.Once内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

所以getConfig函数改造后,通过loadOnce.Do(loadConfigs)来实现互斥加载。

解决了并发加载问题,还不会因为多次加载浪费时间和空间及引入锁等机制带来的性能问题。

并发安全与锁之sync.mutex

package mainimport ("fmt""sync"
)var love int32 = 10000 //公共资源love
var wg sync.WaitGroupfunc getLove() {defer wg.Done()for i := 0; i < 1000; i++ {love = love - 1}fmt.Println("love left:", love)
}
func main() {for children := 0; children < 10; children++ {wg.Add(1)go getLove()}wg.Wait()fmt.Println("love is:", love)
}

由于启动了10个协程,这些协程存在竞争love资源的可能(多试几次或把初始值和循环数调大点),可能会出现以下的输出,其实也就是电商里说的超卖问题,按照逻辑love最后应该为0,但实际情况是love还剩下3045:

I:\src\go\src\helo>go run test.go
love left: 7763
love left: 8763
love left: 9000
love left: 6763
love left: 6452
love left: 5452
love left: 5220
love left: 4220
love left: 4045
love left: 3045
love is: 3045

超卖的问题在于并发不安全,并发时对临界资源的读写不安全,我们需要考虑在读写临界资源时,将并发/并行转换成串行,通常采用加锁的机制。

互斥锁(完全互斥)是一种常用的控制共享资源访问的方法,它能够保证同时只有一个协程可以访问共享资源。golang中使用sync包的Mutex类型来实现互斥锁:

package mainimport ("fmt""sync"
)var love int32 = 10000 //公共资源love
var wg sync.WaitGroup
var mu sync.Mutexfunc getLove() {defer wg.Done()mu.Lock()for i := 0; i < 1000; i++ {love = love - 1}mu.Unlock()fmt.Println("love left:", love)
}
func main() {for children := 0; children < 10; children++ {wg.Add(1)go getLove()}wg.Wait()fmt.Println("love is:", love)
}

前面我们说互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当并发去读取一个资源不涉及资源修改的时候是没有必要加锁的,如果使用完全互斥锁的话,会导致读多的协程会堵塞等待锁的释放,很影响效率,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为读锁和写锁,当一个协程获取读锁之后,其他的协程如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个协程获取写锁之后,其他的协程无论是获取读锁还是写锁都会等待。

也就是说,读写锁互斥,读读不会互斥,协程的读锁,不应其他协程的读锁获取,但影响写锁的获取;而写锁影响所有协程对该资源锁的获取。

编码如何处理呢?

我们只要对共享资源的读加读锁Rlock(),对共享资源的写加写锁lock()

var rwlock sync.RWMutexfunc read(){rwlock.RLock()//read love actionrwlock.RUnlock()
}func write(){rwlock.Lock()//writen love actioinrwlock.Unlock()
}

并发安全之sync.map

golang内置map不是并发安全的,很好理解,map只是一种资源的数据结构而已,所以作为共享资源,其本身当然也是并发不安全的,对于map中key、value的操作也需要做并发安全处理,比如加锁。

虽然加锁可以解决,但golang也提供了一套并发安全的map操作类:sync.Map,其内置了Store、Load、LoadOrStore、Delete、Range等操作方法,这些方法都是并发安全的,可以理解为原子操作。

package mainimport ("fmt""strconv""sync"
)var sm = sync.Map{}
var wg = sync.WaitGroup{}func setMap(i int) {key := strconv.Itoa(i)sm.Store(key, i)value, _ := sm.Load(key)fmt.Printf("k=:%v,v:=%v\n", key, value)wg.Done()}func main() {for i := 0; i < 20; i++ {wg.Add(1)go setMap(i)}wg.Wait()
}

并发安全之atomic

针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。

package mainimport ("fmt""sync""sync/atomic"
)var love int32 = 10000 //公共资源love
var wg sync.WaitGroup
var mu sync.Mutexfunc getLove() {defer wg.Done()for i := 0; i < 1000; i++ {love = love - 1}fmt.Println("love left:", love)
}
func mutexGetLove() {defer wg.Done()mu.Lock()for i := 0; i < 1000; i++ {love = love - 1}mu.Unlock()fmt.Println("love left:", love)
}
func atomicGetLove() {defer wg.Done()for i := 0; i < 1000; i++ {atomic.AddInt32(&love, -1)}fmt.Println("love left:", love)
}
func main() {for children := 0; children < 10; children++ {wg.Add(1)// go getLove() //普通,并发不安全// go mutexGetLove() //互斥锁,并发安全,但性能开销大go atomicGetLove() //原子操作,并发安全,性能优于互斥锁机制}wg.Wait()fmt.Println("love is:", love)
}

atomic包提供了底层的原子级内存操作,虽然其对于同步算法的实现很有效果,但除了某些特殊的底层应用,我们建议使用channel或者sync.包实现同步会更好些。

http://www.lryc.cn/news/149200.html

相关文章:

  • 开启EMQX的SSL模式及SSL证书生成流程
  • 4 | Java Spark实现 WordCount
  • Redis7安装
  • Nginx vs Tomcat:一个高性能Web服务器和Java应用服务器的对决
  • 终端登录github两种方式
  • 【防火墙】防火墙NAT Server的配置
  • 《算法竞赛·快冲300题》每日一题:“简化农场”
  • 【二等奖方案】大规模金融图数据中异常风险行为模式挖掘赛题「冀科数字」解题思路
  • C# List与HashSet的contains()方法查询速度比较
  • 命令执行漏洞复现攻击:识别威胁并加强安全
  • Keepalived实现服务器的高可用性
  • Python程序化交易接口批量获取数据源码
  • 【强化学习】基本概念
  • 0001__安装electron失败 postinstall: `node install.js`
  • Linux测开常用命令总结
  • xml转化为txt数据的脚本,为yolo提供训练
  • 【H5页面嵌入到小程序或APP中实现手机号点击复制和拨号功能】
  • Kubernetes技术--k8s核心技术 configMap
  • Springboot动态修改日志级别
  • 新手将最简单的springboot部署上tomcat出现的意外问题
  • P1177 【模板】排序(Sort排序)
  • 软件测试(黑盒测试、白盒测试、灰盒测试)
  • 昨天面试的时候被提问到的问题集合。
  • 广电运营商三网融合监控运维方案
  • 数据库锁简析
  • 说说广播流与普通流
  • 内卷的本质和大数据在计量经济学领域的运用思考
  • 毕业设计-摄像头识别二维码
  • 封装动态表单组件
  • 提高Python并发性能 - asyncio/aiohttp介绍