当前位置: 首页 > news >正文

【RocketMQ 生产者和消费者】- ConsumeMessageOrderlyService 顺序消费消息

文章目录

  • 1. 前言
  • 2. start 锁定队列
    • 2.1 lockMQPeriodically 周期锁定所有队列
    • 2.2 broker 通过 lockBatchMQ 批量对消息队列加锁
    • 2.3 tryLockBatch 批量加锁
    • 2.4 isLocked 判断队列是否加锁了
    • 2.5 isExpired 判断锁是否失效了
  • 3. submitConsumeRequest 提交消费请求
  • 4. ConsumeRequest 消息消费
    • 4.1 takeMessages 从缓存中获取消息
  • 5. processConsumeResult 处理消费结果
    • 5.1 checkReconsumeTimes 最大消费重试次数了
    • 5.2 makeMessageToConsumeAgain 消息回退
    • 5.3 commit 提交
  • 6. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
  • 【RocketMQ 生产者和消费者】- 消费者启动源码
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(1)
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(2)- 分配策略
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响
  • 【RocketMQ 生产者和消费者】- 消费者的订阅关系一致性
  • 【RocketMQ 生产者和消费者】- 消费者发起消息拉取请求 PullMessageService
  • 【RocketMQ 生产者和消费者】- broker 是如何处理消费者消息拉取的 Netty 请求的
  • 【RocketMQ 生产者和消费者】- broker 处理消息拉取请求
  • 【RocketMQ 生产者和消费者】- 消费者处理消息拉取结果
  • 【RocketMQ 生产者和消费者】- 消费者处理消息拉取结果
  • 【RocketMQ 生产者和消费者】- ConsumeMessageConcurrentlyService 并发消费消息

这篇文章来看下 ConsumeMessageOrderlyService 如何处理顺序消费消息,同理顺序消费的线程池,核心线程数和最大线程数也都是 20,消息队列是无界阻塞队列,然后还有一个 scheduledExecutorService,创建出来的是单线程的定时任务线程池,用于顺序提交消息。

/*** 创建单线程的定时任务线程池, 用于单线程延迟提交消费请求*/
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));

2. start 锁定队列

顺序消费模式下,一个队列只能给一个消费者来消费,所以需要定时去向 broker 加锁,那么为什么要向 broker 加锁呢,因为负载均衡也是在 broker 执行的,如果这时候新增了一些消费者,那么消息队列就会重新分配,假设当前消费者真正消费某个消息队列,这种情况下肯定是不能中断的,因此需要加个锁,等下一次负载均衡再把这个消息队列给交出去。

/*** 顺序消费模式*/
public void start() {// 集群模式才有顺序消费, 广播模式就没有了if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {// 启动后 1s 开始执行, 接着每 20s 执行一次this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 锁定所有分配给当前消费者的消息队列ConsumeMessageOrderlyService.this.lockMQPeriodically();} catch (Throwable e) {log.error("scheduleAtFixedRate lockMQPeriodically exception", e);}}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}
}

2.1 lockMQPeriodically 周期锁定所有队列

这个方法每 20s 执行一次,向 broker 请求锁定所有队列,避免负载均衡将当前正在消费的消费者交出去。

