当前位置: 首页 > news >正文

RabbitMQ手动签收消息

RabbitMQ手动签收消息

这里讲解SpringBoot使用RabbitMQ进行有回调的用法和消费者端手动签收消息的用法。

1、pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version><relativePath/></parent><groupId>com.example.demo</groupId><artifactId>rabbitmq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-demo</name><description>rabbitmq-demno</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2、配置文件

server:port: 9090
spring:application:name: rabbit-confirmrabbitmq:template:# 使用return-callback时必须设置mandatory为truemandatory: true# 消息发送到交换机确认机制,是否确认回调publisher-confirm-type: correlated# 消息发送到交换机确认机制,是否返回回调publisher-returns: truelistener:simple:# 并发消费者初始化值concurrency: 5# 最大值max-concurrency: 10# 每个消费者每次监听时可拉取处理的消息数量prefetch: 20# 确认模式设置为手动签收acknowledge-mode: manualusername: zsx242030password: zsx242030virtual-host: /

3、定义配置类

package com.example.demo.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfiguration {/*** 声明confirm.message队列*/@Beanpublic Queue confirmQueue() {return new Queue("confirm.message");}/*** 声明一个名为exchange-2的交换机*/@Beanpublic TopicExchange exchange2() {return new TopicExchange("exchange-2");}/*** 将confirm.message的队列绑定到exchange-2交换机*/@Beanpublic Binding bindMessage1() {return BindingBuilder.bind(confirmQueue()).to(exchange2()).with("confirm.message");}
}

4、定义生产者

package com.example.demo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.sql.Timestamp;
import java.time.LocalDateTime;@Component
@Slf4j
public class ConfirmProducer {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 如果消息没有到exchange,则confirm回调,ack=false* 如果消息到达exchange,则confirm回调,ack=true* exchange到queue成功,则不回调return* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)*/private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {if (!ack) {log.error("消息发送失败:correlationData: {},cause: {}", correlationData, cause);} else {log.info("消息发送成功:correlationData: {},ack: {}", correlationData, ack);}};private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routeKey) ->log.error("消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}", exchange, routeKey, replyCode, replyText);/*** 发送消息** @param message 消息内容*/public void send(String message) {// 构建回调返回的数据CorrelationData correlationData = new CorrelationData();Timestamp time = Timestamp.valueOf(LocalDateTime.now());correlationData.setId(time + "");Message message1 = MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)// 将CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,然后人工处理.setCorrelationId(correlationData.getId()).build();rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);rabbitTemplate.convertAndSend("exchange-2", "confirm.message", message1, correlationData);}
}

5、定义消费者

package com.example.demo.config;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class ConfirmConsumer {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "confirm.message",durable = "true"),exchange = @Exchange(value = "exchange-2",type = "topic"),key = "confirm.message"))public void receive(String message, Message message1, Channel channel) throws IOException {log.info("消费者收到消息:{}", message);long deliverTag = message1.getMessageProperties().getDeliveryTag();//第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型//为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。channel.basicAck(deliverTag, false);}
}

6、创建controller调用

package com.example.demo.controller;import com.example.demo.config.ConfirmProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class ConfirmController {@Resourceprivate ConfirmProducer confirmProducer;@GetMapping("/confirm-message")public void confirmMessage() {confirmProducer.send("hello confirm message");}
}

7、启动类

package com.example.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitDemoApplication.class, args);}}

8、测试

http://localhost:9090/confirm-message
2022-07-05 18:20:43.043  INFO 4492 --- [nectionFactory1] com.example.demo.config.ConfirmProducer  : 消息发送成功:correlationData: CorrelationData [id=2022-07-05 18:20:43.025],ack: true
2022-07-05 18:20:43.046  INFO 4492 --- [ntContainer#0-5] com.example.demo.config.ConfirmConsumer  : 消费者收到消息:hello confirm message
http://www.lryc.cn/news/135370.html

相关文章:

  • Unity 3d角色展示脚本(旋转 平移 缩放)展示界面
  • Spring Boot 将 Word 转换为 PDF
  • 【PHP面试题82】system和exec是用来做什么的?有什么区别
  • 05-微信小程序常用组件-表单组件
  • Lucky player —— Java 项目(Spring Boot)
  • ios 声网agora 音视频直播场景下的集成总结
  • mysql 、sql server 临时表、表变量、
  • 15. Canvas制作汽车油耗仪表盘
  • 解决git上传远程仓库时的最大文件大小限制
  • Midjourney API 国内申请及对接方式
  • 第一章 文件的输入和输出
  • java面试基础 -- 深克隆 浅克隆
  • 网络安全在医疗行业中的重要性
  • elemenPlus ElMessage 字符串如何换行问题
  • Linux socket网络编程
  • 【广州华锐互动】牲畜养殖VR模拟实操系统为传统教育注入新的生命力
  • JavaScript基础(Dom操作)
  • .NET6.0 System.Drawing.Common 通用解决办法
  • k8s ingress (二)
  • 如何实现element UI中table操作栏更多按钮的展示与折叠?
  • SpringBoot(二)
  • python脚本——批量将word文档转换成pdf文件
  • 自然语言处理从入门到应用——LangChain:链(Chains)-[通用功能:链的保存(序列化)与加载(反序列化)]
  • 机器学习:开启智能时代的重要引擎
  • ES搭建集群
  • # Lua与C++交互(二)———— 交互
  • 机器人焊接生产线参数监控系统理解需求
  • 前端基础(ES6 模块化)
  • 第七章,文章界面
  • HJ102 字符统计