04、RocketMQ -- 核心基础使用
目录
- 核心基础使用
- 1、入门案例
- 生产者
- 消费者
- 2、消息发送方式
- 方式1:同步消息
- 方式2:异步消息
- 方式3:一次性消息
- 管控台使用过程中可能出现的问题
- 3、消息消费方式
- 集群模式(默认)
- 广播模式
- 4、顺序消息
- 分析图:
- 代码实现:
- 生产者代码:
- 消费者代码:
- Tag消息消费过滤
- 5、延迟消息
- 消费者代码:
- 消费者代码:
- 消息过滤
- 6、Tag 标签过滤
- 生产者:
- 消费者:
- 7、SQL92 过滤
- 生产者代码:
- 消费者代码:
- 修改配置文件
核心基础使用
1、入门案例
生产者和消费者都需要用到同一个依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>
生产者
控制台的消息
消费者
RocketMQ有push(推模式)和pull(拉模式)两种消费消息的模式,推模式就是Broker主动将消息推送给消费者,拉模式就是消费者主动从Broker将消息拉回来。推模式本质实际上是拉模式,是基于拉模式实现的
consumer启动之后,只要不关闭,一有消息就会被这个消费者消费
2、消息发送方式
方式1:同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
消息中间件要保证消息不丢失,它里面有一个持久化机制的。
发来的消息默认存在内存中,但是如果消息中心宕机了,那么消息就全丢了。
所以这个消息中心里面有这样一个存储介质,消息中心最终会把消息存在【文件存储】里面,每个Broker Server 有自己的文件存储。
方式2:异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
适合需要快速响应的场景,不过可靠性比同步消息差一点,因为是异步发送,所以业务逻辑继续往下走的时候,异步发送的消息可能会出现问题,这就是说它可靠性差点。
分析图:
示意图:
启动rocketmq之后发送异步消息成功。
方式3:一次性消息
性能更高,对丢失一两条数据无所谓的,适合日志场景
代码示意图
生产者
消费者
管控台使用过程中可能出现的问题
存到消息的时间是机器时间,然后隔天再打开查询,时间对不上
时间同步命令
用阿里的时间同步
3、消息消费方式
集群模式(默认)
集群模式:消息是分散消费的,分散到不同的消费者去消费的。
广播模式
4、顺序消息
分析图:
消费按照指定的顺序进行消费
rocketMQ本身就是多线程的,默认每个消费者的线程数为5个,每个消费者可以有n个线程来进行消费。
属于多线程消费
每一个topic默认有4个消息队列 MessageQueue,如图
顺序消费分析图:
代码实现:
生产者代码:
一些注解:
消费者代码:
Tag消息消费过滤
“*”号表示所有消息都要消费
想要看消息消费前和消费后的状态的区别,下图不确定是不是这么理解
注意点:
加这个的话,每次都会从头开始消费-----待确定功能
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
5、延迟消息
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,
从1s到2h分别对应着等级1到18
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
消费者代码:
主要是这里的消息调用个延迟发送消息的方法而已
消费者代码:
这里跟其他消费者代码没什么区别
消息过滤
6、Tag 标签过滤
生产者:
消费者:
7、SQL92 过滤
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL** 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:**123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
注意: 在使用SQL过滤的时候, 需要配置参数enablePropertyFilter=true
生产者代码:
消费者代码:
修改配置文件
报错的原因是因为linux要修改下配置
enablePropertyFilter=true
:wq 保存退出
关闭和重新启动nameserver和broker
关闭nameserver:
sh mqshutdown namesrv
关闭broker
sh mqshutdown broker
1.启动NameServer
nohup sh mqnamesrv &
2.启动Broker
nohup sh mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
这里已经变成true了
重新启动消费者看会不会报错
过滤成功