当前位置: 首页 > news >正文

初试Kafka

Kafka 是一个分布式流处理平台,通常用作消息中间件,它可以处理大规模的实时数据流。以下是从零开始使用 Kafka 作为消息中间件的基本教程:

步骤 1: 下载和安装 Kafka

  1. 访问 Apache Kafka 官方网站:Apache Kafka
  2. 下载最新的 Kafka 发行版,并解压缩到本地文件夹。

步骤 2: 启动 ZooKeeper

Kafka 使用 ZooKeeper 来协调分布式节点。在 Kafka 解压缩后的文件夹中,进入 bin 目录,执行以下命令启动 ZooKeeper:

./zookeeper-server-start.sh ../config/zookeeper.properties

步骤 3: 启动 Kafka 服务

继续在 bin 目录中执行以下命令启动 Kafka 服务:

./kafka-server-start.sh ../config/server.properties

步骤 4: 创建一个主题(Topic)

Kafka 使用主题来组织和分类消息。执行以下命令创建一个主题:

./kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

这将创建一个名为 my_topic 的主题,具有一个分区和一个副本。

步骤 5: 发送消息到主题

使用 Kafka 提供的生产者工具向主题发送消息:

./kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092

然后,您可以在控制台中输入消息并按 Enter 发送。

步骤 6: 消费消息

使用 Kafka 提供的消费者工具从主题中消费消息:

./kafka-console-consumer.sh --topic my_topic --bootstrap-server localhost:9092 --from-beginning

这将显示从主题中接收到的消息。

步骤 7: 使用编程语言连接 Kafka

除了命令行工具外,您还可以使用编程语言连接 Kafka。根据您选择的语言,可以使用 Kafka 提供的客户端库。

使用 Java 示例
// 生产者示例
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "Hello, Kafka!");producer.send(record);producer.close();}
}// 消费者示例
import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "my_group");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList("my_topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());}}}
}

这是一个简单的 Java 示例,演示了如何使用 Kafka 的生产者和消费者 API。

希望这个简单的教程能帮助您入门 Kafka。请注意,这只是一个基础,Kafka 还有许多高级功能和配置,具体取决于您的使用场景和需求。

http://www.lryc.cn/news/268013.html

相关文章:

  • SuperMap Hi-Fi 3D SDK for Unity基础开发教程
  • Upload-lab(pass1~2)
  • Linux:查询当前进程或线程的资源使用情况
  • unityc用vs2017介绍
  • 单元测试实战
  • WebService
  • Nestjs使用log4j打印日志
  • Selenium - 自动化测试框架
  • RFID技术在汽车制造:提高生产效率、优化物流管理和增强安全性
  • git异常
  • 【C语言学习疑难杂症】第12期:如何从汇编角度深入理解y = (*--p)++这行代码(易懂版)
  • 5G阅信应用场景有哪些?
  • 使用OpenSSL生成自签名SSL/TLS证书和私钥
  • pycharm2023.2激活和新建项目,python3.12安装永久换源
  • FPGA分频电路设计(2)
  • 【三】【C语言\动态规划】珠宝的最高价值、下降路径最小和、最小路径和,三道题目深度解析
  • 爬虫工作量由小到大的思维转变---<第二十八章 Scrapy中间件说明书>
  • 从Maven初级到高级
  • orangepi--开发板配置网络SSH登录
  • 简单通讯录管理系统第4关:简单通讯录管理系统之修改通讯录用户信息
  • macOS编译ckb-next
  • 漏刻有时数据可视化Echarts组件开发(46)散点图颜色判断
  • 智能优化算法应用:基于驾驶训练算法3D无线传感器网络(WSN)覆盖优化 - 附代码
  • 【论文阅读】MCANet: Medical Image Segmentation with Multi-Scale Cross-Axis Attention
  • 机器视觉实战应用:手势、人脸、动作以及手势鼠标构建(一)
  • python作业题百度网盘,python作业答案怎么查
  • centos7.9中离线安装nginx开启ssl,arm架构
  • LENOVO联想笔记本小新Pro 14 IRH8 2023款(83AL)电脑原装出厂Win11系统恢复预装OEM系统
  • blender使用faceit绑定自己的表情动作
  • 有关List的线程安全、高效读取:不变模式下的CopyOnWriteArrayList类、数据共享通道:BlockingQueue