当前位置: 首页 > news >正文

GO语言:Worker Pools线程池、Select语句、Metex互斥锁详细示例教程

目录标题

  • 一、Buffered Channels and Worker Pools
    • 1. Goroutine and Channel Example 线程和通道示例
    • 2. Deadlock 死锁
    • 3. Closing buffered channels 关闭通道
    • 4. Length vs Capacity 长度和容量
    • 5. WaitGroup
    • 6. Worker Pool Implementation 线程池
  • 二、Select
    • 1. Example
    • 2. Default case 默认选择
    • 3. Deadlock and default case 死锁与默认选择
    • 4. Random selection 随机选择
  • 三、Mutex
    • 1. Program with a race condition 无锁示例
    • 2. Solving the race condition using a mutex 互斥锁解决方案
    • 3. Solving the race condition using channel 通道解决方案

一、Buffered Channels and Worker Pools

1. Goroutine and Channel Example 线程和通道示例

          package mainimport ("fmt""time")func write(ch chan int) {for i := 0; i < 5; i++ {ch <- i // 向通道写入 0-4		因为通道容量是2 需要读取数据才会进行下一步 否则一直在阻塞态fmt.Println("Successfully wrote", i, "to ch")}close(ch) // 关闭通道}func main() {ch := make(chan int, 2) // 创建一个容量为2的缓冲通道  通道容量大小会导致阻塞go write(ch)time.Sleep(2 * time.Second) // 模拟时间间隔for v := range ch {fmt.Println("Read value", v, "from ch") // 读取数据 goroutine继续运行time.Sleep(2 * time.Second)}// 并发的 goroutine 和通道的阻塞机制,write() 函数和 range ch 循环可以交替执行,使得循环不会一次执行完毕,而是在读取完所有值之后等待新的值出现,再次进行循环迭代。}// Successfully wrote 0 to ch// Successfully wrote 1 to ch// Read value 0 from ch// Successfully wrote 2 to ch// Read value 1 from ch// Successfully wrote 3 to ch// Read value 2 from ch// Successfully wrote 4 to ch// Read value 3 from ch// Read value 4 from ch

2. Deadlock 死锁

         package mainimport (  "fmt")func main() {  ch := make(chan string, 2)ch <- "naveen"ch <- "paul"ch <- "steve"			// 其容量是2 但是写入三个 导致死锁fmt.Println(<-ch)fmt.Println(<-ch)}// fatal error: all goroutines are asleep - deadlock!// goroutine 1 [chan send]:  // main.main()  //     /tmp/sandbox091448810/prog.go:11 +0x8d

3. Closing buffered channels 关闭通道

         ch := make(chan int, 5)ch <- 6ch <- 9close(ch)n, open := <-chfmt.Printf("Received: %d, open: %t\n", n, open)n, open = <-chfmt.Printf("Received: %d, open: %t\n", n, open)n, open = <-chfmt.Printf("Received: %d, open: %t\n", n, open)// Received: 5, open: true  // Received: 6, open: true  // Received: 0, open: false  

4. Length vs Capacity 长度和容量

        ch := make(chan string, 3)ch <- "Like"ch <- "LiangXiaoQing"fmt.Println("capacity is", cap(ch))fmt.Println("length is", len(ch)) // 通道写入的个数fmt.Println("read value", <-ch)fmt.Println("new length is", len(ch))fmt.Println("read value", <-ch)fmt.Println("new length is", len(ch))// capacity is 3// length is 2// read value Like// new length is 1// read value LiangXiaoQing// new length is 0

5. WaitGroup

         // Add() 添加任务// Done() 通知wait完成任务// Wait() 阻塞等待所有任务完成package mainimport (  "fmt""sync""time")func process(i int, wg *sync.WaitGroup) {fmt.Println("started Goroutine", i)   // 3.打印 Goroutine 开始执行的信息time.Sleep(2 * time.Second)           // 4.暂停 2 秒,模拟任务执行时间fmt.Printf("Goroutine %d ended\n", i) // 5.打印 Goroutine 执行结束的信息wg.Done()                             // 6.通知等待组任务已完成}func main() {no := 3var wg sync.WaitGroupfor i := 0; i < no; i++ {wg.Add(1)          // 1.循环三次添加三次任务go process(i, &wg) // 2.每次传入当前i 0-2 及wg内存地址}wg.Wait() 	// 7.等待所有任务完成fmt.Println("All go routines finished executing")}// started Goroutine 1// started Goroutine 0// started Goroutine 2// Goroutine 2 ended// Goroutine 0 ended// Goroutine 1 ended// All go routines finished executing

6. Worker Pool Implementation 线程池

        type Job struct { // Job 结构表示一个具有 ID 和随机数的作业。id       intrandomno int}type Result struct { // Result 结构表示作业的结果,包括作业本身和数字各位数之和。job         Jobsumofdigits int}var jobs = make(chan Job, 10)       // jobs 是一个带有缓冲区大小为 10 的通道,用于传递作业。var results = make(chan Result, 10) // results 是一个带有缓冲区大小为 10 的通道,用于传递结果。func digits(number int) int { // digits 函数计算一个整数的各位数之和。sum := 0no := numberfor no != 0 { // 循环中,通过取模和除法操作,将数字的各位数相加。digit := no % 10sum += digitno /= 10}time.Sleep(2 * time.Second) // time.Sleep(2 * time.Second) 使函数暂停 2 秒钟,模拟一个耗时操作。return sum                  // 返回各位数之和。}func worker(wg *sync.WaitGroup) { // worker 函数是一个工作协程,用于处理作业。for job := range jobs { // 使用 range 循环从 jobs 通道接收作业。output := Result{job, digits(job.randomno)} // 通过调用 digits 函数计算作业的各位数之和。 将作业和结果封装为 Result 结构results <- output                           // 并发送到 results 通道。}wg.Done() // wg.Done() 声明一个任务已完成。}func createWorkerPool(noOfWorkers int) { // createWorkerPool 函数创建一个工作池,用于并发处理作业。var wg sync.WaitGroup              // 创建一个 sync.WaitGroup 对象 wg,用于等待所有工作协程完成。for i := 0; i < noOfWorkers; i++ { // 使用 for 循环创建指定数量的工作协程。wg.Add(1)      // 添加任务go worker(&wg) // 每个工作协程调用 worker 函数,并传递 &wg 作为参数。}wg.Wait()      // wg.Wait() 等待所有工作协程完成。close(results) // 关闭 results 通道,表示所有结果已经发送完毕。}func allocate(noOfJobs int) { // allocate 函数用于生成指定数量的作业并发送到 jobs 通道。for i := 0; i < noOfJobs; i++ { //  使用 for 循环创建指定数量的作业。randomno := rand.Intn(999) // 生成一个随机数 randomno,范围在 0 到 999 之间。job := Job{i, randomno}    // 创建一个 Job 结构体 jobjobs <- job                // 并将其发送到 jobs 通道。}close(jobs) // 关闭 jobs 通道,表示所有作业已经发送完毕。}func result(done chan bool) { // result 函数用于接收并处理结果。for result := range results { // 使用 range 循环从 results 通道接收结果。fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)}done <- true}func main() {startTime := time.Now()noOfJobs := 10go allocate(noOfJobs) 	// 传入job id 0-99 random 0-999的随机数字	Job id 1, input random no 636done := make(chan bool)go result(done)		// result线程一直等待results有值 如果有值则打印信息 传入true结束通道noOfWorkers := 10	// 控制线程池数量 createWorkerPool(noOfWorkers)	// 把&wg sync.WaitGroup 类型变量的指针 传给 results<-done	// 关闭通道endTime := time.Now()diff := endTime.Sub(startTime)	// 计算时间差fmt.Println("total time taken", diff.Seconds(), "seconds")}// Job id 1, input random no 636, sum of digits 15  // Job id 0, input random no 878, sum of digits 23  // Job id 9, input random no 150, sum of digits 6  // ...// total time taken  20.01081009 seconds  

二、Select

1. Example

        package mainimport ("fmt""time")func server1(ch chan string) {time.Sleep(9 * time.Second)ch <- "From Server 1"}func server2(ch chan string) {time.Sleep(6 * time.Second)ch <- "From server 2"}func main() {output1 := make(chan string)output2 := make(chan string)go server1(output1)go server2(output2)select { 	// 使用select语句接收多个通道消息,select会接收最先准备好的通道接收操作case s1 := <-output1:fmt.Println(s1)case s2 := <-output2:fmt.Println(s2)}}// From server 2

2. Default case 默认选择

        func process(ch chan string) {time.Sleep(1 * time.Second)ch <- "Process Successful"}func main() {ch := make(chan string)go process(ch)for { 		// for循环一直循环 每次循环休息1秒 直到v有值 主要看上面process函数睡眠睡觉 否则一直输出default的值time.Sleep(1000 * time.Microsecond)select {case v := <-ch:fmt.Println("Received value:", v)returndefault:fmt.Println("No value Received")}}// ....// No value Received//No value Received//No value Received//No value Received//Received value: Process Successful

3. Deadlock and default case 死锁与默认选择

        func main() {ch := make(chan string)select {case v := <-ch:fmt.Println("Received value", v)default:fmt.Println("Default case executed")}}// Default case executed

4. Random selection 随机选择

        package mainimport (  "fmt""time")func server1(ch chan string) {  ch <- "from server1"}func server2(ch chan string) {  ch <- "from server2"}func main() {  output1 := make(chan string)output2 := make(chan string)go server1(output1)go server2(output2)time.Sleep(1 * time.Second)select {		// 使用select语句接收多个通道消息,select会接收最先准备好的通道接收操作case s1 := <-output1:fmt.Println(s1)case s2 := <-output2:fmt.Println(s2)}}// From Server 1

三、Mutex

1. Program with a race condition 无锁示例

        package mainimport ("fmt""sync")var x = 0func increment(wg *sync.WaitGroup) {x = x + 1wg.Done()}func main() {var w sync.WaitGroupfor i := 0; i < 1000; i++ {w.Add(1)go increment(&w)}w.Wait()fmt.Println("Final value of X", x)}// Final value of X 987		最终答案应该是1000 因为多线程全部都在操作x 导致有些操作未成功

2. Solving the race condition using a mutex 互斥锁解决方案

        package mainimport ("fmt""sync")var x = 0func increment(wg *sync.WaitGroup, m *sync.Mutex) {m.Lock() // 上锁x = x + 1m.Unlock() // 释放锁  只有拿到锁才能操作x 否则一直等待wg.Done()}func main() {var w sync.WaitGroupvar m sync.Mutexfor i := 0; i < 1000; i++ {w.Add(1)go increment(&w, &m)}w.Wait()fmt.Println("Final value of X", x)}// Final value of X 1000

3. Solving the race condition using channel 通道解决方案

        package main  import (  "fmt""sync")var x  = 0  func increment(wg *sync.WaitGroup, ch chan bool) {  ch <- truex = x + 1<- chwg.Done()   }func main() {  var w sync.WaitGroupch := make(chan bool, 1)	// 通道容量 1	所以每次都需要上一个结束 下一个才能进行操作for i := 0; i < 1000; i++ {w.Add(1)        go increment(&w, ch)}w.Wait()fmt.Println("final value of x", x)}// Final value of x 1000

技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请点点赞收藏+关注谢谢支持 !!!

http://www.lryc.cn/news/139613.html

相关文章:

  • vue ui 创建项目没有反应
  • go语言中channel类型
  • 基于STM32F1的电子罗盘HMC5883L角度测量
  • Oracle解锁表、包、用户、杀会话、停job
  • 软考高级系统架构设计师系列论文九十九:论软件开发平台的选择和应用
  • Redis Pub/Sub 指南
  • Nest(2):Nest 应用目录结构和脚手架命令介绍
  • 【嵌入式】MKV31F512VLL12 微控制器 (MCU) 、Cyclone® IV E EP4CE10E22I8LN,FPGA-现场可编程门阵列芯片
  • 矢量调制分析基础
  • ensp-Ipv6配置配置
  • java八股文面试[java基础]—— hashCode 与 equals 区别 == 与 equals的区别
  • Dubbo之PojoUtils源码分析
  • 【C++】—— C++11新特性之 “右值引用和移动语义”
  • 谈一谈redis脑裂
  • 基于原生Servlet使用模板引擎Thymeleaf访问界面
  • 【C语言】15-函数-1
  • 08-信息收集-架构、搭建、WAF等
  • Qt --- 显示相关设置 窗口属性等
  • 使用小程序实现左侧菜单,右侧列表双向联动效果
  • selenium中处理验证码问题
  • EMR电子病历系统 SaaS电子病历编辑器源码 电子病历模板编辑器
  • 一些自定义hooks
  • 基于Citespace、vosviewer、R语言的文献计量学可视化分析技术及全流程文献可视化SCI论文高效写作方法
  • lEC 61068-2-14_2023环境试验.第2-14部分:试验.试验N:温度变化, 最新版发布
  • CFDEM学习笔记
  • SpringBoot入门篇1 - 简介和工程创建
  • MyBatis-Plus updateById不更新null值
  • 用pytorch实现AlexNet
  • LeetCode560.和为k的子数组
  • echarts 的dataZoom滑块两端文字被遮挡