当前位置: 首页 > news >正文

golang实现生产者消费者模式

在Go语言中,生产者消费者模式可以通过使用Goroutines和Channels来实现。Goroutines允许并发执行,而Channels则用于在生产者和消费者之间安全地传递数据。
生产者消费者模式的基本思路
生产者:负责生成数据并将其放入一个共享的缓冲区(Channel)。
消费者:从共享的缓冲区中取出数据并进行处理。
同步:使用Channel来同步生产者和消费者之间的操作,确保数据的安全传递。

建立一个channel

package outimport "fmt"type Out struct {data chan interface{}
}var out *Outfunc NewOut() *Out {if out == nil {out = &Out{data: make(chan interface{}, 65535),}}return out
}
func Println(i interface{}) {out.data <- i
}
func (o *Out) OutPut() {//for i := range o.data {//	fmt.Println(i)//	fmt.Println("out put")//}//fmt.Println("结束")for {select {case i := <-o.data:fmt.Println(i)}}
}

一对一

package one_oneimport ("producer-consumer/out""sync"
)type Task struct {ID int64
}func (t *Task) run() {out.Println(t.ID)
}var taskCh = make(chan Task, 10)const taskNum int64 = 10000func producer(wo chan<- Task) {var i int64for i = 1; i <= taskNum; i++ {t := Task{ID: i,}wo <- t}close(wo)
}
func consumer(ro <-chan Task) {for t := range ro {if t.ID != 0 {t.run()}}
}
func Exec() {wg := &sync.WaitGroup{}wg.Add(2)go func(wg *sync.WaitGroup) {defer wg.Done()producer(taskCh)}(wg)go func(wg *sync.WaitGroup) {defer wg.Done()consumer(taskCh)}(wg)wg.Wait()out.Println("执行成功")
}

一对多

package one_manyimport ("producer-consumer/out""sync"
)type Task struct {ID int64
}func (t *Task) run() {out.Println(t.ID)
}var taskCh = make(chan Task, 10)const taskNum int64 = 10000func producer(wo chan<- Task) {var i int64for i = 1; i <= taskNum; i++ {t := Task{ID: i,}wo <- t}close(wo)
}
func consumer(ro <-chan Task) {for t := range ro {if t.ID != 0 {t.run()}}
}
func Exec() {wg := &sync.WaitGroup{}wg.Add(1)go func(wg *sync.WaitGroup) {defer wg.Done()producer(taskCh)}(wg)var i int64for i = 0; i < taskNum; i++ {if i%100 == 0 {wg.Add(1)go func(wg *sync.WaitGroup) {defer wg.Done()consumer(taskCh)}(wg)}}wg.Wait()out.Println("执行成功")
}

多对一

package many_oneimport ("producer-consumer/out""sync"
)type Task struct {ID int64
}func (t *Task) run() {out.Println(t.ID)
}var taskCh = make(chan Task, 10)const taskNum int64 = 10000
const nums int64 = 100func producer(wo chan<- Task, startNum int64, nums int64) {var i int64for i = startNum; i < taskNum+nums; i++ {t := Task{ID: i,}wo <- t}
}
func consumer(ro <-chan Task) {for t := range ro {if t.ID != 0 {t.run()}}
}
func Exec() {wg := &sync.WaitGroup{}pwg := &sync.WaitGroup{}var i int64for i = 0; i < taskNum; i += nums {if i >= taskNum {break}wg.Add(1)pwg.Add(1)go func(i int64) {defer wg.Done()defer pwg.Done()producer(taskCh, i, nums)}(i)}wg.Add(1)go func() {defer wg.Done()consumer(taskCh)}()pwg.Wait()go close(taskCh)wg.Wait()out.Println("执行成功")
}

多对多

package many_manyimport ("fmt""producer-consumer/out""time"
)type Task struct {ID int64
}func (t *Task) run() {out.Println(t.ID)
}var taskCh = make(chan Task, 10)
var done = make(chan struct{})const taskNum int64 = 10000func producer(wo chan<- Task, done chan struct{}) {var i int64for {if i >= taskNum {i = 0}i++t := Task{ID: i,}select {case wo <- t:case <-done:out.Println("生产者退出")return}}
}
func consumer(ro <-chan Task, done chan struct{}) {for {select {case t := <-ro:if t.ID != 0 {t.run()}case <-done:for t := range ro {if t.ID != 0 {t.run()}}out.Println("消费者退出")return}}
}
func Exec() {go producer(taskCh, done)go producer(taskCh, done)go producer(taskCh, done)go producer(taskCh, done)go producer(taskCh, done)go producer(taskCh, done)go consumer(taskCh, done)go consumer(taskCh, done)time.Sleep(time.Second * 5)close(done)close(taskCh)time.Sleep(time.Second * 5)fmt.Println(len(taskCh))
}

主程序入口

package mainfunc main() {//o := out.NewOut()//go o.OutPut()//one_one.Exec()//one_many.Exec()//many_one.Exec()//many_many.Exec()//sig := make(chan os.Signal)//signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)//<-sig
}
http://www.lryc.cn/news/510960.html

相关文章:

  • 自动化测试-Pytest测试
  • Ingress-Nginx Annotations 指南:配置要点全方面解读(下)
  • 【QED】等式构造
  • Kafka数据迁移全解析:同集群和跨集群
  • Debian安装配置RocketMQ
  • vue之axios基本使用
  • 三只脚的电感是什么东西?
  • 【数据库学习笔记】SQL触发器(例题+代码)
  • Unittest02|TestSuite、TestRunner、HTMLTestRunner、处理excel表数据、邮件接收测试结果
  • BAPI_BATCH_CHANGE在更新后不自动更新批次特征
  • 顶会评测集解读-AlignBench: 大语言模型中文对齐基准
  • MySQL外键类型与应用场景总结:优缺点一目了然
  • 【含开题报告+文档+PPT+源码】基于SpringBoot+Vue的网上书店管理系统的设计与实现
  • 力扣面试题 - 40 迷路的机器人 C语言解法
  • ElementPlus 自定义封装 el-date-picker 的快捷功能
  • 二百八十二、ClickHouse——删除Linux中的ClickHouse
  • c++ 命名空间使用规则
  • 从 ELK Stack 到简单 — Elastic Cloud Serverless 上的 Elastic 可观察性
  • Pandas系列|第二期:Pandas中的数据结构
  • Hadoop中MapReduce过程中Shuffle过程实现自定义排序
  • 数位dp-acwing
  • 智慧园区小程序开发制作功能介绍
  • STM32高级 物联网之Wi-Fi通讯
  • LLM预训练recipe — 摘要版
  • 波动理论、传输线和S参数网络
  • nginx-1.23.2版本RPM包发布
  • 如何用WPS AI提高工作效率
  • LabVIEW应用在工业车间
  • Elasticsearch:normalizer
  • 动态规划子序列问题系列一>等差序列划分II