/*** 锁定所有队列*/
public synchronized void lockMQPeriodically() {if (!this.stopped) {// 锁定所有分配给当前消费者的消息队列this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}
}

lockAll 定时锁定分配给当前消费者的消息队列。

/*** 定时锁定分配给当前消费者的消息队列*/
public void lockAll() {// 构建 [brokerName -> 消息队列集合] 的映射HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<MessageQueue>> entry = it.next();// brokerNamefinal String brokerName = entry.getKey();// 消息队列final Set<MessageQueue> mqs = entry.getValue();if (mqs.isEmpty())// 消息队列为空, 直接返回continue;// 获取主节点地址, 如果获取不到就获取 brokerId = 1 的从节点FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {// 1. 构建加锁请求体LockBatchRequestBody requestBody = new LockBatchRequestBody();// 消费者组requestBody.setConsumerGroup(this.consumerGroup);// 消费者 IDrequestBody.setClientId(this.mQClientFactory.getClientId());// 要加锁的消费队列requestBody.setMqSet(mqs);try {// 2. 向 broker 发送同步请求, 对消息队列批量加锁, 返回结果就是锁定的消息队列Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 3. 处理锁定的消息队列for (MessageQueue mq : lockOKMQSet) {// 从 processQueueTable 中获取处理队列ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {// broker 加锁完之后if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}// 设置处理队列里面的 locked 加锁状态为 trueprocessQueue.setLocked(true);// 设置处理队列的锁定时间processQueue.setLastLockTimestamp(System.currentTimeMillis());}}// 遍历所有队列for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {// 这里就是加锁失败的队列ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {// 设置加锁状态为 falseprocessQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}}
}

上面就是所有的逻辑,首先就是构建 [brokerName -> 消息队列集合] 的映射,因为消费者订阅的 topic 下面的对垒是可以分配到不同的 broker 的,所以这里分类一下,方便后面向各个 broker 发起请求。

然后就是遍历集合,分别向每一个 broker 发送请求,参数把要加锁的队列传进去,以及消费者组和消费者 ID。

发送请求过后,返回结果是已经加锁成功的队列 lockOKMQSet。最后遍历这些已经加锁成功的队列,将本地对应的处理队列的加锁状态 locked 也设置为 true,然后设置加锁时间为当前时间。之所以这个方法是周期锁定所有队列,就是因为处理队列里面会根据当前加锁时间是否超过了 30s 来判断这个锁是不是过期,所以需要确保在没到 30s 的时候继续向 broker 发起请求加锁,相当于续费。

最后遍历所有队列,将那些加锁失败的队列加锁状态设置为 false。设置为 false 有可能是因为这个队列还在其他消费者那里消费,后续当前消费者顺序消费的时候判断如果加锁状态为 false,就不会消费消息,而是延迟一段时间再次提交消费请求。


2.2 broker 通过 lockBatchMQ 批量对消息队列加锁

/*** 对消息队列批量加锁* @param ctx* @param request* @return* @throws RemotingCommandException*/
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 创建返回结果final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 加锁请求LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);// 批量加锁, 返回加锁成功的集合Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(requestBody.getConsumerGroup(),requestBody.getMqSet(),requestBody.getClientId());// 返回结果LockBatchResponseBody responseBody = new LockBatchResponseBody();responseBody.setLockOKMQSet(lockOKMQSet);response.setBody(responseBody.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}

最终调用了 RebalanceLockManager#tryLockBatch 来尝试批量锁定消息队列,返回锁定成功的队列。


2.3 tryLockBatch 批量加锁

/*** 尝试批量锁定消息队列, 返回锁定成功的队列, 顺序消费者会调用这个分发* @param group* @param mqs* @param clientId* @return*/
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,final String clientId) {// 加锁成功的集合Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());// 加锁失败的集合Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());// 首先这些队列里面有一些队列是已经加锁成功的, 还有一些队列是没有加锁的, 先分出来for (MessageQueue mq : mqs) {if (this.isLocked(group, mq, clientId)) {// 这里是已经被传入的消费者锁定了lockedMqs.add(mq);} else {// 没加锁或者说不是传入的消费者锁定的notLockedMqs.add(mq);}}if (!notLockedMqs.isEmpty()) {try {// 先加锁this.lock.lockInterruptibly();try {// 获取这个消费者组下面的队列加锁情况ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);if (null == groupValue) {// 初始化groupValue = new ConcurrentHashMap<>(32);this.mqLockTable.put(group, groupValue);}// 遍历所有没有锁定的队列, 这里没有锁定意思是没有被传入的 Consumer 锁定for (MessageQueue mq : notLockedMqs) {// 首先获取下消息队列的加锁情况LockEntry lockEntry = groupValue.get(mq);// 1. 这里就是消息队列没有被锁定if (null == lockEntry) {// 创建加锁对象, 这里面会初始化 lastUpdateTimestamp 为当前时间lockEntry = new LockEntry();// 设置加锁的消费者 IDlockEntry.setClientId(clientId);// 添加到集合中groupValue.put(mq, lockEntry);log.info("tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",group,clientId,mq);}// 2. 如果已经被当前客户端锁定if (lockEntry.isLocked(clientId)) {// 设置 lastUpdateTimestamp 为当前时间, 因为有可能这里队列是原本就被当前客户端锁定的而不是新建的, 所以// 需要重新设置更新时间, 虽然对于第一次锁定的也会在这里重复设置, 但是问题不大lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());// 添加到已锁定的集合中lockedMqs.add(mq);// 继续处理下一个队列continue;}// 3. 到这里就是说这个消息队列不是被这个消费者锁定的String oldClientId = lockEntry.getClientId();// 如果说当前这个消息队列被上一个消费者锁定已经超过 60s 还没有解锁, 就过期了if (lockEntry.isExpired()) {// 重新设置加锁的消费者lockEntry.setClientId(clientId);// 重新设置更新时间lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());log.warn("tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",group,oldClientId,clientId,mq);// 添加到已锁定的集合中lockedMqs.add(mq);continue;}// 这里就是当前消息队列已经被其他消费者锁定了, 而且还没有过期log.warn("tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",group,oldClientId,clientId,mq);}} finally {// 解锁this.lock.unlock();}} catch (InterruptedException e) {log.error("putMessage exception", e);}}// 返回传入的消费者锁定的消息队列集合return lockedMqs;
}

