当前位置: 首页 > news >正文

2、Connecting to Kafka

KafkaAdmin-请参阅配置主题

ProducerFactory-请参阅发送消息

ConsumerFactory-请参阅接收消息

从2.5版本开始,每个版本都扩展了KafkaResourceFactory。这允许在运行时通过向引导服务器的配置中添加Supplier<String>来更改引导服务器:setBootstrapServersSupplier(()->…)。所有新连接都将调用此命令以获取服务器列表。消费者和生产者通常寿命较长。要关闭现有的生产者,请在DefaultKafkaProducerFactory上调用reset()。要关闭现有的Consumers,请在KafkaListenerEndpointRegistry上调用stop()(然后调用start())和/或在任何其他侦听器容器bean上调用stop()和start()。

为了方便起见,该框架还提供了一个支持两组引导服务器的ABSwitch集群;其中一个在任何时候都是活动的。通过调用setBootstrapServersSupplier()配置ABSwitchCluster并将其添加到生产者和消费者工厂以及KafkaAdmin。当你想切换时,在生产者工厂上调用primary()或secondary()并调用reset()来建立新的连接;对于消费者,stop()和start()都是监听器容器。使用@KafkaListeners时,停止()和启动()KafkaListenerEndpointRegistry bean。

有关更多信息,请参阅Javadocs。

Factory Listeners

从2.5版本开始,DefaultKafkaProducerFactory和DefaultKafkaConsumerFactory可以配置一个监听器,以便在创建或关闭生产者或消费者时接收通知。

interface Listener<K, V> {default void producerAdded(String id, Producer<K, V> producer) {}default void producerRemoved(String id, Producer<K, V> producer) {}}
interface Listener<K, V> {default void consumerAdded(String id, Consumer<K, V> consumer) {}default void consumerRemoved(String id, Consumer<K, V> consumer) {}}

在每种情况下,id都是通过将客户端id属性(在创建后从metrics()中获得)附加到工厂beanName属性中创建的,用..分隔。。

例如,这些监听器可用于在创建新客户端时创建和绑定Micrometer KafkaClientMetrics实例(并在客户端关闭时关闭它)。

该框架提供了正是这样做的监听器;请参见千分尺本地度量。

Default client ID prefixes

从版本3.2开始,对于使用Spring.application.name属性定义应用程序名称的Spring Boot应用程序,此名称现在用作这些客户端类型的自动生成客户端ID的默认前缀:

不使用消费者群体的消费者客户

生产商客户

管理员客户端

这使得在服务器端更容易识别这些客户端,以便进行故障排除或应用配额。

Client TypeWithout application nameWith application name

consumer without consumer group

consumer-null-1

myapp-consumer-1

consumer with consumer group "mygroup"

consumer-mygroup-1

consumer-mygroup-1

producer

producer-1

myapp-producer-1

admin

adminclient-1

myapp-admin-1

http://www.lryc.cn/news/581717.html

相关文章:

  • 大模型关键字解释
  • 【机器学习笔记Ⅰ】4 梯度下降
  • 【管理学】乐嘉性格色彩与MBTI的优劣和适用场景
  • 【C++基础】内存管理四重奏:malloc/free vs new/delete - 面试高频考点与真题解析
  • 汇编与接口技术:8259中断实验
  • 高效处理大体积Excel文件的Java技术方案解析
  • 从0写自己的操作系统(4)实现简单的任务切换
  • FileZilla二次开发实战指南:C++架构解析与界面功能扩展
  • 在Ubuntu 24.04上部署Zabbix 7.0对服务器进行监控
  • 【机器学习笔记Ⅰ】13 正则化代价函数
  • [2025CVPR]一种新颖的视觉与记忆双适配器(Visual and Memory Dual Adapter, VMDA)
  • SSL 终结(SSL Termination)深度解析:从原理到实践的全维度指南
  • Python Bcrypt详解:从原理到实战的安全密码存储方案
  • 用户中心Vue3项目开发2.0
  • 2048小游戏实现
  • 线性代数--AI数学基础复习
  • 深度学习6(多分类+交叉熵损失原理+手写数字识别案例TensorFlow)
  • Chunking-free RAG
  • Web-API-day2 间歇函数setInterval与事件监听addEvenListener
  • 【Note】《Kafka: The Definitive Guide》第四章:Kafka 消费者全面解析:如何从 Kafka 高效读取消息
  • Apache Spark 4.0:将大数据分析提升到新的水平
  • A O P
  • 金融级B端页面风控设计:操作留痕与异常预警的可视化方案
  • 深度学习篇---深度学习常见的应用场景
  • 容声W60以光水离子科技实现食材“主动养鲜”
  • [Qt] visual studio code 安装 Qt插件
  • FastAPI + Tortoise-ORM + Aerich 实现数据库迁移管理(MySQL 实践)
  • 深度学习 必然用到的 线性代数知识
  • 嵌入式 数据结构学习(五) 栈与队列的实现与应用
  • React Ref 指南:原理、实现与实践