Using Spring for Apache Pulsar:Message Production
1. Pulsar Template
在Pulsar生产者端,Spring Boot自动配置提供了一个用于发布记录的PulsarTemplate。该模板实现了一个名为PulsarOperations的接口,并提供了通过其合约发布记录的方法。
这些send API方法有两类:send和sendAsync。send方法通过Pulsar生成器上的同步发送功能阻止调用。它们返回消息在代理上持久化后发布的消息的MessageId。sendAsync方法调用是非阻塞的异步调用。它们返回一个CompletableFuture,您可以在消息发布后使用它异步接收消息ID。
对于不包含主题参数的API变体,将使用主题解析过程来确定目标主题。
1.1. Simple API
该模板为简单的发送请求提供了一些方法(前缀为“send”)。对于更复杂的发送请求,流畅的API可以让您配置更多选项。
1.2. Fluent API
该模板提供了一个流畅的构建器来处理更复杂的发送请求。
1.3. Message customization
您可以指定一个TypedMessageBuilderCustomizer来配置传出消息。例如,以下代码显示了如何发送键控消息:
template.newMessage(msg).withMessageCustomizer((mb) -> mb.key("foo-msg-key")).send();
1.4. Producer customization
您可以指定一个ProducerBuilderCustomizer来配置底层Pulsar生产者构建器,该生成器最终构建用于发送传出消息的生产者。
请谨慎使用,因为这可以完全访问生产者构建器,调用其某些方法(如create)可能会产生意想不到的副作用。
例如,以下代码显示了如何禁用批处理和启用分块:
template.newMessage(msg).withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false)).send();
另一个示例显示了如何在将记录发布到分区主题时使用自定义路由。在Producer构建器上指定自定义MessageRouter实现,例如:
template.newMessage(msg).withProducerCustomizer((pb) -> pb.messageRouter(messageRouter)).send();
请注意,使用MessageRouter时,spring.pulsar.producter.message-routing-mode的唯一有效设置是自定义。
另一个示例显示了如何添加一个ProducerInterceptor,该拦截器将拦截和修改生产者在发布到代理之前收到的消息:
template.newMessage(msg).withProducerCustomizer((pb) -> pb.intercept(interceptor)).send();
定制程序将仅适用于用于发送操作的生产者。如果要将自定义程序应用于所有生产商,则必须按照全球生产商自定义中的描述将其提供给生产商工厂。
2. Specifying Schema Information
如果您使用Java基元类型,框架会自动为您检测模式,您不需要指定任何模式类型来发布数据。对于非基元类型,如果在PulsarTemplate上调用send操作时没有明确指定Schema,则Spring For Apache Pulsar框架将尝试构建Schema。JSON类型。
目前支持的复杂模式类型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和带内联编码的KEY_VALUE。
2.1. Custom Schema Mapping
作为在PulsarTemplate上调用复杂类型的发送操作时指定模式的替代方法,模式解析器可以配置类型的映射。这消除了在框架使用传出消息类型咨询解析器时指定模式的需要。
2.1.1. Configuration properties
模式映射可以使用spring.pulsar.defaults.type-mappings属性进行配置。以下示例使用application.yml分别使用AVRO和JSON模式为User和Address复杂对象添加映射:
spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON
消息类型是消息类的完全限定名。
2.1.2. Schema resolver customizer
添加映射的首选方法是通过上述属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。
以下示例使用模式解析器定制器分别使用AVRO和JSON模式为User和Address复杂对象添加映射:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation
指定用于特定消息类型的默认模式信息的另一种选择是用@PulsarMessage注释标记消息类。可以通过注释上的schemaType属性指定架构信息。
以下示例将系统配置为在生成或使用Foo类型的消息时使用JSON作为默认模式:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置,就不需要在发送操作上设置或指定模式。
2.2. Producing with AUTO_SCHEMA
如果没有机会提前知道Pulsar主题的模式类型,您可以使用AUTO_PRODUCE模式将原始JSON或Avro有效载荷安全地发布为byte[]。
在这种情况下,生产者会验证出站字节是否与目标主题的模式兼容。
只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()发送操作如下例所示:
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅支持Avro和JSON模式类型。
3. Pulsar Producer Factory
PulsarTemplate依赖于PulsarProducerFactory来实际创建底层生产者。Spring Boot自动配置还提供了这个生产者工厂,您可以通过指定任何Spring.pulser.producer.*应用程序属性来进一步配置它。
如果在直接使用生产者工厂API时没有指定主题信息,则使用PulsarTemplate使用的相同主题解析过程,唯一的例外是省略了“消息类型默认”步骤。
3.1. Global producer customization
该框架提供了ProducerBuilderCustomizer合约,该合约允许您配置用于构建每个生产者的底层构建器。要自定义所有生产者,您可以将自定义者列表传递给PulsarProducerFactory构造函数。使用多个自定义程序时,它们将按照在列表中显示的顺序应用。
如果您使用Spring Boot自动配置,您可以将自定义程序指定为bean,它们将根据其@Order注释自动传递给PulsarProducerFactory。
如果您只想将自定义程序应用于单个生产商,您可以使用Fluent API并在发送时指定自定义程序。
4. Pulsar Producer Caching
每个底层Pulsar生产者都会消耗资源。为了提高性能并避免不断创建生产者,生产者工厂会缓存它创建的生产者。它们以LRU方式缓存,并在配置的时间段内未被使用时被驱逐。缓存键由足够的信息组成,以确保在后续的创建请求中,调用者返回相同的生产者。
此外,您可以通过指定任何spring.pulsinger.producer.cache.*应用程序属性来配置缓存设置。
4.1. Caution on Lambda customizers
任何用户提供的生产者定制器也包含在缓存密钥中。由于缓存键依赖于equals/hashCode的有效实现,因此在使用Lambda自定义程序时必须谨慎。
规则:实现为Lambdas的两个自定义程序将在equals/hashCode上匹配,当且仅当它们使用相同的Lambda实例并且不需要在其闭包外定义任何变量时。
为了澄清上述规则,我们将看几个例子。在下面的示例中,定制器被定义为内联Lambda,这意味着对sendUser的每次调用都使用相同的Lambda实例。此外,它不需要闭包外的变量。因此,它将作为缓存键匹配。
void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName("user")).send();
}
在下一种情况下,定制器被定义为内联Lambda,这意味着对sendUser的每次调用都使用相同的Lambda实例。但是,它需要一个闭包外的变量。因此,它将不匹配为缓存键。
void sendUser() {var user = randomUser();var name = randomName();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName(name)).send();
}
在最后一个例子中,定制器被定义为内联Lambda,这意味着对sendUser的每次调用都使用相同的Lambda实例。虽然它确实使用了一个变量名,但它并非源自其闭包之外,因此将作为缓存键进行匹配。这说明变量可以在Lambda闭包中使用,甚至可以调用静态方法。
void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> {var name = SomeHelper.someStaticMethod();b.producerName(name);}).send();
}
规则:如果你的Lambda定制器不是只定义一次(在后续调用中使用相同的实例),或者它需要在闭包之外定义变量,那么你必须提供一个具有有效equals/hashCode实现的定制器实现。
如果不遵守这些规则,那么生产者缓存将始终丢失,您的应用程序性能将受到负面影响。
5. Intercept Messages on the Producer
添加ProducerInterceptor可以让您在生产者接收到的消息发布到代理之前对其进行拦截和修改。为此,您可以将拦截器列表传递给PulsarTemplate构造函数。使用多个拦截器时,应用它们的顺序是它们在列表中出现的顺序。
如果使用Spring Boot自动配置,则可以将拦截器指定为Beans。它们会自动传递给PulsarTemplate。拦截器的排序是通过使用@Order注释实现的,如下所示:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {...
}@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {...
}
如果您没有使用启动器,则需要自己配置和注册上述组件。