当前位置: 首页 > news >正文

RocketMQ-消息消费模式 顺序消费

RocketMQ-消息消费模式 顺序消费

  • RocketMQ-消息消费模式
    • 集群模式
      • 集群模式的演示(本身就默认)
      • Rocketmq存储队列
    • 广播模式
  • 顺序消费
    • 如何改实现顺序消费


RocketMQ-消息消费模式

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
在这里插入图片描述
在这里插入图片描述

集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条

在这里插入图片描述在这里插入图片描述

Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
在这里插入图片描述

广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}

运行结果:

生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
在这里插入图片描述
在这里插入图片描述


顺序消费

实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程

假设我们没有实现顺序消费的时候

创建生产者

1.创建实体类

@Setter
@Getter
public class OrderStep {private long orderId;private String desc;@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}
}

2.创建测试类

public class OrderUtil {public static List<OrderStep> buildOrders(){List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}

3.创建生产者类

public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));producer.sendOneway(msg);}producer.shutdown();}
}

创建消费者类

public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
在这里插入图片描述

如何改实现顺序消费

生产者类

public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();//设置队列选择器MessageQueueSelector selector = new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("队列个数"+list.size());Long orderId = (Long) o;int index = (int)(orderId % list.size());return list.get(index);}};for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));//指定消息选择器,换入的参数producer.send(msg,selector,step.getOrderId());}producer.shutdown();}
}

消费者类

public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");//从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//一个队列对应一个线程consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg:list){System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();}
}
http://www.lryc.cn/news/2992.html

相关文章:

  • 一、Java并发编程之线程、synchronized
  • 12.hadoop系列之MapReduce分区实践
  • 有了独自开,一个人就是一个团队
  • web期末复习 2023.02.11
  • 第44章 用户密码实体及其约束规则的定义实现
  • 聊聊并发与锁
  • 开源项目 —— 原生JS实现斗地主游戏 ——代码极少、功能都有、直接粘贴即用
  • Linux第四讲
  • Redis 持久化
  • Python语言零基础入门教程(十三)
  • 江苏五年制专转本应该复习几轮?
  • 微信小程序的优化方案之主包与分包的研究
  • 从手工测试转型web自动化测试继而转型成专门做自动化测试的学习路线。
  • 【计组笔记03】计算机组成原理之系统五大部件介绍、主存模型和CPU结构介绍
  • 微信小程序解析用户加密数据
  • 毕业四年换了3份软件测试工作,我为何仍焦虑?
  • 嵌入式C基础知识(7)
  • 大数据系列之:安装pulsar详细步骤
  • 色彩-基础理论
  • 1629_MIT_6.828_xv6_chapter1操作系统的组织
  • 基于Golang哈希算法监控配置文件变化
  • 关于一笔画问题的一些思考(欧拉路Fleury算法、逐步插入回路法、以及另一种可能的解法)
  • vlookup怎么用详细步骤,看这一篇就够了
  • 雅思经验(9)之小作文常用词汇总结
  • 【Python语言基础】——Python NumPy 数组创建
  • 【大数据】Hadoop-Kms 安装及相关详细配置,看完你就会了
  • SpringCloud分布式框架
  • Csss属性display,visibility区别,对渲染页面的影响
  • 怎么给笔记本电脑外接两台显示器?
  • 生成树协议 — STP