当前位置: 首页 > news >正文

golang worker channel 模式

  • 大概流程就是job -> JobQueue
  • 调度器循环获取JobQueue ,获取到的job ,再去异步获取等待可用的 worker,取出 chan Job,将job 写入改worker的 chan Job
  • worker 处理任务,先处理 case job := <-w.JobChannel: 处理完成后再将 chan Job 写入到worker 里面,等待调度去取调用
package mainimport ("log""os""strconv""sync""time"
)var (MaxWorker intMaxQueue  intJobQueue  chan Job
)func init() {var err errorMaxWorker, err = strconv.Atoi(os.Getenv("MAX_WORKERS"))if err != nil {MaxWorker = 5 // 默认值}MaxQueue, err = strconv.Atoi(os.Getenv("MAX_QUEUE"))if err != nil {MaxQueue = 10 // 默认值}JobQueue = make(chan Job, MaxQueue)
}type Payload struct {// Payload的属性
}func (p *Payload) UploadToS3() error {// 模拟上传操作log.Println("Uploading to S3")return nil
}type Job struct {Payload Payload
}type Worker struct {WorkerPool chan chan JobJobChannel chan Jobquit       chan bool
}func NewWorker(workerPool chan chan Job) Worker {return Worker{WorkerPool: workerPool,JobChannel: make(chan Job),quit:       make(chan bool)}
}func (w Worker) Start() {go func() {for {w.WorkerPool <- w.JobChannelselect {case job := <-w.JobChannel:if err := job.Payload.UploadToS3(); err != nil {log.Printf("Error uploading to S3: %s", err)}case <-w.quit:return}}}()
}func (w *Worker) Stop() {go func() {w.quit <- true // 通知工作线程停止}()
}type Dispatcher struct {WorkerPool chan chan JobmaxWorkers intworkers    []Worker  // 新增:用于跟踪所有工作线程quit       chan bool // 用于停止dispatch循环
}func NewDispatcher(maxWorkers int) *Dispatcher {return &Dispatcher{WorkerPool: make(chan chan Job, maxWorkers),maxWorkers: maxWorkers,workers:    make([]Worker, 0, maxWorkers),}
}func (d *Dispatcher) Runs() {for i := 0; i < d.maxWorkers; i++ {worker := NewWorker(d.WorkerPool)d.workers = append(d.workers, worker) // 跟踪新创建的工作线程worker.Start()}go d.dispatch()
}func (d *Dispatcher) dispatch() {for {select {// 从JobQueue中获取一个jobcase job := <-JobQueue:go func(job Job) {// 尝试获取一个可用的worker job channel,阻塞直到有可用的workerjobChannel := <-d.WorkerPool// 分发job到worker job channel中jobChannel <- job}(job)case <-d.quit:// 退出return}}
}func (d *Dispatcher) StopAllWorkers() {var wg sync.WaitGroupfor _, worker := range d.workers {wg.Add(1)go func(w Worker) {w.Stop() // 停止工作线程wg.Done()}(worker)}wg.Wait() // 等待所有工作线程安全退出
}func (d *Dispatcher) Stop() {d.quit <- trued.StopAllWorkers()
}func main() {dispatcher := NewDispatcher(MaxWorker)dispatcher.Runs()// 模拟作业提交for i := 0; i < 20; i++ {payload := Payload{ /* ... */ }job := Job{Payload: payload}JobQueue <- job}// 等待一段时间,以便可以看到工作的完成time.Sleep(10 * time.Second)
}
http://www.lryc.cn/news/214287.html

相关文章:

  • 舔狗日记之好一条舔狗
  • 【地理位置识别】IP归属地应用的特点
  • 华为实验基础(2):路由器基础
  • 婚姻管理系统-使用bbst数据结构
  • 软件架构的概念
  • kubernetes存储-secrets
  • Springboot使用EasyExcel导入导出Excel文件
  • Pytorch L1,L2正则化
  • 【Elasticsearch 未授权访问漏洞复现】
  • pytorch笔记:PackedSequence对象送入RNN
  • C#WPF工具提示(ToolTip)实例
  • 智慧矿山系统中的猴车安全监测与识别
  • 网络协议--TCP连接的建立与终止
  • react条件渲染
  • Docker中Failed to initialize NVML: Unknown Error
  • 学习笔记|单样本秩和检验|假设检验摘要|Wilcoxon符号检验|规范表达|《小白爱上SPSS》课程:SPSS第十一讲 | 单样本秩和检验如何做?很轻松!
  • ttkefu在线客服在客户联络领域的价值
  • 创新方案|2023如何用5种新形式重塑疫后实体门店体验
  • Aqua Data Studio 2023.1
  • 【C++智能指针】
  • gcc/g++使用格式+各种选项,预处理/编译(分析树,编译优化,生成目标代码)/汇编/链接过程(函数库,动态链接)
  • OSPF复习(2)
  • FPGA时序分析与约束(9)——主时钟约束
  • sqlite3 关系型数据库语言 SQL 语言
  • spring boot中的多环境配置
  • python3 阿里云api进行巡检发送邮件
  • 【Linux】安装使用Nginx负载均衡,并且部署前端项目
  • k8s中 pod 或节点的资源利用率监控
  • 订水商城实战教程07-搜索
  • stm32内 misc stm32f10x_hd stm32f10x_it stm32f10x_conf关系