当前位置: 首页 > news >正文

Kafka集成springboot

安装kafka,直接到官网下载bin文件,本文使用windows进行使用kafka。

下载之后,第一步,启动zookeeper:

zookeeper-server-start.bat ..\..\config\zookeeper.properties

 第二步,启动kafka:

kafka-server-start.bat ..\..\config\server.properties

 第三步,在pom中导入依赖:

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>

第四步,修改yml文件,添加配置:

spring:kafka:bootstrap-servers: localhost:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

第五步, 即可编写测试类(测试消息队列):

生产者:

package com.farm.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HelloController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("topic1","你好旧时光");return "ok";}
}

消费者:

package com.farm.controller;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;@Component
public class HelloListener {@KafkaListener(topics = "topic1")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}
}

效果如下:

只要访问/hello就可以触发生产者向topic1主题发送“你好旧时光”的字样,通过注解可以让消费者消费这条消息。

补充:发送对象,采用fastjson进行封装:

@GetMapping("/hello")
public String hello(){User user = new User();user.setUsername("xiaowang");user.setAge(18);kafkaTemplate.send("topic1", JSON.toJSONString(user));return "ok";
}

消费者:

package com.heima.kafka.listener;import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;@Component
public class HelloListener {@KafkaListener(topics = "topic1")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user);}}
}

http://www.lryc.cn/news/262014.html

相关文章:

  • Unity中实现ShaderToy卡通火(移植篇)
  • 指针相关知识(进阶)
  • 怎么将文件变为可执行文件
  • 5373. 中等计算
  • 极智一周 | 两系列汇总、MI300X、H100、特供芯片、GPT-4、火灾检测、酷睿Ultra And so on
  • leetcode刷题日志-383赎金信
  • K8s(九)—volume.md
  • python N个人围成一圈报数 报到3出列 直到只剩下最后一人
  • RFC4861 中文版下
  • 用友时空 KSOA 多处SQL注入漏洞复现
  • [AutoSar]基础部分 RTE 介绍
  • Logstash访问安全访问Elasticsearch集群
  • 加密的艺术:对称加密的奇妙之处(下)
  • 异常检测 | MATLAB实现BiLSTM(双向长短期记忆神经网络)数据异常检测
  • 2023“楚怡杯”湖南省赛“信息安全管理与评估“--数字取证调查(高职组)
  • C++ list常用操作
  • MILP加速运算技巧——模型对称性的预处理
  • JavaScript中的生成器与迭代器详解
  • WebLangChain_ChatGLM:结合 WebLangChain 和 ChatGLM3 的中文 RAG 系统
  • hive常用SQL函数及案例
  • 分页操作中使用LIMIT和OFFSET后出现慢查询的原因分析
  • Java八股文面试全套真题【含答案】- Redis篇
  • 【C++11特性篇】一文助小白轻松理解 C++中的【左值&左值引用】【右值&右值引用】
  • 动态规划——OJ题(一)
  • 六:爬虫-数据解析之BeautifulSoup4
  • 音频筑基:总谐波失真THD+N指标
  • 自动驾驶技术:驶向未来的智能之路
  • TIGRE: a MATLAB-GPU toolbox for CBCT image reconstruction
  • 我的NPI项目之Android 安全系列 -- EMVCo
  • vue中实现使用相框点击拍照,canvas进行前端图片合并下载