当前位置: 首页 > news >正文

RabbitMq-接收消息+redis消费者重复接收

在接触RammitMQ时,好多文章都说在配置中设置属性 

# rabbitmq 配置
rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)publisher-returns: true#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlatedlistener: #消费者 端配置retry:enabled: true # 是否支持重试default-requeue-rejected: falsemax-attempts: 5 #最大重试次数initial-interval: 3000 # 重试时间间隔direct:acknowledge-mode: manualsimple:acknowledge-mode: manual

消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。

----------------------------------正确思路---------------------------------------------------------------------------------

设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;

---------------------------------------代码

package com.charg.listener;import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.Duration;/*** rabbitmq 监听*/
@Slf4j
@Component
public class RabbitQueueListener {/*** 最大重试次数*/private static int maxReconsumeCount = 3;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 监听  队列的处理器** @param message*/@RabbitListener(queues = "队列名称")@RabbitHandlerpublic void onMessage(Message message, Channel channel) {//唯一标识String messageId = message.getMessageProperties().getMessageId();try {//判断messageId在redis中是否存在if (verificationMessageId(messageId)) {log.error("消息已重复处理,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {//不存在 则处理消息// 接收消息if (StringUtils.isNotBlank(new String(message.getBody()))) {//修改业务逻辑if (!false) {log.error("消息即将再次返回队列处理...逻辑错误");// 处理最大回调次数getMaximumNumber(message, channel);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//加入缓存addMessageId(message);}} else {log.info("消息为空拒绝接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}} catch (Exception e) {e.printStackTrace();try {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理,拒绝再次接收----...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");// 处理最大回调次数getMaximumNumber(message, channel);}} catch (Exception exception) {exception.printStackTrace();}}}/*** 记录消息最大次数** @param message* @param channel* @throws IOException*/private void getMaximumNumber(Message message, Channel channel) {try {int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());if (maxReconsumeCount > recounsumeCounts) {log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 记录重试次数maxMessageId(message.getMessageProperties().getMessageId());} else {log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));// 将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);// 清除缓存RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());//重试三次后,还是失败 需记录到数据库addRabMsgLog(message);}} catch (Exception e) {e.printStackTrace();}}/*** 设置消息的最大重试次数*/public void maxMessageId(String messageId) {String messageMax ="messageMaxKey"+ messageId;// 存入缓存,用来记录该消息重试了几次if (RedisUtils.hasKey(messageMax)) {RedisUtils.incrAtomicValue(messageMax);} else {//错误的消息-插入数据库RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));}}/*** 校验消息是否消费过该消息** @param messageId 消息id* @return*/public boolean verificationMessageId(String messageId) {// 消息是否存在keyString verifyIsExistKey ="messageExistKey" + messageId;if ((RedisUtils.hasKey(verifyIsExistKey))) {return true;}return false;}/*** 保存消费过消息** @param message 消息* @return*/public void addMessageId(Message message) {// 存入缓存RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);}/*** 消息队列 失败日志 操作* 自己存数据库逻辑*/public void addRabMsgLog(Message message) {log.info("====操作日志===");//将内容记录到数据库}}
--------------------------------数据库表

 

http://www.lryc.cn/news/63096.html

相关文章:

  • Orangepi Zero2 全志H616简介
  • Golang每日一练(leetDay0047)
  • HCL Nomad Web 1.0.7发布和新功能验证
  • 春招网申简历填写三技巧
  • 计算机网络基础知识总结
  • (下)苹果有开源,但又怎样呢?
  • row_number 和 cte 使用实例:考场监考安排
  • 2023天梯赛记录
  • 被吐槽 GitHub仓 库太大,直接 600M 瘦身到 6M,这下舒服了
  • OpenGL(三)——着色器
  • 【MySQL】单表查询
  • 第一章 安装Unity
  • 20230425----重返学习-vue项目-vue自定义指令-vue-cli的配置
  • el-input 只能输入整数(包括正数、负数、0)或者只能输入整数(包括正数、负数、0)和小数
  • Docker Compose的常用命令与docker-compose.yml脚本属性配置
  • with语句和上下文管理器(py编程)
  • 《JavaEE初阶》HTTP协议和HTTPS
  • 微信小程序 | 基于高德地图+ChatGPT实现旅游规划小程序
  • Excel技能之实用技巧,高手私藏
  • 黑马程序员Java零基础视频教程笔记-运算符
  • Microsoft Data Loss Prevention(DLP)部署方案
  • win系统使用frp端口映射实现内网穿透,配置“任务计划程序”提高稳定性
  • python工具方法 39 大图裁剪为小图|小图还原成大图(含生成大图伪标签)
  • MUSIC算法仿真
  • redis 数据类型详解 以及 redis适用场景场合
  • python基于轻量级YOLOv5的生猪检测+状态识别分析系统
  • 阅读笔记 First Order Motion Model for Image Animation
  • 【计算机图形学】课堂习题汇总
  • 国外导师对博士后申请简历的几点建议
  • 【五一创作】Scratch资料袋