当前位置: 首页 > news >正文

RocketMQ批量发送消息❓

优点:

批量发送消息可以提高rocketmq的生产者性能和吞吐量。

使用场景:

  1. 发送大量小型消息时;
  2. 需要降低消息发送延迟时;
  3. 需要提高生产者性能时;

注意事项:

  1. 消息列表的大小不能超过broker设置的最大消息大小;
  2. 消息列表的大小不能超过生产证设置的maxMessageSize 参数,此参数默认为 4MB;
  3. 批量发送消息不支持消息事务;
  4. 如果代码在发送消息列表时发生异常,则可能会发生部分消息发送成功,部分消息发送失败的情况。如果要确保所有消息都已成功发送,则需要增加错误处理逻辑和消息重试机制;


批量发送消息为什么要限制maxMessageSize❓

消息列表的大小不能超过生产者设置的maxMessageSize参数,主要是为了避免消息发送延迟和消息过大导致broker出现性能问题。如果尝试发送大于maxMessageSize的消息,RocketMQ会抛出MessageTooLargeException异常,并且消息不会被发送到broker。

如果开发者在开发时遇到了消息列表大小超过maxMessageSize的情况,可以考虑以下几种处理方式:

    1. 提升maxMessageSize参数的大小,这样可以容纳更大的消息列表。但是,需要注意在提升参数大小时,要考虑到RocketMQ broker的性能和网络带宽等因素。
    2. 考虑将消息列表进行拆分,然后分批发送。这样可以避免一次发送过多的消息。
    3. 计算消息的大小并进行压缩。可以使用一些压缩算法,如 LZ4、Snappy 等,对消息进行压缩,以减小消息的大小。
    4. 对超过 maxMessageSize 的消息进行过滤或其他处理。可以通过业务逻辑对消息进行分组或分类,对超过 maxMessageSize 的消息进行过滤或其他处理,以避免发送超出限制的消息。

代码实现

package com.resource.sync.rocketmq;import java.util.Iterator;
import java.util.List;/*** @description:消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb**/
public class ListSplitter<T> implements Iterator<List<T>> {/*** 分割数据大小*/private int sizeLimit;/*** 分割数据列表*/private final List<T> messages;/*** 分割索引*/private int currIndex;public ListSplitter(int sizeLimit, List<T> messages) {this.sizeLimit = sizeLimit;this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<T> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {T t = messages.get(nextIndex);totalSize = totalSize + t.toString().length();if (totalSize > sizeLimit) {break;}}List<T> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
    private final int maxMessageSize = 1024 * 1024 * 4;/*** 消息分割(批量发送)*/private void bulkSendMsg(List<Message<String>> messageList) {// 限制数据大小ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);while (splitter.hasNext()) {List<Message> nextList = splitter.next();syncBulkSendMessage("topic", nextList);}}/*** @param topic* @param list* @description:发送实时消息(批量)*/public void syncBulkSendMessage(String topic, List<Message> list) {SendResult sendResult = null;try {sendResult = rocketMQTemplate.syncSend(topic, list);if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());}if (sendResult.getSendStatus() == SendStatus.SEND_OK) {log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());}} catch (Exception e) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);}}

http://www.lryc.cn/news/220677.html

相关文章:

  • 一键同步chromedriver版本
  • Zephyr-7B-β :类GPT的高速推理LLM
  • 【笔试题】位运算
  • RT-Thread 10. 使用keil4编译GD32F450
  • Vue 跨域的两种解决方式
  • 【windows Docker 安装mysql:只需3条命令】
  • 【软件逆向】如何逆向Unity3D+il2cpp开发的安卓app【IDA Pro+il2CppDumper+DnSpy+AndroidKiller】
  • vue3ref和reactive
  • [架构之路-244]:目标系统 - 设计方法 - 软件工程 - 软件开发方法与软件开发模型
  • Matter 系列 #10|Matter 的证书吊销机制
  • mybatis动态表名
  • 高校为什么需要大数据挖掘平台?
  • @Value的使用
  • 用 Wireshark 在 Firefox 或 Google Chrome 上使用 SSLKEYLOGFILE 环境变量解密 SSL 流量
  • 京东大数据:2023年Q3美妆行业数据分析报告
  • [题] 改革春风吹满地 #图论 #多边形面积
  • FPGA时序分析与约束(2)——时序电路时序
  • 明御安全网关任意文件上传漏洞复现
  • JVM虚拟机:如何查看自己的JVM默认的垃圾回收器
  • 目标检测YOLO系列从入门到精通技术详解100篇-【目标检测】机器视觉
  • 设计模式——建造者模式
  • Go语言用Colly库编写的图像爬虫程序
  • 14.2 并发与竞争实验
  • 【MediaTek】T750实现Host 网络和Guest 网络隔离以及各个连接终端间隔离功能
  • 数字滤波器之高通滤波器设计
  • 【leetcode】58.最后一个单词的长度
  • 用Java(C语言也可以看)实现冒泡排序和折半查找(详细过程图)+逆序数组
  • antd本地上传excel文件并读取文件的数据转为json
  • BI数据可视化:不要重复做报表,只需更新数据
  • fiddler抓包拦截请求转发到其他地址