当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(独立消费者消费某一个主题数据案例__订阅主题)

目录

    • 一、独立消费者消费某一个主题数据案例
      • 1.1、案例需求
      • 1.2、案例代码
      • 1.3、测试

一、独立消费者消费某一个主题数据案例

1.1、案例需求

  • 创建一个独立消费者,消费firstTopic主题中数据,所下图所示:
    注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。
    在这里插入图片描述

1.2、案例代码

  • 代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;/**** 独立消费者,消费某一个主题中的数据*/
    public class CustomConsumer {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");// 设置分区分配策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("firstTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){//每一秒拉取一次数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//输出数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}kafkaConsumer.commitAsync();}}
    }
    

1.3、测试

  • 在 Kafka 集群控制台,创建firstTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 3 --replication-factor 1 --topic firstTopic
    

    在这里插入图片描述

  • 在 IDEA中启动案例代码
    在这里插入图片描述

  • 在 Kafka 集群控制台,创建 Kafka生产者,并输入数据。

    bin/kafka-console-producer.sh  --bootstrap-server 192.168.136.27:9092 --topic firstTopic
    

    在这里插入图片描述

  • 在 IDEA 控制台观察接收到的数据。

    ConsumerRecord(topic = firstTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateT
    ime = 1694097579736, serialized key size = -1, serialized value size = 10, headers =
    RecordHeaders(headers = [], isReadOnly = false), key = null, value = helo kafka)
    

    在这里插入图片描述

http://www.lryc.cn/news/166293.html

相关文章:

  • 笔记本多拓展出一个屏幕
  • Redis 高可用及持久化
  • Java高级: 反射
  • 【计算机网络】什么是WebSocket?
  • Apinto 网关: Go语言实现 HTTP 转 gRPC
  • 【管理运筹学】第 7 章 | 图与网络分析(4,最大流问题)
  • linux学习总结
  • 【API 管理】什么是 API 管理,为什么它很重要?
  • 基于人体呼出气体的电子鼻系统的设计与实现
  • OPC发展历程
  • 第69步 时间序列建模实战:ARIMA建模(R)
  • 【多线程】CountDownLatch
  • 使用 docker buildx 构建跨平台镜像 (QEMU/buildx/build)
  • 算法|Day49 动态规划17
  • Linux nohup命令
  • SQL Server 跨库/服务器查询
  • word转PDF文件变小,图片模糊
  • 被删除并且被回收站清空的文件如何找回
  • 每日两题 131分割回文串 784字母大小写全排列(子集模版)
  • Java面试八股文宝典:初识数据结构-数组的应用扩展之HashMap
  • ES6 特性
  • 重拾html5
  • 递归学习——记忆化搜索
  • ChatGPT帮助一名儿童确诊病因,之前17位医生无法确诊
  • Laf 云开发平台及其实现原理
  • 浅谈STL|STL函数对象篇
  • 自建私人图床方案:使用Cpolar+树洞外链轻松部署超轻量级图床,实现高效图片存储
  • 从零基础到精通Flutter开发:一步步打造跨平台应用
  • SpringBoot整合WebSocket【代码】
  • 微服务 第一章 Java线程池技术应用