当前位置: 首页 > news >正文

Redis实现延时队列

缓存队列延时向接口报工,并支持多实例部署。

引入依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-data</artifactId><version>3.17.4</version>
</dependency>

注入RedisClient

import org.springframework.util.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private String port;@Value("${spring.redis.password}")private String password;@Value("${spring.redis.database}")private Integer database;@Bean()public RedissonClient redissonClient() {Config config = new Config();SingleServerConfig serversConfig = config.useSingleServer();serversConfig.setAddress("redis://" + host + ":" + port);if (!StringUtils.isEmpty(password)){serversConfig.setPassword(password);}serversConfig.setDatabase(database);return Redisson.create(config);}
}

注入延时队列Bean

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author* redisson延迟队列*/
@Configuration
public class RedissonQueueConfig {@Beanpublic RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonClient") RedissonClient redissonClient) {String queueName = "queue";return redissonClient.getBlockingQueue(queueName);}@Bean(name = "rDelayedQueue")public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonClient") RedissonClient redissonClient,@Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {return redissonClient.getDelayedQueue(blockQueue);}
}

编写方法

import java.util.concurrent.TimeUnit;/*** @author*/
public interface DelayQueue {/*** 发布** @param object* @return*/Boolean offer(Object object);/*** 带延迟功能的队列** @param object* @param time* @param timeUnit*/void offer(Object object, Long time, TimeUnit timeUnit);void offerAsync(Object object, Long time, TimeUnit timeUnit);Boolean offerAsync(Object object);
}import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @author*/
@Component
public class RedissonDelayQueue implements DelayQueue {private static Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);@Resource(name = "rDelayedQueue")private RDelayedQueue<Object> rDelayedQueue;@Overridepublic Boolean offer(Object object) {return rDelayedQueue.offer(object);}@Overridepublic void offer(Object object, Long time, TimeUnit timeUnit) {rDelayedQueue.offer(object, time, timeUnit);}@Overridepublic void offerAsync(Object object, Long time, TimeUnit timeUnit) {rDelayedQueue.offerAsync(object, time, timeUnit);}@Overridepublic Boolean offerAsync(Object object) {boolean flag = false;RFuture<Boolean> rFuture = rDelayedQueue.offerAsync(object);try {flag = rFuture.get();} catch (InterruptedException | ExecutionException e) {log.info("offerAsync exception:{}", e.getMessage());e.printStackTrace();}return flag;}
}

延时任务

队列生产

@Resource(name = "rDelayedQueue")
private RDelayedQueue<Object> rDelayedQueue;public void delayedQueue(){//rDelayedQueue.size()队列中元素数量//添加到延时队列中rDelayedQueue.offerAsync("延时30秒执行任务",30, TimeUnit.SECONDS);
}

队列消费

import com.coctrl.mom.common.service.WebService;
import com.coctrl.mom.process.entity.vo.LESPassStationDetailQueueVO;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @author*/
@Component
@Slf4j
public class RedissonTask {@Resource(name = "rBlockingQueue")private RBlockingQueue<Object> rBlockingQueue;@PostConstructpublic void take() {new Thread(() -> {while (true) {try {log.info("延时报工信息===============" +rBlockingQueue.take());//业务代码} catch (InterruptedException e) {log.error("延时报工失败===============" + e.getMessage());e.printStackTrace();}}}).start();}
}

http://www.lryc.cn/news/106159.html

相关文章:

  • 无限遍历,Python实现在多维嵌套字典、列表、元组的JSON中获取数据
  • 信息学奥赛一本通——1180:分数线划定
  • SpringApplication对象的构建及spring.factories的加载时机
  • 基于传统检测算法hog+svm实现图像多分类
  • slice() 方法,使用 concat() 方法, [...originalArray],find(filter),移出类名 removeAttr()
  • Zabbix报警机制、配置钉钉机器人、自动发现、主动监控概述、配置主动监控、zabbix拓扑图、nginx监控实例
  • ELK日志分析系统概述及部署
  • HTML拖拽
  • 【vue】 vue2 监听滚动条滚动事件
  • k8s目录
  • 设计模式行为型——解释器模式
  • 使用 Webpack 优化前端开发流程
  • mysql的分库分表脚本
  • JavaEE初阶之文件操作 —— IO
  • 客户端代码 VS 服务端代码 简述
  • 【娱乐圈明星知识图谱2】信息抽取
  • C++ rand的用法
  • element时间选择器的默认值
  • fiddler过滤器
  • 面试必考精华版Leetcode2130.链表最大孪生和
  • qemu kvm 新建虚拟机
  • Charles抓包工具使用(一)(macOS)
  • 2023年8月美团外卖3-18元红包优惠券天天领取活动日历及美团外卖红包领取使用
  • 深度学习各层负责什么内容?
  • 【硬件设计】模拟电子基础二--放大电路
  • 基于应用值迭代的马尔可夫决策过程(MDP)的策略的机器人研究(Matlab代码实现)
  • 控件旋转90度,并跟随大小缩放
  • 软件外包开发的PHP开发框架
  • D2L学习记录-10-词嵌入word2vec
  • 海外独立站怎么搭建?7个海外独立站搭建指南