当前位置: 首页 > news >正文

Kafka——概述、安装及命令行操作

文章目录

  • 一、概述
    • 1.1、定义
    • 1.2、如何运作?
    • 1.3、传统消息队列的应用场景
    • 1.4、消息队列的两种模式
    • 1.5、Kafka的基础架构
  • 二、安装(需要安装zookeeper)
  • 三、常用命令行操作
    • 3.1、主题命令行操作
    • 3.2、生产者命令行操作
    • 3.3、消费者命令行操作

一、概述

1.1、定义

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Quere)。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息。

1.2、如何运作?

Kafka是一个由服务器和客户端组成的分布式系统,通过高性能TCP网络协议进行通信。它可以部署在本地和云中的裸机硬件、虚拟机和容器上环境。

  • 服务器: Kafka作为一个或多个服务器的集群运行,这些服务器可以跨越多个数据中心或云区域。其中一些服务器形成了存储层,成为代理。其他服务器运行Kafka Connect以持续导入和导出数据作为事件流。用于将Kafka与现有的系统、其他Kafka集群集成。Kafka集群具有高度可扩展行和容错:如果其任何服务器发送故障,其他服务器将接管其工作以确保连续操作,没有数据丢失。
  • **客户端:**它们允许编写分布式应用程序和微服务,这些应用程序和微服务可以并行地、大规模地读取、写入和处理事件流,并且即使在网络问题或机器故障的情况下也能以容错的方式进行。

1.3、传统消息队列的应用场景

缓存/消峰、解耦、异步通信

  • 缓存/消峰:有助于控制和优化数据流经过系统的速度,解决生成消息和消费消息的处理速度不一致的情况。
    在这里插入图片描述

  • 解耦:允许独立扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
    在这里插入图片描述+ 异步处理:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候去处理它们。

在这里插入图片描述

1.4、消息队列的两种模式

  • 点对点模式: 消费者主动拉取数据,消息收到后清除消息。
    在这里插入图片描述
  • 发布/订阅模式:
    • 可以有多个topic主题。
    • 消费者消费数据后,不删除数据。
    • 每个消费者相互独立,都可以消费到数据。
      在这里插入图片描述

1.5、Kafka的基础架构

  • 为方便扩展,并提高吞吐量,一个topic分为多个partition。
  • 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费。
  • 为提高可用性,为每个partition增加若干个副本。
  • ZK中记录谁是leader
    在这里插入图片描述
  • Producer:消息生产者,就是向 Kafka broker 发消息的客户端。
  • Consumer:消息消费者,向 Kafka broker 取消息的客户端。
  • Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。
  • Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
  • Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
  • Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

二、安装(需要安装zookeeper)

  1. 下载安装包Kafka官网下载地址
  2. 上传到服务器
  3. 解压
    tar -zxf /opt/install/kafka_2.12-2.8.0.tgz -C /opt/soft/
    
  4. 修改目录名
    mv /opt/install/kafka2_.12-2.8.0/ kafka212
    
  5. 修改配置文件(server.properties)(下面修改前面为第几行注意地址改为自己的地址)
    vim /opt/soft/kafka212/config/server.properties21 broker.id=036 advertised.listeners=PLAINTEXT://xsqone144:909260 log.dirs=/opt/soft/kafka212/data103 log.retention.hours=1680123 zookeeper.connect=xsqone144:2181/kafka137 delete.topic.enable=true
    
  6. 设置节点号
    echo "0">/opt/soft/kafka212/data/myid
    
  7. 修改配置文件
    vim /etc/profile# KAFKA_HOMEexport KAFKA_HOME=/opt/soft/kafka212export PATH=$PATH:$KAFKA_HOME/bin
    
  8. 重新加载配置文件
    source /etc/profile
    
  9. 测试(启动Kafka)
    # 先启动zookeeper
    zkServer.sh start
    kafka-server-start.sh -daemon /opt/soft/kafka212/config/server.properties
    
    在这里插入图片描述

