当前位置: 首页 > news >正文

RocketMQ的一些使用理解

1.RocketMQ的生产者生产负载策略(3种)
(1)SelectMessageQueueByHash (一致性hash)
(2)SelectMessageQueueByMachineRoom (机器随机)
(3)SelectMessageQueueByRandom (随机)
在这里插入图片描述
第1种一致性hash算法是对所有的队列进行hash计算
缺点:(1)如果出现队列增减可能会导致顺序消息断层。
(2)在多broker情况下,可能导致broker分布不均匀,即我加了一个broker维度的hash分层。

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;/*** 参考SelectMessageQueueByHash,很大可能放在同一个broker上了。* 我们hash选择broker来避免这个情况*/
public class SelectMessageQueueByBrokerHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Map<String, Map<Integer, MessageQueue>> brokerQueueMap = new HashMap<>(16);mqs.forEach(queue -> brokerQueueMap.computeIfAbsent(queue.getBrokerName(), key -> new HashMap<>(32)).put(queue.getQueueId(), queue));List<String> brokerNames = new ArrayList<>(brokerQueueMap.keySet());Collections.sort(brokerNames);// hash选择broker,再hash选择queueint brokerIndex = indexForBroker(arg, brokerNames);Map<Integer, MessageQueue> queueMap = brokerQueueMap.get(brokerNames.get(brokerIndex));int queueIndex = indexForQueue(arg, queueMap);return queueMap.get(queueIndex);}private int indexForBroker(Object arg, List<String> brokerNames) {int hashCode = arg.hashCode();hashCode = hashCode < 0 ? Math.abs(hashCode) : hashCode;return hashCode % brokerNames.size();}/*** 参考one to one hash算法* https://blog.51cto.com/u_14398214/5076158*/private int indexForQueue(Object arg, Map<Integer, MessageQueue> queueMap) {String key = String.valueOf(arg);int hash, i;for (hash = 0, i = 0; i < key.length(); ++i) {hash += key.charAt(i);hash += (hash << 10);hash ^= (hash >> 6);}hash += (hash << 3);hash ^= (hash >> 11);hash += (hash << 15);hash = hash < 0 ? Math.abs(hash) : hash;return hash % queueMap.size();}
}

2.consumer消费负载均衡策略:
(1)默认采用平均分配方法来实现负载均衡
 如果consumer个数和queue数不对等时:
consumer个数比queue个数多,多个consumer消费一个queue
consumer个数和queue个数一样,一个consumer消费一个queue
consumer个数比queue个数少,一个consumer消费多个queue
(2)AllocateMessageQueueConsistentHash:一致性哈希
何时reblance:
(1)当一个consumer宕机最多20秒执行reblance,新consumer重新消费
(2)当有新consumer接入时,立即执行reblance。

参考:
https://www.cnblogs.com/jijiecong/p/15182736.html

http://www.lryc.cn/news/24332.html

相关文章:

  • Java枚举详解
  • 虚拟机上安装openKylin详细步骤总结
  • 夜天之书 #74 企业开源的软件协议模型实践(Part 2)
  • 2.webpack实时打包
  • KingbaseES V8R3 表加密
  • 2 为社么软件架构很重要?
  • Python 之 Pandas merge() 函数、set_index() 函数、drop_duplicates() 函数和 tolist() 函数
  • MySQL实战之深入浅出索引(下)
  • (二分查找)leetcode1539. 第 k 个缺失的正整数
  • yaml文件格式详解及实例
  • AOP在PowerJob中的使用,缓存锁保证并发安全,知识细节全总结
  • 对账平台设计
  • JavaEE进阶第五课:SpringBoot的创建和使用
  • 我带过的一名C++实习生——Z同学
  • 面试题13. 机器人的运动范围
  • LeetCode189_189. 轮转数组
  • java Files和Paths的使用详解 附有使用demo
  • 如何使用ApacheTomcatScanner扫描Apache Tomcat服务器漏洞
  • js中的定时器 setTimeout()和setInterval()
  • 【吃透Js】深入学习浅拷贝和深拷贝
  • AUTOSAR为啥要开发新的社区商业模式?
  • 数据结构和算法面试常见题必考以及前端面试题
  • 一文解决Python所有报错
  • LeetCode 1237. Find Positive Integer Solution for a Given Equation【双指针,二分,交互】
  • 【C语言】结构体进阶
  • 全志T3+FPGA国产核心板——Pango Design Suite的FPGA程序加载固化
  • 深度学习之 imgaug (图像增强)学习笔记
  • mysql字符串等值查询中条件字段值末尾有空格也能查到数据问题
  • 一个关于事件溯源Event Sourcing的小荔枝,Golang实现
  • Vue3 组合式函数,实现minxins