当前位置: 首页 > news >正文

SpringBoot使用kafka事务-消费者方

前言

在上一篇文章中,写到了如何在springboot中生产者如何使用kafka的事务,详情链接:Springboot使用kafka事务-生产者方

那么,这一篇就接着上篇所写的内容,讲解一下再springboot中消费者如何使用kafka的事务。

实现

在springboot中kafka的消费者配置也和生产者一样,有两种配置的方式:

  • 第一种是使用springboot提供的自定装配机制
  • 第二种是自定义配置

自动装配机制

在springboot的配置文件中加入以下代码即可实现

   spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: test_group #默认组id  后面会配置多个消费者组key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerisolation-level: read_committedenable-auto-commit: false #关闭自动提交auto-commit-interval: 100max-poll-records: 20 #批量消费 一次接收的最大数量

这样就实现了事务的自动状态,特别注意的是配置文件中的isolation-level属性,这个属性一定要设置读已提交的事务级别,这样才能配合生产者实现事务的特性。

使用

这种配置方式的使用就很简单了,
第一:新建一个管理类,类名上用Component注解标识为需要springboot管理

@Component
public class kafkaConfigs {
}

第二:使用springboot提供的KafkaListener注解,即可使用

    @KafkaListenerpublic void testListener(String data) {log.info("接受到的数据为: {} ",data);}

全部代码如下:

@Component
public class kafkaConfigs {@KafkaListenerpublic void testListener(String data) {log.info("接受到的数据为: {} ",data);}
}

缺点

自动装配机制是很方便的,但是在一些场景下,我们需要连接多个kafka的地址来实现不同的业务,而且有的场景之下我们并不需要kafka的事务管理机制,所以这就需要用到我们的第二种方法,自定义配置了。

自定义配置

这次,我们使用springboot为我们提供的KafkaListener注解来实现这个功能。
在yml配置文件中加入第二个kakfa的连接地址,并且将事务紫隔离级别去掉即可。

   spring:kafka:bootstrap-servers: localhost:9092bootstrap-servers-2: localhost2:9092consumer:group-id: test_group #默认组id  后面会配置多个消费者组key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: false #关闭自动提交auto-commit-interval: 100max-poll-records: 20 #批量消费 一次接收的最大数量

注意bootstrap-servers-2这个key,是我们自定义的key,它在kafka的自动配置包里面是没有的。

使用

自定义配置的使用和第一种使用方式大同小异,具体为:
第一:新建一个管理类,类名上用Component注解标识为需要springboot管理

@Component
public class kafkaConfigs {
}

第二:使用springboot提供的KafkaListener注解,并且在这里标识需要使用到的kafka连接地址以及事务隔离级别

    @KafkaListener(topics = "my-topics2" , groupId = "my-group2",properties = {"bootstrap.servers=${spring.kafka.bootstrap-servers-2}","isolation.level=read_committed"})public void testListener1(String data) {log.info("接受到的数据为: {} ",data);}

全代码如下:

@Component
public class kafkaConfigs {@KafkaListener(topics = "my-topics2" , groupId = "my-group2",properties = {"bootstrap.servers=${spring.kafka.bootstrap-servers-2}","isolation.level=read_committed"})public void testListener1(String data) {log.info("接受到的数据为: {} ",data);}
}

可以看到,我们使用了properties属性指定了需要连接的kafka地址,并且指定了事务的隔离级别,这样就实现了一个具有事务功能的消费者,并且对其他方法不产生任何影响。

总结

以上,我们使用两种方式配置springboot中kafka消费者如何使用事务,读者可以结合上篇文章结合食用,效果更佳!


上篇链接:Springboot使用kafka事务-生产者方

http://www.lryc.cn/news/153234.html

相关文章:

  • C# 实现PictureBox从指定的文件夹内进行翻页操作
  • Eureka 注册中心的使用
  • vue3 组件通信方式
  • 淘宝商品API使用示例:如何通过调用外部API来获取淘宝商品价格销量主图详情数据
  • RK3568-android11-适配ov13850摄像头
  • 基于Sider-chatgpt3.5-编写一个使用springboot2.5连接elasticsearch7的demo程序,包括基本的功能,用模板方法
  • nodejs中如何使用Redis
  • golang append坑
  • PaddleNLP使用Vicuna
  • jackson常用操作
  • ios ipa包上传需要什么工具
  • 科目1基础知识快速入门精简
  • 安卓逆向 - 某东app加密参数还原
  • Visual Studio(2022)生成链接过程的.map映射文件以及.map映射文件的内容说明
  • A. Gift Carpet
  • 技术科普:汽车开放系统架构AUTOSAR
  • 说说HTTP 和 HTTPS 有什么区别?
  • Pygame中Trivia游戏解析6-5
  • Java8新特性2——方法引用
  • Mac“其他文件”存放着什么?“其他文件”的清理方法
  • 46、TCP的“三次握手”
  • libudev 和 libusb 常见API分析
  • [dasctf]misc04
  • Scala的函数式编程与高阶函数,匿名函数,偏函数,函数的闭包、柯里化,抽象控制,懒加载等
  • Axure RP 8.1.0.3400(原型设计工具)
  • 企业微信、飞书、钉钉机器人消息发送工具类
  • 手撕 视觉slam14讲 ch7 / pose_estimation_3d2d.cpp (1)
  • Mac安装Dart时,Homebrew报错 Error: Failure while executing
  • SSM整合~
  • Self-supervised 3D Human Pose Estimation from a Single Image