当前位置: 首页 > news >正文

Kafka消费者api编写教程

1.基本属性配置

输入new Properties().var 回车

//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);/*//订阅主题//创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);*///订阅分区//创建一个数组列表变量接收主题分区值ArrayList<TopicPartition> topicPartitions = new ArrayList<>();//指定要订阅的分区topicPartitions.add(new TopicPartition("customers",2));//订阅分区kafkaConsumer.assign(topicPartitions);//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}
}

http://www.lryc.cn/news/366198.html

相关文章:

  • 什么情况下要配置DNS服务
  • 华为端云一体化开发 (起步1.0)(HarmonyOS学习第七课)
  • 数据结构之ArrayList与顺序表(上)
  • Java 8 中的 Stream API,用于处理集合数据
  • 106、python-第四阶段-3-设计模式-单例模式
  • 【猫狗识别系统】图像识别Python+TensorFlow+卷积神经网络算法+人工智能深度学习
  • 记录汇川:红绿灯与HMI-ST
  • 已解决java.nio.charset.CoderMalfunctionError: 编码器故障错误的正确解决方法,亲测有效!!!
  • Linux 中常用的设置、工具和操作
  • [论文笔记]AIOS: LLM Agent Operating System
  • 2024全国高考作文题解读(文心一言 4.0版本)
  • 【功能超全】基于OpenCV车牌识别停车场管理系统软件开发【含python源码+PyqtUI界面+功能详解】-车牌识别python 深度学习实战项目
  • TESSENT2024.1安装
  • 【机器学习】原理与应用场景 Python代码展现
  • Python怎么循环计数:深入解析与实践
  • Facebook企业户 | Facebook公共主页经营
  • 排序数组 ---- 分治-归并
  • 【红黑树变色+旋转】
  • pytorch 使用tensor混合:进行index操作
  • Threejs(WebGL)绘制线段优化:Shader修改gl.LINES模式为gl.LINE_STRIP
  • 继承-进阶
  • 探索k8s集群的配置资源(secret和configmap)
  • 如何设置vue3项目中默认的背景为白色
  • MS1112驱动开发
  • K8s存储对象的使用
  • 构建自动化API数据抓取系统
  • 【Qt知识】部分QWidget属性表格
  • 【ARM64 常见汇编指令学习 19.1 -- ARM64 跳转指令 b.pl 详细介绍】
  • WWDC24即将到来,ios18放大招
  • C#中的空合并运算符与空合并赋值运算符:简化空值处理