当前位置: 首页 > news >正文

go并发之美·多个channel合并/多个数据流合并

多个数据流(来自于不同channel)合并为一个流。

一般用于多个相同性质来源的数据进行合并为一处进行统一处理。

 

目录

背景

实现·赖着不走

变个花样:学成出师


 

背景

最近在重温武侠剧,无意间想到了一些情形然后手痒,想顺手来模拟一下,接着有了此文。

平时可能经常碰到这样的场景,如一个业务逻辑可能会统筹多个/多份不同来源的同类型数据,然后该业务逻辑进行统筹汇总并统一处理汇总后的结果。

以武侠剧为例,杨过受到了来自多位大师的指点学到了多方武功,然后自己自行吸收处理,对于杨过来说,可以学一段停下来自行吸收作罢;也可持续学习持续吸收(变化无穷,乐趣无穷)。

很有意思,一起来体验下。

实现·赖着不走

上代码

// 入口(可能就是拜师流程)
func main() {c1()
}

持续教授武功&持续学习武功

func c1() {process := make(chan struct{})ch1, ch2, ch3 := make(chan string), make(chan string), make(chan string)go func() { // 黄药师的指点for i := 1; ; i++ {data := fmt.Sprintf("a%s", strconv.Itoa(i))fmt.Println("黄药师 教学: ", data)ch1 <- datatime.Sleep(time.Second * 2) // sleep纯属为观察,可不加}}()go func() { // 洪七公的教学for i := 1; ; i++ {data := fmt.Sprintf("b%s", strconv.Itoa(i))fmt.Println("洪七公 教学: ", data)ch2 <- datatime.Sleep(time.Second * 2)}}()go func() { // 欧阳锋的教学for i := 1; ; i++ {data := fmt.Sprintf("c%s", strconv.Itoa(i))fmt.Println("欧阳锋 教学: ", data)ch3 <- datatime.Sleep(time.Second * 2)}}()result := foo1(process, ch1, ch2, ch3)// 杨过学到了来自三位大师的武功(持续学习)for {select {case data := <-result:teacher := "欧阳锋"if string(data[0]) == "a" {teacher = "黄药师"} else if string(data[0]) == "b" {teacher = "洪七公"}fmt.Printf("\t 杨过学到了来自 %s 的武功, 武功是 %s\n", teacher, data)}}
}

杨过具体咋吸收的三位老师的武功?

// chs:多个待合并的channel
func foo1(done chan struct{}, chs ...<-chan string) (_ <-chan string) {result := make(chan string)var wg sync.WaitGroupwriting := func(ch <-chan string) {defer wg.Done()for data := range ch {select {case <-done: // 关闭该ch对应的goroutinereturncase result <- data: // 传递数据到result}}}wg.Add(len(chs))for _, c := range chs {go writing(c) // 每个channel各自操作}go func() {wg.Wait() // 等所有chs被耗尽,本次合并完成,相当于所有数据均已复用到了resultfmt.Println("all ch has used")close(result)}()fmt.Println("returning result...")return result
}

speed running:

returning result...
黄药师 教学:  a1
洪七公 教学:  b1
欧阳锋 教学:  c1杨过学到了来自 黄药师 的武功, 武功是 a1杨过学到了来自 洪七公 的武功, 武功是 b1杨过学到了来自 欧阳锋 的武功, 武功是 c1
黄药师 教学:  a2杨过学到了来自 黄药师 的武功, 武功是 a2
欧阳锋 教学:  c2杨过学到了来自 欧阳锋 的武功, 武功是 c2
洪七公 教学:  b2杨过学到了来自 洪七公 的武功, 武功是 b2
欧阳锋 教学:  c3杨过学到了来自 欧阳锋 的武功, 武功是 c3
黄药师 教学:  a3杨过学到了来自 黄药师 的武功, 武功是 a3
洪七公 教学:  b3杨过学到了来自 洪七公 的武功, 武功是 b3
洪七公 教学:  b4杨过学到了来自 洪七公 的武功, 武功是 b4
欧阳锋 教学:  c4杨过学到了来自 欧阳锋 的武功, 武功是 c4
黄药师 教学:  a4杨过学到了来自 黄药师 的武功, 武功是 a4
洪七公 教学:  b5杨过学到了来自 洪七公 的武功, 武功是 b5
黄药师 教学:  a5杨过学到了来自 黄药师 的武功, 武功是 a5
欧阳锋 教学:  c5杨过学到了来自 欧阳锋 的武功, 武功是 c5
洪七公 教学:  b6杨过学到了来自 洪七公 的武功, 武功是 b6
黄药师 教学:  a6杨过学到了来自 黄药师 的武功, 武功是 a6
欧阳锋 教学:  c6杨过学到了来自 欧阳锋 的武功, 武功是 c6
洪七公 教学:  b7杨过学到了来自 洪七公 的武功, 武功是 b7
黄药师 教学:  a7杨过学到了来自 黄药师 的武功, 武功是 a7
欧阳锋 教学:  c7杨过学到了来自 欧阳锋 的武功, 武功是 c7
洪七公 教学:  b8杨过学到了来自 洪七公 的武功, 武功是 b8
黄药师 教学:  a8杨过学到了来自 黄药师 的武功, 武功是 a8
欧阳锋 教学:  c8杨过学到了来自 欧阳锋 的武功, 武功是 c8

可以看到,三位大师不断教武功给杨过,杨过不断学习武功,教啥学啥,而且是持续学习不想下山赖着不走。

 

变个花样:学成出师

那怎样让杨过学几年后出师呢,学成总要出师嘛,这里模拟三位大师总共教完7套功夫后允许杨过学成出师。一起看看:

func main() {//c1()c2()
}var mu sync.Mutex
var learnedNum int // 已教了的功夫数func isTeachOver() bool {res := falsemu.Lock()if learnedNum >= 7 {res = true}mu.Unlock()return res
}

核心逻辑


// 学成出师, 模拟三位大师总共教完7套功夫后允许杨过学成出师
func c2() {process := make(chan struct{})ch1, ch2, ch3 := make(chan string), make(chan string), make(chan string)once := sync.Once{}teachOverFunc := func(teacher string) bool {res := isTeachOver()if res {if teacher == "黄药师" {fmt.Println("!!!黄药师已停止教学")close(ch1) // 黄药师停止教学} else if teacher == "洪七公" {fmt.Println("!!!洪七公已停止教学")close(ch2)} else {fmt.Println("!!!欧阳锋已停止教学")close(ch3)}once.Do(func() {  // 手段很多,任你随意close(process)fmt.Println("process closed")})}return res}go func() { // 黄药师的指点for i := 1; ; i++ {// 教学之前先看看教完了没有over := teachOverFunc("黄药师")if over {return}select {case <-process:  // 三位老师都会看看是不是有人已经交完了最后一套功夫,教完了就不用再教了fmt.Println("黄药师done")returndefault:// 还没教完,继续教mu.Lock()learnedNum++fmt.Println("x1=", learnedNum)mu.Unlock()data := fmt.Sprintf("a%s", strconv.Itoa(i))fmt.Println("黄药师 教学: ", data)ch1 <- datatime.Sleep(time.Second * 1)}}}()go func() { // 洪七公的教学for i := 1; ; i++ {over := teachOverFunc("洪七公")if over {return}select {case <-process:fmt.Println("洪七公done")returndefault:mu.Lock()learnedNum++fmt.Println("x2=", learnedNum)mu.Unlock()data := fmt.Sprintf("b%s", strconv.Itoa(i))fmt.Println("洪七公 教学: ", data)ch2 <- datatime.Sleep(time.Second * 1)}}}()go func() { // 欧阳锋的教学for i := 1; ; i++ {over := teachOverFunc("欧阳锋")if over {return}select {case <-process:fmt.Println("欧阳锋done")returndefault:mu.Lock()learnedNum++fmt.Println("x3=", learnedNum)mu.Unlock()data := fmt.Sprintf("c%s", strconv.Itoa(i))fmt.Println("欧阳锋 教学: ", data)ch3 <- datatime.Sleep(time.Second * 1)}}}()result := foo2(process, ch1, ch2, ch3)// 杨过学到了来自三位大师的武功(学成出师)yangguoLearned := 0 // 杨过学完可以自己核对一下,自己学到的和老师教的功夫数对不对的上
learning:for {select {case data := <-result:if len(data) > 0 {  teacher := "欧阳锋"if string(data[0]) == "a" {teacher = "黄药师"} else if string(data[0]) == "b" {teacher = "洪七公"}fmt.Printf("\t 杨过学到了来自 %s 的武功, 武功是 %s\n", teacher, data)yangguoLearned++  // 真实学到了功夫才+1,否则直接判断}if yangguoLearned == 7 {if isTeachOver() {fmt.Println("三位老师已教完了所有7套功夫 ", yangguoLearned)}fmt.Println("杨过确认学完了所有7套功夫, 山桃熟了7次,秋叶已红透, 可以下山了")break learning}}}fmt.Println("教学与学习流程结束.")time.Sleep(time.Second * 20) // 杨过干点其他啥(仅为了便于观察程序运行控制而模拟继续运行,真正程序业务运行中不会直接停止主线程)}

杨过具体咋吸收的三位老师的武功?

func foo2(done chan struct{}, chs ...<-chan string) (_ <-chan string) {result := make(chan string)var wg sync.WaitGroupwriting := func(ch <-chan string) {defer wg.Done()for data := range ch {select {case <-done:returncase result <- data:}}}wg.Add(len(chs))for _, c := range chs {go writing(c)}go func() {wg.Wait() // 等待秋叶红透,本次教学完成fmt.Println("all ch has used")close(result)}()fmt.Println("returning result...")return result
}

来看看教学与学习结果,是否能成功出师:

returning result...
x3= 1
欧阳锋 教学:  c1杨过学到了来自 欧阳锋 的武功, 武功是 c1
x1= 2
黄药师 教学:  a1杨过学到了来自 黄药师 的武功, 武功是 a1
x2= 3
洪七公 教学:  b1杨过学到了来自 洪七公 的武功, 武功是 b1
x3= 4
欧阳锋 教学:  c2杨过学到了来自 欧阳锋 的武功, 武功是 c2
x1= 5
黄药师 教学:  a2
x2= 6
洪七公 教学:  b2杨过学到了来自 黄药师 的武功, 武功是 a2杨过学到了来自 洪七公 的武功, 武功是 b2
x3= 7
欧阳锋 教学:  c3杨过学到了来自 欧阳锋 的武功, 武功是 c3
三位老师已教完了所有7套功夫  7
杨过确认学完了所有7套功夫, 山桃熟了7次,秋叶已红透, 可以下山了
教学与学习流程结束.
!!!洪七公已停止教学
process closed
!!!黄药师已停止教学
!!!欧阳锋已停止教学
all ch has used

是不是很有意思呢?

 

哈哈,这次先到这。再会!


 

 

 

http://www.lryc.cn/news/33852.html

相关文章:

  • 数据库多租户实现三种方式
  • 单协议 2.4GHz CC2651R31T0RGZR/CC2651R31T0RKPR无线MCU 802.15.4,蓝牙5.2
  • 【项目精选】基于struts+hibernate的采购管理系统
  • 在找docker命令和部署?看这一篇文章就够了。
  • NTLM协议原理分析
  • SOC计算方法:电流积分+开路电压
  • linux mysql启动报错处理方案
  • Qt配置VS的编译环境(以MSVC2015 64bit为例)
  • iOS 9.3.5越狱环境安装配置
  • mac电脑解决Error: command failed: npm install --loglevel error --legacy-peer-deps
  • Java中对象的finalization机制
  • proteus光敏电阻电路的arduino仿真
  • MySql面试精选—慢查询如何优化
  • 一款OutLook信息收集工具
  • java多线程(二一)并发协作生产者消费者设计模式
  • Win YAPI + Jenkins 实现接口自动化测试
  • 【计算机视觉 自然语言处理】什么是多模态?
  • 2023百度面试真题
  • MAC(m1)-VMWare Fushion安装Windows11
  • HTML与CSS简介
  • 基于Java开发幼儿园管理系统项目教程(附源码)
  • 第一次运行vue遇到的问题
  • Clickhouse数据去重
  • 精讲typescript从入门到入土
  • typora-beta-0.11.18版本又提示过期的解决方案
  • WebUI自动化测试框架搭建(二十)-优化:测试对象无法连接或出现异常时,请更新本文作为测试对象
  • 【FATE联邦学习】standalone版Fateboard修改配置
  • 分享一个应急响应web日志:access.log文件分析小工具
  • windows注册服务非常实用
  • 蓝桥dfs专题