当前位置: 首页 > news >正文

MQ高级篇---消息可靠性

MQ的一些常见问题


在这里插入图片描述
后面内容基于springboot 2.3.9.RELEASE

消息可靠性


在这里插入图片描述

生产者确认机制

在这里插入图片描述

  • 在publisher微服务中application.yml中添加
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

在这里插入图片描述

  • 每个RabbitTemplate只能配置一个ReturnCallback, 因此需要在项目启动过程中配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error("消息发送到队列失败, 响应码:{}, 失败原因: {}, 交换机: {}, 路由key: {}, 消息: {}",replyCode, replyText, exchange, routingKey, message);});}
}
  • 发送消息, 指定消息ID,消息ConfirmCallBack
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;@Slf4j
@SpringBootTest
public class PublishTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid name() throws InterruptedException {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());correlationData.getFuture().addCallback(result -> {if(result.isAck()){// ACKlog.debug("消息成功投递到交换机! 消息ID: {}", correlationData.getId());}else {// NACKlog.error("消息投递到交换机失败! 消息ID: {}", correlationData.getId());}}, ex -> {log.error("消息发送失败!", ex);});rabbitTemplate.convertAndSend("high.topic", "high.#", "hello amqp", correlationData);}
}

在这里插入图片描述

消息持久化

声明队列和交换机时指定durabletrue,为持久化

spring amqp中交换机、队列、消息默认都是持久的

消费者消息确认

在这里插入图片描述
消费者业务添加配置

spring:rabbitmq:listener:simple:acknowledge-mode: auto 

失败重试机制

在这里插入图片描述

spring:rabbitmq:listener:simple:acknowledge-mode: autoprefetch: 1retry:enabled: true     # 开启消费者失败重试initial-interval: 1000  # 初始的失败等待时长1秒multiplier: 1  # 下次失败的等待时长倍数max-attempts: 3   # 最大重试次数stateless: true  # true无状态, false有状态, 如果业务中包含事务, 这里改为false

配置说明:

初始等待时长1秒,倍数为2, 则等待时长为 1秒 2秒 4秒 8秒 …

消费者失败消息处理策略

在这里插入图片描述

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorMessageBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

死信交换机


在这里插入图片描述
这个是由队列投递

TTL

在这里插入图片描述

  • 声明死信交换机
	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue", durable = "true"),exchange = @Exchange(name = "dl.direct"),key = "dl"))public void listenDlQueue(String msg){log.info("消费者接收到了dl.queue的延迟消息: {}", msg);}
  • 声明TTL交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TTLMessageConfig {@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000)   // 指定时间10秒.deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");}
}
  • 发送消息
	@Testvoid name() {MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();rabbitTemplate.convertAndSend("ttl.direct", "ttl", "ttl message");}

也可以指定消息的过期时间, 两者都指定时, 以短的为准

在这里插入图片描述

http://www.lryc.cn/news/324704.html

相关文章:

  • SpringMVC | SpringMVC中的 “文件上传和下载”
  • JVM快速入门(2)HotSpot和堆、新生区、永久区、堆内存调优、JProfiler工具分析OOM原因、GC(垃圾回收)、JVM经典面试笔试题整理
  • 我的风采——android studio
  • BMS设计中的短路保护和MOSFET选型(上)
  • 用go实现一个任务调度类 (泛型)
  • ansible 管理工具以及常用模块
  • javaSSM公司招聘管理系统IDEA开发mysql数据库web结构计算机java编程maven项目
  • 蓝桥杯day11刷题日记
  • IDEA, Pycharm, Goland控制台乱码
  • JavaScript单元测试jasmine学习(一)
  • 108、3D Gaussian Splatting for Real-Time Radiance Field Rendering
  • PHP之CURL和Socket
  • 【Web】NKCTF 2024 个人wp(部分)
  • QT常见布局器使用
  • 政安晨:【深度学习部署】—— TensorFlow Extended(TFX)介绍
  • 宝石与石头
  • 【Vue3之computed属性(四)】
  • 生产力工具|安装更新R软件(R、studio)
  • ffmpeg实现媒体流解码
  • 面试题 之 react
  • k8s笔记27--快速了解 k8s pod和cgroup的关系
  • android Fragment 生命周期 方法调用顺序
  • python写爬虫爬取京东商品信息
  • 使用Linux别名简化命令输入
  • 34.网络游戏逆向分析与漏洞攻防-游戏网络通信数据解析-登录数据包的监视与模拟
  • rust - 对文件夹进行zip压缩加密
  • ETL数据倾斜与资源优化
  • Python的asyncio:异步编程的利器
  • nodejs+vue高校奖助学金系统python-flask-django-php
  • 已解决redis.clients.jedis.exceptions.JedisMovedDataException异常的正确解决方法,亲测有效!!!