当前位置: 首页 > news >正文

springboot集成Redisson做分布式消息队列

这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version>
</dependency>

首先创建一个自定义注解RedissonTopic.java,用于指定消息的路由key

package com.zyq.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;/** Redissson消息队列注解* author xiaochi* date 2024/10/23*/
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {/*** topic名称* @return*/String key();/*** 是否队列发送消息* @return*/boolean queue() default false;/*** 队列容量* @return*/int queueSize() default 100;/** queue为true时生效* 延迟发送时间(大于0默认延迟,延迟队列可设置大于0)* @return*/int delayTime() default 0;/** queue为true时生效* 时间单位(默认毫秒)* @return*/TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

继续创建消息监听器 RedissonTopicMessageListener.java,具体内容如下:

package com.zyq.listener;/** Redisson消息监听接口* author xiaochi* date 2024/10/23*/
public interface RedissonTopicMessageListener{/*** 接收的消息处理* @param message*/void message(Object message);/*** 发送失败(队列已满时会调用)* @param message*/void sendFail(Object message);/*** 异常* @param message*/void exception(Object message);
}

接下来就是最重要的Redisson配置,内容如下:

/*** Redisson 配置* @return*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){Config config = new Config();config.useSingleServer().setPassword("123456").setDatabase(0).setConnectionPoolSize(24) // 连接池大小,默认64.setConnectionMinimumIdleSize(3) // 最小空闲连接数,默认32.setRetryAttempts(3) // 命令失败重试次数 3.setRetryInterval(1500) // 命令重试发送时间间隔(毫秒) 默认1500.setTimeout(10000) // 命令等待超时(毫秒) 默认10000.setConnectTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setIdleConnectionTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setSubscriptionConnectionMinimumIdleSize(1) // 发布和订阅连接的最小空闲连接数.setSubscriptionConnectionPoolSize(24) // 发布和订阅连接池大小 默认50.setDnsMonitoringInterval(10000) // DNS监测时间间隔(毫秒),默认5000*//*.setAddress("redis://127.0.0.1:6379");//config.setThreads(Runtime.getRuntime().availableProcessors());// 默认 16RedissonClient redissonClient = Redisson.create(config);StringBuilder msg = new StringBuilder();msg.append("Redisson topic register[");String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);for (String beanName : beanNames) {RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);if (redissonTopic.queue()){RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());RDelayedQueue<Object> delayedQueue = null;if (0 != redissonTopic.delayTime()){delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);}RTopic topic = redissonClient.getTopic(redissonTopic.key());RDelayedQueue<Object> finalDelayedQueue = delayedQueue;topic.addListener(Object.class, (channel, message) -> {if (finalDelayedQueue != null){try {finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加延迟队列异常,{}",e);}}else {try {if (!boundedBlockingQueue.offer(message)){topicMessageListener.sendFail(message);}}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加队列异常,{}",e);}}});// 为了不阻塞主线程,放在新线程中运行AsyncUtil.run(() -> {while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){try {Object take = boundedBlockingQueue.take();if (!"".equals(take)){topicMessageListener.message(take);}} catch (Exception e) {topicMessageListener.exception(e.getMessage());log.info("Redisson延迟队列监测异常,{}",e);}}if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){log.info("Redisson service shutdown");}});}else {RTopic topic = redissonClient.getTopic(redissonTopic.key());topic.addListener(Object.class, (channel,message) -> {try {topicMessageListener.message(message);}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson即时消息发送异常,{}",e);}});}msg.append(redissonTopic.key()).append(".");}}msg.append("]").append("finish.");log.info(msg.toString());return redissonClient;
}

到此基本就完成了,接下来就是创建消息监听类进行消费消息了TopicMessageListener.java 去实现消息监听器接口RedissonTopicMessageListener.java

package com.zyq.listener;import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;/** 消息监听类* author xiaochi* date 2024/10/23*/
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {@Overridepublic void message(Object message) {System.out.println("testTopic监听器延迟队列收到消息," + message);}@Overridepublic void sendFail(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息发送失败");}@Overridepublic void exception(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息异常");}
}

现在可以起2个springboot项目进行消息交流了。封装一个消息发送工具 RedissonMessageUtil.java,内容如下:

package com.demo3.util;import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/** Redisson消息发送工具* author xiaochi* date 2024/10/24*/
@Component
public class RedissonMessageUtil {private static RedissonClient redissonClient;@Autowiredpublic void setRedissonClient(RedissonClient redissonClient) {RedissonMessageUtil.redissonClient = redissonClient;}/*** 发送消息* @param key* @param message* @return 返回接收消息的客户端数量*/public static long send(String key,Object message){RTopic topic = redissonClient.getTopic(key);return topic.publish(message);}
}

到此完成。

http://www.lryc.cn/news/467473.html

相关文章:

  • 如何通过Lua语言请求接口拿到数据
  • Android 13 SystemUI 隐藏下拉快捷面板部分模块(wifi,bt,nfc等)入口
  • 自由学习记录(14)
  • 疯狂Spring Boot讲义[推荐1]
  • vue中$nextTick的作用是什么,什么时候使用
  • Redis实现全局ID生成器
  • Xshell远程连接工具详解
  • 如何在verilog设计的磁盘阵列控制器中实现不同RAID级别(如RAID 0、RAID 1等)的切换?
  • 基于元神操作系统实现NTFS文件操作(十)
  • Qt的几个函数方法
  • openpnp - bug - 散料飞达至少定义2个物料
  • HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException
  • 数据库的构成与手写简单数据库的探索
  • 基于STM32的智能晾衣架设计
  • 【MAUI】模糊控件(毛玻璃高斯模糊亚克力模糊)
  • 深度学习:pandas篇
  • Redis学习文档(Redis基本数据类型【Hash、Set】)
  • 15分钟学Go 第9天:函数的定义与调用
  • Java虚拟机:JVM介绍
  • R数据科学 16.5.3练习题
  • 通过conda install -c nvidia cuda=“11.3.0“ 安装低版本的cuda,但是却安装了高版本的12.4.0
  • 简易CPU设计入门:验证取指令模块
  • 【MySQL数据库】MySQL主从复制
  • CDC变更数据捕捉技术是什么?和ETL有什么不同?
  • 一种用于推进欧洲临床中心中风管理的联邦学习平台即服务
  • 给哔哩哔哩bilibili电脑版做个手机遥控器
  • opencv dnn模块 示例(27) 目标检测 object_detection 之 yolov11
  • 鸿蒙开发融云demo初始化和登录
  • 手机防窥膜的工作原理是怎样的?有必要使用防窥膜吗?
  • 【Python_PySide6学习笔记(三十九)】基于QLineEdit实现自定义文本框,用于格式化文本,每四个字符后添加一个空格