当前位置: 首页 > news >正文

Kafka Java API

1、增加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version>
</dependency>

2、三个案例

案例1:生产数据

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class Demo1KafkaProducer {public static void main(String[] args) {Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//生产数据producer.send(new ProducerRecord<>("words","java"));producer.flush();//关闭连接producer.close();}
}

案例2: 文件生产数据到kafka(读取)

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;public class Demo2FileToKaFka {public static void main(String[] args) throws Exception{Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//读取文件BufferedReader bs = new BufferedReader(new FileReader("flink/data/student.csv"));String line;while ((line=bs.readLine())!=null){//生产数据    如果指定分区默认为轮循添加数据producer.send(new ProducerRecord<>("students",line));producer.flush();}//关闭连接bs.close();producer.close();}
}

创建控制台消费者消费数据 

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

使用hash分区的方式改写该案例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;public class Dem3FileToKafkaWithHash {public static void main(String[] args) throws Exception {Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//读取文件FileReader fileReader = new FileReader("flink/data/student.csv");BufferedReader bufferedReader = new BufferedReader(fileReader);String line;while ((line = bufferedReader.readLine()) != null) {String clazz = line.split(",")[4];//hash分区int partition = Math.abs(clazz.hashCode()) % 3;//生产数据//kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash_partition//指定分区生产数据producer.send(new ProducerRecord<>("students_hash_partition", partition, null, line));producer.flush();}//关闭连接fileReader.close();bufferedReader.close();producer.close();}
}

案例3:消费kafka中的数据

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.ArrayList;
import java.util.Properties;public class Demo4Consumer {public static void main(String[] args) {Properties properties = new Properties();//指定kafka集群列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/** earliest* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* latest  默认* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据* none* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常**/properties.setProperty("auto.offset.reset", "earliest");//指定消费者组,一条数据在一个组内只消费一次properties.setProperty("group.id", "java_kafka_group1");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅topicArrayList<String> topics = new ArrayList<>();topics.add("hash_students");consumer.subscribe(topics);//死循环拉取数据,使数据全部拉取完毕while (true) {//拉取数据  默认只会拉取500条数据ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);//解析数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String topic = consumerRecord.topic();         //主题long offset = consumerRecord.offset();         //偏移量int partition = consumerRecord.partition();    //分区String value = consumerRecord.value();         //数据long timestamp = consumerRecord.timestamp();   //处理时间System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);}}}
}

http://www.lryc.cn/news/359364.html

相关文章:

  • pushd: not found
  • 【第十三节】C++控制台版本坦克大战小游戏
  • 酷得单片机方案 2.4G儿童遥控漂移车
  • 【为什么 Google Chrome 打开网页有时极慢?尤其是国内网站,如知网等】
  • FastAPI - 数据库操作5
  • HTML静态网页成品作业(HTML+CSS)—— 冶金工程专业展望与介绍介绍网页(2个页面)
  • Flutter基础 -- Dart 语言 -- 注释函数表达式
  • “仿RabbitMQ实现消息队列”---整体架构与模块说明
  • springboot如何快速接入minio对象存储
  • 第六届“智能设计+运维”国产工业软件研讨会暨2024年天洑软件用户大会圆满召开
  • 05.k8s弹性伸缩
  • 【数据结构】详解二叉树
  • MapDB:轻量级、高性能的Java嵌入式数据库引擎
  • Rye: 一个革新的Python包管理工具
  • 如何在C#代码中判断当前C#的版本和dotnet版本
  • Linux 36.3@Jetson Orin Nano之系统安装
  • 案例实践 | 基于长安链的首钢供应链金融科技服务平台
  • Vue3实战笔记(55)—Vue3.4新特性揭秘:defineModel重塑v-model,拥抱高效双向数据流!
  • C++ | Leetcode C++题解之第123题买卖股票的最佳时机III
  • 微信小程序中Button组件的属性值和用法详解
  • 等保测评 | 等保测评简介及流程具体是什么?
  • CompassArena 司南大模型测评--代码编写
  • 叉积和法向量学习笔记
  • YZW900规格书
  • 9岁学生学什么编程好一些:探索编程启蒙的奥秘
  • Java反射实战指南:反射机制的终极指南
  • 高效训练超越LoRA,北航发布MoRA
  • 【Spring】Spring之依赖注入源码解析(上)
  • HBase 常用 shell 操作
  • 【区分vue2和vue3下的element UI InputNumber 计数器组件,分别详细介绍属性,事件,方法如何使用,并举例】