当前位置: 首页 > news >正文

【RabbitMQ】golang客户端教程2——工作队列

任务队列/工作队列

在这里插入图片描述
在上一个教程中,我们编写程序从命名的队列发送和接收消息。在这一节中,我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。

工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。

这个概念在Web应用中特别有用,因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务,(译注:例如注册时发送邮件或短信验证码等场景)。

准备工作

在本教程的上一部分,我们发送了一条包含“ Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有实际的任务,例如调整图像大小或渲染pdf文件,所以我们通过借助time.Sleep函数模拟一些比较耗时的任务。我们会将一些包含.的字符串封装为消息发送到队列中,其中每有一个.就表示需要耗费1秒钟的工作,例如,hello...表示一个将花费三秒钟的假任务。

我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go

body := bodyFrom(os.Args)  // 从参数中获取要发送的消息正文
err = ch.Publish("",           // exchangeq.Name,       // routing keyfalse,        // mandatoryfalse,amqp.Publishing {DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

下面是bodyFrom函数:

func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

我们以前的receive.go程序也需要进行一些更改:它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其称为worker.go

msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dot_count := bytes.Count(d.Body, []byte("."))  // 数一下有几个.t := time.Duration(dot_count)time.Sleep(t * time.Second)  // 模拟耗时的任务log.Printf("Done")}
}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们的假任务模拟执行时间。

然后,我们就可以打开两个终端,分别执行new_task.goworker.go了。

# shell 1
go run worker.go
# shell 2
go run new_task.go

循环调度

使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。

首先,让我们尝试同时运行两个worker.go脚本。它们都将从队列中获取消息,但是究竟是怎样呢?让我们来看看。

你需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者之后,你可以发布一些消息:

# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....

然后我们在shell1shell2 两个窗口看到如下输出结果了:

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多worker试一下。

消息确认

work 完成任务可能需要耗费几秒钟,如果一个worker在任务执行过程中宕机了该怎么办呢?我们当前的代码中,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果你终止一个worker那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个worker的尚未处理的消息。

我们不想丢失任何任务,如果一个worker意外宕机了,那么我们希望将任务交付给其他worker来处理。

为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。

如果使用者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP连接丢失),RabbitMQ将了解消息未完全处理,并将对其重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。

没有任何消息超时;RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。

在本教程中,我们将使用手动消息确认,方法是为“auto-ack”参数传递一个false,然后在完成任务后,使用d.Ack(false)worker发送一个正确的确认(这将确认一次传递)。

msgs, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // 注意这里传false,关闭自动消息确认false,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
if err != nil {fmt.Printf("ch.Consume failed, err:%v\n", err)return
}// 开启循环不断地消费消息
forever := make(chan bool)
go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手动传递消息确认}
}()

使用这段代码,我们可以确保即使你在处理消息时使用CTRL+C杀死一个worker,也不会丢失任何内容。在worker死后不久,所有未确认的消息都将被重新发送。

消息确认必须在接收消息的同一通道(Channel)上发送。尝试使用不同的通道(Channel)进行消息确认将导致通道级协议异常。

忘记确认

忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:

> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
> ```
>
> 在Windows平台,去掉sudo
>
> ```bash
> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
> ```### 消息持久化我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。首先,我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此,我们需要声明它是持久的:```go
q, err := ch.QueueDeclare("hello", // nametrue,    // 声明为持久队列false,   // delete when unusedfalse,   // exclusivefalse,   // no-waitnil,     // arguments
)

虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如task_queue

q, err := ch.QueueDeclare("task_queue", // nametrue,         // 声明为持久队列false,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments
)

这种持久的选项更改需要同时应用于生产者代码和消费者代码。

在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用amqp.Publishing中的持久性选项amqp.Persistent

err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // 立即false,  // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)ContentType:  "text/plain",Body:         []byte(body),})

有关消息持久性的说明

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用publisher confirms

公平分发

你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker将持续忙碌,而另一个worker几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。

这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
在这里插入图片描述
为了避免这种情况,我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker

err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global
)

关于队列大小的说明

如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。

完整的代码实例

我们的new_task.go的最终代码代入如下:

package mainimport ("github.com/streadway/amqp""log""os""strings"
)func failOnErrorNew(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func bodyFrom(args []string) string {var s stringif len(args) < 2 || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], "")}return s
}func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorNew(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorNew(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("task_queue", //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //arguments)failOnErrorNew(err, "Failed to declare a queue ")body := bodyFrom(os.Args)err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent, //表示将消息设置为持久模式,确保在 RabbitMQ 重启后消息能够被恢复。ContentType:  "text/plain",Body:         []byte(body),})failOnErrorNew(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)}

work.go内容如下:

package mainimport ("bytes""github.com/streadway/amqp""log""time"
)func failOnErrorWork(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorWork(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorWork(err, "Failed to open a channel")defer ch.Close()//声明队列q, err := ch.QueueDeclare("task_queue", //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //argument)failOnErrorWork(err, "Failed to declare a queue")err = ch.Qos(1,     //prefetch count 消费者一次能获取的最大未确认消息数量0,     //prefetch size 消费者一次能获取的最大未确认消息的总大小(以字节为单位)false, //global  是否将预取配置应用于所有通道。如果为 true,则应用于所有通道;如果为 false,则仅应用于当前通道。)//获取接收消息的Delivery通道msgs, err := ch.Consume(q.Name, //queue"",     //consumerfalse,  //注意这里传false,关闭自动消息确认false,  //exclusivefalse,  //no-localfalse,  //no-waitnil,    //args)failOnErrorWork(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Reveived a message:%s", d.Body)dot_count := bytes.Count(d.Body, []byte(".")) //数一下有几个t := time.Duration(dot_count)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) //手动传递消息确认}}()log.Printf("[*] Waiting for messages. To exit press CTRL+C")<-forever
}

源自:https://www.rabbitmq.com/getstarted.html

http://www.lryc.cn/news/106873.html

相关文章:

  • 芯旺微冲刺IPO,车规级MCU竞争白热化下的“隐忧”凸显
  • HTML <s> 标签
  • 微信小程序 - scroll-view组件之上拉加载下拉刷新(解决上拉加载不触发)
  • rust usize与i64怎么比较大小?
  • 电脑更新win10黑屏解决方法
  • STM32入门——外部中断
  • 【计算机网络】NAT及Bridge介绍
  • 封装动态SQL的插件
  • C# Microsoft消息队列服务器的使用 MSMQ
  • Kafka3.0.0版本——生产者如何提高吞吐量
  • js精度丢失的问题
  • C++ 编译预处理
  • 备战秋招 | 笔试强化22
  • LeetCode ACM模式——哈希表篇(二)
  • hadoop 3.1.3集群搭建 ubuntu20
  • 备忘录模式——撤销功能的实现
  • Golang 函数参数的传递方式 值传递,引用传递
  • K8s影响Pod调度和Deployment
  • 透明代理和不透明代理
  • 1424. 对角线遍历 II;2369. 检查数组是否存在有效划分;1129. 颜色交替的最短路径
  • 【漏洞复现】Metabase 远程命令执行漏洞(CVE-2023-38646)
  • Linux 9的repo for OVS build
  • DOCTYPE 是什么作用?
  • KubeSphere 3.4.0 发布:支持 K8s v1.26
  • 自然语言文本分类模型代码
  • Prometheus实现系统监控报警邮件
  • could not import go.etcd.io/etcd/clientv3-go
  • MySQL的行锁、表锁触发
  • mysql-入门笔记-3
  • 3分钟创建超实用的中小学新生录取查询系统,现在可以实现了