当前位置: 首页 > news >正文

【rabbitmq】实现问答消息消费示例

目录

          • 1. 说明
          • 2. 截图
            • 2.1 接口调用截图
            • 2.2 项目结构截图
          • 3. 代码示例

1. 说明
  • 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。
  • 2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象。
  • 3.rabbitmq监听队列消息,消费消息后,向sseEmitter对象写入内容。
  • 4.当业务逻辑结束,调用emitter.complete()方法,结束此次会话。
  • 5.这里举一个问答的示例,采用的是work模式,逻辑比较简单,仅供参考。
2. 截图
2.1 接口调用截图

在这里插入图片描述

2.2 项目结构截图

在这里插入图片描述

3. 代码示例
  • 1.pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version></parent><groupId>com.learning</groupId><artifactId>springboot</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.5.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><!--打jar包使用--><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
  • 2.application.yaml
spring:# rabbbitmq配置信息rabbitmq:host: 192.168.2.11port: 5672username: adminpassword: adminvirtual-host: /
  • 3.rabbitmq配置类
package com.learning.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** rabbitmq配置类*/
@Configuration  
public class RabbitMQConfig{/*** 存sseEmitter*/@Bean("emitterMap")public ConcurrentMap<String, SseEmitter> emitterMap(){ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();return emitters;}/*** 工作模式,交换机名*/public static final String EXCHANGE_NAME = "work_exchange";/*** 工作模式,队列名*/public static final String QUEUE_NAME = "work_queue";  @Bean("work_queue")public Queue queue() {  return new Queue(QUEUE_NAME, true);  }  @Bean("work_exchange")public Exchange exchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();}  @Bean  public Binding binding(@Qualifier("work_queue") Queue queue,@Qualifier("work_exchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("work_routing_key").noargs();}  
}
  • 4.controller类
package com.learning.controller;import com.learning.service.QuestionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description 获取答案**/
@RestController
@RequestMapping("/question")
public class QuestionController {@Autowiredprivate QuestionService questionService;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@PostMapping(value="/ask", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter ask(@RequestParam String question) {String questionId = UUID.randomUUID().toString();SseEmitter emitter = new SseEmitter();emitterMap.put(questionId, emitter);questionService.ask(questionId, question);return emitter;}
}
  • 5.消息实体
package com.learning.dto;import lombok.Data;import java.io.Serializable;/*** @Author wangyouhui* @Description 消息**/
@Data
public class MessageDTO implements Serializable {private String questionId;private String question;private String answer;private Boolean end;
}
  • 6.service实现类
package com.learning.service.impl;import com.fasterxml.jackson.databind.ObjectMapper;
import com.learning.config.RabbitMQConfig;
import com.learning.dto.MessageDTO;
import com.learning.service.QuestionService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description**/
@Service
public class QuestionServiceImpl implements QuestionService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL")public void receiveMessage(Message message, Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String json = new String(message.getBody());ObjectMapper mapper = new ObjectMapper();MessageDTO messageDTO = mapper.readValue(json, MessageDTO.class);SseEmitter sseEmitter = emitterMap.get(messageDTO.getQuestionId());if(sseEmitter != null){sseEmitter.send(messageDTO);}if(messageDTO.getEnd() != null && messageDTO.getEnd()){sseEmitter.complete();emitterMap.remove(messageDTO.getQuestionId());}// 手动签收channel.basicAck(deliveryTag, false);} catch (IOException e) {e.printStackTrace();// 拒绝签收,消息重新入队try {channel.basicReject(deliveryTag, true);} catch (IOException ioException) {ioException.printStackTrace();}}}@Overridepublic void ask(String questionId, String question) {MessageDTO message1 = new MessageDTO();message1.setQuestionId(questionId);message1.setQuestion(question);message1.setAnswer("您好,这个");message1.setEnd(false);this.sendMessage(message1);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message2 = new MessageDTO();message2.setQuestionId(questionId);message2.setQuestion(question);message2.setAnswer("您好,这个是答案");message2.setEnd(false);this.sendMessage(message2);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message3 = new MessageDTO();message3.setQuestionId(questionId);message3.setQuestion(question);message3.setAnswer("您好,这个是答案,请问是否能解决你的问题");message3.setEnd(true);this.sendMessage(message3);}public void sendMessage(MessageDTO message){ObjectMapper mapper = new ObjectMapper();String json = null;try {json = mapper.writeValueAsString(message);rabbitTemplate.convertAndSend("work_exchange", "work_routing_key", json);} catch (Exception e) {e.printStackTrace();}}
}
  • 7.service接口
package com.learning.service;/*** @Author wangyouhui* @Description **/
public interface QuestionService {void ask(String questionId, String question);
}
  • 8.应用类
package com.learning;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author wangyouhui* @Description**/
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
http://www.lryc.cn/news/469240.html

相关文章:

  • 单片机_RTOS__架构概念
  • ClickHouse在百度MEG数据中台的落地和优化
  • B/S架构(Browser/Server)与C/S架构(Client/Server)
  • idea中自定义注释模板语法
  • 基于SSM的儿童教育网站【附源码】
  • 深挖自闭症病因与孩子表现的关联
  • [网络协议篇] UDP协议
  • 关系型数据库(1)----MySQL(初阶)
  • 计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58同城租房爬虫 房源推荐系统
  • 深度学习技术演进:从 CNN、RNN 到 Transformer 的发展与原理解析
  • Lua中的goto语句
  • 【rust实战】rust博客系统2_使用wrap启动rust项目服务
  • 【实战案例】Django框架使用模板渲染视图页面及异常处理
  • 设置K8s管理节点异常容忍时间
  • 什么样的JSON编辑器才好用
  • ArkUI自定义TabBar组件
  • pair类型应用举例
  • 数字 图像处理算法的形式
  • 安徽对口高考Python试题选:输入一个正整数,然后输出该整数的3的幂数相加形式。
  • Node.js是什么? 能做什么?
  • JVM快速入门
  • 理解深度学习模型——高级音频特征表示的分层理解
  • 【HarmonyOS Next】原生沉浸式界面
  • 数据结构 ——— 树的概念及结构
  • 初探Vue前端框架
  • Lucas带你手撕机器学习——岭回归
  • C2W4.LAB.Word_Embedding.Part1
  • hive初体验
  • 云渲染主要是分布式(分机)渲染,如何使用blender云渲染呢?
  • WordPress与WP Engine:关键事件时间线