当前位置: 首页 > news >正文

05_kafka-整合springboot

文章目录


  • kafka 整合 springboot

  • pom.xml

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
  • application.yml
spring:kafka:bootstrap-servers: kafka_1:9092,kafka_2:9092,kafka_3:9092consumer:auto-commit-interval: 100auto-offset-reset: earliestenable-auto-commit: truegroup-id: group1key-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:isolation:level: read_committedvalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:acks: allbatch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:enable:idempotence: trueretries: 5transaction-id-prefix: transaction-id-value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • KafkaBootApp
package cn.qww.boot;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
import org.springframework.messaging.handler.annotation.SendTo;import java.io.IOException;@SpringBootApplication@EnableKafka
public class KafkaBootApp {public static void main(String[] args) throws IOException {SpringApplication.run(KafkaBootApp.class, args);}@KafkaListeners(value = {@KafkaListener(topics = {"topic04"})})@SendTo(value = {"topic02"})public String listener(ConsumerRecord<?, ?> cr) {System.out.println("receive:" + cr);return cr.value() + " qww";}}
  • kafkaTemplate 使用,及 事务
package cn.qww.boot;import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;@SpringBootTest(classes = {KafkaBootApp.class})
@RunWith(SpringRunner.class)
public class KafkaTemplateTests {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 两种方式,注掉 application.yml 中   transaction-id-prefix: transaction-id-* 或者添加 @Transactional 注释*/@Test// @Transactionalpublic void sendTest() {kafkaTemplate.send(new ProducerRecord<>("topic01", "kafkaTemplate_key", "kafkaTemplate_value"));}/*** 注掉 application.yml 中 需要开启:  transaction-id-prefix: transaction-id- 配置;* 就是说和上边的方法相互冲突*/@Testpublic void testKafkaTemplateTx() {kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, String> kafkaOperations) {return kafkaOperations.send(new ProducerRecord<>("topic01", "OperationsCallback_key", "OperationsCallback_OperationsCallback"));}});}}
http://www.lryc.cn/news/344465.html

相关文章:

  • 论UML在学情精准测评系统中的应用
  • Day23 代码随想录打卡|字符串篇---重复的子字符串
  • 【win10 文件夹数量和看到不一致查看隐藏文件已经打开,Thumb文件作妖】
  • ctfshow web入门 sql注入 web224--web233
  • 「Java开发指南」如何用MyEclipse搭建GWT 2.1和Spring?(一)
  • python同时进行字符串的多种替换
  • 【Java基础题型】用筛法求之N内的素数(老题型)
  • Linux进程——Linux环境变量
  • SRM系统供应链库存协同提升企业服务水平
  • Windows安全加固-账号与口令管理
  • 【数据库原理及应用】期末复习汇总高校期末真题试卷03
  • 数据库加密数据模糊匹配查询技术方案
  • jsSPA应用如何实现动态内容更新
  • C++学习笔记——仿函数
  • python 中如何匹配字符串
  • Windows 系统运维常用命令
  • Springboot监听ConfigMap配置文件自动更新配置
  • API安全机制
  • 接口性能测试 —— Jmeter并发与持续性压测!
  • Windows+Linux的虚拟串口工具
  • Spring-AOP
  • 算法程序设计-快速排序
  • Jmeter用jdbc实现对数据库的操作
  • Mac 上安装多版本的 JDK 且实现 自由切换
  • springboot如何发送邮件,java如何发送邮件随机码作为验证
  • 使用QLoRA在自定义数据集上finetuning 大模型 LLAMA3 的数据比对分析
  • 编译和链接(超详细)
  • Rust Turbofish 的由来
  • 2.外卖点餐系统(Java项目 springboot)
  • Universal Thresholdizer:将多种密码学原语门限化