当前位置: 首页 > news >正文

40分钟学 Go 语言高并发:Select多路复用

Select多路复用

学习目标

知识点掌握程度应用场景
select实现原理深入理解底层机制channel通信和多路选择
超时处理掌握超时控制方法避免阻塞和资源浪费
优先级控制理解优先级实现处理多个channel的顺序
性能考虑了解性能优化点高并发场景优化

1. Select实现原理

让我们通过一个完整的例子来理解select的工作原理:

package mainimport ("fmt""math/rand""sync""time"
)// 数据生产者
type Producer struct {dataChan chan intdone     chan struct{}
}// 创建新的生产者
func NewProducer() *Producer {return &Producer{dataChan: make(chan int, 100),done:     make(chan struct{}),}
}// 启动生产
func (p *Producer) Start() {go func() {defer close(p.dataChan)for {select {case <-p.done:fmt.Println("Producer: received stop signal")returndefault:// 生成随机数据data := rand.Intn(100)select {case p.dataChan <- data:fmt.Printf("Producer: sent data %d\n", data)time.Sleep(time.Millisecond * 100)case <-p.done:fmt.Println("Producer: received stop signal while sending")return}}}}()
}// 停止生产
func (p *Producer) Stop() {close(p.done)
}// 获取数据通道
func (p *Producer) DataChan() <-chan int {return p.dataChan
}// 数据处理器
type Processor struct {producers []*Producerresults   chan intdone      chan struct{}
}// 创建新的处理器
func NewProcessor(producerCount int) *Processor {producers := make([]*Producer, producerCount)for i := 0; i < producerCount; i++ {producers[i] = NewProducer()}return &Processor{producers: producers,results:   make(chan int, producerCount*100),done:      make(chan struct{}),}
}// 启动处理
func (p *Processor) Start() {// 启动所有生产者for i, producer := range p.producers {producer.Start()// 为每个生产者启动一个处理goroutinego func(id int, prod *Producer) {for {select {case data, ok := <-prod.DataChan():if !ok {fmt.Printf("Processor %d: producer channel closed\n", id)return}// 处理数据result := data * 2select {case p.results <- result:fmt.Printf("Processor %d: processed data %d -> %d\n", id, data, result)case <-p.done:return}case <-p.done:fmt.Printf("Processor %d: received stop signal\n", id)return}}}(i, producer)}
}// 停止处理
func (p *Processor) Stop() {close(p.done)for _, producer := range p.producers {producer.Stop()}
}// 获取结果通道
func (p *Processor) Results() <-chan int {return p.results
}func main() {// 创建有3个生产者的处理器processor := NewProcessor(3)// 启动处理器processor.Start()// 创建结果收集器var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()count := 0for result := range processor.Results() {fmt.Printf("Collector: received result %d\n", result)count++if count >= 20 { // 收集20个结果后停止processor.Stop()break}}}()// 等待处理完成wg.Wait()fmt.Println("Main: processing completed")
}

1.1 Select执行流程图

在这里插入图片描述

2. 超时处理

让我们实现一个带有超时控制的服务请求处理系统:

package mainimport ("context""fmt""math/rand""sync""time"
)// 请求处理器
type RequestHandler struct {requests  chan Requestresponses chan Responsedone      chan struct{}wg        sync.WaitGroup
}// 请求结构
type Request struct {ID      intTimeout time.DurationData    string
}// 响应结构
type Response struct {RequestID intResult    stringError     error
}// 创建新的请求处理器
func NewRequestHandler() *RequestHandler {return &RequestHandler{requests:  make(chan Request, 100),responses: make(chan Response, 100),done:      make(chan struct{}),}
}// 启动处理器
func (h *RequestHandler) Start(workers int) {for i := 0; i < workers; i++ {h.wg.Add(1)go h.worker(i)}
}// 工作协程
func (h *RequestHandler) worker(id int) {defer h.wg.Done()for {select {case req, ok := <-h.requests:if !ok {fmt.Printf("Worker %d: request channel closed\n", id)return}// 创建context用于超时控制ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)// 处理请求response := h.processRequest(ctx, req)// 发送响应select {case h.responses <- response:fmt.Printf("Worker %d: sent response for request %d\n", id, req.ID)case <-h.done:cancel()return}cancel() // 清理contextcase <-h.done:fmt.Printf("Worker %d: received stop signal\n", id)return}}
}// 处理单个请求
func (h *RequestHandler) processRequest(ctx context.Context, req Request) Response {// 模拟处理时间processTime := time.Duration(rand.Intn(int(req.Timeout))) + req.Timeout/2select {case <-time.After(processTime):return Response{RequestID: req.ID,Result:   fmt.Sprintf("Processed: %s", req.Data),}case <-ctx.Done():return Response{RequestID: req.ID,Error:    ctx.Err(),}}
}// 提交请求
func (h *RequestHandler) SubmitRequest(req Request) error {select {case h.requests <- req:return nilcase <-h.done:return fmt.Errorf("handler is stopped")}
}// 获取响应
func (h *RequestHandler) GetResponse() (Response, error) {select {case resp := <-h.responses:return resp, nilcase <-h.done:return Response{}, fmt.Errorf("handler is stopped")}
}// 停止处理器
func (h *RequestHandler) Stop() {close(h.done)h.wg.Wait()close(h.requests)close(h.responses)
}func main() {// 创建请求处理器handler := NewRequestHandler()handler.Start(3)// 发送一些测试请求requests := []Request{{ID: 1, Timeout: time.Second, Data: "Fast request"},{ID: 2, Timeout: time.Second * 2, Data: "Normal request"},{ID: 3, Timeout: time.Millisecond * 500, Data: "Quick request"},{ID: 4, Timeout: time.Second * 3, Data: "Slow request"},}// 提交请求for _, req := range requests {if err := handler.SubmitRequest(req); err != nil {fmt.Printf("Failed to submit request %d: %v\n", req.ID, err)continue}fmt.Printf("Submitted request %d\n", req.ID)}// 收集响应var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for i := 0; i < len(requests); i++ {resp, err := handler.GetResponse()if err != nil {fmt.Printf("Failed to get response: %v\n", err)continue}if resp.Error != nil {fmt.Printf("Request %d failed: %v\n", resp.RequestID, resp.Error)} else {fmt.Printf("Request %d succeeded: %s\n", resp.RequestID, resp.Result)}}}()// 等待所有响应处理完成wg.Wait()// 停止处理器handler.Stop()fmt.Println("Main: processing completed")
}

3. 优先级控制

让我们实现一个带有优先级控制的任务调度系统:

package mainimport ("fmt""math/rand""sort""sync""time"
)// 优先级级别
const (PriorityHigh = iotaPriorityMediumPriorityLow
)// 任务结构
type Task struct {ID       intPriority intAction   func() error
}// 优先级调度器
type PriorityScheduler struct {highPriority   chan TaskmediumPriority chan TasklowPriority    chan Taskresults        chan errordone           chan struct{}wg             sync.WaitGroup
}// 创建新的调度器
func NewPriorityScheduler() *PriorityScheduler {return &PriorityScheduler{highPriority:   make(chan Task, 100),mediumPriority: make(chan Task, 100),lowPriority:    make(chan Task, 100),results:        make(chan error, 100),done:          make(chan struct{}),}
}// 启动调度器
func (s *PriorityScheduler) Start(workers int) {for i := 0; i < workers; i++ {s.wg.Add(1)go s.worker(i)}
}// 工作协程
func (s *PriorityScheduler) worker(id int) {defer s.wg.Done()for {// 使用优先级顺序处理任务select {case <-s.done:return// 高优先级任务case task := <-s.highPriority:fmt.Printf("Worker %d: processing high priority task %d\n", id, task.ID)s.results <- task.Action()// 如果没有高优先级任务,检查中优先级default:select {case <-s.done:returncase task := <-s.highPriority:fmt.Printf("Worker %d: processing high priority task %d\n", id, task.ID)s.results <- task.Action()case task := <-s.mediumPriority:fmt.Printf("Worker %d: processing medium priority task %d\n", id, task.ID)s.results <- task.Action()// 如果没有中优先级任务,检查低优先级default:select {case <-s.done:returncase task := <-s.highPriority:fmt.Printf("Worker %d: processing high priority task %d\n", id, task.ID)s.results <- task.Action()case task := <-s.mediumPriority:fmt.Printf("Worker %d: processing medium priority task %d\n", id, task.ID)s.results <- task.Action()case task := <-s.lowPriority:fmt.Printf("Worker %d: processing low priority task %d\n", id, task.ID)s.results <- task.Action()}}}}
}// 提交任务
func (s *PriorityScheduler) SubmitTask(task Task) error {var targetChan chan Taskswitch task.Priority {case PriorityHigh:targetChan = s.highPrioritycase PriorityMedium:targetChan = s.mediumPrioritycase PriorityLow:targetChan = s.lowPrioritydefault:return fmt.Errorf("invalid priority level: %d", task.Priority)}select {case targetChan <- task:return nilcase <-s.done:return fmt.Errorf("scheduler is stopped")}
}// 获取结果
func (s *PriorityScheduler) Results() <-chan error {return s.results
}// 停止调度器
func (s *PriorityScheduler) Stop() {close(s.done)s.wg.Wait()close(s.highPriority)close(s.mediumPriority)close(s.lowPriority)close(s.results)
}// 创建模拟任务
func createTask(id int, priority int, duration time.Duration) Task {return Task{ID:       id,Priority: priority,Action: func() error {time.Sleep(duration)if rand.Float32() < 0.1 { // 10%的失败率return fmt.Errorf("task %d failed", id)}return nil},}
}func main() {// 创建调度器scheduler := NewPriorityScheduler()scheduler.Start(3)// 创建一些测试任务var tasks []Taskfor i := 0; i < 15; i++ {priority := i % 3 // 在三个优先级之间循环duration := time.Millisecond * time.Duration(rand.Intn(500)+100)tasks = append(tasks, createTask(i, priority, duration))}// 随机打乱任务顺序rand.Shuffle(len(tasks), func(i, j int) {tasks[i], tasks[j] = tasks[j], tasks[i]})// 提交任务for _, task := range tasks {if err := scheduler.SubmitTask(task); err != nil {fmt.Printf("Failed to submit task %d: %v\n", task.ID, err)continue}fmt.Printf("Submitted task %d with priority %d\n", task.ID, task.Priority)}// 收集结果var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()completed := 0failures := 0for err := range scheduler.Results() {if err != nil {failures++fmt.Printf("Task failed: %v\n", err)}completed++if completed >= len(tasks) {break}}fmt.Printf("\nProcessing completed: %d tasks total, %d failures\n",completed, failures)}()// 等待所有任务完成wg.Wait()// 停止调度器scheduler.Stop()fmt.Println("Main: scheduler stopped")
}

让我们继续完成优先级控制的示例代码:

3.1 优先级控制流程图

在这里插入图片描述

4. 性能考虑

4.1 Select性能优化建议

  1. case数量控制
  • select中的case数量会影响性能
  • 建议控制在合理范围内(通常不超过5-10个)
  1. channel缓冲区
  • 适当使用带缓冲的channel可以提高性能
  • 避免频繁的阻塞和唤醒
  1. default分支使用
  • 合理使用default避免无谓的阻塞
  • 考虑轮询间隔,避免CPU空转

让我们实现一个性能优化的示例:

package mainimport ("fmt""runtime""sync""sync/atomic""time"
)// 性能统计
type Stats struct {processed uint64dropped   uint64blocked   uint64
}// 批处理器
type BatchProcessor struct {input     chan interface{}output    chan []interface{}done      chan struct{}stats     *StatsbatchSize intmaxWait   time.Duration
}// 创建新的批处理器
func NewBatchProcessor(batchSize int, maxWait time.Duration) *BatchProcessor {return &BatchProcessor{input:     make(chan interface{}, batchSize*2),output:    make(chan []interface{}, batchSize),done:      make(chan struct{}),stats:     &Stats{},batchSize: batchSize,maxWait:   maxWait,}
}// 启动处理
func (p *BatchProcessor) Start(workers int) {for i := 0; i < workers; i++ {go p.worker(i)}// 启动统计打印go p.printStats()
}// 工作协程
func (p *BatchProcessor) worker(id int) {batch := make([]interface{}, 0, p.batchSize)timer := time.NewTimer(p.maxWait)defer timer.Stop()for {// 重置计时器if !timer.Stop() {select {case <-timer.C:default:}}timer.Reset(p.maxWait)// 优化的批处理逻辑for len(batch) < p.batchSize {select {case <-p.done:returncase item := <-p.input:batch = append(batch, item)atomic.AddUint64(&p.stats.processed, 1)case <-timer.C:// 达到最大等待时间,处理当前批次if len(batch) > 0 {p.processBatch(batch)batch = batch[:0]}atomic.AddUint64(&p.stats.blocked, 1)continuedefault:// 如果输入队列为空且已有数据,立即处理if len(batch) > 0 {p.processBatch(batch)batch = batch[:0]}// 短暂休眠避免CPU空转runtime.Gosched()continue}// 批次满了就处理if len(batch) >= p.batchSize {p.processBatch(batch)batch = batch[:0]}}}
}// 处理批次数据
func (p *BatchProcessor) processBatch(batch []interface{}) {// 创建副本避免数据竞争output := make([]interface{}, len(batch))copy(output, batch)// 尝试发送处理结果select {case p.output <- output:// 成功发送default:// 输出channel满了,增加丢弃计数atomic.AddUint64(&p.stats.dropped, uint64(len(batch)))}
}// 提交数据
func (p *BatchProcessor) Submit(item interface{}) error {select {case p.input <- item:return nilcase <-p.done:return fmt.Errorf("processor is stopped")default:atomic.AddUint64(&p.stats.dropped, 1)return fmt.Errorf("input channel full")}
}// 获取输出通道
func (p *BatchProcessor) Output() <-chan []interface{} {return p.output
}// 定期打印统计信息
func (p *BatchProcessor) printStats() {ticker := time.NewTicker(time.Second)defer ticker.Stop()var lastProcessed, lastDropped, lastBlocked uint64for {select {case <-p.done:returncase <-ticker.C:processed := atomic.LoadUint64(&p.stats.processed)dropped := atomic.LoadUint64(&p.stats.dropped)blocked := atomic.LoadUint64(&p.stats.blocked)fmt.Printf("Stats - Processed: %d/s, Dropped: %d/s, Blocked: %d/s\n",processed-lastProcessed,dropped-lastDropped,blocked-lastBlocked)lastProcessed = processedlastDropped = droppedlastBlocked = blocked}}
}// 停止处理器
func (p *BatchProcessor) Stop() {close(p.done)
}func main() {// 创建批处理器processor := NewBatchProcessor(100, time.Millisecond*50)processor.Start(3)// 模拟高速数据提交var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(id int) {defer wg.Done()for j := 0; j < 10000; j++ {data := fmt.Sprintf("Data-%d-%d", id, j)processor.Submit(data)time.Sleep(time.Microsecond * time.Duration(50+id*10))}}(i)}// 处理输出go func() {for batch := range processor.Output() {// 这里可以进行批量处理,比如写入数据库fmt.Printf("Received batch of size %d\n", len(batch))}}()// 等待提交完成wg.Wait()time.Sleep(time.Second) // 等待最后的处理完成// 停止处理器processor.Stop()fmt.Println("Main: processing completed")
}

4.2 性能优化要点

  1. 避免过度使用select
  • 只在必要的地方使用select
  • 考虑其他并发控制方式
  1. channel设计优化
  • 合理设置缓冲区大小
  • 避免频繁的channel创建和关闭
  1. goroutine管理
  • 控制goroutine数量
  • 实现优雅的退出机制
  1. 内存优化
  • 重用切片和对象
  • 避免不必要的内存分配

总结

核心要点

  1. Select实现原理
  • 随机选择机制
  • 阻塞和非阻塞模式
  • 多路复用特性
  1. 超时处理
  • 超时控制方法
  • 资源释放保证
  • 错误处理机制
  1. 优先级控制
  • 优先级实现方式
  • 任务调度策略
  • 公平性保证
  1. 性能优化
  • select使用建议
  • channel优化
  • 资源管理

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

http://www.lryc.cn/news/490235.html

相关文章:

  • candence: 如何快速设置SUBCLASS 的颜色
  • FinalShell进行前端项目部署及nginx配置
  • 神经网络(系统性学习一):入门篇——简介、发展历程、应用领域、基本概念、超参数调优、网络类型分类
  • 用nextjs开发时遇到的问题
  • 微前端基础知识入门篇(二)
  • 自然语言处理:第六十五章 MinerU 开源PDF文档解析方案
  • Arcpy 多线程批量重采样脚本
  • python 画图例子
  • Win11 22H2/23H2系统11月可选更新KB5046732发布!
  • 【STM32】MPU6050初始化常用寄存器说明及示例代码
  • 深度学习中的mAP
  • Redis设计与实现 学习笔记 第二十章 Lua脚本
  • 大模型(LLMs)推理篇
  • Leetcode 412. Fizz Buzz
  • 双因子认证:统一运维平台安全管理策略
  • CMake笔记:install(TARGETS target,...)无法安装的Debug/lib下
  • 使用ENSP实现NAT
  • 漫步北京小程序构建智慧出行,打造旅游新业态模式
  • 对齐输出
  • Wekan看板安装部署与使用介绍
  • VisionPro 机器视觉案例 之 黑色齿轮
  • 学习python的第十三天之数据类型——函数传参中的传值和传址问题
  • Windows11深度学习环境配置
  • 电销老是被标记,该如何解决!!!
  • MyBatis入门——基本的增删改查
  • 学习Gentoo系统中二进制软件包和源代码包的概念
  • 麦肯锡报告 | 未来的经济引擎:解读下一代竞争领域
  • 连接mysql并读取指定表单数据到DataFrame
  • 从入门到精通数据结构----四大排序(上)
  • 【bug】使用transformers训练二分类任务时,训练损失异常大