当前位置: 首页 > news >正文

Go语言管道Channel通信教程

Go语言管道Channel通信教程

目录

  1. Channel基础概念
  2. Channel类型与创建
  3. Channel操作详解
  4. Select语句
  5. Channel通信模式
  6. 高级Channel技巧
  7. 实战案例

Channel基础概念

什么是Channel?

Channel是Go语言中用于goroutine之间通信的管道。它体现了Go的并发哲学:“不要通过共享内存来通信,而要通过通信来共享内存”

Channel的特性

  • 类型安全:每个channel只能传输特定类型的数据
  • 同步机制:提供goroutine之间的同步
  • 方向性:可以限制channel的读写方向
  • 缓冲控制:支持无缓冲和有缓冲两种模式

Channel类型与创建

无缓冲Channel

package mainimport ("fmt""time"
)func main() {// 创建无缓冲channelch := make(chan string)// 启动发送者goroutinego func() {time.Sleep(1 * time.Second)ch <- "Hello, Channel!"fmt.Println("Message sent")}()// 主goroutine接收消息fmt.Println("Waiting for message...")message := <-chfmt.Println("Received:", message)
}

有缓冲Channel

package mainimport ("fmt""time"
)func main() {// 创建缓冲大小为3的channelch := make(chan int, 3)// 发送数据(不会阻塞,因为有缓冲)ch <- 1ch <- 2ch <- 3fmt.Printf("Channel length: %d, capacity: %d\n", len(ch), cap(ch))// 接收数据for i := 0; i < 3; i++ {value := <-chfmt.Printf("Received: %d\n", value)}
}

方向性Channel

package mainimport "fmt"// 只能发送的channel
func sender(ch chan<- string) {ch <- "Hello from sender"close(ch)
}// 只能接收的channel
func receiver(ch <-chan string) {for message := range ch {fmt.Println("Received:", message)}
}func main() {ch := make(chan string)go sender(ch)receiver(ch)
}

Channel操作详解

发送和接收

package mainimport ("fmt""time"
)func main() {ch := make(chan int, 2)// 发送操作ch <- 42ch <- 100// 接收操作value1 := <-chvalue2 := <-chfmt.Printf("Received: %d, %d\n", value1, value2)// 带ok的接收操作ch <- 200close(ch)value3, ok := <-chfmt.Printf("Received: %d, ok: %t\n", value3, ok)value4, ok := <-chfmt.Printf("Received: %d, ok: %t\n", value4, ok) // ok为false,channel已关闭
}

关闭Channel

package mainimport "fmt"func producer(ch chan<- int) {for i := 1; i <= 5; i++ {ch <- ifmt.Printf("Sent: %d\n", i)}close(ch) // 关闭channel表示不再发送数据
}func consumer(ch <-chan int) {// 使用range遍历channel,直到channel关闭for value := range ch {fmt.Printf("Received: %d\n", value)}fmt.Println("Channel closed, consumer finished")
}func main() {ch := make(chan int, 2)go producer(ch)consumer(ch)
}

Select语句

基本Select用法

package mainimport ("fmt""time"
)func main() {ch1 := make(chan string)ch2 := make(chan string)go func() {time.Sleep(1 * time.Second)ch1 <- "Message from ch1"}()go func() {time.Sleep(2 * time.Second)ch2 <- "Message from ch2"}()// select语句等待多个channel操作for i := 0; i < 2; i++ {select {case msg1 := <-ch1:fmt.Println("Received from ch1:", msg1)case msg2 := <-ch2:fmt.Println("Received from ch2:", msg2)}}
}

带超时的Select

package mainimport ("fmt""time"
)func main() {ch := make(chan string)go func() {time.Sleep(3 * time.Second)ch <- "Delayed message"}()select {case msg := <-ch:fmt.Println("Received:", msg)case <-time.After(2 * time.Second):fmt.Println("Timeout: no message received within 2 seconds")}
}

非阻塞Select

package mainimport "fmt"func main() {ch := make(chan int, 1)// 非阻塞发送select {case ch <- 42:fmt.Println("Sent 42")default:fmt.Println("Channel is full, cannot send")}// 非阻塞接收select {case value := <-ch:fmt.Printf("Received: %d\n", value)default:fmt.Println("No value available")}// 再次尝试非阻塞接收select {case value := <-ch:fmt.Printf("Received: %d\n", value)default:fmt.Println("No value available")}
}

Channel通信模式

生产者-消费者模式

