当前位置: 首页 > news >正文

Spring Cloud Stream Kafka(3.2.2版本)使用

问题

正在尝试只用Spring Cloud Stream Kafka。

步骤

配置

spring:cloud:function:definition: project2Building    stream:kafka:binder:brokers: xxxx:9002configuration:enable.auto.commit: falsesession.timeout.ms: 30000max.poll.records: 30allow.auto.create.topics: falseauto.offset.reset: earliest# 反序列化配置key.serializer: org.apache.kafka.common.serialization.StringDeserializervalue.deserializer: org.apache.kafka.common.serialization.StringDeserializer# JAAS配置security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";"autoCreateTopics: falsebindings:# 自定义消费bean的方法名称project2Building-in-0:# 消费组group: xxxx# 主题destination: xxxx

消费方法

package xxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;import java.util.function.Consumer;/*** 与xxxx的kafka对接处理* @author zyl*/
@Slf4j
@Configuration
public class MainConfig {@Beanpublic Consumer<Message<String>> project2Building(){return msg ->{log.info(String.format("Kafka消息:%s",msg.getPayload()));// TODO 手动提交kafkaAcknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);if (acknowledgment != null) {acknowledgment.acknowledge();}};}}

总结

我用的这个版本Spring Cloud Stream Kafka(3.2.2版本)相对于Spring Boot 的Kafka库有点重量级了。这就是Spring Cloud 基于Kafka的流处理框架。

参考:

  • Spring Cloud Stream Kafka Binder Reference Guide
  • Kafka With Spring Cloud Streams Using Function-based Mode
  • Spring Cloud Stream - functional and reactive
http://www.lryc.cn/news/174620.html

相关文章:

  • 8位微控制器上的轻量级SM2加密算法实现:C语言详细指南与完整代码解析
  • neo4j下载安装配置步骤
  • 【机组】计算机系统组成课程笔记 第二章 计算机中的信息表示
  • 指针笔试题详解
  • MySQL 日志管理、备份与恢复
  • vtk- 数据类型(一) 三角链实例代码
  • Git大全
  • Touch命令使用指南:创建、更新和修改文件时间戳
  • Windows开启 10 Telnet
  • 高教杯数学建模A题程序设计要点与思路
  • Spring Boot的新篇章:探索2.0版的创新功能
  • 5、SpringBoot_热部署
  • 【kohya】训练自己的LoRA模型
  • [尚硅谷React笔记]——第1章 React简介
  • Debezium系列之:快照参数详解
  • redis单机版搭建
  • 物联网边缘网关
  • docker部署springboot程序时遇到的network问题
  • RASP hook插桩原理解析
  • Pygame中Sprite的使用方法6-5
  • 浅谈为什么多态只能是指针或引用
  • js看代码说输出
  • Java笔记:使用javassist修改class文件内方法
  • 华为云云耀云服务器L实例评测 |云服务器性能评测
  • iphone的safari浏览器实现全屏的pwa模式,并修改顶部状态栏背景颜色
  • springboot对接rabbitmq并且实现动态创建队列和消费
  • Spring的后处理器-BeanFactoryPostprocessor
  • Flutter 必备知识点
  • 什么是FMEA(失效模式和影响分析)?
  • Redis面试题(三)