当前位置: 首页 > news >正文

Spring Boot教程之五十五:Spring Boot Kafka 消费者示例

Spring Boot Kafka 消费者示例

Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。因此,下面列出了 Spring boot 的一些主要功能。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“启动器”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第三方库。
  • 提供可用于生产的功能,例如健康检查、指标和外部化配置。
  • 几乎不需要代码生成,也不需要 XML 配置。

Apache Kafka是一个发布-订阅消息系统。消息系统允许您在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义和进一步处理主题(主题可能是类别)的软件。应用程序可以连接到此系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您的个人博客上的任何事件,也可以是触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用来自 Kafka 主题的消息并使用 Spring Boot 将它们显示在控制台中,其中Kafka 是先决条件。 

例子:

先决条件:确保您已经在本地机器上安装了 Apache Kafka,因此您应该知道如何在 Windows 上安装和运行 Apache Kafka?

步骤 1:转到此链接并创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

步骤2:创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

  • Java

// Java Program to Illustrate Kafka Configuration

  

package com.amiya.kafka.apachekafkaconsumer.config;

  

// Importing required classes

import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

  

// Annotations

@EnableKafka

@Configuration

  

// Class

public class KafkaConfig {

  

    @Bean

    public ConsumerFactory<String, String> consumerFactory()

    {

  

        // Creating a Map of string-object pairs

        Map<String, Object> config = new HashMap<>();

  

        // Adding the Configuration

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                   "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG,

                   "group_id");

        config.put(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

        config.put(

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

  

        return new DefaultKafkaConsumerFactory<>(config);

    }

  

    // Creating a Listener

    public ConcurrentKafkaListenerContainerFactory

    concurrentKafkaListenerContainerFactory()

    {

        ConcurrentKafkaListenerContainerFactory<

            String, String> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        return factory;

    }

}

步骤 3:创建名为KafkaConsumer的消费者文件

  • Java

// Java Program to Illustrate Kafka Consumer

  

package com.amiya.kafka.apachekafkaconsumer.consumer;

  

// Importing required classes

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

  

@Component

  

// Class

public class KafkaConsumer {

  

    @KafkaListener(topics = "NewTopic",

                   groupId = "group_id")

  

    // Method

    public void

    consume(String message)

    {

        // Print statement

        System.out.println("message = " + message);

    }

}

步骤 4:现在我们必须做以下事情才能使用 Spring Boot 从 Kafka 主题消费消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

类似地,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka 主题发送消息

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

步骤 5:现在运行你的 Spring Boot 应用程序。确保已在application.properties文件中更改了端口号

server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序

输出:在输出中,您可以看到当您从 Kafka 主题发送消息时,它会实时显示在控制台上。 

http://www.lryc.cn/news/520968.html

相关文章:

  • 统计有序矩阵中的负数
  • 【6】Word:海名公司文秘❗
  • c语言 --- 字符串
  • LeetCode 热题 100_二叉树的最近公共祖先(49_236_中等_C++)(二叉树;深度优先搜索)
  • (三)c#中const、static、readonly的区别
  • 人工智能任务19-基于BERT、ELMO模型对诈骗信息文本进行识别与应用
  • 【C++】函数(下)
  • 一个使用 Golang 编写的新一代网络爬虫框架,支持JS动态内容爬取
  • 深入探讨 Vue.js 的动态组件渲染与性能优化
  • vulnhub靶场【IA系列】之Tornado
  • 简要认识JAVAWeb技术三剑客:HTMLCSSJavaScript
  • C# 修改项目类型 应用程序程序改类库
  • 卡通风格渲染
  • ubuntu各分区的用途
  • 理解STC15F2K60S2单片机的最小电路
  • Docker官网安装
  • 成功案例分享 — 芯科科技助力涂鸦智能打造Matter over Thread模块,简化Matter设备开发
  • 基于Python机器学习、深度学习技术提升气象、海洋、水文领域实践应用-以ENSO预测为例讲解
  • 【Rust自学】12.6. 使用TDD(测试驱动开发)开发库功能
  • 贪心算法汇总
  • H266/VVC 帧内预测中 ISP 技术
  • PyTorch 中的 Dropout 解析
  • 集中式架构vs分布式架构
  • 微服务主流框架和基础设施介绍
  • 4.5.1 顺序查找、折半查找(二分查找)
  • DDD - 微服务设计与领域驱动设计实战(上)_统一建模语言及事件风暴会议
  • 基于Piquasso的光量子计算机的模拟与编程
  • 44_Lua迭代器
  • 相机SD卡照片数据不小心全部删除了怎么办?有什么方法恢复吗?
  • RAG 测评基线