当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

目录

    • 一、独立消费者消费某一个主题中某个分区数据案例
      • 1.1、案例需求
      • 1.2、案例代码
      • 1.3、测试

一、独立消费者消费某一个主题中某个分区数据案例

1.1、案例需求

  • 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示:
    在这里插入图片描述

1.2、案例代码

  • 生产者往firstTopic主题 0 号分区发送数据代码

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("firstTopic", 0,"","hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
  • 消费者消费firstTopic主题 0 分区数据代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题对应的分区ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("firstTopic",0));kafkaConsumer.assign(topicPartitions);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

1.3、测试

  • 在 IDEA 中执行消费者程序,如下图:
    在这里插入图片描述
  • 在 IDEA 中执行生产者程序 ,在控制台观察生成几个 0号分区的数据,如下图:
    在这里插入图片描述
  • 在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
    在这里插入图片描述
http://www.lryc.cn/news/158205.html

相关文章:

  • 基于Simulink的用于电力系统动态分析
  • 日200亿次调用,喜马拉雅网关的架构设计
  • 构造函数和析构函数(个人学习笔记黑马学习)
  • GPT引领前沿与应用突破之GPT4科研实践技术与AI绘图教程
  • Git上传新项目
  • C语言文件操作总结
  • 原生js之dom如何进行事件监听(事件捕获/冒泡)
  • 使用SimPowerSystems并网光伏阵列研究(Simulink实现)
  • BUUCTF-WEB-[ACTF2020 新生赛]Includel
  • 算法通关村十四关:白银挑战-堆能高效解决的经典问题
  • 跨站请求伪造(CSRF)攻击与防御原理
  • 从0到1实现播放控制器
  • 【Vue-Element-Admin】导出el-table全部数据
  • MFC 更改控件的大小和位置
  • 【向量数据库】相似向量检索Faiss数据库的安装及余弦相似度计算(C++)
  • 教育培训小程序的设计与功能解析
  • 【ES】illegal_argument_exception“,“reason“:“Result window is too large
  • SpringBoot实现登录拦截
  • 浅谈泛在电力物联网、能源互联网与虚拟电厂
  • 深度学习框架安装与配置指南:PyTorch和TensorFlow详细教程
  • vue中属性执行顺序
  • 【代码随想录】Day 50 动态规划11 (买卖股票Ⅲ、Ⅳ)
  • PHP反序列化漏洞
  • 容器编排学习(一)k8s集群管理
  • js去除字符串空格的几种方式
  • Spring 自带工具——URI 工具UriComponentsBuilder
  • 优化案例5:视图目标列改写优化
  • Origin绘制彩色光谱图
  • 项目复盘:从实践中学习
  • 机器学习和数据挖掘02-Gaussian Naive Bayes