当前位置: 首页 > news >正文

kafka怎么用代码读取数据

Kafka可以通过Java语言中的Kafka客户端库来读取数据。以下是一个简单的Java代码示例,通过Kafka Consumer API从Kafka集群中读取数据:

```java
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class KafkaConsumerExample {public static void main(String[] args) throws Exception {String topicName = "my-topic";Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topicName));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}
```

在这个示例中,我们使用KafkaConsumer类,它是Kafka Java客户端库的一部分,从指定的主题(topic)中消费数据。我们设置了Kafka集群的地址(bootstrap.servers),将自己的消费组ID(group.id)分配给消费者,以及对键(key)和值(value)进行反序列化。在这个示例中,我们使用StringDeserializer,但您可以使用任何其他适当的序列化程序。

在while循环中,我们使用poll()方法从Kafka集群中获取消息,等待100毫秒。然后我们遍历这些消息并打印它们的偏移量、键和值。

http://www.lryc.cn/news/98627.html

相关文章:

  • 网关与路由器的区别
  • 助力工业物联网,工业大数据之工单事实指标需求分析【二十】
  • python_PyQt5开发工具结构基础
  • 【C++】入门基础2
  • Reinforcement Learning with Code 【Chapter 8. Value Funtion Approximation】
  • 常用InnoDB参数介绍
  • 云原生网关部署新范式丨 Higress 发布 1.1 版本,支持脱离 K8s 部署
  • 【通讯录】--C语言
  • 通过两种实现方式理解CANoe TC8 demo是如何判断接收的以太网报文里的字段的
  • Mysql- 存储引擎
  • vite / nuxt3 项目使用define配置/自定义,可以使用process.env.xxx获取的环境变量
  • 在Linux、Ubuntu中跨平台编译ARM(AARCH64)平台的binutils
  • SpringCloudAlibaba微服务实战系列(五)Sentinel1.8.5+Nacos持久化
  • pytest中conftest的用法以及钩子基本使用
  • 数据结构---顺序栈、链栈
  • 我的MacBook Pro:维护心得与实用技巧
  • Higress非K8S安装
  • QT--day4(定时器事件、鼠标事件、键盘事件、绘制事件、实现画板、QT实现TCP服务器)
  • hjm家族信托科技研究报告
  • [SQL挖掘机] - 视图相关操作
  • 【Quartus FPGA】EMIF DDR3 读写带宽测试
  • Flutter:flutter_local_notifications——消息推送的学习
  • Spring AOP (面向切面编程)原理与代理模式—实例演示
  • 什么是SCRUM认证体系 ?
  • DoIP学习笔记系列:(二)VN5620 DoIP测试配置实践笔记
  • Grafana - TDEngine搭建数据监测报警系统
  • ES6基础知识二:ES6中数组新增了哪些扩展?
  • 使用CRM分析数据有哪些功能?
  • 大数据课程综合实验案例---课设问题汇总
  • 基于Vue+Element Plus实现表格组件