当前位置: 首页 > news >正文

【Golang】Go语言编程思想(六):Channel,第六节,并发编程模式

并发模式

下例重新对 channel 的用法进行回顾:

package mainimport ("fmt""math/rand""time"
)func msgGen(name string) chan string {c := make(chan string)go func(name string) { // 在这个 goroutine 当中向外发送数据i := 0for {time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)c <- fmt.Sprintf("service %s: message %d", name, i) // 生成消息, 传递给 channeli++}}(name)return c
}func main() {m1 := msgGen("service 1") // msgGen() 是一个生成器, 它会生成消息m2 := msgGen("service 2") // m1 和 m2 是两个相互独立的产生消息的服务// 生成的消息是哪里来的呢? 答案就在 msgGen() 的 go func() 当中// 生成的消息怎么读呢? 使用 for 开启无限循环for {fmt.Println(<-m1) // 使用 <- 从 channel 接收消息fmt.Println(<-m2)}
}

在上述代码当中,我们定义了一个消息生成器 msgGen,它返回的是 chan string,即收发类行为 string 的 channel。

在 main 函数中,我们定义了两个消息接收器,分别是 m1 和 m2,可以将 m1 和 m2 看作是与 msgGen 这个服务进行交互的句柄(handle)。

运行上述程序,得到的输出是交替的,显然可以进一步使用 select 来进行改进。

实际上同时等待多个服务有两种方法,一种方法是新建一个专门接受各个服务器消息的 channel,再从这个 channel 读取数据:

func fanIn(c1, c2 chan string) chan string {c := make(chan string)// 开启两个 goroutinego func() {for { // 第一个 goroutine 将 c1 的数据送给 cc <- <-c1}}()go func() {for { // 第二个 goroutine 将 c2 的数据送给 cc <- <-c2}}()return c
}func main() {m1 := msgGen("service 1") // msgGen() 是一个生成器, 它会生成消息m2 := msgGen("service 2") // m1 和 m2 是两个相互独立的产生消息的服务m := fanIn(m1, m2)		  // 使用 fanIn 接受两个 chan 的数据for {fmt.Println(<-m)}
}

还可以使用 select 来实现上面的 fanIn,优点是不需要再为每一个 channel 新开一个 goroutine:

func fanInBySelect(c1, c2 chan string) chan string {c := make(chan string)go func() {for {select { // 使用 select 接受多个 channel 的数据case m := <-c1: // 如果 c1 有数据,c <- m // 将数据送给 c1case m := <-c2: // 如果 c2 有数据c <- m}}}()return c
}

可以对我们初始的 fanIn 进行修改,使它可以接受任意数量的 channel 并对 channel 当中的信息进行汇总:

func fanIn(chans ...chan string) chan string {// 由于 channel 也是一等公民, 因此可以使用上述 ... 的方式来输入 chansc := make(chan string)for _, ch := range chans {go func(in chan string) {for {c <- <-in}}(ch)	// 一定要显式地将 ch 作为参数输入到 goroutine 当中}return c
}func main() {m1 := msgGen("service 1") // msgGen() 是一个生成器, 它会生成消息m2 := msgGen("service 2") // m1 和 m2 是两个相互独立的产生消息的服务// 生成的消息是哪里来的呢? 答案就在 msgGen() 的 go func() 当中// 生成的消息怎么读呢? 使用 for 开启无限循环m3 := msgGen("service 3")m := fanIn(m1, m2, m3)for {fmt.Println(<-m)}
}

测试结果如下:
在这里插入图片描述

并发任务控制

基于 Golang 的并发编程,可以实现下述的几种并发控制:

  • 非阻塞等待;
  • 超时机制;
  • 任务中断/退出;
  • 优雅退出;

非阻塞等待

基于 select 的 default 实现非阻塞等待。编写一个名为 nonBlockingWait 的函数,输入是 chan string,输出是 string 和 bool。如果 chan 有输出值,使用 select 返回输出值和 true,否则使用 default 返回空串和 false:

func nonBlockingWait(c chan string) (string, bool) {// 实现非阻塞等待select {case m := <-c:return m, true // 等到了数据default: // 基于 default 实现非阻塞的等待return "", false // 只要没有等到, 就输出空串和 false}
}func main() {m1 := msgGen("service 1") // msgGen() 是一个生成器, 它会生成消息m2 := msgGen("service 2")for {fmt.Println(<-m1)if m, ok := nonBlockingWait(m2); ok {fmt.Println(m)} else {fmt.Println("no message from service 2")}}
}

超时机制

使用 time.Duration 和 time.After 来实现超时机制:

func timeoutWait(c chan string, timeout time.Duration) (string, bool) {select {case m := <-c: // 等到了 chan string 的数据return m, truecase <-time.After(timeout):return "", false}
}
func main() {m1 := msgGen("service1")for {if m, ok := timeoutWait(m1, 2*time.Second); ok {fmt.Println(m)} else {fmt.Println("timeout")}}
}

当超出规定的时间 2s 仍然没有数据发送到 m1,就会输出 timeout:
在这里插入图片描述

任务中断/退出

假定我们只希望 main 函数当中只有 5s 在接收信息:

func main() {done := make(chan struct{})			// 使用 done 通知 goroutine 主线程即将结束m1 := msgGen("service1", done)		// 将 done 传递给 channelfor i := 0; i < 5; i++ {if m, ok := timeoutWait(m1, time.Second); ok {fmt.Println(m)} else {fmt.Println("timeout")}}done <- struct{}{}time.Sleep(time.Second)
}func msgGen(name string, done chan struct{}) chan string {c := make(chan string)go func(name string) { // 在这个 goroutine 当中向外发送数据i := 0for {select {case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):c <- fmt.Sprintf("service %s: message %d", name, i)case <-done:	// 一旦接收到 done 的消息, 服务将结束fmt.Println("Done")return}i++}}(name)return c
}

优雅退出

将 done 变为双向的 channel 即可实现。当 goroutine 当中的 done 接收到停止信号时,开始优雅退出,退出完成后,done 再向外部发送信号告知退出完毕:

func msgGen(name string, done chan struct{}) chan string {c := make(chan string)go func(name string) { // 在这个 goroutine 当中向外发送数据i := 0for {select {case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):c <- fmt.Sprintf("service %s: message %d", name, i)case <-done:// 开始优雅退出fmt.Println("cleaning up")time.Sleep(2 * time.Second)fmt.Println("cleaning done")done <- struct{}{}			// 退出完毕, 发送信号通知外部return}i++}}(name)return c
}func main() {done := make(chan struct{})m1 := msgGen("service1", done)for i := 0; i < 5; i++ {if m, ok := timeoutWait(m1, time.Second); ok {fmt.Println(m)} else {fmt.Println("timeout")}}done <- struct{}{}<-done				// 接收到外部停止信号之后才继续向下进行
}
http://www.lryc.cn/news/502020.html

相关文章:

  • unity打包web,如何减小文件体积,特别是 Build.wasm.gz
  • go引入skywalking
  • 大华DSS数字监控系统 attachment_downloadAtt.action 任意文件下载漏洞复现
  • qt 封装 调用 dll
  • Python使用Selenium库获取 网页节点元素、名称、内容的方法
  • 系统安全——访问控制访问控制
  • SQL Server 数据库还原到某个时点(完整恢复模式)
  • 埃隆马斯克X-AI发布Grok-2大模型,快来体验~
  • Python工厂设计模式:简化对象创建
  • 【隐私计算篇】隐私集合求交(PSI)原理深入浅出
  • 工作中常用的8种设计模式
  • Qwen 论文阅读记录
  • 自动驾驶:百年演进
  • SSM 校园一卡通密钥管理系统 PF 于校园图书借阅管理的安全保障
  • 什么叫中间件服务器?
  • 【docker】12. Docker Volume(存储卷)
  • SpringBoot【八】mybatis-plus条件构造器使用手册!
  • OpenAI直播发布第4天:ChatGPT Canvas全面升级,免费开放!
  • 自学高考的挑战与应对:心理调适、学习方法改进与考试技巧提升
  • 2024年12月11日Github流行趋势
  • Next.js配置教程:构建自定义服务器
  • SpringCloud 题库
  • 基于Filebeat打造高效日志收集流水线
  • 《HTML 的变革之路:从过去到未来》
  • 快速了解 Aurora DSQL
  • 计算机视觉与医学的结合:推动医学领域研究的新机遇
  • Scala的隐式对象
  • PageHelper自定义Count查询及其优化
  • 【数据结构】哈夫曼树
  • springboot422甘肃旅游服务平台代码-(论文+源码)_kaic