当前位置: 首页 > news >正文

Spring Boot学习(三十三):集成kafka

前言

下面是zookeeper和kafka的官网下载地址,大家可以学习下载

zookeeper下载地址:http://zookeeper.apache.org/releases.html

kafka下载地址:http://kafka.apache.org/downloads.html

1、添加依赖

在 pom.xml 文件中添加kafka依赖,依赖如下

	<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、配置Kafka信息

在 application.properties(或 application.yml)文件中配置 Kafka 的相关信息,下面是一个简单的示例:

#kafka地址,多个地址使用,分隔
spring.kafka.bootstrap-servers=127.0.0.1:9092
#消费者组ID
spring.kafka.consumer.group-id=myGroup
#序列化和反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3、发送消息

因为我们是springboot项目,已经集成了KafkaTemplate,我们可以直接使用KafkaTemplate来发送消息

下面,我编写一个发送消息的生产者

/*** 消息生产者*/
@Component
@Slf4j
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;/*** 发送消息* @param topic 主题* @param msg   消息*/public void send(String topic,String msg){kafkaTemplate.send(topic,msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("发送消息失败:{}", ex);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("发送消息成功:{}");}});}/*** 发送消息* @param topic* @param msg*/public void send(String topic, Object msg) {send(topic, JSONObject.toJSONString(msg));}}

编写好生产者之后,我们就可以使用生产者发送消息,如下

	@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("send")public void sendMsg(){kafkaProducer.send("my-topic","hello world");}

如果想定制KafkaTemplate,那么可以在配置类进行配置,如下所示

@Configuration
public class KafakaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;/*** 配置属性* @return*/@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}/*** 定制KafkaTemplate* @return*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setDefaultTopic("myGroup");return kafkaTemplate;}}

4、消费消息

使用 @KafkaListener 注解创建 Kafka 消费者,并监听指定的主题。接收到消息后,可以通过方法参数来接收消息:

@Slf4j
@Component
public class KafkaConsumer {/*** 消费my-topic主题的消息* @param message*/@KafkaListener(topics = "my-topic",groupId = "myGroup1")public void  receiveMessage(String message){log.info("消费消息:"+message);}
}

同一消费者组,只会有一个消费者进行消费,如果想配置多个消费者同时处理,可以使用 @KafkaListener 注解来配置多个消费者。每个消费者需要配置不同的 group-id,监听主题一致,如下所示,就会有两个消费者同时消费

@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = "my-topic",groupId = "myGroup1")public void  receiveMessage(String message){log.info("消费消息:"+message);}@KafkaListener(topics = "my-topic",groupId = "myGroup2")public void  receiveMessage2(String message){log.info("消费消息:"+message);}}
http://www.lryc.cn/news/254297.html

相关文章:

  • MOSFET
  • DriveWorks——参数化设计非标定制利器
  • DevEco Studio集成ArkUI-X
  • 网络视频服务器的作用是什么?
  • 解决vue3使用iconpark控制台预警提示问题
  • VMware 虚拟机 NAT 模式网络配置
  • 5-redis高级-哨兵
  • 鸿蒙HarmonyOS4.0开发应用学习笔记
  • 联通宽带+老毛子Padavan固件 开启IP v6
  • 唯创知音WT2003Hx系列单片机语音芯片:家庭理疗产品的智能声音伴侣
  • 2023_Spark_实验二十七:Linux中Crontab(定时任务)命令详解及使用教程
  • Java动态代理实现与原理详细分析
  • [实践总结] 使用Apache HttpClient 4.x进行进行一次Http请求
  • 易宝OA 两处任意文件上传漏洞复现
  • echart饼图高亮颜色设置,数据为0时候,labelLine不显示
  • Kafka 的消息格式:了解消息结构与序列化
  • 装箱 Box 数据类型
  • 多传感器融合SLAM在自动驾驶方向的初步探索的记录
  • ffmpeg与opencv-python处理视频
  • java 操作git
  • Linux 导入、导出 MySQL 数据库命令
  • 华为数通---BFD多跳检测示例
  • AWS 日志分析工具
  • gitLab 和Idea分支合并
  • 关于 mapboxgl 的常用方法及效果
  • C语言——二级指针
  • 股市复苏中的明懿金汇:抓住新机遇
  • Spacemesh、Kaspa和Chia的全面对比!
  • 【HTML语法】
  • ROS报错:RLException:Invalid roslaunch XML Syntax: mismatched tag: