当前位置: 首页 > news >正文

kafka入门教程,介绍全面

1、官网下载最新版本的kafka,里面已经集成zookeeper。直接解压到D盘

2、配置文件修改,config目录下面的zookeeper.properties.   设置zookeeper数据目录

dataDir=D:/kafka_2.12-3.6.0/tmp/zookeeper

3、修改kafka的配置文件server.properties.   主要修改内容如下:

zookeeper.connect=localhost:2181

log.dirs=D:\\kafka_2.12-3.6.0\\logs

listeners=PLAINTEXT://localhost:9092

其他默认即可。

4、修改完成后进入bin目录:启动zookeeper和kafka,命令如下

zookeeper-server-start.bat ../../config/zookeeper.properties

kafka-server-start.bat ../../config/server.properties

5、命令行创建topic,命令如下:

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic hello

6、创建生产者和消费者,测试。生产者输入消息,消费者就会收到相应的消息了 

kafka-console-producer.bat --broker-list localhost:9092 --topic hello

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hello--from-beginning

7、创建springboot工程,测试

引入依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

8、yml文件配置kafka

spring:
kafka:
bootstrap-servers: localhost:9092
producer:
acks: 1
retries: 3
batch-size: 16384
properties:
linger:
ms: 0
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: helloGroup
enable-auto-commit: false
auto-commit-interval: 1000
auto-offset-reset: latest
properties:
request:
timeout:
ms: 18000
session:
timeout:
ms: 12000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

9、使用springboot    KafkaTemplate发送消息

@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
public String sendMessage(String message) {
kafkaTemplate.send("hello", message);

return "发送成功~";
}

10、消息消费,


@KafkaListener(topics = "hello")
public void receiveMessage(ConsumerRecord<String, String> record) {
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
String message = record.value();

System.out.println("topic = " + topic);
System.out.println("offset = " + offset);
System.out.println("partition = " + partition);
System.out.println("message = " + message);
}

http://www.lryc.cn/news/215350.html

相关文章:

  • 万字解析设计模式之原型模式与建造者模式
  • 深度学习数据集大合集—疾病、植物、汽车等
  • 物联网中的ESP8266该这么用!
  • django中循环生成的多个btn,只有第一个btn会弹出模态框
  • JVM第二十三讲:Java动态调试技术原理
  • 制造企业如何三步实现进销存管理?
  • 封装localstorage为对象 js
  • 算法通关村第五关|白银|队栈和Hash的经典算法题【持续更新】
  • java--构造器
  • 纪念基于JavaScript 实现的后台桌面 UI 设计
  • C++11 auto限制
  • 公司老项目springmvc jsp 自定义多数据源 转到springboot 整理
  • Java之SpringCloud Alibaba【七】【Spring Cloud微服务网关Gateway组件】
  • 探讨jdk源码中的二分查找算法返回值巧妙之处
  • 深度学习实战:基于TensorFlow与OpenCV的手语识别系统
  • 学习整理nginx常用屏蔽规则,让网站更安全
  • 四十一、【进阶】索引使用SQL提示
  • AI智能分析网关高空抛物算法如何实时检测高楼外立面剥落?
  • 微信小程序 - 页面继承(非完美解决方案)
  • 智能配件管理系统有什么用?企业如何实现管理数字化转型?
  • @SuppressWarnings注解使用说明
  • 算法从入门到入土cpp版
  • 没有PDF密码,如何解密文件?
  • Sqlyog 无法连接 8 版本的mysql caching_sha2_password could not be loaded
  • 学习笔记三十三:准入控制
  • Unix/Linux C语言 获取控制台窗口尺寸
  • 界面控件DevExpress WinForms Gauge组件 - 实现更高级别数据可视化
  • vivo 自研蓝河操作系统 BlueOS 发布:支持大模型、BlueXlink 协议实现万物互联
  • opencv复习(很乱)
  • 于璠访谈录 | AI 框架应该和而不同?