redisson实现延迟队列
1.pom引入redisson
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.20.1</version></dependency>
- 整合springboot配置,这个可以参考之前整合redisson的文章,或者上面一步直接引用
redisson整合好的springboot的包 如下(本人是引入的redisson自己整合的springboot,实际一样的
只要redisson可以使用,就成功)
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.27.0</version>
</dependency>
3.配置redis的队列
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @email 1965654634@qq.com*/
@Configuration
public class RedisQueueConfig {@Beanpublic RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) {//队列名称可以自己定义return redissonClient.getBlockingQueue("delayedQueue");}@Beanpublic RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue,RedissonClient redissonClient) {return redissonClient.getDelayedQueue(blockingQueue);}
}
4.创建redis的队列类
import cn.hutool.core.date.LocalDateTimeUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** @email 1965654634@qq.com*/
@Slf4j
@Service
public class DelayedQueueService{@Autowiredprivate RDelayedQueue<String> delayedQueue;@Autowiredprivate RBlockingQueue<String> blockingQueue;@PostConstructpublic void init() {// 创建延迟队列ExecutorService executorService = Executors.newFixedThreadPool(1);executorService.submit(() -> {while (true) {try {// 从延迟队列中取出任务String task = blockingQueue.take();// 处理延迟任务,例如执行某个操作log.info("redis的延迟队列:{},当前时间:{}", task, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));} catch (Exception e) {log.error("redis的延迟队列抛出异常", e);}}});}public String addDelayedTask(String task, long delay) {log.info("redis的延迟队列(添加)的key:{},time:{},当前时间:{}",task,delay, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));// 将任务加入延迟队列delayedQueue.offerAsync(task, delay, TimeUnit.SECONDS);// 返回任务的唯一标识return task;}public void cancelDelayedTask(String task) {log.info("redis的延迟队列(移除)的key:{},当前时间:{}",task, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));// 从延迟队列中移除任务delayedQueue.remove(task);}
}
5.测试controller
@PostMapping("/queueAdd")public ResponseResult<Boolean> queueAdd(String task,Long time){log.info("测试添加的key:{},time:{},当前时间:{}",task,time, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));delayedQueueService.addDelayedTask(task,time);return ResponseResult.ok();}@PostMapping("/queueDel")public ResponseResult<Boolean> dianTiAlarm(String task,Long time){log.info("测试删除的key:{},time:{},当前时间:{}",task,time, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));delayedQueueService.cancelDelayedTask(task);return ResponseResult.ok();}