【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响
文章目录
- 1. 前言
- 2. 消费者 ID 重复
- 3. 消费者连接存储
- 4. 查询消费者组下面的所有连接
- 5. AllocateMessageQueueAveragelyByCircle 策略
- 6. AllocateMessageQueueAveragely 策略
- 7. 示例
- 8. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 消费者启动源码
- 【RocketMQ 生产者和消费者】- 消费者重平衡(1)
- 【RocketMQ 生产者和消费者】- 消费者重平衡(2)- 分配策略
上两篇文章讲解了消费者重平衡的源码以及几种重平衡策略,这篇文章我们来看下消费者 ID 对重平衡的影响。
2. 消费者 ID 重复
public interface AllocateMessageQueueStrategy {List<MessageQueue> allocate(final String consumerGroup,final String currentCID,final List<MessageQueue> mqAll,final List<String> cidAll);String getName();
}
上面消费者重平衡接口可以看到传入的是消费者的 clientId,也就是客户端 id,这个 id 在前面消费者启动的时候就会设置,在当前版本会被替换为 PID + 当前时间,所以重平衡没什么问题。
但是像 4.7.1 这些之前的版本是没有当前时间的,只有当前进程名称。
public void changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) {this.instanceName = String.valueOf(UtilAll.getPid());}
}
那么如果进程 PID 相同会有什么问题吗,我们可以看下消费者的注册过程。
3. 消费者连接存储
broker 处理消费者心跳是通过 heartBeat 方法处理的,可以看到在里面会创建一个 ClientChannelInfo,用于标识生产者或者消费者的连接,可以看到里面的 heartbearData.getClientID 就是客户端 ID。
创建好了 ClientChannelInfo 之后,传入到 registerConsumer 来处理,在里面会通过 updateChannel 方法更新消费者下面的连接集合 channelInfoTable
,可以说消费者组信息里面的 channelInfoTable 存储了这个消费者组下面的消费者连接。
// 消费者组下面的连接信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
由于 key 是 Channel,所以相同 ID 的消费者不会被覆盖。消费者连接的存储就到这,在负载均衡的时候消费者会通过 findConsumerIdList 方法查询出这个消费者组下面的所有消费者。
4. 查询消费者组下面的所有连接
/*** 从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合** 1.获取topic下的所有consumer* 2.先通过topic随机找到一个broker* 3.然后通过这个broker找对应group下面的所有consumerId** @param topic* @param group* @return*/
public List<String> findConsumerIdList(final String topic, final String group) {// 随机选择一个当前topic所属的brokerString brokerAddr = this.findBrokerAddrByTopic(topic);if (null == brokerAddr) {// 如果broker地址为null则请求nameserver更新topic路由信息this.updateTopicRouteInfoFromNameServer(topic);brokerAddr = this.findBrokerAddrByTopic(topic);}if (null != brokerAddr) {try {// 根据brokerAddr和group 得到消费者客户端id列表, 所以可以看出来,一个group里面的客户端都需要订阅同一个topicreturn this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);} catch (Exception e) {log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);}}return null;
}
这个方法会随机选择一个 broker 集群的 master 地址,然后通过 getConsumerIdListByGroup 方法来发送请求获取这个消费者组下面的全部消费者客户端 id 集合。所以可以看得出来,一个消费者组下面的消费者都需要订阅同一个 topic,因为这里是以 group 维度去获取消费者的,如果这个消费者组里面有一个消费者订阅了 topicB,那么这个消费者注册的时候就会注册到 topicB 所在的 broker 集群,有可能这里就获取不到这个消费者了。
/*** 根据brokerAddr和group 得到消费者客户端id列表*/
public List<String> getConsumerIdListByGroup(final String addr,final String consumerGroup,final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,MQBrokerException, InterruptedException {// 构建请求头GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();requestHeader.setConsumerGroup(consumerGroup);// 构建请求命令对象,Code为GET_CONSUMER_LIST_BY_GROUPRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);// 发起同步调用RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {if (response.getBody() != null) {// 响应解码GetConsumerListByGroupResponseBody body =GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);// 返回客户端id列表return body.getConsumerIdList();}}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}
看上面代码,获取消费者集合的请求 Code 是 GET_CONSUMER_LIST_BY_GROUP,获取到返回结果之后返回 consumerIdList 集合,客户端处理这个请求是通过 getConsumerListByGroup 来处理的。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.GET_CONSUMER_LIST_BY_GROUP:// 返回指定group的所有客户端id集合return this.getConsumerListByGroup(ctx, request);case RequestCode.UPDATE_CONSUMER_OFFSET:// 更新消费偏移量return this.updateConsumerOffset(ctx, request);case RequestCode.QUERY_CONSUMER_OFFSET:// 查询消费偏移量return this.queryConsumerOffset(ctx, request);default:break;}return null;
}/*** 返回指定group的所有客户端id集合*/
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 创建响应命令对象final RemotingCommand response =RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);// 解析请求头final GetConsumerListByGroupRequestHeader requestHeader =(GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);// 从broker的consumerTable中获取指定group的消费者组信息ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (consumerGroupInfo != null) {// 获取所有客户端id集合List<String> clientIds = consumerGroupInfo.getAllClientId();if (!clientIds.isEmpty()) {GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();body.setConsumerIdList(clientIds);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;} else {log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}} else {log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());return response;
}
在 getConsumerListByGroup 方法中会先通过消费者组获取这个消费者组信息 consumerGroupInfo,然后通过 getAllClientId 方法获取这个这个消费者组下面的消费者集合。
public List<String> getAllClientId() {List<String> result = new ArrayList<>();Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();while (it.hasNext()) {Entry<Channel, ClientChannelInfo> entry = it.next();ClientChannelInfo clientChannelInfo = entry.getValue();result.add(clientChannelInfo.getClientId());}return result;
}
可以看到获取消费者 id 就是直接遍历 channelInfoTable 集合,然后获取消费者的 clientId,所以如果两个消费者的 clientId 相同,那么这里 result 里面就会存储两个相同的 clientId。
5. AllocateMessageQueueAveragelyByCircle 策略
下面就以 AllocateMessageQueueAveragelyByCircle 为例子来看下如果 cidAll 里面有重复的 clientId 会发生什么,再来回顾下这个分配策略的源码。
/*** 平均分配策略,环形分配* 按照消费者的顺序进行一轮一轮的分配,直到分配完所有消息队列。例如有消费者A、B,有5个消息队列1、2、3、4、5。第一轮A分配1,B分配2;* 第二轮A分配3,B分配4;第二轮A分配5。因此A分配到1、3、5,B分配到2、4。* @param consumerGroup current consumer group* @param currentCID current consumer id* @param mqAll message queue set in current topic* @param cidAll consumer set in current consumer group* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {// 参数校验if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 索引int index = cidAll.indexOf(currentCID);// 获取每个分配轮次轮次中属于该消费者的对应的消息队列for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {result.add(mqAll.get(i));}}return result;
}
由于两个消费者 ID 是一样的,所以求出来的 index 也是相同的,所以分配的队列也是一样的,我们可以用一个 main 方法模拟下。
public static void main(String[] args) {AllocateMessageQueueAveragelyByCircle averagelyByCircle = new AllocateMessageQueueAveragelyByCircle();List<MessageQueue> queues = new ArrayList<MessageQueue>();for(int i = 0; i < 8; i++){queues.add(new MessageQueue("topic-test", "broker-a", i));}System.out.println(averagelyByCircle.allocate("group", "0", queues, Arrays.asList("0", "2", "2")));System.out.println(averagelyByCircle.allocate("group", "2", queues, Arrays.asList("0", "2", "2")));System.out.println(averagelyByCircle.allocate("group", "2", queues, Arrays.asList("0", "2", "2")));
}
上面的 main 方法创建了 8 个队列,然后消费者组下面有 3 个消费者,id 是 [0,2,2],可以看到,消费者-0 的队列是 [0,3,6],两个消费者-2 分配的队列是 [1,4,7],这是因为两个消费者-2 求出来的下标都是 1,所以从 1 开始分配,因此 i % cidAll.size() == index
这个判断条件也都是一样的,所以最终分配的队列都是一样的。
对于消费者来说一个消息队列分配给了多个消费者,这样就有可能导致重复消费问题,而对于消息队列来说由于消息队列 2 和消息队列 5 没有分配给其他消费者,因此这个消息队列上面的消息不会被消费,导致消息堆积。
6. AllocateMessageQueueAveragely 策略
上面我们看了 AllocateMessageQueueAveragelyByCircle 策略,下面再来看下 AllocateMessageQueueAveragely,因为这个策略是默认的分配策略,也就是用户没有自己设定的情况下用的就是 AllocateMessageQueueAveragely。
/*** 负载均衡分配队列, 平均分配* @param consumerGroup 当前消费者组* @param currentCID 当前消费者的 clientID* @param mqAll 当前 topic 的所有队列* @param cidAll 当前消费者组的所有消费者的 clientID* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {// 参数校验if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}// 分配结果List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {// 如果这个消费者不是传入的消费者组下的, 有可能是刚启动没注册到 brokerlog.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);// 就不分配队列处理了return result;}// 假设现在有 8 个队列, 要负载均衡给 3 个消费者 [0, 1, 2], 而当前消费者[1]的 index = 1// 那么在分配的时候三个消费者分配的队列标号就是 [0, 1, 2], [3, 4, 5], [6, 7]// 这里就是求出来当前消费者 ID 所在的位置 = 1int index = cidAll.indexOf(currentCID);// 队列数 % 消费者数 = 剩余队列数 = 2int mod = mqAll.size() % cidAll.size();// 这里求平均数, 求出来的结果就是 8 / 3 + 1 = 3// 1.如果队列数小于消费者数, 平均数就是 1// 2.如果队列数大于消费者数, 并且当前队列的下标在 (0, mod) 这个范围, 那么平均数就是 mqAll.size() / cidAll.size() + 1// 3.如果队列数大于消费者数, 并且当前队列的下标在 [mod, cidAll.size()) 这个范围, 那么平均数就是 mqAll.size() / cidAll.size()int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());// 计算当前消费者从哪里开始分配队列, 这里求出来的就是 1 * 3 = 3int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;// 这里求出来当前消费者要分配多少个队列, 比如当前就是分配 3 个队列int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {// 开始分配队列, 分配的起始位是是 startIndex, 分配的队列数量是 rangeresult.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;
}
由于两个消费者 id 是一样的,所以 index 也是一样的,同样的 averageSize 也是一样的,startIndex 也是一样的,因此求出的队列也都是一样的。根据上一篇文章我们说过的消费者重平衡策略,这两个消费者分配到的队列是:[3,4,5]。
7. 示例
所以如果集合里面有多个消费者 id 相同,会导致这些消费者分配到的消息队列都一样,而且消息队列集合里面也有一些队列是没办法分配到消费者的,会导致消息堆积,现在我们可以模拟下真实情况,比如我们可以启动两个消费者消费同一个 topic,看看这两个消费者的分配队列输出情况,下面先来看下代码。
首先在 AllocateMessageQueueAveragely#allocate 的最后一行代码中加上打印日志。
然后我们启动消费者,消费者 ID 如果我们没有设置,默认就是 PID + 当前时间戳,不过可以通过 rocketmq.client.name
设置实例名称,如果不设置默认就是 DEFAULT。
不设置的情况下就会更改,如果设置了 instanceName,那么就用我们自己设置的。
所以我们启动两个消费者,一个是 Consumer-A,一个是 Consumer-B,下面是消费者的代码:
public class ConsumerA {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}public class ConsumerB {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}
依次启动 NameServer、Broker、ConsumerA,ConsumerB,同时启动消费者 A 的时候设置参数 -Drocketmq.client.name=ConsumerA
,另一个消费者 B 启动设置 -Drocketmq.client.name=ConsumerB
,看消费者的输出:
可以看到,两个消费者平均分配了 [0,1,2,3] 四个队列,大家可能觉得奇怪,为什么输出 clientId 的时候前面还有一个 ip,那是因为我们设置的是 instanceName,真正获取 clientId 的时候还得拼接上 ip,不过我们是同一台机器启动的,所以 ip 也是相同的,只需要关注 instanceName 即可。
下面我们将两个消费者的 rocketmq.client.name=ConsumerA,也就是实例名称相同,下面是两个消费者的输出。
可以看到,这两个消费者 id 相同的情况下分配到的队列 id 都是一样的,而且可以发现 queueId = 2、3 的这两个队列没有分配给任何一个消费者,这也证明了我们上面的推测。
8. 小结
好了,这篇文章我们讲述了消费者 clientId 对消费者重平衡的影响,相同的 clientId 会导致消息队列分配不均衡,导致有一些队列没办法分配到消费者。
如有错误,欢迎指出!!!!