当前位置: 首页 > news >正文

Using Spring for Apache Pulsar:Publishing and Consuming Partitioned Topics

在下面的示例中,我们发布了一个名为hello pulser participated的主题。这是一个被分区的主题,对于这个示例,我们假设该主题已经创建了三个分区。

@SpringBootApplication
public class PulsarBootPartitioned {public static void main(String[] args) {SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");}@Beanpublic ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");return args -> {for (int i = 0; i < 10; i++) {pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());}};}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")public void listen(String message) {System.out.println("Message Received: " + message);}static class FooRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 0;}}static class BarRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 1;}}static class BuzzRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 2;}}}

在前面的示例中,我们发布到一个分区的主题,我们想将一些数据段发布到特定的分区。如果您将其保留为Pulsar的默认值,它将遵循分区分配的轮转模式,我们希望覆盖该模式。为此,我们提供了一个带有send方法的消息路由器对象。考虑实现的三个消息路由器。FooRouter始终将数据发送到分区0,BarRouter发送到分区1,BuzzRouter发送给分区2。还要注意,我们现在使用PulsarTemplate的sendAsync方法,该方法返回CompletableFuture。运行应用程序时,我们还需要将生产者上的messageRoutingMode设置为CustomPartition(spring.pulsinger.producer.message路由模式)。

在消费者端,我们使用具有独占订阅类型的PulsarListener。这意味着来自所有分区的数据最终都在同一个消费者中,并且没有订购保证。

如果我们希望每个分区由一个不同的消费者使用,我们该怎么办?我们可以切换到故障转移订阅模式,并添加三个单独的消费者:

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {System.out.println("Message Received 1: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {System.out.println("Message Received 2: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {System.out.println("Message Received 3: " + foo);
}

当你遵循这种方法时,一个分区总是被一个专用的消费者占用。

同样,如果你想使用Pulsar的共享消费者类型,你可以使用共享订阅类型。但是,当您使用共享模式时,您将失去任何排序保证,因为单个消费者可能会在另一个消费者有机会之前收到来自所有分区的消息。

考虑以下示例:

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {System.out.println("Message Received 1: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {System.out.println("Message Received 2: " + foo);
}

http://www.lryc.cn/news/583035.html

相关文章:

  • 飞算 JavaAI 智能编程助手 - 重塑编程新模态
  • bash 判断 /opt/wslibs-cuda11.8 是否为软连接, 如果是,获取连接目的目录并自动创建
  • (C++)任务管理系统(正式版)(迭代器)(list列表基础教程)(STL基础知识)
  • `fatal: bad config value for ‘color.ui‘`错误解决方案
  • ali linux 安装libreoffice
  • Markdown入门
  • 类和对象拓展——日期类
  • Django核心知识点详解:JSON、AJAX、Cookie、Session与用户认证
  • npu-smi info 华为昇腾NPU 状态监控工具解读
  • 类与对象【下篇】-- 关于类的其它语法
  • 树莓派vsftpd文件传输服务器的配置方法
  • 【02】MFC入门到精通——MFC 手动添加创建新的对话框模板
  • overleaf 改为XeLatex
  • Vue响应式原理四:响应式-监听属性变化
  • 正点原子学习 用户权限管理
  • 【python基础】运算符与布尔值全解析
  • 智慧航天运载体系全生命周期监测 | 图扑数字孪生
  • Shader面试题100道之(41-60)
  • 从0实现线性回归模型
  • vue3.2 前端动态分页算法
  • 「Java案例」打印数字金字塔
  • [Backlog] 核心协调器 | 终端用户界面(TUI)实现 | 多分支任务冲突解决 | 测试验证体系
  • 技术支持丨解决 ServBay 在 Windows 启动时反复提示安装 .NET 的问题
  • Python(30)基于itertools生成器的量子计算模拟技术深度解析
  • 使用LLaMA-Factory微调Qwen2.5-VL-3B 的目标检测任务-数据集格式转换(voc 转 ShareGPT)
  • 【洛谷题单】--顺序结构(一)
  • C++高频知识点(六)
  • [NOIP][C++]洛谷P1376 [USACO05MAR] Yogurt factory 机器工厂
  • LeetCode--42.接雨水
  • C++(STL源码刨析/vector)