当前位置: 首页 > news >正文

Apache Ignite 的分布式队列(IgniteQueue)和分布式集合(IgniteSet)的介绍

以下的内容是关于 Apache Ignite 的分布式队列(IgniteQueue)和分布式集合(IgniteSet 的介绍。它们是 Ignite 提供的分布式数据结构,让你可以在整个集群中像使用本地 BlockingQueueSet 一样操作共享的数据。

下面我们一步步深入理解这些概念。


🎯 一、一句话理解

Ignite 的 IgniteQueueIgniteSet 是跨多个服务器节点的“共享队列”和“共享集合”,多个节点可以同时安全地向队列中添加任务、取出任务,或对集合进行增删查操作。

✅ 类比:

  • IgniteQueue → 分布式版的 BlockingQueue,适合做任务分发、工作队列
  • IgniteSet → 分布式版的 HashSet,保证元素唯一性,适合做去重、白名单等。

🧩 二、基本使用

✅ 1. 创建一个分布式队列(IgniteQueue

Ignite ignite = Ignition.start();IgniteQueue<String> queue = ignite.queue("myQueue",           // 队列名称0,                   // 容量:0 表示无上限new CollectionConfiguration()  // 配置
);
  • 支持标准 BlockingQueue 操作:
    queue.put("task1");        // 阻塞插入
    String task = queue.take(); // 阻塞取出
    queue.offer("task2", 5, TimeUnit.SECONDS); // 带超时插入
    

✅ 2. 创建一个分布式集合(IgniteSet

IgniteSet<String> set = ignite.set("mySet", new CollectionConfiguration());
  • 支持标准 Set 操作:
    set.add("item1");
    set.contains("item1"); // true
    set.remove("item1");
    

✅ 两者都实现了 java.util.Collection 接口,所以你可以用 size(), isEmpty(), iterator() 等方法。


🔁 三、核心特性:Collocated vs. Non-Collocated(同地 vs. 非同地模式)

这是理解 Ignite 集合行为的关键!

模式中文说明
Collocated同地模式整个队列/集合的所有元素都存储在同一个节点上
Non-Collocated非同地模式队列/集合的数据被分片(partitioned) 到多个节点上

📌 什么时候用哪种?

场景推荐模式原因
有很多小队列(如每个用户一个队列)Collocated减少每个队列的分布开销,提高性能
只有 1~2 个大队列(如全局任务池)Non-Collocated数据均匀分布,避免单点压力
集合数据量很大(百万级)Non-Collocated分布式存储,扩展性好
集合很小但数量多(成千上万个)Collocated避免元数据过多,降低协调成本

🔧 如何设置?

CollectionConfiguration colCfg = new CollectionConfiguration();
colCfg.setCollocated(true); // 设置为同地模式IgniteQueue<String> queue = ignite.queue("myQueue", 0, colCfg);

⚠️ 注意:

  • Non-Collocated 模式仅支持 PARTITIONED 缓存模式
  • Collocated 模式下,虽然数据在一个节点,但会根据负载自动分配到不同节点(比如:队列1在NodeA,队列2在NodeB),实现负载均衡。

🚚 四、Cache Queues 和 负载均衡(Load Balancing)

这是 IgniteQueue 的一个经典应用场景分布式任务调度与负载均衡

🎯 场景设想:

你想让集群中的多个节点协同处理一批任务,而且希望:

  • 任务自动分发;
  • 每个节点只处理自己能承受的任务量(避免过载);
  • 任务不重复消费。

✅ 解决方案:用 IgniteQueue.take() 实现“工作窃取”模型

// 生产者节点:提交任务
IgniteQueue<Runnable> queue = ignite.queue("tasks", 0, null);
queue.put(() -> System.out.println("Processing job on remote node"));// 消费者节点(多个):持续取任务执行
while (true) {try {Runnable job = queue.take(); // 阻塞等待任务job.run(); // 执行任务} catch (InterruptedException e) {break;}
}

✅ 这种方式的优点:

特性说明
✅ 自动负载均衡处理快的节点会取更多任务,慢的节点取少一些
✅ 高可用某个消费者宕机,任务不会丢失(仍在队列中)
✅ 不重复消费take() 是原子操作,确保一个任务只被一个节点取走
✅ 弹性伸缩新节点加入后,自动开始消费任务

💡 这类似于 RabbitMQ + Worker 模式,但无需外部消息中间件!


⚙️ 五、CollectionConfiguration 配置详解

这是创建队列/集合时的高级配置项:

方法说明默认值
setCollocated(boolean)是否启用同地模式false
setCacheMode(CacheMode)底层缓存模式:
- PARTITIONED(分片)
- REPLICATED(全复制)
- LOCAL(本地)
PARTITIONED
setAtomicityMode(CacheAtomicityMode)原子性模式:
- ATOMIC(高性能)
- TRANSACTIONAL(支持事务)
ATOMIC
setOffHeapMaxMemory(long)堆外内存最大使用量(字节)0(不限)
setBackups(int)数据备份份数(高可用)0(无备份)
setNodeFilter(IgnitePredicate<ClusterNode>)自定义节点过滤器,决定数据存在哪些节点上null

🌰 示例:创建一个带备份的事务型队列

CollectionConfiguration cfg = new CollectionConfiguration();
cfg.setCollocated(false);
cfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setBackups(1); // 每个数据有1个备份IgniteQueue<String> queue = ignite.queue("safeQueue", 0, cfg);

✅ 这样即使一个节点宕机,队列中的任务也不会丢失。


🧪 六、实际应用场景

场景使用方式
🔧 分布式任务调度Runnable 或任务描述放入 IgniteQueue,Worker 节点 take() 执行
📦 消息广播/通知IgniteSet 存储已处理事件ID,防止重复处理
🧹 去重处理IgniteSet.add() 天然去重,适合爬虫URL去重、日志去重
📊 分布式计数器管理IgniteSet 存储活跃会话ID,统计在线用户数
🔄 工作流引擎用多个队列表示不同阶段的任务流(待处理 → 处理中 → 完成)

⚠️ 七、注意事项

  1. 序列化:放入队列的对象必须可序列化(实现 Serializable)。
  2. 性能take() 是阻塞调用,适合长期运行的消费者线程。
  3. 容量限制:虽然可以设为无界(0),但在生产环境建议设置上限,防止内存溢出。
  4. 持久化:默认在内存中,如需持久化需开启 Ignite 的原生持久化(Native Persistence)。
  5. 监控:可通过 Ignite Visor 或 JMX 监控队列长度、消费速度等。

✅ 总结:一句话掌握精髓

Ignite 的 IgniteQueueIgniteSet 是内建于内存数据网格的分布式集合,既能像本地集合一样使用,又能自动实现跨节点的数据共享、负载均衡和高可用,特别适合作为“轻量级任务队列”或“全局去重集合”使用。


🔄 对比其他技术

技术优点缺点适用场景
IgniteQueue内嵌、低延迟、无需外部依赖功能较简单轻量级任务分发
Kafka高吞吐、持久化、多订阅者复杂、延迟较高日志、事件流
RabbitMQ功能丰富(路由、重试)需独立部署企业级消息系统
Redis List + BRPOP快、常用单点风险(除非集群)简单任务队列

✅ 如果你已经在使用 Ignite 作为缓存或计算网格,直接用 IgniteQueue 是最自然的选择


如果你想实现一个“分布式爬虫任务队列”或“在线用户统计系统”,我可以为你提供完整的代码示例!欢迎继续提问。

http://www.lryc.cn/news/604161.html

相关文章:

  • windows下Docker安装路径、存储路径修改
  • Element Plus常见基础组件(一)
  • 网络协议——MPLS(多协议标签转发)
  • Day23-二叉树的层序遍历(广度优先搜素)
  • 基于dcmtk的dicom工具 第九章 以json文件或sqlite为数据源的worklist服务(附工程源码)
  • Mqttnet的MqttClientTlsOptions.CertificateValidationHandler详解
  • SQL 怎么学?
  • SQLAlchemy 全方位指南:从入门到精通
  • Linux初学者在CentOS 7虚拟机中rpm、yum、dnf的操作练习
  • PCIE4.0/5.0/DDR4/DDR5使用以及布局布线规则-集萃
  • 14、distance_object_model_3d算子
  • 粒子群优化算法(Particle Swarm Optimization, PSO) 求解二维 Rastrigin 函数最小值问题
  • 三相四桥臂SVPWM控制及电机模型
  • Excel制作滑珠图、哑铃图
  • CSRF漏洞原理及利用
  • 子数组和 问题汇总
  • Mysql缓冲池和LRU
  • Accessibility Insights for Windows 使用教程
  • Adv. Sci. 前沿:非零高斯曲率3D结构可逆转换!液晶弹性体多级形变新策略
  • Javaweb————HTTP请求头属性讲解
  • [leetcode] 电话号码的排列组合
  • Vue El 基础
  • PyTorch 数据类型和使用
  • 第二课 P-MOS管应用
  • LRU(Least Recently Used)原理及算法实现
  • 【SQL】Windows MySQL 服务查询启动停止自启动(保姆级)
  • DAY27 函数专题2:装饰器
  • Android 解决键盘遮挡输入框
  • 老年护理实训室建设方案:打造安全、规范、高效的实践教学核心平台
  • C++ 编程问题记录