package mainimport ("fmt""sync""time"
)type Task struct {ID   intData string
}func producer(tasks chan<- Task, wg *sync.WaitGroup) {defer wg.Done()defer close(tasks)for i := 1; i <= 10; i++ {task := Task{ID:   i,Data: fmt.Sprintf("Task-%d", i),}tasks <- taskfmt.Printf("Produced: %s\n", task.Data)time.Sleep(100 * time.Millisecond)}
}func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {defer wg.Done()for task := range tasks {fmt.Printf("Consumer %d processing: %s\n", id, task.Data)time.Sleep(200 * time.Millisecond) // 模拟处理时间fmt.Printf("Consumer %d finished: %s\n", id, task.Data)}
}func main() {tasks := make(chan Task, 5) // 缓冲channelvar wg sync.WaitGroup// 启动生产者wg.Add(1)go producer(tasks, &wg)// 启动多个消费者for i := 1; i <= 3; i++ {wg.Add(1)go consumer(i, tasks, &wg)}wg.Wait()fmt.Println("All tasks completed")
}

管道模式

package mainimport "fmt"// 第一阶段:生成数字
func generate(nums chan<- int) {for i := 1; i <= 10; i++ {nums <- i}close(nums)
}// 第二阶段:计算平方
func square(nums <-chan int, squares chan<- int) {for num := range nums {squares <- num * num}close(squares)
}// 第三阶段:过滤偶数
func filter(squares <-chan int, evens chan<- int) {for square := range squares {if square%2 == 0 {evens <- square}}close(evens)
}func main() {nums := make(chan int)squares := make(chan int)evens := make(chan int)// 启动管道的各个阶段go generate(nums)go square(nums, squares)go filter(squares, evens)// 输出最终结果fmt.Println("Even squares:")for even := range evens {fmt.Println(even)}
}

扇入模式

package mainimport ("fmt""sync""time"
)func worker(id int, output chan<- string) {for i := 1; i <= 3; i++ {message := fmt.Sprintf("Worker %d - Message %d", id, i)output <- messagetime.Sleep(time.Second)}close(output)
}func fanIn(inputs ...<-chan string) <-chan string {output := make(chan string)var wg sync.WaitGroup// 为每个输入channel启动一个goroutinefor _, input := range inputs {wg.Add(1)go func(ch <-chan string) {defer wg.Done()for message := range ch {output <- message}}(input)}// 等待所有输入完成后关闭输出channelgo func() {wg.Wait()close(output)}()return output
}func main() {// 创建多个worker的输出channelch1 := make(chan string)ch2 := make(chan string)ch3 := make(chan string)// 启动workersgo worker(1, ch1)go worker(2, ch2)go worker(3, ch3)// 扇入所有worker的输出merged := fanIn(ch1, ch2, ch3)// 接收合并后的消息for message := range merged {fmt.Println("Received:", message)}
}

高级Channel技巧

Channel的Channel

package mainimport ("fmt""time"
)func worker(id int, jobs <-chan chan string) {for job := range jobs {result := fmt.Sprintf("Worker %d processed job", id)job <- resultclose(job)}
}func main() {jobs := make(chan chan string, 3)// 启动workersfor i := 1; i <= 2; i++ {go worker(i, jobs)}// 发送任务for i := 1; i <= 5; i++ {resultCh := make(chan string, 1)jobs <- resultCh// 等待结果result := <-resultChfmt.Printf("Job %d result: %s\n", i, result)}close(jobs)
}

信号量模式

package mainimport ("fmt""sync""time"
)type Semaphore chan struct{}func NewSemaphore(capacity int) Semaphore {return make(Semaphore, capacity)
}func (s Semaphore) Acquire() {s <- struct{}{}
}func (s Semaphore) Release() {<-s
}func worker(id int, sem Semaphore, wg *sync.WaitGroup) {defer wg.Done()sem.Acquire() // 获取信号量defer sem.Release() // 释放信号量fmt.Printf("Worker %d started\n", id)time.Sleep(2 * time.Second) // 模拟工作fmt.Printf("Worker %d finished\n", id)
}func main() {const maxConcurrent = 3sem := NewSemaphore(maxConcurrent)var wg sync.WaitGroup// 启动10个worker,但最多只有3个同时运行for i := 1; i <= 10; i++ {wg.Add(1)go worker(i, sem, &wg)}wg.Wait()fmt.Println("All workers completed")
}

实战案例

并发Web爬虫

package mainimport ("fmt""net/http""sync""time"
)type CrawlResult struct {URL        stringStatusCode intError      errorDuration   time.Duration
}type Crawler struct {maxConcurrent intsemaphore     chan struct{}
}func NewCrawler(maxConcurrent int) *Crawler {return &Crawler{maxConcurrent: maxConcurrent,semaphore:     make(chan struct{}, maxConcurrent),}
}func (c *Crawler) crawlURL(url string, results chan<- CrawlResult, wg *sync.WaitGroup) {defer wg.Done()// 获取信号量c.semaphore <- struct{}{}defer func() { <-c.semaphore }()start := time.Now()resp, err := http.Get(url)duration := time.Since(start)result := CrawlResult{URL:      url,Duration: duration,Error:    err,}if err == nil {result.StatusCode = resp.StatusCoderesp.Body.Close()}results <- result
}func (c *Crawler) Crawl(urls []string) <-chan CrawlResult {results := make(chan CrawlResult, len(urls))var wg sync.WaitGroupfor _, url := range urls {wg.Add(1)go c.crawlURL(url, results, &wg)}go func() {wg.Wait()close(results)}()return results
}func main() {urls := []string{"https://www.google.com","https://www.github.com","https://www.stackoverflow.com","https://www.golang.org","https://www.reddit.com",}crawler := NewCrawler(3) // 最多3个并发请求results := crawler.Crawl(urls)fmt.Println("Crawling results:")for result := range results {if result.Error != nil {fmt.Printf("❌ %s: %v\n", result.URL, result.Error)} else {fmt.Printf("✅ %s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)}}
}

实时数据处理管道

package mainimport ("fmt""math/rand""time"
)type DataPoint struct {ID        intValue     float64Timestamp time.Time
}type ProcessedData struct {DataPointProcessed boolResult    float64
}// 数据生成器
func dataGenerator(output chan<- DataPoint) {defer close(output)for i := 1; i <= 20; i++ {data := DataPoint{ID:        i,Value:     rand.Float64() * 100,Timestamp: time.Now(),}output <- datatime.Sleep(100 * time.Millisecond)}
}// 数据处理器
func dataProcessor(input <-chan DataPoint, output chan<- ProcessedData) {defer close(output)for data := range input {// 模拟数据处理time.Sleep(50 * time.Millisecond)processed := ProcessedData{DataPoint: data,Processed: true,Result:    data.Value * 2, // 简单的处理逻辑}output <- processed}
}// 数据过滤器
func dataFilter(input <-chan ProcessedData, output chan<- ProcessedData) {defer close(output)for data := range input {// 只传递结果大于100的数据if data.Result > 100 {output <- data}}
}func main() {// 创建管道rawData := make(chan DataPoint, 5)processedData := make(chan ProcessedData, 5)filteredData := make(chan ProcessedData, 5)// 启动管道各阶段go dataGenerator(rawData)go dataProcessor(rawData, processedData)go dataFilter(processedData, filteredData)// 输出最终结果fmt.Println("Filtered results (Result > 100):")for data := range filteredData {fmt.Printf("ID: %d, Original: %.2f, Result: %.2f, Time: %s\n",data.ID, data.Value, data.Result, data.Timestamp.Format("15:04:05"))}
}

总结

Channel是Go语言并发编程的核心工具,提供了优雅的goroutine间通信方式:

关键概念

  • 无缓冲vs有缓冲:控制同步行为
  • 方向性:限制channel的使用方式
  • Select语句:处理多个channel操作
  • 关闭channel:信号传递机制

常用模式

  • 生产者-消费者:解耦数据生产和消费
  • 管道:数据流式处理
  • 扇入扇出:并发处理和结果聚合
  • 信号量:控制并发数量

最佳实践

  1. 发送者负责关闭channel
  2. 使用range遍历channel
  3. 利用select实现超时和非阻塞操作
  4. 合理设置缓冲大小
  5. 避免channel泄漏

掌握Channel的使用是成为Go并发编程专家的必经之路。记住:通过通信来共享内存,而不是通过共享内存来通信

参考资源

  • Go官方文档 - Channel
  • Go并发模式:管道和取消
  • Go并发模式:上下文
http://www.lryc.cn/news/598813.html

相关文章:

  • 黑马点评系列问题之p44实战篇商户查询缓存 jmeter如何整
  • 2025.7.24 01背包与动态规划复习总结
  • 【Oracle】Oracle权限迷宫破解指南:2步定位视图依赖与授权关系
  • MySQL常见命令
  • 多线程 Reactor 模式
  • hcip思维导图(1)
  • GaussDB 数据库架构师(八) 等待事件概述-1
  • 阿里云ECS坑之dnf-makecache系统软件更新检测服务
  • 解决postgresql连接数不足
  • 五分钟了解Java 中的锁
  • SQL基础⑪ | 约束
  • JavaScript 中的 structuredClone() 如何彻底改变你的对象复制方式
  • Android LiveData 全面解析:原理、使用与最佳实践
  • Windows 10 远程桌面(RDP)防暴力破解脚本
  • Android 与 Windows 文件路径的设计差异
  • Android Camera createCaptureSession
  • 教程:如何通过代理服务在国内高效使用 Claude API 并集成到 VSCode
  • DGMR压缩技术:让大规模视觉Transformer模型体积减半而性能不减
  • FastAPI中间件
  • iview 部分用法
  • 锁定锁存器 | 原理 / 应用 / 时序
  • 哈希表模拟实现
  • JVM 垃圾收集器CMS和G1
  • HTTP性能优化实战:从协议到工具的全面加速指南
  • 服务端对接 HTTP 接口传输图片 采用base64还是 multipart/form-data
  • 排序初识(上)-- 讲解超详细
  • Android Studio历史版本快速下载(二次修改记录)
  • rna_seq_pipeline.py-python002
  • CloudComPy使用PyInstaller打包后报错解决方案
  • 如何使用 pdfMake 中文字体