当前位置: 首页 > news >正文

RocketMQ详解及注意事项

RocketMQ是阿里巴巴开源的一款分布式消息中间件,具有高吞吐量、高可用性、可扩展性和稳定性强等特点,广泛应用于异步消息、应用解耦、流量削峰填谷等场景。本文将详细介绍RocketMQ的基本架构、工作流程、消息模型,并列出在使用RocketMQ时需要注意的问题。

RocketMQ基本架构

RocketMQ主要由四部分组成:NameServer、Broker、Producer和Consumer。

  • NameServer:提供轻量级的服务发现和路由。每个NameServer记录完整的路由信息,提供快速的存储路由信息和读取路由信息的功能。

  • Broker:负责存储和转发消息。Broker在启动时,会将自己注册到所有的NameServer上,所有的Broker构成一个完整的消息系统。

  • Producer:消息的生产者,负责生产消息,发送消息。

  • Consumer:消息的消费者,负责消费消息,接收消息。

RocketMQ的基本工作流程

  1. 启动NameServer:NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制器。

  2. Broker启动:在Broker启动的时候,会创建和NameServer的连接,定时发送心跳包。心跳包中包含当前Broker信息(IP、端口等)以及存储所有的Topic信息。

  3. 发送消息:首先Producer会从NameServer中查找Topic的路由信息,然后选择一个队列(负载均衡算法),然后直接与Broker建立长连接,发送消息。

  4. 消费消息:Consumer从NameServer获取Topic的路由信息,然后从Broker中拉取消息,拉取到消息之后,消费者消费消息,然后向Broker发送消费进度。

RocketMQ的消息模型

RocketMQ主要有两种消息模型:点对点模型(P2P)和发布/订阅模型(Pub/Sub)。

  • 点对点模型:消息生产者产生消息,直接发送给某个消息消费者。这种模式下,消息被消费者直接消费,不需要经过Broker。

  • 发布/订阅模型:消息生产者(发布者)将消息发布到Topic,多个消息消费者(订阅者)订阅这个Topic,然后都可以收到消息。这种模式下,消息传输过程中需要经过Broker。

RocketMQ使用中需要注意的问题

1. 消息重复

在使用RocketMQ的过程中,可能会出现消息重复的情况。这主要是因为网络问题或者消费者处理消息的速度跟不上生产者发送消息的速度造成的。为了避免这种情况,我们可以设置消费者的消费策略为顺序消费,这样就可以保证消息的顺序性。同时,我们也可以在消费者端进行去重操作,比如使用数据库的唯一索引等方式。

2. 消息丢失

消息丢失通常是由于Broker宕机或者网络问题造成的。为了避免消息丢失,RocketMQ提供了消息持久化的功能,即将消息存储在磁盘上。此外,我们还可以设置消息的重试次数,当消息发送失败时,可以重新发送。

3. 消息积压

如果消费者处理消息的速度跟不上生产者发送消息的速度,就会造成消息积压。为了解决这个问题,我们可以增加消费者的数量,提高消费者的消费速度。同时,我们也可以对消息进行分类,将不同类型的消息发送到不同的队列中,然后由不同的消费者消费。

Java示例

以下是一个简单的Java示例,展示如何使用RocketMQ进行消息的发送和接收。

创建Producer

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();for (int i = 0; i < 100; i++) {Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);
}producer.shutdown();

创建Consumer

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();

总结

RocketMQ作为一款优秀的分布式消息中间件,凭借其高吞吐量、高可用性、可扩展性和稳定性强等特点,被广泛应用于各种场景。以上内容对RocketMQ进行了详细的介绍,包括其基本架构、工作流程、消息模型以及Java示例,并列出了在使用RocketMQ时需要注意的问题,希望可以帮助大家更好地理解和使用RocketMQ。

👉 💐🌸 公众号请关注 "果酱桑", 一起学习,一起进步! 🌸💐

http://www.lryc.cn/news/106785.html

相关文章:

  • 选择适合的项目管理系统,了解有哪些选择和推荐
  • linux基础命令-cd
  • MySQL数据库分库分表备份
  • PyTorch 中的累积梯度
  • 【面试精品】运维工程师需要具备的核心能力有哪些?
  • 微服务实战项目-学成在线-选课学习(支付与学习中心)模块
  • postman和jmeter的区别何在?
  • maven安装(windows)
  • 自学安全卷不动,是放弃还是继续?
  • Django实现音乐网站 ⑶
  • (13) Qt事件系统(two)
  • PHP的知识概要
  • JSON格式Python,Java,PHP等封装根据商品ID获取快手商品详情数据方法
  • 【ASP.NET MVC】MYSQL安装配置(4)
  • 前端框架学习-Vue(二)
  • sublime配置less的一些坑(1)
  • 解码“平台工程”,VMware 有备而来
  • 2023年第四届华数杯数学建模A题B题C题D题思路代码分析
  • java版直播商城平台规划及常见的营销模式+电商源码+小程序+三级分销+二次开发 bbc
  • windows物理机 上安装centos ,ubuntu,等多个操作系统的要点
  • FSDirectory 与 RAMDirectory
  • 小程序开发:开发框架与工具的使用指南
  • 【LeetCode】探索杨辉三角模型
  • Qt 中引入ffmpeg 动态库
  • 工程师是怎样对待开源 qt
  • Maven中Servlet的坐标为什么要添加<scope>provided</scope>
  • 联发科CEO:未获准向华为供货,换机潮已过去,手机需求不会更差
  • 2023年DevOps和云趋势报告!
  • 怎么学习CSS相关技术知识? - 易智编译EaseEditing
  • Qt 2. QSerialPortInfo显示串口信息