lockedMqs 是加锁成功的集合,消费者传过来要加锁的队列集合中如果有一些已经是被这个消费者加锁过的,就会放到 lockedMqs 中,剩下那些没有加锁过的队列又或者加锁了但是不是当前消费者的会加入 notLockedMqs 集合中。

上面的 isLocked 就是判断这个队列是不是被这个消费者加锁的,clientId 就是消费者的 id,下面 2.4 小节会看下里面的方法,broker 对于队列加锁也是有失效时间的。如果是更新下 lastUpdateTimestamp,重置加锁时间,最后放到 lockedMqs 里面。

mqLockTable 记录的是消费者组下面的加锁情况,记录了这个消费者组下面加锁的队列和以及这个队列被哪个消费者加锁了,LockEntry 里面属性是 clientId 和 lastUpdateTimestamp。

/*** 消费者组的加锁情况*/
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);

对于那些没有加锁过的队列或者是加锁了,但是不是当前消费者持有的队列,如果没有锁定就设置加锁的消费者 ID 为当前消费者,lastUpdateTimestamp 就不用设置了,因为创建出来默认是当前时间。lockEntry.isLocked(clientId) 判断如果已经被当前消费者锁定,这里重新设置更新时间应该是重复设置了,不过也没问题,其实在前面一开始的 this.isLocked 方法也会判断如果说这个队列是当前消费者加锁的,那么就会更新 lastUpdateTimestamp。

那如果说这个队列不是被当前消费者锁定的,就通过 lockEntry.isExpired() 看下锁有没有过期,如果说当前这个消息队列被上一个消费者锁定已经超过 60s 还没有解锁,就过期了,这个时间可以通过系统变量 rocketmq.broker.rebalance.lockMaxLiveTime 设置。这种情况下可以直接更新加锁的消费者为当前消费者,然后重新设置更新时间。

最后就是当前消息队列已经被其他消费者锁定了, 而且还没有过期,这种情况下打印下日志。因此最后返回的 lockedMqs 就是已经加锁完的集合。


2.4 isLocked 判断队列是否加锁了

/*** 判断队列是否加锁了* @param group* @param mq* @param clientId* @return*/
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {// 获取这个消费者组下面的已加锁队列集合ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);if (groupValue != null) {// 获取消息队列的加锁情况LockEntry lockEntry = groupValue.get(mq);if (lockEntry != null) {// 判断下这个消息队列是不是被这个客户端锁定的boolean locked = lockEntry.isLocked(clientId);if (locked) {// 如果是, 更新下 lastUpdateTimestamplockEntry.setLastUpdateTimestamp(System.currentTimeMillis());}// 返回 locked, 代表这个队列是不是当前消费者锁定的return locked;}}// 这里就是获取不到加锁情况, 说明这个消息队列还没有被消费者锁定return false;
}

这里就是判断队列是不是被传入的 clientId 锁定了,如果是就更新下 lastUpdateTimestamp。


2.5 isExpired 判断锁是否失效了

