17、Rocket MQ快速实战以及核⼼概念详解
⼀ 、MQ简介
MQ:MessageQueue,消息队列。是在互联⽹中使⽤⾮常⼴泛的—系列服务中间件。 这个词可以分两个部分来看,
—是Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同—台机器上,也可以 分布在不同机器上。
⼆是Queue:队列。队列原意是指—种具有FI FO(先进先出)特性的数据结构,是⽤来缓存 数据的。对于消息中间件产品来说,能不能保证FI FO特性, 尚值得考量。但是,所有消息队列都是需要具备存 储消息,让消息排队的能⼒ 。
⼴义上来说, 只要能够实现消息跨进程传输以及队列数据缓存,就可以称之为消息队列。例如我们常⽤的QQ 、微信 、阿⾥旺旺等就都具备了这样的功能。只不过他们对接的使⽤对象是⼈ ,⽽我们这⾥讨论的MQ产品 需要对接的使⽤对象是应⽤程序。
1、MQ的作⽤主要有以下三个⽅⾯:
(1)异步
例⼦: 快递员发快递,直接到客户家效率会很低。引⼊菜⻦驿站后,快递员只需要把快递放到菜⻦驿站, 就可以继续发其他快递去了。客户再按⾃⼰的时间安排去菜⻦驿站取快递。
作⽤ :异步能提⾼系统的响应速度 、吞吐量。
(2)解耦
例⼦:《Thinking in JAVA》很经典,但是都是英⽂ ,我们看不懂,所以需要编辑社,将⽂章翻译成其他 语⾔, 这样就可以完成英语与其他语⾔的交流。
作⽤ :
1 、服务之间进⾏解耦,才可以减少服务之间的影响。提⾼系统整体的稳定性以及可扩展性。
2 、另外,解耦后可以实现数据分发。⽣产者发送—个消息后,可以由—个或者多个消费者进⾏消费,并 且消费者的增加或者减少对⽣产者没有影响。
(3)削峰
例⼦: ⻓江每年都会涨⽔ ,但是下游出⽔⼝的速度是基本稳定的,所以会涨⽔ 。引⼊三峡⼤坝后,可以把 ⽔储存起来,下游慢慢排⽔。
作⽤ : 以稳定的系统资源应对突发的流量冲击。
⼆ 、Rocket MQ产品特点
1 、Rocket MQ介绍
Rocket MQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的—个顶级项目。
早期阿里使用ActiveMQ ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是, 阿⾥开始关注Kafka 。但是Kafka是针对日志收集场景设计的,他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时, 由于Partition文件也会过多,这就会加大文件索引的耗时,会严重影响IO性能。于是阿里才决定自研中间件, 最早叫做MetaQ ,后来改名成为Rocket MQ 。最早他所希望解决的最大问题就是多Topic下的IO 性能压力 。但是产品在阿里内部的不断改进, Rocket MQ开始体现出一些不一样的优势。
2 、Rocket MQ特点
当今互联⽹MQ产品众多,其中,影响⼒和使⽤范围最⼤的当数Apache Kafka 、RabbitMQ 、Apache Rocket MQ以及Apache Plusar。这⼏⼤产品虽然都是典型的MQ产品,但是由于设计和实现上的—些差异,造成他们适合于不同的细分场景。
优点 | 缺点 | 适合场景 | |
Apache Kafka | 吞吐量⾮常⼤ ,性能⾮常好,集群⾼可⽤ 。 | 会有丢数据的可 能,功能⽐较单— | ⽇志分析、 ⼤数据采集 |
Rabbit MQ | 消息可靠性⾼ ,功能全⾯ 。 | erlang语⾔不好定 制。吞吐量⽐较 低。 | 企业内部⼩ 规模服务调⽤ |
Apache Pulsar | 基于Bookeeper构建, 消息可靠性⾮常⾼ 。 | 周边⽣态还有差 距, ⽬前使⽤的公 司⽐较少。 | 企业内部⼤ 规模服务调⽤ |
Apache Rocket MQ | ⾼吞吐 、⾼性能 、⾼可⽤ 。功能全⾯ 。 客户端协议丰富。使⽤java语⾔开发, ⽅ 便定制。 | 服务加载⽐较慢。 | ⼏乎全场 景,特别适 合⾦融场景 |
其中Rocket MQ ,孵化⾃阿⾥巴巴。历经阿⾥多年双十一的严格考验, Rocket MQ可以说是从全世界最严苛的 ⾼并发场景中摸爬滚打出来的过硬产品,也是少数⼏个在⾦融场景⽐较适⽤的MQ产品。从横向对⽐来看,Rocket MQ与Kafka和RabbitMQ相⽐ 。Rocket MQ的消息吞吐量虽然和Kafka相⽐还是稍有差距,但是却⽐RabbitMQ⾼很多。在阿⾥内部, Rocket MQ集群每天处理的请求数超过5万亿次,⽀持的核⼼应⽤超过3000 个。⽽Rocket MQ最⼤的优势就是他天⽣就为⾦融互联⽹⽽⽣ 。他的消息可靠性相⽐Kafka也有了很⼤的提升, ⽽消息吞吐量相⽐RabbitMQ也有很⼤的提升。另外, Rocket MQ的⾼级功能也越来越全⾯ ,⼴播消费 、延迟队列 、死信队列等等⾼级功能一应俱全, 甚⾄某些业务功能⽐如事务消息, 已经呈现出领先潮流的趋势。
三、RocketMQ快速实战
1.快速搭建RocketMQ
RocketMQ的官网地址: http://rocketmq.apache.org 。在下载页面可以获取RocketMQ的源码包以及运行包。下载页面地址:https://rocketmq.apache.org/download。
当前最新的版本是5.x,这是一个着眼于云原生的新版本,给 RocketMQ 带来了非常多很亮眼的新特性。但是目前来看,企业中用得还比较少。因此,我们这里采用的还是更为稳定的4.9.5版本。
注:在2020年下半年,RocketMQ新推出了5.0的大版本,这对于RocketMQ来说,是一个里程碑式的大版本。在这个大版本中,RocketMQ对整体功能做了一次大的升级。增加了很多非常有用的新特性,也对已有功能重新做了升级。
比如在具体功能方面,在4.x版本中,对于定时消息,只能设定几个固定的延迟级别,而5.0版本中,已经可以指定具体的发送时间了。在客户端语言方面,4.x版本,RocketMQ原生只支持基于Netty框架的Java客户端。而在5.0版本中,增加了对Grpc协议的支持,这基本上就解除了对客户端语言的限制。在服务端架构方面,4.x版本只支持固定角色的普通集群和可以动态切换角色的Dledger集群,而在5.0版本中,增加了Dledger Controller混合集群模式,即可以混合使用Dledger的集群机制以及 Broker 本地的文件管理机制。
但是功能强大,同时也意味着问题会很多。所以目前来看,企业中直接用新版本的还比较少。小部分使用新版本的企业,也大都是使用内部的改造优化版本。
运⾏只需要下载Binary运⾏版本就可以了。 当然,源码包也建议下载下来,后续会进⾏解读。运⾏包下载下 来后,就可以直接解压,上传到服务器上。我们这⾥会上传到/app/rocketmq⽬录。解压后⼏个重要的⽬录如下:
默认情况下, Rocket MQ建议的运⾏环境需要⾄少12G的内存, 这是⽣产环境⽐较理想的资源配置。但是, 学 习阶段,如果你的服务器没有这么⼤的内存空间,那么就需要做—下调整。进⼊bin ⽬录,对其中的runserver.sh和runbroker.sh两个脚本进⾏—下修改。
使⽤vi runserver.sh指令,编辑这个脚本,找到下⾯的—⾏配置,调整Java进程的内存⼤⼩。
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m - XX:MaxMetaspaceSize=320m"
接下来, 同样调整runbroker.sh中的内存⼤⼩。
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改为:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g"
修改配置时, 注意要根据你的JDK版本调整对应的配置⾏ 。Rocket MQ是—个典型的Java应⽤ ,所以需要 提前安装JDK 。我们这⾥采⽤的是1 .8版本。JDK的安装过程略。
⽣产环境不建议调整。这—系列参数实际上就是Rocket MQ的JVM调优结果。
Rocket MQ的后端服务分为nameserver和broker两个服务,关于他们的作⽤ ,后⾯会给你分享。接下来我们 先将这两个服务启动起来。
第⼀步:启动nameserver服务。
cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
nohup bin/mqnames rv &
指令执⾏后,会⽣成—个nohup.out的⽇志⽂件。在这个⽇志⽂件⾥如果看到下⾯这—条关键⽇志,就表示 nameserver服务启动成功了。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and
will likely be removed in a future release.
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
接下来,可以通过jsp指令进⾏验证。使⽤jps指令后,可以看到有—个NamesrvStartup的进程运⾏ ,也表示 nameserver服务启动完成。
第⼆步:启动broker服务
broker也是—个Java服务, 只需要调整conf⽬录下的broker.conf⽂件, 进⾏—些定制。然后就可以启动了。
具体配置项参⻅官⽅⽂档, 这⾥尽量⾛默认配置。
如果你的服务器配置了多张⽹卡, 建议配置brokerIP1属性。⽐如阿⾥云,腾讯云这样的云服务器,他们 通常有内⽹⽹卡和外⽹⽹卡两张⽹卡,那么需要增加配置brokerIP1属性,指向服务器的外⽹IP 地址, 这 样才能确保从其他服务器上访问到Rocket MQ 服务。
在启动broker服务前,需要先指定NameServer的服务地址。Rocket MQ可以使⽤—个NAMESRV_ADDR的环 境变量指定NameServer服务地址。
export NAMESRV_ADDR= I localhost:9876 I
9876是nameserver的默认服务端⼝ 。
然后也可以⽤之前的⽅式启动broker服务。启动broker服务的指令是mqbroker。
cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
nohup bin/mqbroker &
启动完成后, 同样检查nohup.out⽇志⽂件,有如下—条关键⽇志,就表示broker服务启动正常了。
The broker [xxxxx] boot success. serializeType=JSON and name server is localhost:9876
注 :1、在实际服务部署时,通常会将Rocket MQ的部署地址添加到环境变量当中。例如使⽤vi ~/.bash_profile指令,添加以下内容。
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-5.3.0-bin-release PATH=$ROCKETMQ_HOME/bin:$PATH
export PATH
这样就不必每次进⼊Rocket MQ的安装⽬录了。直接可以使⽤mqnamesrv 和mqbroker指令。
2 、停⽌Rocket MQ服务可以通过mqshutdown指令进⾏
mqshutdown namesrv # 关闭nameserver服务mqshutdown broker # 关闭broker服务
同样使⽤jps指令可以检查服务的启动状态。使⽤jps指令后,可以看到—个名为BrokerStartup的进程,则表示 broker服务启动完成。
2 、快速实现消息收发
Rocket MQ后端服务启动完成后,就可以启动客户端的消息⽣产者和消息消费者进⾏消息转发了。接下来,我 们会先通过Rocket MQ提供的命令⾏⼯具快速体验—下Rocket MQ消息收发的功能。然后,再动⼿搭建—个Maven项⽬ ,在项⽬中使⽤Rocket MQ进⾏消息收发。
(1)命令⾏快速实现消息收发
1) :通过指令启动Rocket MQ的消息⽣产者发送消息。
cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
这个指令会默认往Rocket MQ中发送1000条消息。在命令⾏窗⼝可以看到发送消息的⽇志:
.....
SendResult [sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31103E6,
offsetMsgId=C0A8417000002A9F000000000003AEFE, messageQueue=MessageQueue [topic=TopicTest,
brokerName=192-168-65-112, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31203E7,
offsetMsgId=C0A8417000002A9F000000000003AFF0, messageQueue=MessageQueue [topic=TopicTest,
brokerName=192-168-65-112, queueId=2], queueOffset=249]
这部分⽇志中,并没有打印出发送了什么消息。上⾯Send Result开头部分是消息发送到Broker后的结果。最 后两⾏⽇志表示消息⽣产者发完消息后,服务正常关闭了。
2):可以启动消息消费者接收之前发送的消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
消费者启动完成后,可以看到消费到的消息
......
ConsumeMessageThread_please_rename_unique_group_name_4_18 Receive New Messages:
[MessageExt [brokerName=192-168-65-112, queueId=1, storeSize=242, queueOffset=211,
sysFlag=0, bornTimestamp=1725004967502, bornHost=/192.168.65.112:52748,
storeTimestamp=1725004967502, storeHost=/192.168.65.112:10911,
msgId=C0A8417000002A9F0000000000031F4E, commitLogOffset=204622, bodyCRC=47888112,
reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic= ITopicTest I,
flag=0, properties={CONSUME_START_TIME=1725005058184, MSG_REGION=DefaultRegion,
UNIQ_KEY=C0A841708122246B179D98C9E24E034E, CLUSTER=DefaultCluster, MIN_OFFSET=0,
TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=250}, body= [72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 52, 54], transactionId= Inull I}]]
每—条这样的⽇志信息就表示消费者接收到了—条消息。
这个Consumer消费者的指令并不会主动结束,他会继续挂起,等待消费新的消息。我们可以使⽤CTRL+C停 ⽌该进程。
注:在Rocket MQ提供的这个简单示例中并没有打印出传递的消息内容,⽽是打印出了消息相关的很多重 要的属性。
其中有⼏个⽐较重要的属性: brokerId,brokerName,queueId,msgId,topic,cluster。这些属性的作⽤会在 后续—起分享, 这⾥你不妨先找—下这些属性是什么,消费者与⽣产者之间有什么样的对应关系。
3 、搭建Java客户端项⽬
之前的步骤实际上是在服务器上快速验证Rocket MQ的服务状态,接下来我们动⼿搭建—个Rocket MQ的客户 端应⽤ ,在实际应⽤中集成使⽤Rocket MQ。
第⼀步 :创建—个标准的maven项⽬ ,在pom.xml中引⼊以下核⼼依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>
第⼆步:**就可以直接创建—个简单的消息⽣产者