当前位置: 首页 > news >正文

RabbitMQ-消息消费确认

我们一般使用的是消费者作为被动方接收 RabbitMQ 推送消息,另一种是消费者作为主动方可以主动拉取消息。

RabbitMq 服务器推送消息分为隐式(自动)确认和显示确认。

1 消费者拉取消息

消费者作为主动方拉取消息,每次只能获取一条。

using (var channel = connection.CreateModel())
{BasicGetResult result = channel.BasicGet("PersistenceQueue", true);string message = Encoding.UTF8.GetString(result.Body.ToArray());Console.WriteLine($"拉取到消息:{message}");
}

2 RabbitMq服务器推送消息

消费者作为被动方接收RabbitMQ推送消息。

using (var channel = connection.CreateModel())
{EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//就是Rabbitmq的服务器作为主动方---RabbitMq 推送消息到消费者来的;consumer.Received += (model, ea) =>{string message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"正常收到消息:{message}");};channel.BasicConsume(queue: "PersistenceQueue", autoAck: true, consumer: consumer);
}

3 隐式确认

当 RabbbitMQ 将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。

自动确认的用法十分简单,设置消费方法的参数 autoAck 为 true 即可,我们前边的例子都是使用的自动确认。

channel.BasicConsume(queue: "PersistenceQueue", autoAck: true, consumer: consumer);

4 显式确认

设置消费方法的参数 autoAck 为 false,channel.BasicAck可以一条一条确认后删除,也可使用 channel.BasicReject不删除。

//定义消费者                                      
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
int i = 0;
//就是Rabbitmq的服务器作为主动方---RabbitMq 推送消息到消费者来的;
consumer.Received += (model, ea) =>
{string message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"正常收到消息:{message}");if (i < 100){Console.WriteLine($"【{message}】消息已经被消费,同时从RabbitMQ服务器删除");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);}else{Console.WriteLine($"【{message}】消息没有被正常消费,可以让消息不要删除");channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);//throw new Exception("消息消费异常了~");}i++;
};
//显式确认
channel.BasicConsume(queue: "PersistenceQueue", autoAck: false, consumer: consumer);

5 消息质量

channel.BasicQos可以设置每次从队列中取出几条消息进行消费。

channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);

方法中参数 prefetchSize 为预取的长度,一般设置为0即可,表示长度不限;

prefetchCount 表示预取的条数,即发送的最大消息条数;

global 表示是否在 Connection 中全局设置,true表示 Connetion 下的所有 channel 都设置为这个配置。

http://www.lryc.cn/news/520790.html

相关文章:

  • E10.【C语言】练习:编写一个猜数字游戏
  • RK3568-rk809rtc休眠唤醒
  • 【Uniapp-Vue3】pages.json页面路由globalStyle的属性
  • NHANES数据挖掘|特征变量对死亡率预测的研究设计与分析
  • 【Sharding-JDBC学习】概述_shardingsphere-jdbc 和sharding-jdbc
  • 用户登录/登出功能,当登录页面在另一域名下
  • 自动化解决方案:修复devicedisplaystatusmanager.dll丢失
  • .Net8 Avalonia跨平台UI框架——<vlc:VideoView>控件播放海康监控、摄像机视频(Windows / Linux)
  • 网络协议(八):IP 协议
  • 深度解析 pytest 参数化与 --count 执行顺序的奥秘
  • 【traefik】forwadAuth中间件跨namespace请求的问题
  • java学习记录16
  • 【Lua学习之旅】之单行/多行注释
  • [Effective C++]条款45 运用成员函数模板接受所有兼容类型
  • Harry技术添加存储(minio、aliyun oss)、短信sms(aliyun、模拟)、邮件发送等功能
  • 【python基础——异常BUG】
  • 解决Qt打印中文字符出现乱码
  • 第三十八章 Spring之假如让你来写MVC——适配器篇
  • 服务器引导异常,Grub报错: error: ../../grub-core/fs/fshelp.c:258:file xxxx.img not found.
  • 昵称 校验
  • MATLAB学习笔记目录
  • 基于单片机的语音控制玩具汽车的设计
  • Qt WORD/PDF(五)使用Json一键填充Word表格
  • vue3+ts的几个bug调试
  • DVWA靶场CSRF漏洞通关教程及源码审计
  • 前端开发:HTML常见标签
  • 【机器学习】主动学习-增加标签的操作方法-样本池采样(Pool-Based Sampling)
  • 【Rust自学】11.9. 单元测试
  • 深入理解Web存储机制:Cookie、SessionStorage与LocalStorage的区别
  • SpringBoot之BeanDefinitionLoader类源码学习