当前位置: 首页 > news >正文

11、Kafka ------ Kafka 核心API 及 生产者API 讲解

目录

  • Kafka核心API 及 生产者API讲解
    • ★ Kafka的核心API
      • Kafka包含如下5类核心API:
    • ★ 生产者API
      • Kafka 的API 文档
    • ★ 使用生产者API发送消息

Kafka核心API 及 生产者API讲解

官方文档

★ Kafka的核心API

Kafka包含如下5类核心API:

在这里插入图片描述

Producer API(生产者API):
应用程序通过该API向主题发布消息。

Consumer API(消费者API):
应用程序通过该API订阅一个或多个主题,并从所订阅的主题中拉取消息(记录)

Streams API(流API):
应用程序可通过该API实现流处理器,可以将一个主题的消息“导流”到另一个主题,并能地对消息进行任意自定义的转换。

类似于 RabbitMQ 的 Exchange

Connector API(连接器API):
应用程序可通过这套API来实现连接器,这些连接器不断地从源系统或应用程序导入数据到Kafka,反过来也可将Kafka消息不断地导入某个接收系统或应用程序。

通过这个API,可以让应用程序和Kafka这个消息系统进行一个实时的交互,我们的系统可以不断的接收来自Kafka的消息,也可以让我们的程序不断的把数据导入到Kafka的消息系统中,就像是一个通道,所以叫连接API。

应用场景:我们的应用程序要和Kafka之间保持实时的数据流的时候,就可以用这个连接API。

AdminAPI(管理API):
应用程序可通过该API管理和检查主题、Broker和其他Kafka实体。

在这里插入图片描述



这5套API中,只有流API使用的是专门的JAR包。

其他都用的是org.apache.kafka:kafka-clients依赖库。

而流API用的是org.apache.kafka:kafka-streams依赖库。



★ 生产者API


在这里插入图片描述

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version>
</dependency>

生产者API 的核心类是 KafkaProducer,它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象。

ProducerRecord 代表了一条消息,Kafka 的消息是包含了key、value、timestamp。

ProducerRecord定义了如下6个构造器:

- ProducerRecord(String topic, Integer partition, K key, V value):创建一条发送到指定主题和指定分区的消息。- ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers):创建一条发送到指定主题和指定分区的消息,且包含多个消息头。- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value):创建一条发送到指定主题和指定分区的消息,且使用给定的时间戳。- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers):创建一条发送到指定主题和指定分区的消息、使用给定的时间戳,且包含多个消息头。- ProducerRecord(String topic, K key, V value):创建一条发送到指定主题的消息。- ProducerRecord(String topic, V value):创建一条发送到指定主题的、只带value,不带key的消息。

通过查 API 文档可看这个 ProducerRecord 消息对象 的6个构造器:

在这里插入图片描述

Kafka 的API 文档

Kafka 的API 文档

在这里插入图片描述

★ 使用生产者API发送消息

使用生产者API发送消息很简单,基本只要两步:

1、创建KafkaProducer对象,创建该对象时要传入Properties对象,用于对该生产者进行配置。

2、调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。

3、发送完成后,关闭KafkaProducer对象。



为何Kafka的KafkaProducer需要一个Properties来来创建KafkaProducer?

因为Kafka的Producer API提供了海量的配置选项——如果你将这些配置选项每个都定义成方法,那将是一件让人无比痛苦的事情。

所以Kafka在设计该API时,就直接用了一个Properties来封装所有的配置属性。

http://www.lryc.cn/news/284754.html

相关文章:

  • MySQL 8.3 发布, 它带来哪些新变化?
  • 【数据结构】详谈队列的顺序存储及C语言实现
  • 为什么 HTTPS 协议能保障数据传输的安全性?
  • 使用 Node 创建 Web 服务器
  • leetcode 151反转字符串如何原地去除多余空格
  • 面试问题记录【深圳,共三面,A 轮公司】
  • Mysql数据库cpu飙升怎么解决
  • PHP反序列化漏洞-POP链构造
  • CentOS 7安装Java并配置环境
  • Vagrant创建Oracle RAC环境示例
  • 鸿蒙 HarmonyOS ArkTS ArkUI 动画 中心缩放、顶部缩放、纵向缩放
  • 基于python socket实现TCP/UDP通信
  • 指针的运算
  • 记录一次QT乱码问题
  • 怎么提升搜狗网站排名
  • 搜索经典题——填充 9*9矩阵
  • Vue待办事项(组件,模块化)
  • Vue中的组件
  • svg矢量图标在wpf中的使用
  • 如何在云端加速缓存构建
  • JavaWeb-Cookie与Session
  • ZABBIX根据IP列表,主机描述,或IP子网批量创建主机的维护任务
  • PMIS_ENT_STD
  • 32 登录页组件
  • Docker(一)简介和基本概念:什么是 Docker?用它会带来什么样的好处?
  • 【Linux】进程的概念 进程状态 进程优先级
  • Go语言热重载和优雅地关闭程序
  • Python实现两个列表相加的方法汇总
  • debian12.4配置
  • linux切换root用户su - root和su root的区别