当前位置: 首页 > news >正文

Spring Kafka 之 @KafkaListener 注解详解

       我们在开发的过程中当使用到kafka监听消费的时候会使用到@KafkaListener注解,下面我们就介绍下它的常见属性和使用。

一、介绍

@KafkaListener 是 Spring Kafka 提供的一个注解,用于声明一个方法作为 Kafka 消息的监听器

二、主要参数

1、topic

  • 描述:指定监听的 Kafka 主题,可以是一个字符串数组。
  • 示例:@KafkaListener(topics = "my-topic")
  • 说明:定义了监听器将从哪个或哪些主题接收消息。

2、groupId

  • 描述:指定 Kafka 消费者组的 ID。
  • 示例:@KafkaListener(groupId = "my-group", topics = "my-topic")
  • 说明:每个消费者都有自己所属的组。一个组中可以有多个消费者,它们共同处理消息。

3、id

  • 描述:每个 Listener 实例的唯一标识符。
  • 示例:@KafkaListener(id = "myListener", topics = "my-topic")
  • 说明:如果不指定 groupIdid 将直接作为 groupId。在多监听器的应用中,可以使用不同的 id 来区分不同的监听器容器。

4、containerFactory

  • 描述:指定用于创建 MessageListenerContainer 的工厂 bean 的名称。
  • 示例:@KafkaListener(containerFactory = "yourContainerFactory", topics = "your-topic")
  • 说明:容器负责管理消息监听器的生命周期和线程管理。

5、topicPattern

  • 描述:指定一个正则表达式模式,用于匹配要监听的多个主题。
  • 示例:@KafkaListener(topicPattern = "your-topic.*", groupId = "your-group-id")
  • 说明:允许通过模式来匹配一组相关的主题。

6、autoStartup

  • 描述:指定是否在应用程序启动时自动启动监听器。
  • 示例:@KafkaListener(autoStartup = "false", topics = "your-topic")
  • 说明:默认为 true,可以手动控制监听器的启动和停止。

7、bootstrap.servers(注意:这不是 @KafkaListener 的直接参数,但通常在 Kafka 配置中指定)

  • 描述:Kafka 服务器的地址列表,用于连接到 Kafka 集群。
  • 示例:"localhost:9092,anotherhost:9092"
  • 说明:用于配置 Kafka 连接的基本信息。

三、示例

import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.springframework.kafka.annotation.KafkaListener;  
import org.springframework.stereotype.Service;  @Service  
public class KafkaConsumerService {  // 使用 @KafkaListener 注解配置 Kafka 消息监听器  @KafkaListener(  topics = "my-topic", // 监听名为 "my-topic" 的 Kafka 主题  groupId = "my-consumer-group", // 消费者组 ID 为 "my-consumer-group"  id = "myListener", // 监听器实例的唯一标识符为 "myListener"  containerFactory = "kafkaListenerContainerFactory", // 使用名为 "kafkaListenerContainerFactory" 的工厂 bean 来创建 MessageListenerContainer  autoStartup = "true" // 应用程序启动时自动启动监听器,默认为 true,这里显式指定  )  public void consumeMessage(ConsumerRecord<?, ?> record) {  // 处理接收到的 Kafka 消息  String topic = record.topic();  String value = (String) record.value();  System.out.println("Received message from topic: " + topic + ", value: " + value);  }  // (可选)你可以通过配置类来定义 kafkaListenerContainerFactory  // 通常在 @Configuration 类中定义 Bean  // ...  // @Bean  // public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(  //     ConsumerFactory<?, ?> consumerFactory) {  //     // 自定义 Kafka 监听器容器工厂  //     ...  // }  // 注意:上面的配置类代码是示例性的,并未完整展示如何配置一个 KafkaListenerContainerFactory。  // 实际的配置将依赖于你的应用程序和 Kafka 配置需求。  
}

http://www.lryc.cn/news/360310.html

相关文章:

  • 【量算分析工具-贴地距离】GeoServer改造Springboot番外系列九
  • 【linux】(1)文件操作及vi
  • 【5】MySQL数据库备份-XtraBackup - 全量备份
  • 数据治理-数据标准演示
  • 基于Chisel的FPGA流水灯设计
  • LabVIEW齿轮调制故障检测系统
  • AI帮写:探索国内AI写作工具的创新与实用性
  • n后问题 回溯笔记
  • 简述Java中实现Socket通信的步骤
  • Asp.Net Core 实现分片下载的最简单方式
  • [Mac软件]Leech for Mac v3.2 - 轻量级mac下载工具
  • 留给“端侧大模型”的时间不多了
  • Pytest框架中的Setup和Teardown功能
  • yolov10/v8 loss详解
  • Typescript高级: 深入理解infer关键字
  • JQC-3FF-S-Z 继电器模块使用(arduino)
  • 黑马一站制造数仓实战2
  • 网络I/O模型
  • Docker 简介和安装
  • 【源码】Spring Data JPA原理解析之Repository自定义方法命名规则执行原理(二)
  • Vue前端中从后端获取图片验证码
  • 【源码】多语言H5聊天室/thinkphp多国语言即时通讯/H5聊天室源码/在线聊天/全开源
  • gitlab 创建 ssh 和 token
  • Docker - Kafka
  • 一键实现文件夹批量高效重命名:轻松运用随机一个字母命名,让文件管理焕然一新!
  • Vue3项目练习详细步骤(第二部分:主页面搭建)
  • [个人总结]-java常用方法
  • 什么是Java泛型?它有什么作用
  • [机缘参悟-197] - 《道家-水木然人间清醒1》读书笔记 -21-看问题从现象到本质的层次
  • AIGC商业案例实操课,发觉其创造和商业的无限可能,Ai技术在行业应用新的商机