当前位置: 首页 > news >正文

Spring Boot 配置Kafka

1 Kafka

        Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

2 Maven依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

3 Spring Boot配置

spring:kafka:bootstrap-servers: localhost:9092producer:batch-size: 16384buffer-memory: 67108864acks: 1compression-type: lz4key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:enable-auto-commit: trueauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringSerializervalue-deserializer: org.apache.kafka.common.serialization.StringSerializer

4 生产者配置

4.1 KafkaProducerConfig

        生产者的相关配置,指定kafka的地址,消息序列化器。

        topic的分区数、副本数。

package com.xudongbase.kafka.producer;import com.xudongbase.kafka.constant.KafkaTopicConstant;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@Configuration
public class KafkaProducerConfig {@Value(value = "${spring.kafka.bootstrap-servers:}")private String bootstrapAddress;/*** 分区(分区数需要慎重设置,一般分区数为消费者的倍数,要不然在消费高峰时刻会出现消费速度不一样的情况)*/private static final int NUM_PARTITIONS = 5;/*** 副本*/private static final short REPLICATION_FACTOR = (short) 2;
http://www.lryc.cn/news/508075.html

相关文章:

  • 基于单片机的火灾报警器 (论文+源码)
  • 分析excel硕士序列数据提示词——包含对特征的筛选,非0值的过滤
  • MongoDB 更新文档
  • 分布式协同 - 分布式事务_TCC解决方案
  • MFC/C++学习系列之简单记录13
  • PostgreSQL表达式的类型
  • 速通Python 第四节——函数
  • 如何在Windows系统上安装和配置Maven
  • STM32之GPIO输出与输出
  • linux定时器操作
  • 重拾设计模式--观察者模式
  • Vue.js前端框架教程7:Vue计算属性和moment.js
  • 【游戏设计原理】22 - 石头剪刀布
  • 3-Gin 渲染 --[Gin 框架入门精讲与实战案例]
  • python小课堂(一)
  • GESP202309 二级【小杨的 X 字矩阵】题解(AC)
  • @PostConstruct注解解释!!!!
  • laya游戏引擎中打包之后图片模糊
  • 【数据结构练习题】链表与LinkedList
  • [项目代码] YOLOv8 遥感航拍飞机和船舶识别 [目标检测]
  • 移动魔百盒中的 OpenWrt作为旁路由 安装Tailscale并配置子网路由实现在外面通过家里的局域网ip访问内网设备
  • JVM对象分配内存如何保证线程安全?
  • ArcGIS计算土地转移矩阵
  • 数据库 MYSQL的概念
  • Node.js后端程序打包问题汇总(webpack、rsbuild、fastify、knex、objection、sqlite3、svg-captcha)
  • 部署 Apache Samza 和 Apache Kafka
  • xiaomiR4c openwrt
  • leetcode-128.最长连续序列-day14
  • 梳理你的思路(从OOP到架构设计)_简介设计模式
  • JAVA前端开发中type=“danger“和 type=“text“的区别