当前位置: 首页 > news >正文

C#使用RabbitMQ-2_详解工作队列模式

简介

🍀RabbitMQ中的工作队列模式是指将任务分配给多个消费者并行处理。在工作队列模式中,生产者将任务发送到RabbitMQ交换器,然后交换器将任务路由到一个或多个队列。消费者从队列中获取任务并进行处理。处理完成后,消费者可以向RabbitMQ发送一个确认消息,表示任务已完成。

优点:

🍀工作队列模式的主要优点是能够实现负载均衡和并行处理。通过将任务分配给多个消费者,可以提高系统的处理能力和吞吐量。此外,工作队列模式还具有很好的扩展性,可以根据需要动态添加或删除消费者。

任务流程:

  1. 生产者(Producer)将任务发送到RabbitMQ交换器(Exchange)。
  2. 交换器根据路由键(Routing Key)将任务路由到一个或多个队列(Queue)。
  3. 消费者(Consumer)从队列中获取任务并进行处理。
  4. 处理完成后,消费者向RabbitMQ发送一个确认消息,表示任务已完成。

生产者代码

在这个代码中我们声明消息队列时第二个参数设置为true,表示这个队列是持久化的。接着使用while做一个循环,不断读取用户输入的消息内容,然后将其转换为字节数组后发布到"hello"队列中。

class MyClass
{public static void Main(string[] args){var factory = new ConnectionFactory();factory.HostName = "localhost"; //RabbitMQ服务在本地运行factory.UserName = "guest"; //用户名factory.Password = "guest"; //密码//创建连接using (var connection = factory.CreateConnection()){//创建通道using (var channel = connection.CreateModel()){//声明一个名称为hello的消息队列channel.QueueDeclare("hello", true, false, false, null);string msg = null;int i = 1;Console.WriteLine("请输入要发送的消息内容:");while (!string.IsNullOrEmpty(msg = Console.ReadLine())){string message = $"Hello {msg} ! " + i++; //传递的消息内容var body = Encoding.UTF8.GetBytes(message);//此处的参数"hello" 就对应的就是上面声明的消息队列的路由键channel.BasicPublish("", "hello", null, body); //开始传递Console.WriteLine("已发送: {0}", message);}}}}
}

消费者代码

🍀这里最关键的一行代码就是channel.BasicQos(0, 1, false);BasicQos方法用于设置消费者的预取计数(prefetch count)。消费者从队列中获取消息的方式是通过预取计数来控制的。预取计数决定了消费者在没有发送确认信号的情况下可以同时处理多少条未确认的消息。

在Channel.BasicQos()方法中三个参数作用如下:

  1. prefetchSize:这个参数表示每次从队列中获取的消息的最大大小,单位是字节。设置为0表示没有限制。
  2. prefetchCount:这个参数表示每个消费者同时可以处理的最大未确认消息的数量。设置为1表示每个消费者只能处理一个未确认消息。
  3. global:这个布尔值表示是否将这两个参数应用于所有的消费者。如果设置为true,则这两个参数将应用于所有的消费者;如果设置为false,则这两个参数仅适用于当前的消费者。

channel.BasicQos(0, 1, false);这行代码设置了消费者的预取计数为1。这意味着消费者在没有发送确认信号的情况下,最多只会处理一条未确认的消息。

这样可以提高消费者处理消息的效率,因为消费者不需要等待其他消费者发送确认信号后再处理消息。这样可以在一定程度上提高系统的吞吐量。

class MyClass
{static void Main(string[] args){//创建连接工厂var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "guest";factory.Password = "guest";//创建连接using (var connection = factory.CreateConnection()){//创建通道using (var channel = connection.CreateModel()){//声明队列channel.QueueDeclare("hello", true, false, false, null);channel.BasicQos(0, 1, false);//事件的基本消费者var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);//这里加上睡眠时间,模拟耗时任务Thread.Sleep(1000);Console.WriteLine("已接收: {0}", message);//发送消息确认信号(手动确认)    channel.BasicAck(ea.DeliveryTag,false);};//当 autoAck设置为true时,也就是自动确认模式,一旦消息队列将消息发送给消息消费者后,就会从内存中将这个消息删除。//当autoAck设置为false时,也就是手动模式,如果此时的有一个消费者宕机,消息队列就会将这条消息继续发送给其他的消费者,这样数据在消息消费者集群的环境下,就不会不丢失了。channel.BasicConsume("hello", false, consumer);Console.ReadKey();}}}
}

代码演示

🍀首先我们将消费者代码发布到本地文件夹中

🍀发布完成后我们找到打包好的程序集,双击两次.exe文件,运行两个消费者

 🍀接着我们运行生产者代码,在控制台随意发送6条消息。

🍀再回到我们刚刚运行的两个消费者程序,可以看到, 消息被分发给两个消费者了

http://www.lryc.cn/news/288621.html

相关文章:

  • Day37 56合并区间 738单调递增的数字 968监控二叉树
  • 【Android】在WSA安卓子系统中进行新实验性功能试用与抓包(2311.4.5.0)
  • 【服务器】服务器的管理口和网口
  • 一个小例子,演示函数指针
  • python12-Python的字符串之使用input获取用户输入
  • 【代码随想录-数组】移除元素
  • springboot事务管理
  • 数据结构——链式二叉树(2)
  • spring-boot-starter-validation常用注解
  • AF700 NHS 酯,AF 700 Succinimidyl Ester,一种明亮且具有光稳定性的近红外染料
  • C#常见内存泄漏
  • Xmind安装到指定目录
  • [GXYCTF2019]BabyUpload1
  • SpringBoot之分页查询的使用
  • 【shell-10】shell实现的各种kafka脚本
  • 【模型压缩】模型剪枝详解
  • Log4j2-01-log4j2 hello world 入门使用
  • Mysql-日志介绍 日志配置
  • 计算机网络的体系结构的各层在整个过程中起到什么作用?
  • 如何在业务代码中优雅的使用策略模式?
  • “docker-credential-desktop.exe“: executable file not found in $PATH 错误解决
  • openssl3.2/test/certs - 055 - all DNS-like CNs allowed by CA1, no DNS SANs
  • 长虹智能电视6000iD、6080iD、3000iD、U2系列等 ZLM61HiPJ机芯升级刷机方法,附刷机数据
  • 六、VTK创建平面vtkPlaneSource
  • LiveGBS流媒体平台GB/T28181常见问题-如何配置使用自己已有的redis服务替换redis版本升级redis版本
  • stm32产品架构
  • 数据结构——双链表
  • Git 对文件名大小写不敏感的问题解决方案
  • Java复习系列之阶段三:框架原理
  • 【Python】01快速上手爬虫案例一:搞定豆瓣读书