当前位置: 首页 > news >正文

RocketMQ(3)之事务消息

一、发送事务消息案例

        事务消息共有三种状态,提交状态、回滚状态、中间状态: 

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

        1.1创建事务性生产者

        使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在上面说的。

    /*** 发送事务消息* @throws Exception*/@Testpublic void testTransactionProduce() throws Exception {TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}

2.事务监听接口

        当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

 class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();/*** 本地事务*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}/*** 状态回查*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}

1.3事务消息使用上的限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
http://www.lryc.cn/news/158027.html

相关文章:

  • 基于多设计模式下的同步异步日志系统
  • API接口与电商平台之间的联系,采集京东平台数据按关键字搜索商品接口示例
  • 代码随想录day41|343. 整数拆分96. 不同的二叉搜索树
  • Less常用内置函数
  • pdf转换成图片转换器在线怎么转?pdf转换成图片具体方法介绍
  • JavaScript动态设置浏览器可视区域元素的文字颜色、监听滚动条、querySelectorAll、getBoundingClientRect
  • 意向客户的信息获取到底是怎样的,快来get一下
  • 自动化测试常用脚本语言有哪些?
  • mapreduce 的工作原理以及 hdfs 上传文件的流程
  • Ubuntu22.04安装ROS2
  • uniapp - 倒计时组件-优化循环时间倒计时
  • java 实现访问者模式
  • JDK源码剖析之PriorityQueue优先级队列
  • TSINGSEE青犀AI视频分析/边缘计算/AI算法·人脸识别功能——多场景高效运用
  • 力扣(LeetCode)算法_C++——最大连续 1 的个数 III
  • 23062C++QT day2
  • React三属性之:props
  • 大数据安全 | (一)介绍
  • 软件工程的概念及其重要性
  • [足式机器人]Part3 变分法Ch01-2 数学预备知识——【读书笔记】
  • 【嵌入式开发 Linux 常用命令系列 7.1 -- awk 过滤列中含有特定字符的行】
  • 前端(十六)——Web应用的安全性研究
  • 无涯教程-JavaScript - BIN2HEX函数
  • Kafka环境搭建与相关启动命令
  • 【C++】类的封装 ② ( 封装最基本的表层概念 | 类对象作为参数传递的几种情况 )
  • Linux上安装FTP
  • C/C++使用GDAL库编程窍门之——通用可移植性库(Common Portability Library, CPL)
  • Linux container_of() 宏定义
  • 详解python中的序列类型---列表list
  • Unity 引擎中国版 “团结引擎” 发布