/*** 消费者对消息队列的锁定时间最多 60s* @return*/
public boolean isExpired() {boolean expired =(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;return expired;
}

逻辑比较简单,就是判断当前距离上一次加锁的时间有没有超过 60s,如果超过就说明锁过期了。


3. submitConsumeRequest 提交消费请求

好了,上面说了下如何锁定队列的,这里就回到 ConsumeMessageOrderlyService 来看下具体的消费流程,首先就是提交消费请求。

/*** 提交消费请求* @param msgs              拉取到的消息* @param processQueue      处理队列* @param messageQueue      消息队列* @param dispathToConsume  是否分发消息*/
@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {// 当 msgTreeMap 有消息且还没有开始消费的时候, 传入的这个参数就是 true// 当 msgTreeMap 有消息并且正在消费中, 传入的这个参数就是 false// 这样是为了顺序消费, 处理完再去分发下一次请求if (dispathToConsume) {// 构建消费请求, 这里没有把消息放进线程里面处理, 这就意味者消费者会到 msgTree 里面去拉取消息来消费ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);// 提交到线程池中去消费this.consumeExecutor.submit(consumeRequest);}
}

ConsumeMessageOrderlyService 和 ConsumeMessageConcurrentlyService 不一样,要考虑顺序性,dispathToConsume 这个参数可以在上层调用看到,就是 PullCallback#onSuccess 方法处理 broker 返回的消息时会传入。

当 msgTreeMap 有消息且还没有开始消费的时候,传入的这个参数就是 true,当 msgTreeMap 有消息并且正在消费中,传入的这个参数就是 false,是否正在消费中由参数 consuming 控制,如果传入这个参数为 true,在消费的时候当消息从 msgTreeMap 被拿出来后判断如果 msgTreeMap 里面的消息都被消费完了,这时候把 consuming 设置为 false,再次往里面添加消息拉取请求。

那么这样做会不会有消息丢失呢? 其实也不会,因为在 PullCallback#onSuccess 拉取到消息之后第一件事就是写到了本地缓存 msgTreeMap 中,所以数据不会丢失。

那么会不会堆积呢,比如消费者频繁发起拉取请求,但是因为有 dispathToConsume 这个参数导致消费没那么快。 这里就要说下 ConsumeRequest 的消费逻辑了,不同于 ConsumeMessageConcurrentlyService,这里的 ConsumeRequest 消费完消息之后会继续拉取消息消费,在 msgTreeMap 里面还有消息且没有消费失败之前会不断消费,这时候 dispathToConsume 就是 false,因为上面也说了这个参数如果是 true 得确保 msgTreeMap 是空的。

所以当一个 ConsumeRequest 将本地缓存里面的消息消费完了之后,下一个 ConsumeRequest 才会被添加进来,不会同时有两个 ConsumeRequest 消费同一个消息队列的,因为你要考虑消费失败的场景,如果两个 ConsumeRequest 同时消费,假设 ConsumeRequest1 消费的偏移量较小,但是消费失败了,这时候就会延时一段时间再次提交消费请求,而这时候 ConsumeRequest2 消费的偏移量较大的消息没有消费失败,这种情况下就确保不了顺序性了


4. ConsumeRequest 消息消费

这里的消息消费就不需要传入 msgs 了,因为为了确保顺序性都是直接从 msgTreeMap 拿的,所以属性里面没有 msgs,还是一样看看 run 方法,也是消费的核心。

/*** 顺序消费*/
@Override
public void run() {// 如果处理队列被丢弃了, 直接返回if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}// 1. 对消息队列加本地锁, 防止并发消费同一个队列final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);// synchronized 加锁synchronized (objLock) {// 2. 如果是广播模式的顺序消费, 又或者说是集群模式但是对队列加锁了并且锁还没有过期//    顺序消费的消费者 ConsumeMessageOrderlyService 会每隔 20s 定时向 broker 申请对负载均衡分配到这个消费者下面的处理//    队列加锁, 加锁成功就会设置 locked = trueif (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {// 消费的起始时间final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped()) {// 3. 如果处理队列被丢弃了, 就不消费了, 比如负载均衡给了其他消费者, 避免重复消费log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}// 4. 如果当前消费模式是集群模式并且当前处理队列没有被锁定if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);// 延迟 10ms 再次向 broker 发送请求锁定该 messageQueue, 锁定成功再延迟提交消费请求ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);// 直接退出, 本次消费结束break;}// 5. 如果当前消费者消费模式是集群消费, 但是 processQueue 处理队列锁已经过期if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);// 延迟 10ms 再次向 broker 发送请求锁定该 messageQueue, 锁定成功再延迟提交消费请求ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);// 直接退出, 本次消费结束break;}// 6. 前面校验的消耗时间long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {// 超时了, 延迟 10ms 再次提交消费请求, 不需要发送锁定请求, 因为处理队列锁还没有过期ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}// 一次性可以消费多少消息, 默认 1final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 7. 从 msgTreeMap 中获取偏移量最小的前 consumeBatchSize 条消息List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);// 处理重传消息 topic, 当 msgs 消息是重传消息的时候, 会在这个方法中将这条消息的 topic 还原成真实的 topicdefaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());// 8. 拉取到了消息, 开始消费if (!msgs.isEmpty()) {// 构建顺序消费上下文final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;// 8.1 如果有消费钩子, 调用钩子的 consumeMessageBefore 前置方法if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}// 起始时间long beginTimestamp = System.currentTimeMillis();// 消费返回类型ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {// 8.2 再次获取处理队列的消费锁, 从这里可知,顺序消费需要获取四把锁: broker 的 messageQueue 锁,//     本地 messageQueue 锁,本地 processQueue 锁, 本地 processQueue 里面的 consumeLock,同时获取这四把锁才可以继续消费this.processQueue.getConsumeLock().lock();if (this.processQueue.isDropped()) {// 如果队列已经丢弃掉了, 直接返回, 不继续消费log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}// 8.3 调用用户设置的监听器的 consumeMessage 分发去消费消息, 返回结果是 ConsumeOrderlyStatusstatus = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {// 出异常就大于日志log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue), e);// 异常标记为 truehasException = true;} finally {// 解除本地 processQueue 锁this.processQueue.getConsumeLock().unlock();}// 8.4 判断返回结果if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {// 如果是返回 ROLLBACK 或者 SUSPEND_CURRENT_QUEUE_A_MOMENT, 说明消费失败, 打印日志log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}// 消费时间long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {// 如果是业务出异常了, 设置返回类型为 EXCEPTIONreturnType = ConsumeReturnType.EXCEPTION;} else {// 如果什么都没有返回, 结果就是 RETURNNULLreturnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {// 如果消费时间超过 15 分钟, 就表示超时了, 返回结果是 TIME_OUTreturnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {// 如果返回状态是 SUSPEND_CURRENT_QUEUE_A_MOMENT, 那么消费失败, 返回结果是 FAILEDreturnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {// 消费成功, 返回结果是 SUCCESSreturnType = ConsumeReturnType.SUCCESS;}// 如果有设置了消费回调钩子, 将返回结果设置到属性里面if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {// 如果没有返回结果, 设置为 SUSPEND_CURRENT_QUEUE_A_MOMENTstatus = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}// 8.5 如果有消费钩子, 调用钩子的 consumeMessageAfter 后置钩子方法if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {// 设置消费结果状态consumeMessageContext.setStatus(status.toString());// 设置是否消费成功consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);// 调用钩子方法ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}// 增加消费时间ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);// 9. 根据不同的 status 状态处理消费结果continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {// 没拉取到消息就没必要继续消费了, 直接退出continueConsume = false;}}} else {if (this.processQueue.isDropped()) {// 消息队列丢弃了, 打印下日志, 直接返回log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}// 这里就是集群模式下没有锁定 processQueue(向 broker 申请锁定的), 这种情况下延迟 100ms 再次提交消费请求ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}
}

上面是全部的逻辑,跟并发消费那块的核心逻辑都大差不差,有几个点不同的要说下。

  1. 首先对消息队列加本地锁,防止并发消费同一个队列,也就是 MessageQueue 加锁
  2. 广播模式下的顺序消费直接走里面的处理逻辑,一条消息会分配给对应消费者组下面的所有消费者消费,因此只需要确保当前消费者是顺序消费就行。集群模式一个队列只能分配给一个消费者,因此需要确保整个队列不能同时分配给多个消费者,需要加锁,processQueue 是前面 2.1 小节周期加锁来锁定的。
  3. 顺序消费一个消费任务会不断消费本地缓存里面的消息,通过 continueConsume 控制,如果需要消息重试,那么会延时 1s 再次提交消费请求,这时候 continueConsume 会设置为 false 不再继续拉取消息,又或者本地缓存里面没有消息了,这种情况下也会设置 continueConsume 为 false,不再继续消费,等到消费者再次处理消息拉取请求从 broker 获取到消息放到 msgTreeMap 之后会再次提交 ConsumeRequest 请求来消费。
  4. 集群模式下如果加锁失败,那么延时 100ms 再次提交消息消费请求,有可能只是当前消息队列还在其他消费者那里消费,但是负载均衡到了当前消费者。
  5. 注意最终消息消费之前会获取四个锁,broker 的队列锁,本地 processQueue 锁,本地 processQueue 的 consumeLock 锁,本地 messageQueue 锁。

4.1 takeMessages 从缓存中获取消息

这个方法比较关键,从缓存中获取消息,消息条数是 batchSize,主要是涉及到了 consumingMsgOrderlyTreeMap 这个集合,以及后面消息消费完了后的 commit 提交逻辑。

/*** 从缓存中获取消息, 消息条数是 batchSize* @param batchSize* @return*/
public List<MessageExt> takeMessages(final int batchSize) {List<MessageExt> result = new ArrayList<MessageExt>(batchSize);// 当前时间final long now = System.currentTimeMillis();try {// 加写锁this.treeMapLock.writeLock().lockInterruptibly();// 设置上一次消费的时间this.lastConsumeTimestamp = now;try {// 缓存中还有消息if (!this.msgTreeMap.isEmpty()) {for (int i = 0; i < batchSize; i++) {// 获取偏移量最小的消息Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();if (entry != null) {// 添加到结果集合中result.add(entry.getValue());// 添加到 consumingMsgOrderlyTreeMap 集合中, 这些消息正在消费中consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());} else {// 缓存里面没有数据了break;}}}// 一条消息都没拉取到if (result.isEmpty()) {// 设置为 false, 表示没有消息消费consuming = false;}} finally {// 解除写锁this.treeMapLock.writeLock().unlock();}} catch (InterruptedException e) {log.error("take Messages exception", e);}return result;
}

方法不复杂,就是在 msgTreeMap 中从最小偏移量开始获取 batchSize 条消息,不过有一个点就是获取到了消息之后需要提交到 consumingMsgOrderlyTreeMap 这个集合中,表示这些消息正在消费,这里先记住这个结构。


5. processConsumeResult 处理消费结果

上面是处理消息消费的大致流程,下面来看下如何处理消费结果的。

/**,** @param msgs           消费的消息* @param status         消费的状态* @param context        消费上下文* @param consumeRequest 消费请求* @return 消费结果,表示上层消费者是否可以接着继续消费剩余的本地缓存里面的消息*/
public boolean processConsumeResult(final List<MessageExt> msgs,final ConsumeOrderlyStatus status,final ConsumeOrderlyContext context,final ConsumeRequest consumeRequest
) {// 默认是接着消费boolean continueConsume = true;long commitOffset = -1L;// 如果是自动提交, 默认就是 trueif (context.isAutoCommit()) {switch (status) {// 这两个状态被废弃了, 所以一般不会用, 如果消费监听器里面返回了这两个结果, 那么就代表这些消息成功消费, 走下面 SUCCESS 流程case COMMIT:case ROLLBACK:// 但是还是要打印下日志log.warn("the message queue consume result is illegal, we think you want to ack these message {}",consumeRequest.getMessageQueue());case SUCCESS:// 处理已经消费完的消息, 这个 commitOffset 就是下一条要消费的消息的起始偏移量commitOffset = consumeRequest.getProcessQueue().commit();// 统计消费成功的次数this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:// 消费失败, 统计消费失败的次数 + 1this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());// 校验重新消费的次数if (checkReconsumeTimes(msgs)) {// 如果没有到达最大消费次数// 又或者到达最大消费次数了, 但是发送延迟消息失败, 将正在消费的消息丢回本地缓存, 等待再次消费consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);// 延迟 1s 再次重新消费this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());// 消费失败, 本次消费请求不会再继续消费下面的消息continueConsume = false;} else {// 如果达到了最大重试消息且发送延迟 topic 成功了, 这里计算是消费成功了commitOffset = consumeRequest.getProcessQueue().commit();}break;default:break;}} else {// 手动提交switch (status) {case SUCCESS:// 消费成功, 只是统计消费成功的次数this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case COMMIT:// 返回 COMMIT 状态才会提交已消费的消息(consumingMsgOrderlyTreeMap), 意思是这批消息消费成功了commitOffset = consumeRequest.getProcessQueue().commit();break;case ROLLBACK:// 回滚正在消费的消息consumeRequest.getProcessQueue().rollback();// 延迟 1s 再次重新消费this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:// 消费失败, 消费失败次数 + 1this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());// 校验重新消费的次数if (checkReconsumeTimes(msgs)) {// 如果没有到达最大消费次数// 又或者到达最大消费次数了, 但是发送延迟消息失败, 将正在消费的消息丢回本地缓存, 等待再次消费consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);// 延迟 1s 再次重新消费, 这里就不需要使用 sendMessageBack 发送到 broker 了, 顺序消费者是自己本地消费的this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());// 消费失败, 本次消费请求不会再继续消费下面的消息continueConsume = false;}break;default:break;}}// 如果 commitOffset >= 0 并且处理队列没有被丢弃, 说明这个消息队列还是被当前消费者持有, 这下就可以放心更新消费者的本地缓存 offsetTable// 而这里第三个参数是 increaseOnly = false, 意思是 offsetTable 不会顺序递增, 也就是说 offsetTable 里面有可能 offset 会被更新成更小// 的 commitOffset, 但是由于顺序消费都是加锁了的, 且一个消息队列同一时间只会分配给一个消费者, 所以这里 increaseOnly = false 也没什么问题if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);}return continueConsume;
}

顺序消费情况下如果消息是自动提交的,那么消息成功 SUCCESS 的处理就是统计消费成功的条数,然后更新本地 offsetTable 里面的偏移量为下一条要消费的消息的起始偏移量,通过 commit 处理已经消费完的消息。

如果是 SUSPEND_CURRENT_QUEUE_A_MOMENT,比如业务自己返回的或者是消费过程出异常了,这种情况下记录消费失败次数 + 1,然后判断下消息消费重试有没有到上限了,如果没有,就挂起这条消息等待再次消费。

然后重点来了,顺序消息的消息重试是在本地完成的,通过 checkReconsumeTimes 检查消息重试次数如果没有到达上限,默认是 Integer.MAX_VALUE,也就是没有上限,可以通过下面消费者的方法来设置。

public void setMaxReconsumeTimes(final int maxReconsumeTimes) {this.maxReconsumeTimes = maxReconsumeTimes;
}

没有到达上限的情况下通过 makeMessageToConsumeAgain 将正在消费的消息重新丢回本地缓存中,然后等待再次消费,这个方法下面会讲,其实就是将 4.1 小节的 consumingMsgOrderlyTreeMap 集合里面的消息丢回 msgTreeMap 中。


5.1 checkReconsumeTimes 最大消费重试次数了

/*** 校验当前是否已经达到最大消费重试次数了, 默认 Integer.MAX_VALUE* @param msgs* @return*/
private boolean checkReconsumeTimes(List<MessageExt> msgs) {boolean suspend = false;if (msgs != null && !msgs.isEmpty()) {// 遍历所有消息for (MessageExt msg : msgs) {// 当前消息的重试次数是不是到达上限了, 默认是没有上限if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {// 如果达到重试次数了, 设置消息的 RECONSUME_TIME 属性为消息的重试次数MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));// 发送延迟消息, 延迟等级是 3 + 消费次数, 但是这里发送延时消息是不会在 broker 被重新消费了, 而是直接发送到死信队列if (!sendMessageBack(msg)) {// 发送失败(出异常, 一般都不会走到这), 挂起请求, 延迟继续重新消费suspend = true;// 设置消息的重新消费次数 + 1msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);}} else {// 如果没有到达上限, 挂起消费请求, 延迟继续重新消费suspend = true;// 设置消费次数 + 1msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);}}}// 是否挂起请求return suspend;
}

遍历所有消息,判断当前消息的重试次数是不是到达上限了,默认是没有上限,如果达到重试次数了,设置消息的 RECONSUME_TIME 属性为消息的重试次数。这里发送延迟消息,延迟等级是 3 + 消费次数,但是这里发送延时消息是不会在 broker 被重新消费了,而是直接发送到死信队列,因为已经到达了重试的上限。


5.2 makeMessageToConsumeAgain 消息回退

/*** 将消费过的消息重新丢回 msgTreeMap 里面后续再次等待重新消费* @param msgs*/
public void makeMessageToConsumeAgain(List<MessageExt> msgs) {try {// 加写锁this.treeMapLock.writeLock().lockInterruptibly();try {// 遍历所有消息for (MessageExt msg : msgs) {// 将消息从 consumingMsgOrderlyTreeMap 中删除this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());// 重新添加到 msgTreeMap 集合中, 等待重新消费this.msgTreeMap.put(msg.getQueueOffset(), msg);}} finally {// 解除写锁this.treeMapLock.writeLock().unlock();}} catch (InterruptedException e) {log.error("makeMessageToCosumeAgain exception", e);}
}

这里就是将消费过的消息重新丢回 msgTreeMap 里面后续再次等待重新消费,直接看注释即可。


5.3 commit 提交

/*** 顺序消费的提交方法, 在里面处理 consumingMsgOrderlyTreeMap, 删除已经消费完的消息, 返回下一条等待更新的 offset, 因为这个 commit* 方法就是在消息消费完之后调用的, 而 consumingMsgOrderlyTreeMap 存储的就是本轮消费的消息, 所以到这里就是 consumingMsgOrderlyTreeMap* 里面的消息都已经被消费完了, 且消费结果是 SUCCESS* @return*/
public long commit() {try {// 加写锁this.treeMapLock.writeLock().lockInterruptibly();try {// 最后一条消费完的消息的偏移量Long offset = this.consumingMsgOrderlyTreeMap.lastKey();// msgCount 减去已经消费完的消息数msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {// 遍历所有已消费的消息, 减去消息体大小msgSize.addAndGet(0 - msg.getBody().length);}// 清除已消费的消息this.consumingMsgOrderlyTreeMap.clear();if (offset != null) {// 返回下一条要消费的消息的起始偏移量return offset + 1;}} finally {// 解除写锁this.treeMapLock.writeLock().unlock();}} catch (InterruptedException e) {log.error("commit exception", e);}return -1;
}

commit 提交顺序消费的消息,提交的意思就是正在消费的 consumingMsgOrderlyTreeMap 里面的消息都确认消费成功了,所以就维护下几个本地的消息相关属性,再把 consumingMsgOrderlyTreeMap 清空,接着继续消费下一批消息。

上面也说了,顺序消费的消息重试是放到本地去完成的,所以需要这么一个集合来存储当前正在消费的是哪些消息,如果发生了重试就要将这个集合的消息重新放回 msgTreeMap 重新消费,如果没问题就调用 commit 将这部分消息给删掉,然后维护好剩余的几个变量,就是 msgCountmsgSize


6. 小结

好了,这篇文章就到这里了,主要讲了 ConsumeMessageOrderlyService 顺序消费消息的逻辑,顺序消息因为需要按添加的先后来消费,因此消费前需要加锁,流程会比 ConsumeMessageConcurrentlyService 复杂一点。下一篇文章就会讲一下消息消费失败之后,是怎么讲消息发送到重传队列的,也就是 sendMessageBack。





如有错误,欢迎指出!!!

http://www.lryc.cn/news/615697.html

相关文章:

  • 在windows安装colmap并在cmd调用
  • vue3前端项目cursor rule
  • 常用hook钩子函数
  • 海关 瑞数 失信企业 逆向 分析 后缀 rs
  • 从神经网络语言模型(NNLM)到Word2Vec:自然语言处理中的词向量学习
  • 【Html网页模板】炫酷科技风公司首页
  • Axure设计下的智慧社区数据可视化大屏:科技赋能社区管理
  • [0CTF 2016]piapiapia
  • PhotoDirector 安卓版:功能强大的照片编辑与美化应用
  • Dify集成 Echarts 实现智能数据报表集成与展示实战详解
  • 复杂项目即时通讯从android 5升级android x后遗症之解决 ANR: Input dispatching timed out 问题 -优雅草卓伊凡
  • 咪咕MGV3200-KLH_GK6323V100C_板号E503744_安卓9_短接强刷包-可救砖
  • WebAssembly技术详解:从浏览器到云原生的高性能革命
  • Flutter 与 Android NDK 集成实战:实现高性能原生功能
  • Vue3 组件化开发
  • Solana上Launchpad混战:新颖性应被重视
  • 一个“加锁无效“的诡异现象
  • BGP 笔记
  • Python 中的 Mixin
  • 第4章 程序段的反复执行2 while语句P128练习题(题及答案)
  • 【动态数据源】⭐️@DS注解实现项目中多数据源的配置
  • Datawhale AI夏令营第三期,多模态RAG方向 Task2
  • 深度学习入门Day8:生成模型革命——从GAN到扩散模型
  • pytorch llm 计算flops和参数量
  • Busybox编译、制作initramfs,并在QEMU中运行
  • 束搜索(Beam Search):原理、演进与挑战
  • Java -- 日期类-第一代-第二代-第三代日期
  • NLP:Transformer输出部分
  • 第十九天-输入捕获实验
  • AI编程工具 | Trae介绍