当前位置: 首页 > news >正文

Spring for Apache Pulsar->Reactive Support->Message Production

好消息:Spring for Apache Pulsar这两天刚刚升到2.0.0版本

1. ReactivePulsarTemplate

在Pulsar生产者端,Spring Boot自动配置提供了一个ReactivePulsarTemplate用于发布记录。该模板实现了一个名为ReactivePulse Operations的接口,并提供了通过其合约发布记录的方法。

该模板提供了send方法,可以接受单个消息并返回Mono<MessageId>。它还提供了send方法,可以接受多条消息(以ReactiveStreams Publisher类型的形式)并返回Flux<MessageId>。

对于不包含主题参数的API变体,将使用主题解析过程来确定目标主题。

1.1. Fluent API

该模板提供了一个流畅的构建器来处理更复杂的发送请求。

1.2. Message customization

您可以指定MessageSpecBuilderCustomizer来配置传出消息。例如,以下代码显示了如何发送键控消息:

template.newMessage(msg).withMessageCustomizer((mc) -> mc.key("foo-msg-key")).send();

1.3. Sender customization

您可以指定一个ReactiveMessageSenderBuilderCustomizer来配置底层Pulsar发送器生成器,该生成器最终构建用于发送传出消息的发送器。

请谨慎使用,因为这可以完全访问发送方构建器,调用其某些方法(如create)可能会产生意想不到的副作用。

例如,以下代码显示了如何禁用批处理和启用分块:

template.newMessage(msg).withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false)).send();

另一个示例显示了如何在将记录发布到分区主题时使用自定义路由。在发送方构建器上指定自定义MessageRouter实现,例如:

template.newMessage(msg).withSenderCustomizer((sc) -> sc.messageRouter(messageRouter)).send();

请注意,使用MessageRouter时,spring.pulsar.producter.message-routing-mode的唯一有效设置是自定义。

2. Specifying Schema Information

如果您使用Java基元类型,框架会自动为您检测模式,您不需要指定任何模式类型来发布数据。对于非基元类型,如果在ReactivePulsarTemplate上调用send操作时没有明确指定Schema,则Spring For Apache Pulsar框架将尝试构建Schema。JSON类型。

目前支持的复杂模式类型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和带内联编码的KEY_VALUE。

2.1. Custom Schema Mapping

作为在ReactivePulse Template上为复杂类型调用发送操作时指定模式的替代方法,可以使用类型的映射配置模式解析器。这消除了在框架使用传出消息类型咨询解析器时指定模式的需要。

2.1.1. Configuration properties

模式映射可以使用spring.pulsar.defaults.type-mappings属性进行配置。以下示例使用application.yml分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON

消息类型是消息类的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首选方法是通过上述属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。

以下示例使用模式解析器定制器分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation

指定用于特定消息类型的默认模式信息的另一种选择是用@PulsarMessage注释标记消息类。可以通过注释上的schemaType属性指定架构信息。

以下示例将系统配置为在生成或使用Foo类型的消息时使用JSON作为默认模式:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要在发送操作上设置或指定模式。

2.2. Producing with AUTO_SCHEMA

如果没有机会提前知道Pulsar主题的模式类型,您可以使用AUTO_PRODUCE模式将原始JSON或Avro有效载荷安全地发布为byte[]。

在这种情况下,生产者会验证出站字节是否与目标主题的模式兼容。

只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()发送操作如下例所示:

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}

这仅支持Avro和JSON模式类型。

3. ReactivePulsarSenderFactory

ReactivePulsarTemplate依赖于ReactivePulse SenderFactory来实际创建底层发送方。

Spring Boot提供了这个发送器工厂,可以配置任何Spring.pulser.producer.*应用程序属性。

如果直接使用发送方工厂API时未指定主题信息,则使用ReactivePulse Template使用的相同主题解析过程,但省略了“消息类型默认”步骤。

3.1. Producer Caching

每个底层Pulsar生产者都会消耗资源。为了提高性能并避免持续创建生产者,底层Apache Pulsar Reactive客户端中的ReactiveMessageSenderCache缓存了它创建的生产者。它们以LRU方式缓存,并在配置的时间段内未被使用时被驱逐。

您可以通过指定任何spring.pulsinger.producer.cache.*应用程序属性来配置缓存设置。

http://www.lryc.cn/news/584381.html

相关文章:

  • 生产环境CI/CD流水线构建与优化实践指南
  • 访问Windows服务器备份SQL SERVER数据库
  • 网安-解决pikachu-rce乱码问题
  • NFS文件存储及部署论坛(小白的“升级打怪”成长之路)
  • G5打卡——Pix2Pix算法
  • 前缀和|差分
  • 【部分省份已考真题】备战2025全国青少年信息素养大赛-算法创意实践挑战赛c++省赛/复赛真题——被污染的药剂
  • Expected Sarsa 算法的数学原理
  • Flask 入门教程:用 Python 快速搭建你的第一个 Web 应用
  • Go语言包管理完全指南:从基础到最佳实践
  • UECC-UE连接协调的运作方式
  • 【会员专享数据】2013-2024年我国省市县三级逐月SO₂数值数据(Shp/Excel格式)
  • 2025年最新Python+Playwright自动化测试- 隐藏元素定位与操作
  • DSP的基础平台搭建
  • 24、企业设备清单管理(Equipment)详解:从分类到管理,设备全生命周期把控
  • 虚拟环境已安装该包,且已激活,但报错
  • 介绍 cnpm exec electron-packager
  • Apache http 强制 https
  • Qt使用脚本实现GUI扩展技术详解
  • Android View 绘制流程 优化 (Bitmap 复用+内容变化检测+防抖调度策略)
  • Canny边缘检测(cv2.Canny())
  • 2025年语言处理、大数据与人机交互国际会议(DHCI 2025)
  • MD5有什么特点吗
  • Linux入门篇学习——Linux 工具之 make 工具和 makefile 文件
  • fastMCP基础(一)
  • 如何将多个.sql文件合并成一个:Windows和Linux/Mac详细指南
  • STM32F103C8T6驱动无源蜂鸣器详解:从硬件设计到音乐播放
  • 【研报复现】方正金工:(1)适度冒险 因子
  • Boost.Asio学习(3):异步读写
  • Pytest之收集用例规则与运行指定用例