三、常用命令行操作

3.1、主题命令行操作

  • 查看消息队列
kafka-topics.sh --bootstrap-server xsqone144:9092 --list

在这里插入图片描述

  • 创建消息队列
kafka-topics.sh --bootstrap-server xsqone144:9092 --create --topic taibai --replication-factor 1 --partitions 1

在这里插入图片描述

  • 查看消息队列详情
kafka-topics.sh --bootstrap-server xsqone144:9092 --describe --topic taibai

在这里插入图片描述

  • 修改分区数(注意:只能增加,不能减少)
kafka-topics.sh --bootstrap-server xsqone144:9092 --alter --topic taibai --partitions 3

在这里插入图片描述

  • 删除消息队列
kafka-topics.sh --bootstrap-server xsqone144:9092 --delete --topic taibai 

在这里插入图片描述

3.2、生产者命令行操作

  • 发送消息
kafka-console-producer.sh --bootstrap-server xsqone144:9092 --topic xsqone

在这里插入图片描述

  • 查看生产者者offset信息
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xsqone144:9092 --topic xsqone

在这里插入图片描述

3.3、消费者命令行操作

  • 消费消息队列中的数据
kafka-console-consumer.sh --bootstrap-server xsqone144:9092 --topic xsqone

在这里插入图片描述

  • 消费消息队列中所有数据
kafka-console-consumer.sh --bootstrap-server xsqone144:9092 --from-beginning --topic first
  • 查看消费者的offset信息(最后的为消费者IDName)
kafka-consumer-groups.sh --bootstrap-server xsqone144:9092 --describe --group 5

第一列是groupid,第二列是主题名,第三列是分区,第四列是偏移量,第五列是生产者偏移量,第六列偏移量落后
在这里插入图片描述

http://www.lryc.cn/news/58611.html

相关文章:

  • 怎么控制ERP企业管理系统开发的价格
  • 我在“Now In Android”中学到的 9 件事
  • ChatGPT宝藏插件丨装上之后,上网、语音聊天、一键分享对话……简直让你爽到起飞!
  • 私有句柄表
  • Vue——类与样式绑定
  • 软考中项计算题总结
  • 如何使用基于GPT-4的Cursor编辑器提升开发效率
  • 压箱底教程分享,手把手教会你如何注册target账号和下单
  • 一次性搞懂dBSPL、dBm、dBu、dBV、dBFS的区别!
  • 漂亮实用的15个脑图模板,你知道哪些是AI做的吗?
  • 历代程序员都无法逃脱的诅咒 -- 低代码
  • 14Exceptional Control Flow Exceptions and Process(异常控制流,异常和进程)
  • LeetCode - 两数之和
  • Python 小型项目大全 31~35
  • 他又赚了一万美金
  • 企业工程项目管理系统+spring cloud 系统管理+java 系统设置+二次开发
  • 教你使用Apache搭建Http
  • ZooKeeper+Kafka+ELK+Filebeat集群搭建实现大批量日志收集和展示
  • 数据结构初阶 - 总结
  • 代码随想录算法训练营第四十四天-动态规划6|518. 零钱兑换 II ,377. 组合总和 Ⅳ (遍历顺序决定是排列还是组合)
  • wma格式怎么转换mp3,4种方法超快学
  • 【数据结构与算法】判定给定的字符向量是否为回文算法
  • 考研数二第十七讲 反常积分与反常积分之欧拉-泊松(Euler-Poisson)积分
  • 【论文总结】理解和减轻IoT消息协议的安全风险
  • SpringBoot基础入门
  • jar 包与 war 包区别
  • 【数据结构:复杂度】时间复杂度
  • 京东pop店铺订单导出
  • 论文阅读:Towards Stable Test-time Adaptation in Dynamic Wild World
  • 2022国赛27:Linux-1时间服务chrony配置