当前位置: 首页 > news >正文

如何使用Java和RabbitMQ实现延迟队列?

前言

今天我们使用Java和RabbitMQ实现消息队列的延迟功能。

前期准备,需要安装好docker、docker-compose的运行环境。

需要安装RabbitMQ的可以看下面这篇文章。

如何使用PHP和RabbitMQ实现消息队列?-CSDN博客

今天讲的是依赖RabbitMQ的延迟插件实现消息队列的延迟功能。

如何安装RabbitMQ的延迟插件并且启用,可以看下面的这篇文章。

如何使用PHP和RabbitMQ实现延迟队列(方式一)?_php调rabbit 设置延时-CSDN博客

一、编写代码

1、使用springboot框架快速搭建一个项目。

2、在 pom.xml 中添加 Spring Boot AMQP 的依赖,内容如下。

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

3、在 application.yml 中配置 RabbitMQ 的连接信息,内容如下。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

4、在配置类中定义交换机、队列和绑定,内容如下。

package com.ayzen.hello;import java.util.HashMap;
import java.util.Map;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed_exchange";public static final String DELAYED_QUEUE = "delayed_queue";public static final String ROUTING_KEY = "delayed_key";@BeanCustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);}@BeanQueue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}@BeanBinding binding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();}
}

5、创建一个生产者,发送一个带有延迟属性的消息,内容如下。

package com.ayzen.hello;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/test")
public class TestController {final Logger logger = LoggerFactory.getLogger(getClass());@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/send")public ResponseEntity<Object> send() {this.sendDelayMessage("sendDelayMessage", 5000);return ResponseEntity.ok(ResponseDto.success("ok"));}private void sendDelayMessage(String message, long ttlInMilliseconds) {MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", ttlInMilliseconds);Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("delayed_exchange", "delayed_key", msg);logger.info("send message to rabbitmq.");}
}

6、创建一个消息者,监听接收队列中的消息,内容如下。

package com.ayzen.hello;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TestService {final Logger logger = LoggerFactory.getLogger(getClass());@RabbitListener(queues = "delayed_queue")public void process(String message) {logger.info("process message from rabbitmq,message={}", message);}
}

7、至此,测试项目代码已完成,下一步将进行验证。

二、测试验证

1、启动服务。

2、调用生产者,执行如下代码。

curl http://127.0.0.1:8080/test/send

3、查看日志,正常情况会返回如下内容。

如上图所示,在2024-04-07T22:32:47.489+08:00接收到生产者的请求,然后在2024-04-07T22:32:52.588+08:00执行消费动作,延迟5秒。

4、至此,使用Java和RabbitMQ实现延迟队列的功能已验证完毕。

总结

用Java和RabbitMQ实现消息队列的延迟功能,其实依靠的是RabbitMQ的一个延迟插件,主要有以下几个步骤。

1、安装RabbitMQ延迟插件。

2、编写Java测试项目。

3、进行测试验证。

上面的代码只是做个简单的示例,如果运用到实际的项目当中需要做进一步的优化。

最后因本人能力有限,有什么不对的地方望各位大佬指出好让我改进,多多包含,谢谢大家。

http://www.lryc.cn/news/336289.html

相关文章:

  • AI论文速读 | TF-LLM:基于大语言模型可解释性的交通预测
  • 智慧矿山视频智能监控与安全监管方案
  • 2024春算法训练4——函数与递归题解
  • 【C++】C++知识点复习
  • SpringBoot+Vue,轻松实现网页版人脸登录与精准识别
  • 深入浅出 -- 系统架构之垂直架构
  • 深入浅出 -- 系统架构之微服务架构选型参考图
  • Java 使用 ant.jar 执行 SQL 脚本文件
  • 【随笔】Git 高级篇 -- 快速定位分支 ^|~(二十三)
  • git环境切换
  • hyperf websocket
  • 用Echarts词云数据可视化热词表白​​
  • VUE 实现路由的基本原理
  • Android 11 添加系统属性
  • docker 创建容器过程
  • OSI七层网络攻击行为及防范手段
  • 第100+5步 ChatGPT文献复现:ARIMAX预测肺结核 vol. 5
  • 论文| Convolutional Neural Network-based Place Recognition - 2014
  • 基于微信小程序的自习室预约系统的设计与实现
  • 【机器学习】《机器学习算法竞赛实战》第7章用户画像
  • vue3新手笔记
  • 互联网大厂ssp面经之路:计算机网络part1
  • C语言程序设计每日一练(1)
  • Spring 统一功能处理
  • 【软设】知识点速记2
  • 激光雷达和相机的联合标定工具箱[cam_lidar_calibration]介绍
  • ML.NET(二) 使用机器学习预测表情分析
  • YOLOv9最新改进系列:YOLOv9改进之添加注意力-ContextAggregation,有效涨点!!!
  • 【数据结构】初识数据结构与复杂度总结
  • 子域名是什么?有什么作用?