当前位置: 首页 > news >正文

基于模板方法模式-消息队列发送

基于模板方法模式-消息队列发送

消息队列广泛应用于现代分布式系统中,作为解耦、异步处理和流量控制的重要工具。在消息队列的使用中,发送消息是常见的操作。不同的消息队列可能有不同的实现方式,例如,RabbitMQ、Kafka、RocketMQ、Redis等。为了统一操作流程和复用代码,可以使用设计模式来简化开发工作。

模板方法模式(Template Method Pattern)是一种行为型设计模式,旨在通过定义一个操作的骨架,而将一些步骤延迟到子类中实现。在消息队列发送场景中,模板方法模式可以帮助我们定义消息发送的基本流程,同时将具体的发送操作抽象为子类来实现,从而避免重复的代码。

1. 使用 RocketMQ 发送普通消息

public class YourServiceCode {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate ConfigurableEnvironment configurableEnvironment;@Transactional(rollbackFor = Exception.class)public void yourMethod(YourRequestDTO requestParam) {// 业务逻辑:// ...... 省略业务处理部分// 如果是立即发送任务,直接调用消息队列进行发送if (Objects.equals(requestParam.getSendType(), YourSendTypeEnum.IMMEDIATE.getType())) {// 定义消息队列 Topic(根据需要调整实际的 Topic)String topic = "your-topic-name-${unique-name}";// 通过 Spring 上下文解析占位符,把模板中的 unique-name 替换为实际值topic = configurableEnvironment.resolvePlaceholders(topic);// 构建消息体String messageKeys = UUID.randomUUID().toString();Message<Long> message = MessageBuilder.withPayload(yourDO.getId())  // 使用具体的 DO 对象.setHeader(MessageConst.PROPERTY_KEYS, messageKeys)  // 设置消息的 Key.build();// 执行消息队列发送及异常处理逻辑SendResult sendResult;try {sendResult = rocketMQTemplate.syncSend(topic, message, 2000L);  // 发送消息,并设置超时时间log.info("[生产者] 执行任务 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys);} catch (Exception ex) {log.error("[生产者] 执行任务 - 消息发送失败,消息体:{}", yourDO.getId(), ex);  // 异常处理}}}
}

每次发送消息时,总是充斥着大量相同的冗余代码,这些逻辑散落在业务代码中,不利于对核心业务的理解和维护。

2.什么是模板方法模式

模板方法模式(Template Method Pattern)是一种行为型设计模式,旨在通过在父类中定义一个算法的框架(即模板方法),而将某些步骤延迟到子类中实现。模板方法模式让子类可以在不改变算法结构的情况下,重新定义算法中的某些特定步骤。

在模板方法模式中:

  • 父类提供了一个骨架方法(即模板方法),并规定了算法的步骤。
  • 子类负责实现某些具体的步骤,这些步骤通常是一些可变的行为,父类并不关心。
  • 父类的模板方法调用这些可变的步骤,完成一个完整的算法流程。

通过这种方式,可以确保算法的结构一致,但又允许各个子类根据需要修改特定的步骤。

3. 使用模板方法模式的好处

使用模板方法模式后,我们可以将发送消息的流程固定,并把具体的步骤通过子类来实现。这样带来了以下好处:

  • 代码复用:通过模板方法模式,可以将发送消息的共同逻辑提取到父类中,避免了重复代码。例如,连接消息队列和错误处理等通用逻辑只需在父类中实现。
  • 清晰的流程控制:模板方法模式能够清晰地定义消息发送的流程,所有子类遵循同一流程,便于理解和调试。
  • 易于扩展:如果需要支持不同的消息队列,只需要创建不同的子类实现具体的发送逻辑,父类的通用流程不变,符合开闭原则,便于扩展。
  • 高内聚低耦合:每个消息队列的具体实现仅依赖于模板方法模式中的抽象方法,具有很高的内聚性,同时也保持了与其他模块的低耦合。

4.模板方法设计模式重构消息发送

4.1 定义消息发送事件基础实体

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public final class BaseSendExtendDTO {/*** 事件名称*/private String eventName;/*** 主题*/private String topic;/*** 标签*/private String tag;/*** 业务标识*/private String keys;/*** 发送消息超时时间*/private Long sentTimeout;/*** 具体延迟时间*/private Long delayTime;
}

另外,有些和业务无关的属性,我们再抽象一层 Wrapper 类,用于定义消息发送基础内容。

@Data
@Builder
@NoArgsConstructor(force = true)
@AllArgsConstructor
@RequiredArgsConstructor
public final class MessageWrapper<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 消息发送 Keys*/@NonNullprivate String keys;/*** 消息体*/@NonNullprivate T message;/*** 消息发送时间*/private Long timestamp = System.currentTimeMillis();
}

4.2 定义抽象消息发送类

将消息发送的逻辑和结果日志的打印进行了抽象,也就是抽象方法模式中的复用性。并且,将消息发送事件的基本参数(如 Topic、Tag、是否延迟消息等)以及 Keys 的个性化属性独立为两个抽象方法。

