当前位置: 首页 > news >正文

kafka消息系统实战

kafka是什么?

        是一种高吞吐量的、分布式、发布、订阅、消息系统 

1.导入maven坐标

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>

2.编写提供者

public class KafkaProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers", "localhost:9092");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("acks", "all");prop.put("retries", 0);prop.put("batch.size", 16384);prop.put("linger.ms", 1);prop.put("buffer.memory", 33554432);String topic = "hello"; // 主题org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(prop);producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka"));producer.close();}}

3.编写消费者

public class KafkaConsumer {public static void main(String[] args) throws InterruptedException {Properties prop = new Properties();prop.put("bootstrap.servers", "192.168.8.166:9092");prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("group.id", "con-1");prop.put("auto.offset.reset", "latest");//自动提交偏移量prop.put("auto.commit.intervals.ms", "true");//自动提交时间prop.put("auto.commit.interval.ms", "1000");org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);ArrayList<String> topics = new ArrayList<>();//可以订阅多个消息topics.add("hello");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(20));for (ConsumerRecord<String, String> consumerRecord : poll) {System.out.println(consumerRecord);System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());System.out.println(consumerRecord.topic());}}}
}

 4.下载kafka

点此去官网下载——>Apache Kafka

解压后进入config目录 

修改zookeeper.properties

dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper

修改日志存放的路径server.properties

log.dirs=D:/kafka_2.13-3.5.1/tmp/kafka-logs

 启动zookeeper服务

zookeeper-server-start.bat ../../config/zookeeper.properties

启动kafka服务

kafka-server-start.bat ../../config/server.properties

5.依次启动消费者和生产者,查看发布的消息

http://www.lryc.cn/news/145669.html

相关文章:

  • Kafka3.0.0版本——Leader故障处理细节原理
  • BI系统框架模型
  • 双向交错CCM图腾柱无桥单相PFC学习仿真与实现(3)硬件功能实现
  • 微软用 18 万行 Rust 重写了 Windows 内核
  • word 调整列表缩进
  • nginx学习
  • python+TensorFlow实现人脸识别智能小程序的项目(包含TensorFlow版本与Pytorch版本)(一)
  • ChatGPT怎么用于政府和公共服务?
  • dvwa文件上传通关及代码分析
  • 数字孪生:重塑政府决策与公共服务
  • Leetcode:【448. 找到所有数组中消失的数字】题解
  • 2023年中,量子计算产业现状——
  • 微信小程序智慧流调微信小程序设计与实现
  • 分布式集群框架——有关zookeeper的面试考点
  • Spring Cloud Gateway的快速使用
  • VSCode-C++环境配置+Cmake
  • python爬虫14:总结
  • 扩散模型实战(八):微调扩散模型
  • Android 全局控件属性设置
  • 下面是实践百度飞桨上面的pm2.5分类项目_logistic regression相关
  • 阿里云误删Python后域yum报错解决方案
  • unordered-------Hash
  • 数据仓库总结
  • hadoop学习:mapreduce入门案例二:统计学生成绩
  • 自学TypeScript-基础、编译、类型
  • nginx配置https
  • windows Etcd的安装与使用
  • 【py】为什么用 import tkinter 不能运行
  • 【深度学习】实验04 交叉验证
  • whisper语音识别部署及WER评价