go并发之美·多个channel合并/多个数据流合并
多个数据流(来自于不同channel)合并为一个流。
一般用于多个相同性质来源的数据进行合并为一处进行统一处理。
目录
背景
实现·赖着不走
变个花样:学成出师
背景
最近在重温武侠剧,无意间想到了一些情形然后手痒,想顺手来模拟一下,接着有了此文。
平时可能经常碰到这样的场景,如一个业务逻辑可能会统筹多个/多份不同来源的同类型数据,然后该业务逻辑进行统筹汇总并统一处理汇总后的结果。
以武侠剧为例,杨过受到了来自多位大师的指点学到了多方武功,然后自己自行吸收处理,对于杨过来说,可以学一段停下来自行吸收作罢;也可持续学习持续吸收(变化无穷,乐趣无穷)。
很有意思,一起来体验下。
实现·赖着不走
上代码
// 入口(可能就是拜师流程)
func main() {c1()
}
持续教授武功&持续学习武功
func c1() {process := make(chan struct{})ch1, ch2, ch3 := make(chan string), make(chan string), make(chan string)go func() { // 黄药师的指点for i := 1; ; i++ {data := fmt.Sprintf("a%s", strconv.Itoa(i))fmt.Println("黄药师 教学: ", data)ch1 <- datatime.Sleep(time.Second * 2) // sleep纯属为观察,可不加}}()go func() { // 洪七公的教学for i := 1; ; i++ {data := fmt.Sprintf("b%s", strconv.Itoa(i))fmt.Println("洪七公 教学: ", data)ch2 <- datatime.Sleep(time.Second * 2)}}()go func() { // 欧阳锋的教学for i := 1; ; i++ {data := fmt.Sprintf("c%s", strconv.Itoa(i))fmt.Println("欧阳锋 教学: ", data)ch3 <- datatime.Sleep(time.Second * 2)}}()result := foo1(process, ch1, ch2, ch3)// 杨过学到了来自三位大师的武功(持续学习)for {select {case data := <-result:teacher := "欧阳锋"if string(data[0]) == "a" {teacher = "黄药师"} else if string(data[0]) == "b" {teacher = "洪七公"}fmt.Printf("\t 杨过学到了来自 %s 的武功, 武功是 %s\n", teacher, data)}}
}
杨过具体咋吸收的三位老师的武功?
// chs:多个待合并的channel
func foo1(done chan struct{}, chs ...<-chan string) (_ <-chan string) {result := make(chan string)var wg sync.WaitGroupwriting := func(ch <-chan string) {defer wg.Done()for data := range ch {select {case <-done: // 关闭该ch对应的goroutinereturncase result <- data: // 传递数据到result}}}wg.Add(len(chs))for _, c := range chs {go writing(c) // 每个channel各自操作}go func() {wg.Wait() // 等所有chs被耗尽,本次合并完成,相当于所有数据均已复用到了resultfmt.Println("all ch has used")close(result)}()fmt.Println("returning result...")return result
}
speed running:
returning result...
黄药师 教学: a1
洪七公 教学: b1
欧阳锋 教学: c1杨过学到了来自 黄药师 的武功, 武功是 a1杨过学到了来自 洪七公 的武功, 武功是 b1杨过学到了来自 欧阳锋 的武功, 武功是 c1
黄药师 教学: a2杨过学到了来自 黄药师 的武功, 武功是 a2
欧阳锋 教学: c2杨过学到了来自 欧阳锋 的武功, 武功是 c2
洪七公 教学: b2杨过学到了来自 洪七公 的武功, 武功是 b2
欧阳锋 教学: c3杨过学到了来自 欧阳锋 的武功, 武功是 c3
黄药师 教学: a3杨过学到了来自 黄药师 的武功, 武功是 a3
洪七公 教学: b3杨过学到了来自 洪七公 的武功, 武功是 b3
洪七公 教学: b4杨过学到了来自 洪七公 的武功, 武功是 b4
欧阳锋 教学: c4杨过学到了来自 欧阳锋 的武功, 武功是 c4
黄药师 教学: a4杨过学到了来自 黄药师 的武功, 武功是 a4
洪七公 教学: b5杨过学到了来自 洪七公 的武功, 武功是 b5
黄药师 教学: a5杨过学到了来自 黄药师 的武功, 武功是 a5
欧阳锋 教学: c5杨过学到了来自 欧阳锋 的武功, 武功是 c5
洪七公 教学: b6杨过学到了来自 洪七公 的武功, 武功是 b6
黄药师 教学: a6杨过学到了来自 黄药师 的武功, 武功是 a6
欧阳锋 教学: c6杨过学到了来自 欧阳锋 的武功, 武功是 c6
洪七公 教学: b7杨过学到了来自 洪七公 的武功, 武功是 b7
黄药师 教学: a7杨过学到了来自 黄药师 的武功, 武功是 a7
欧阳锋 教学: c7杨过学到了来自 欧阳锋 的武功, 武功是 c7
洪七公 教学: b8杨过学到了来自 洪七公 的武功, 武功是 b8
黄药师 教学: a8杨过学到了来自 黄药师 的武功, 武功是 a8
欧阳锋 教学: c8杨过学到了来自 欧阳锋 的武功, 武功是 c8
可以看到,三位大师不断教武功给杨过,杨过不断学习武功,教啥学啥,而且是持续学习不想下山赖着不走。
变个花样:学成出师
那怎样让杨过学几年后出师呢,学成总要出师嘛,这里模拟三位大师总共教完7套功夫后允许杨过学成出师。一起看看:
func main() {//c1()c2()
}var mu sync.Mutex
var learnedNum int // 已教了的功夫数func isTeachOver() bool {res := falsemu.Lock()if learnedNum >= 7 {res = true}mu.Unlock()return res
}
核心逻辑
// 学成出师, 模拟三位大师总共教完7套功夫后允许杨过学成出师
func c2() {process := make(chan struct{})ch1, ch2, ch3 := make(chan string), make(chan string), make(chan string)once := sync.Once{}teachOverFunc := func(teacher string) bool {res := isTeachOver()if res {if teacher == "黄药师" {fmt.Println("!!!黄药师已停止教学")close(ch1) // 黄药师停止教学} else if teacher == "洪七公" {fmt.Println("!!!洪七公已停止教学")close(ch2)} else {fmt.Println("!!!欧阳锋已停止教学")close(ch3)}once.Do(func() { // 手段很多,任你随意close(process)fmt.Println("process closed")})}return res}go func() { // 黄药师的指点for i := 1; ; i++ {// 教学之前先看看教完了没有over := teachOverFunc("黄药师")if over {return}select {case <-process: // 三位老师都会看看是不是有人已经交完了最后一套功夫,教完了就不用再教了fmt.Println("黄药师done")returndefault:// 还没教完,继续教mu.Lock()learnedNum++fmt.Println("x1=", learnedNum)mu.Unlock()data := fmt.Sprintf("a%s", strconv.Itoa(i))fmt.Println("黄药师 教学: ", data)ch1 <- datatime.Sleep(time.Second * 1)}}}()go func() { // 洪七公的教学for i := 1; ; i++ {over := teachOverFunc("洪七公")if over {return}select {case <-process:fmt.Println("洪七公done")returndefault:mu.Lock()learnedNum++fmt.Println("x2=", learnedNum)mu.Unlock()data := fmt.Sprintf("b%s", strconv.Itoa(i))fmt.Println("洪七公 教学: ", data)ch2 <- datatime.Sleep(time.Second * 1)}}}()go func() { // 欧阳锋的教学for i := 1; ; i++ {over := teachOverFunc("欧阳锋")if over {return}select {case <-process:fmt.Println("欧阳锋done")returndefault:mu.Lock()learnedNum++fmt.Println("x3=", learnedNum)mu.Unlock()data := fmt.Sprintf("c%s", strconv.Itoa(i))fmt.Println("欧阳锋 教学: ", data)ch3 <- datatime.Sleep(time.Second * 1)}}}()result := foo2(process, ch1, ch2, ch3)// 杨过学到了来自三位大师的武功(学成出师)yangguoLearned := 0 // 杨过学完可以自己核对一下,自己学到的和老师教的功夫数对不对的上
learning:for {select {case data := <-result:if len(data) > 0 { teacher := "欧阳锋"if string(data[0]) == "a" {teacher = "黄药师"} else if string(data[0]) == "b" {teacher = "洪七公"}fmt.Printf("\t 杨过学到了来自 %s 的武功, 武功是 %s\n", teacher, data)yangguoLearned++ // 真实学到了功夫才+1,否则直接判断}if yangguoLearned == 7 {if isTeachOver() {fmt.Println("三位老师已教完了所有7套功夫 ", yangguoLearned)}fmt.Println("杨过确认学完了所有7套功夫, 山桃熟了7次,秋叶已红透, 可以下山了")break learning}}}fmt.Println("教学与学习流程结束.")time.Sleep(time.Second * 20) // 杨过干点其他啥(仅为了便于观察程序运行控制而模拟继续运行,真正程序业务运行中不会直接停止主线程)}
杨过具体咋吸收的三位老师的武功?
func foo2(done chan struct{}, chs ...<-chan string) (_ <-chan string) {result := make(chan string)var wg sync.WaitGroupwriting := func(ch <-chan string) {defer wg.Done()for data := range ch {select {case <-done:returncase result <- data:}}}wg.Add(len(chs))for _, c := range chs {go writing(c)}go func() {wg.Wait() // 等待秋叶红透,本次教学完成fmt.Println("all ch has used")close(result)}()fmt.Println("returning result...")return result
}
来看看教学与学习结果,是否能成功出师:
returning result...
x3= 1
欧阳锋 教学: c1杨过学到了来自 欧阳锋 的武功, 武功是 c1
x1= 2
黄药师 教学: a1杨过学到了来自 黄药师 的武功, 武功是 a1
x2= 3
洪七公 教学: b1杨过学到了来自 洪七公 的武功, 武功是 b1
x3= 4
欧阳锋 教学: c2杨过学到了来自 欧阳锋 的武功, 武功是 c2
x1= 5
黄药师 教学: a2
x2= 6
洪七公 教学: b2杨过学到了来自 黄药师 的武功, 武功是 a2杨过学到了来自 洪七公 的武功, 武功是 b2
x3= 7
欧阳锋 教学: c3杨过学到了来自 欧阳锋 的武功, 武功是 c3
三位老师已教完了所有7套功夫 7
杨过确认学完了所有7套功夫, 山桃熟了7次,秋叶已红透, 可以下山了
教学与学习流程结束.
!!!洪七公已停止教学
process closed
!!!黄药师已停止教学
!!!欧阳锋已停止教学
all ch has used
是不是很有意思呢?
哈哈,这次先到这。再会!