当前位置: 首页 > news >正文

Rabbitmq消息确认机制

1.生产者确认机制

确认消息发送到交换机--Confirm方式

1.1普通Confirm方式

private static void sendMsg(Channel channel) throws IOException, InterruptedException {
        //开启确认机制
        channel.confirmSelect();
        //发送消息到exchange
        String msg = "hello confirm";
 
        channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
 
        if (channel.waitForConfirms()) {
            System.out.println("生产者发布消息至Exchange成功!");
        } else {
            System.out.println("生产者发布消息至Exchange失败!请重试");
        }
    }
 

1.2异步Confirm方式

 private static void sendMsg(Channel channel) throws IOException, InterruptedException {
        //开启确认机制
        channel.confirmSelect();
        //发送消息到exchange
        String msg = "hello confirm";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
        }
 
        //开启异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("生产者发布消息至Exchange成功,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
            }
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("生产者发布消息至Exchange失败,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
            }
        });
        System.in.read();
    }

2.交换机到消息队列的return确认机制

 //3开启Return机制
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //当送达失败是才会回调
                System.out.println(new String(body, "utf-8") + ",消息没有送达到queue中");
            }
        });
 

3.消费者确认机制

接收消息成功时

channel.basicAck(envelope.getDeliveryTag(), false);

接收消息失败或进入异常时

 try {
                    //具体业务
                    int i = 1 / 0;
                    //确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    if (errorMap.get(new String(body, "UTF-8")) != null) {
                        System.out.println("消息已重复处理失败,拒绝再次接收...");
                        channel.basicReject(envelope.getDeliveryTag(), false);
                    } else {
                        System.out.println("消息即将再次返回队列处理...");
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                        errorMap.put(new String(body, "UTF-8"), 1);
                    }
                }
            }
 

一般在该消息处理完后执行,该消息才会在队列里面被删除,不然会处于UnAcked的状态存在队列中

finally {channel.basicAck(envelope.getDeliveryTag(), false);
}
http://www.lryc.cn/news/58571.html

相关文章:

  • FinClip 云开发实践(附小程序demo)
  • 真正好用的工业品ERP系统应该是什么样的?
  • Shiro重定向
  • Greenplum数据库执行器——PartitionSelector执行节点
  • POJ 2311 Cutting Game
  • CTF-PHP反序列化漏洞1-基础知识
  • 【面试】记一次安恒面试及总结
  • 刹车制动(卡钳)TOP3供应商份额超50%,哪些本土供应商突围
  • Go分布式爬虫笔记(二十二)
  • 跨线程修改主界面
  • 国内ChatGPt研发-中国chatGPT
  • springboot的rest服务配置服务的根路径
  • MySQL B+Tree 索引优化技巧
  • 100种思维模型之逆向思维模型-46
  • C/C++每日一练(20230413)
  • volatile和synchronized的区别
  • Cadence Allegro 导出Unplaced Component Report报告详解
  • 面试了上百位性能测试后,我发现了一个令人不安的事实...
  • 天气预报查询 API + AI 等于王炸(一大波你未曾设想的天气预报查询 API 应用场景更新了)
  • 跨境电商的行业现状与发展趋势分析
  • 适配器设计模式
  • 代码随想录算法训练营第三十五天-贪心算法4| ● 860.柠檬水找零 ● 406.根据身高重建队列 ● 452. 用最少数量的箭引爆气球
  • 2023MathorcupC题电商物流网络包裹应急调运与结构优化问题建模详解+模型代码(一)
  • 软件测试技术之跨平台的移动端UI自动化测试(上)
  • 【MySQL--02】库的操作
  • 人民链Baas服务平台上线,中创助力人民数据共建数据服务应用场景
  • 说说如何借助webpack来优化前端性能?
  • AiDD AI+软件研发数字峰会开启编程新纪元
  • 【远程开发】VSCode使用Remote SSH远程连接Linux服务器
  • C++纯虚函数和抽象类详解