@RequiredArgsConstructor
@Slf4j(topic = "CommonSendProduceTemplate")
public abstract class AbstractCommonSendProduceTemplate<T> {private final RocketMQTemplate rocketMQTemplate;/*** 构建消息发送事件基础扩充属性实体** @param messageSendEvent 消息发送事件* @return 扩充属性实体*/protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent);/*** 构建消息基本参数,请求头、Keys...** @param messageSendEvent 消息发送事件* @param requestParam     扩充属性实体* @return 消息基本参数*/protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam);/*** 消息事件通用发送** @param messageSendEvent 消息发送事件* @return 消息发送返回结果*/public SendResult sendMessage(T messageSendEvent) {BaseSendExtendDTO baseSendExtendDTO = buildBaseSendExtendParam(messageSendEvent);SendResult sendResult;try {// 构建 Topic 目标落点 formats: `topicName:tags`StringBuilder destinationBuilder = StrUtil.builder().append(baseSendExtendDTO.getTopic());if (StrUtil.isNotBlank(baseSendExtendDTO.getTag())) {destinationBuilder.append(":").append(baseSendExtendDTO.getTag());}// 延迟时间不为空,发送任意延迟消息,否则发送普通消息if (baseSendExtendDTO.getDelayTime() != null) {sendResult = rocketMQTemplate.syncSendDeliverTimeMills(destinationBuilder.toString(),buildMessage(messageSendEvent, baseSendExtendDTO),baseSendExtendDTO.getDelayTime());} else {sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),buildMessage(messageSendEvent, baseSendExtendDTO),baseSendExtendDTO.getSentTimeout());}log.info("[生产者] {} - 发送结果:{},消息ID:{},消息Keys:{}", baseSendExtendDTO.getEventName(), sendResult.getSendStatus(), sendResult.getMsgId(), baseSendExtendDTO.getKeys());} catch (Throwable ex) {log.error("[生产者] {} - 消息发送失败,消息体:{}", baseSendExtendDTO.getEventName(), JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

4.3. 定义消息发送事件

将消息队列中的数据定义为事件

4.4定义消息队列生产者

消息队列生产者继承了我们的消息发送抽象类,并实现了两个抽象方法,从而体现了模板方法设计模式的扩展性。在业务代码中,只需引入消息发送生产者,即可通过简洁的一行代码完成消息发送流程。

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CouponTaskExecuteEvent {/*** 推送任务id*/private Long couponTaskId;
}@Slf4j
@Component
public class CouponTaskActualExecuteProducer extends AbstractCommonSendProduceTemplate<CouponTaskExecuteEvent> {private final ConfigurableEnvironment environment;public CouponTaskActualExecuteProducer(@Autowired RocketMQTemplate rocketMQTemplate, @Autowired ConfigurableEnvironment environment) {super(rocketMQTemplate);this.environment = environment;}@Overrideprotected BaseSendExtendDTO buildBaseSendExtendParam(CouponTaskExecuteEvent messageSendEvent) {return BaseSendExtendDTO.builder().eventName("优惠券推送执行").keys(String.valueOf(messageSendEvent.getCouponTaskId())).topic(environment.resolvePlaceholders("你的主题")).sentTimeout(2000L).build();}@Overrideprotected Message<?> buildMessage(CouponTaskExecuteEvent couponTaskExecuteEvent, BaseSendExtendDTO requestParam) {String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();return MessageBuilder.withPayload(new MessageWrapper(keys, couponTaskExecuteEvent)).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag()).build();}
}
http://www.lryc.cn/news/524294.html

相关文章:

  • 俄语画外音的特点
  • PyTorch使用教程(10)-torchinfo.summary网络结构可视化详细说明
  • 亚博microros小车-原生ubuntu支持系列:5-姿态检测
  • C语言之高校学生信息快速查询系统的实现
  • WPF基础 | WPF 基础概念全解析:布局、控件与事件
  • 迷宫1.2
  • RabbitMQ---应用问题
  • Unity自学之旅03
  • pip 相关
  • vue request 发送formdata
  • Android RTMP直播练习实践
  • ITIL认证工具商-ManageEngine Servicedesk Plus
  • https 的 CA证书和电子签名
  • 频繁刷新网页会对服务器造成哪些影响?
  • 贪心算法(题1)区间选点
  • JavaWeb开发学习笔记--MySQL
  • 抖音小程序一键获取手机号
  • iconfont等图标托管网站上传svg显示未轮廓化解决办法
  • 2008-2020年各省城镇登记失业率数据
  • Linux——信号量和(环形队列消费者模型)
  • 【JOIN】关键字在MySql中的详细使用
  • 渗透测试--攻击常见的Web应用
  • window系统annaconda中同时安装paddle和pytorch环境
  • python-leetcode-简化路径
  • 浅谈 PID 控制算法
  • ailx10的专栏电子书(2022版)
  • WPS按双字段拆分工作表到独立工作簿-Excel易用宝
  • C++ Qt练习项目 日期时间数据 未完待续
  • vim文本编辑器
  • 产品经理面试题总结2025【其一】