当前位置: 首页 > news >正文

Spring Kafka生产者实现

需求

我们需要通过Spring Kafka库,将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。

pom.xml

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

我这里不需要写版本号,应为我使用的Spring Boot。Spring Boot会自动帮我挑选spring-kafka应该使用哪个版本合适。

application.yml

spring:kafka:producer:# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092client-id: producer-dev# SASL_PLAINTEXT 接入方式security:protocol: SASL_PLAINTEXT# 反序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# SASL 采用 SCRAM-SHA-256 方式sasl:mechanism: SCRAM-SHA-256# jaas配置jaas:options:username: kafkauserpassword: kafkapwdenabled: truelogin-module: org.apache.kafka.common.security.scram.ScramLoginModulecontrol-flag: required

以上,是关于Spring Kafka的全部配置。下面摘要出来的配置,是可以单独配置在配置中心的:

topic:# 接收消息的主题配置save: hello_kafka_topic
spring:kafka:producer:client-id: producer-dev# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092# jaas配置jaas:options:username: kafkauserpassword: kafkapwd

Java

KafkaProducerService.java


public interface KafkaProducerService {/*** 转发消息到kafka*/void sendToKafka(String msg);}

KafkaProducerServiceImpl.java

import cn.com.xxx.service.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** 转发消息到kafka*/
@RefreshScope
@Slf4j
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** kafka接收消息的主题*/@Value("${topic.save}")private String topic;@Overridepublic void sendToKafka(String msg) {log.info(String.format("$$$$ => Producing message: %s", msg));ProducerRecord<String, String> recordKafka = new ProducerRecord<>(topic, msg);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(recordKafka);future.addCallback(new KafkaSendCallback<String, String>() {@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("成功发消息:{}给Kafka:{}", msg, result);}@Overridepublic void onFailure(KafkaProducerException ex) {log.error("发消息:{}给Kafka:{}", msg, recordKafka, ex);}});}
}

到这里为止Spring Kafka生产者所有配置就都可以了。这里使用的异步监听kafka回调的方式发送消息。

总结

这里使用Spring Kafka库异回调步给Kafka消息。这里使用的Spring Kafka库是老版本,所以,这里的使用的回调类是ListenableFuture类。

参考:

  • Spring for Apache Kafka2.8.3
  • Spring for Apache Kafka
http://www.lryc.cn/news/208033.html

相关文章:

  • 手把手教你入门Three.js(初识篇)
  • Hadoop学习总结(搭建Hadoop集群(伪分布式模式))
  • 人性与理性共赢,真心罐头跃过增长的山海关
  • 【Redis】Docker部署Redis数据库
  • 【目标跟踪】多目标跟踪测距
  • 吐血整理,服务端性能测试-Docker部署MySQL/Nginx(详细步骤)
  • 基于单片机设计的智能窗帘控制系统
  • WSL的秘钥被修改了要怎么弄
  • cesium开发引入方式
  • 无缝的链间互操作性:通用消息传递的强大之处
  • minio + linux + docker + spring boot实现文件上传与下载
  • vue ant DatePicker 日期选择器 限制日期可控范围
  • linux 音视频架构 linux音视频开发
  • el-table添加固定高度height后高度自适应
  • Python分享之多进程探索 (multiprocessing包)
  • Boris FX Mocha Pro 2023:Mac/win全能影像处理神器
  • elementUI 特定分辨率(如1920*1080)下el-row未超出一行却换行
  • mac电脑视频处理推荐:达芬奇DaVinci Resolve Studio 18 中文最新
  • OKLink携手CertiK在港举办Web3生态安全主题论坛
  • 王道p40 1.设计一个递归算法,删除不带头结点的单链表L中的所有值为x的结点(c语言代码实现)图解递归
  • 深入浅出排序算法之希尔排序
  • close excel by keyword 根据关键字关闭 excel 窗口 xlwings 方式实现
  • LIO-SAM算法解析
  • vscode 提升小程序开发效率的必备插件与工具
  • 第五章单元测试
  • 【JAVA基础】多线程与线程池
  • HCIA数据通信——交换机(Vlan间的通信与安全)
  • Linux shell编程学习笔记16:bash中的关联数组
  • 浏览器是怎么执行JS的?——消息队列与事件循环
  • IMU预积分的过程详解