当前位置: 首页 > news >正文

SpringCloud之Stream框架集成RocketMQ消息中间件

        Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

     目前 Spring Cloud Stream只支持 RabbitMQ 和 Kafka 的自动化配置。

     Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.

在 SpringCloudStream 3.x 版本前是通过 @StreamListener 和 @EnableBinding 进行消息的发送和消费的,springCloudStream 3.x 版本后 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解,不建议使用了;后续的版本更新中替换成函数式的方式实现。

既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?

通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic。

不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:

  • 输入 - + -in- + < index >

     例如:myTopic-in-0

  • 输出 - + -out- + < index >

       例如:myTopic-out-0

注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致(后面还会在项目实战中展示一遍)

代码示例:

----------------------------------项目实战--------------------------------------

看下我们项目中的配置,配置文件是放在nacos上面的:

消息发送:

/*** @ClassName MessageParamParentDto* @Author zxd* @Version 1.0.0* @Description TODO* @CreateTime 2023/6/13 11:27 - 星期二*/
@Data
public class MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7963819193258646924L;private  String routeUrl;}

--------------------------------------------------------------------------------------------------------------

/*** @ClassName MessageParamDto* @Author kch* @Version 1.0.0* @Description 消息队列接收系统消息实体对象* @CreateTime 2022/9/18 15:16 - 星期日*/
@Data
public class MessageParamDto  extends MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7111819193258646924L;/*** 消息模板code*/@NotNull(message = "消息模板不能为空")private String templateCode;/*** 可变参数,必传字段* 该参数匹配模板字符串中的变量和URL中的变量,所以模板和URL中的变量名不能重复*/@NotNull(message = "参数不能为空")private Map<String, String> params;/*** 消息详情跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String routerParams;/*** 消息操作跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String contentPathParams;/*** 接收者租户*/@NotNull(message = "接收者租户ID不能为空")private Long tenantId;/*** 接收人*/@NotNull(message = "接收者用户ID不能为空")@Size(min = 1, message = "接收者用户ID不能为空")private List<RecipientUser> recipientUsers;@Valid@Data@AllArgsConstructor@NoArgsConstructorpublic static class RecipientUser implements Serializable {/*** 接收人id*/@NotNull(message = "接收者用户ID不能为空")private Long recipientId;/*** 接收人手机号*/@Pattern(regexp = RegexPool.MOBILE, message = "手机格式错误")private String phone;}}

-----------------------------------------------------------------------------------------------------------

/*** @ClassName MessageMqBinding* @Author zpp* @Version 1.0.0* @Description TODO* @CreateTime 2023/2/10 15:37 - 星期五*/
public interface MessageMqBinding {/*** 系统消息生产者交换机*/String MESSAGE_MQ_OUTPUT = "dyzsMessageProvider-out-0";
}

----------------------------------------------------------------------------------------

@Slf4j
@RestController
@RequestMapping("/mq")
public class MessageMqController {@Resourceprivate StreamBridge streamBridge;/*** @param :* @Author zpp* @Description 发送系统消息* @Date 2023/2/10 15:27* @Return com.zysy.common.api.entity.Result<java.lang.Boolean>*/@PostMappingpublic Result<Boolean> sendMessage(@RequestBody @Validated MessageParamDto dto) {log.info("接收到系统消息发送请求:{}", JSONObject.toJSONString(dto));MessageMQParamDto paramDto = new MessageMQParamDto(dto);paramDto.setCreateBy(UserUtil.getUserId());paramDto.setCreateDept(UserUtil.getDeptId());List<MessageMQParamDto> paramDtoList = new ArrayList<>();paramDtoList.add(paramDto);MessageBuilder builder = MessageBuilder.withPayload(paramDtoList).setHeader("Content-Type", "application/json");return Result.success(streamBridge.send(MessageMqBinding.MESSAGE_MQ_OUTPUT, builder.build()));}

------------------------------------------------------------------------------------------------------

消息消费:

          下图是在代码中配置的消息消费者,这里的函数名称要和上图中的function.definition配置的名称一样;

http://www.lryc.cn/news/188173.html

相关文章:

  • 与创新者同行!Apache Doris 首届线下峰会即将开启,最新议程公开!|即刻预约
  • vue解决:Parsing error: No Babel config file detected for ....
  • 算法题:K 次取反后最大化的数组和(典型的贪心算法问题)
  • Go语言中向[]byte数组中增加一个元素
  • CSS 布局案例: 2行、多行每行格数不定,最后一列对齐
  • 数据结构--算法、数据结构的基本概念
  • Edge浏览器下载文件被保存为 .crdownload 文件的问题小记
  • 6-10 单链表分段逆转 分数 15
  • 【单片机】17-温度传感器DS18B20
  • 力扣 -- 5. 最长回文子串
  • SpringCloud源码探析(十)-Web消息推送
  • Vue、React和小程序中的组件通信:父传子和子传父
  • 安卓玩机----展讯芯片机型解锁 读写分区工具 操作步骤解析
  • 微软放大招!Bing支持DALL-E3,免费AI绘画等你来体验!
  • tp5访问的时候必须加index.php,TP5配置隐藏入口index.php文件
  • 16k面试中的10个问题
  • STM32单片机入门学习(六)-光敏传感器控制LED
  • MFC 鼠标悬停提示框
  • 大数据学习,涉及哪些技术?
  • Clion中使用C/C++开发stm32程序
  • JavaScript Web APIs第五天笔记
  • [ICCV-23] Paper List - 3D Generation-related
  • Transformer为什么如此有效 | 通用建模能力,并行
  • 【初识Jmeter】【接口自动化】
  • C:数组传值调用和传地址调用
  • Python数据容器——字典的常用操作(增、删、改、查)
  • JavaScript入门——(5)函数
  • 数据库sql查询成绩第二高
  • 十五、异常(5)
  • 途虎养车上市、京东养车“震虎”,如何突围汽车后市场?