当前位置: 首页 > news >正文

kafka集群搭建(Linux环境)

zookeeper搭建,可以搭建集群,也可以单机(本地学习,没必要搭建zookeeper集群,单机完全够用了,主要学习的是kafka)

1. 首先官网下载zookeeper:Apache ZooKeeper

2. 下载好之后上传到centOS or 其他虚拟机

3. 解压

4. 到zookeeper的config目录下copy 一份zoo_sample.cfg,并改名为zoo.cfg

5. 配置环境变量(这一步不是必须,如果想在任何目录下可以执行zookeeper的命令,可以执行此选项,如果不执行此选项,每次执行zookeeper命令需要到zookeeper的bin目录下去执行)

        - 进入/etc 目录: cd /etc

        - 修改 profile 文件: vim profile, 修改后的结果如下:

 改好后,wq! 保存退出,这样就可以启动zookeeper了:zkServer.sh start

kafka 集群搭建

1. 下载kafka:Apache Kafka

2. 上传centOS,解压

3. 配置kafka环境变量,如上图所示

4. 进入config目录下,copy 2份 server.properties,重命名为 server.properties1 和 server.properties2,这一步的目的是在一台虚拟机上模拟3个kafka,只是在配置文件里做区分,每个 server.properties 需要配置以下内容,分别如下:

server.properties:

 server.properties1:

server.properties2:

 5. 分别启动这三个配置文件:

进入config目录下,执行以下命令

kafka-server-start.sh -daemon server.properties
kafka-server-start.sh -daemon server.properties1
kafka-server-start.sh -daemon server.properties2

   **** -daemon 是后台运行

不断的刷新 JPS 命令,查看kafka启动情况:

可以看到3台kafka都已经启动,检查kafka在zookeeper里的情况

在任何目录下执行命令进入到zk 客户端:

zkCli.sh

 再执行以下命令,查看zookeeper下所有的文件夹

ls /

再执行以下命令,可以查看kafka broker id情况:

ls /brokers/ids

 

 可以看到3台kafka都已经启动了

java 代码测试

java代码:

package com.tech;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class SimpleDemo {static class MyProducer{// 这里是centOS里的ipstatic final String BOOTSTRAP_SERVERS = "192.168.116.128:9092,192.168.116.128:9093,192.168.116.128:9094";public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);properties.put(ProducerConfig.CLIENT_ID_CONFIG,"MY-CLIENT");KafkaProducer<String,String> producer = new KafkaProducer(properties);ProducerRecord<String,String> record = new ProducerRecord<>("test","hello xma");producer.send(record);}}static class MyConsumer{static final String BOOTSTRAP_SERVERS = "192.168.116.128:9092,192.168.116.128:9093,192.168.116.128:9094";public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"TEST-GROUP");KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList("test"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key()+":"+record.value());}}}}
}

producer也可以在centOS里启动:

kafka-console-producer.sh --broker-list 192.168.116.128:9092,192.168.116.128:9093,192.168.116.128:9094 --topic test

***** 注意如果consumer消费消息的时候出现如下错误,需要配置centOS里的your.host.name信息:

java.net.UnknownHostException:XXX

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.116.128 your.host.name

配置好之后需重启虚拟机

http://www.lryc.cn/news/101552.html

相关文章:

  • 树莓派本地快速搭建web服务器,并发布公网访问
  • 集合中的数据结构
  • CentOS 8 错误: Error setting up base repository
  • java外观模式
  • 3秒快速打开 jupyter notebook
  • 数据安全
  • 华为nat64配置
  • 从分片传输到并行传输之大文件传输加速技术
  • mybatisPlus入门篇
  • NineData支持最受欢迎数据库PostgreSQL
  • Redis配置类
  • 【前端知识】React 基础巩固(三十六)——RTK中的异步操作
  • 33. 本地记事本
  • Android Glide预处理preload原始图片到成品resource 预加载RecyclerViewPreloader,Kotlin
  • 亚马逊云科技全新Amazon Bedrock,助力客户构建生成式AI应用
  • 题解:ABC275 C-Counting Squares
  • 加载已训练好的目标检测YOLOv8,v5,v3,v6模型,对数据集中某张图片中的object打上方框、标出类别,并将图片保存到本地
  • 《零基础入门学习Python》第073讲:GUI的终极选择:Tkinter10
  • Shell脚本实现分库分表操作
  • 区块链实验室(12) - 网络拓扑对PBFT共识流量的影响
  • 聊聊这几年的科技风口
  • 【力扣每日一题】2023.7.30 环形链表2
  • Flink状态的理解
  • 6.3.tensorRT高级(1)-yolov5模型导出、编译到推理(无封装)
  • 如何利用设备数字化平台推动精益制造?
  • 使用Wps减小PDF文件的大小
  • 【深度学习】GPT-3
  • 在登录界面中设置登录框、多选项和按钮(HTML和CSS)
  • 【语音识别】- 声学,词汇和语言模型
  • 【考研英语语法及长难句】小结