当前位置: 首页 > news >正文

SpringBoot+Kafka

文章目录

  • 一、依赖
  • 二、配置文件
  • 三、API
    • 1、生产者
    • 2、消费者


一、依赖

<!-- spring-kafka(与kafka的版本一致) -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.1.RELEASE</version>
</dependency>

二、配置文件

spring:kafka:# kafka地址,集群用逗号分隔(localhost:9092,localhost:9093)。缺省:localhost:9092bootstrap-servers: localhost:9092# 生产者#producer:# key的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer#key-serializer: org.apache.kafka.common.serialization.StringSerializer# value的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer#value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者consumer:# 消费者组group-id: testGroup# 自动偏移量# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# key的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer#key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# value的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer#value-deserializer: org.apache.kafka.common.serialization.StringDeserializer#listener:# SINGLE-单个消费;BATCH-批量消费。缺省SINGLE#type: BATCH# 消费者监听的主题不存在时,启动项目是否报错。缺省:false#missing-topics-fatal: false

三、API

1、生产者

/*** 生产消息** @author kimi* @date 2023/2/18*/
@Component
public class ProducerMsg {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 生产消息** @param msg*/public void send(String topic, String msg) {kafkaTemplate.send(topic, msg);}/*** 生产消息+回调** @param topic* @param msg*/public void sendCallback(String topic, String msg) {kafkaTemplate.send(topic, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {//成功的回调@Overridepublic void onSuccess(SendResult<String, String> stringStringSendResult) {RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();//主题final String topic = recordMetadata.topic();//分区final int partition = recordMetadata.partition();//偏移量final long offset = recordMetadata.offset();System.err.println(String.format("生产消息成功:topic: %s,partition: %s,offset: %s", topic, partition, offset));}//失败的回调@Overridepublic void onFailure(Throwable throwable) {}});}}

2、消费者

/*** 消费者** @author kimi* @date 2023/2/18*/
@Component
public class ConsumeMsg {/*** 单个消费** @param consumer*/@KafkaListener(topics = {"USER", "LOG"})public void consumeSingle(ConsumerRecord<String, String> consumer) {System.err.println("监听到kafka消息: " + consumer);final String topic = consumer.topic();final String value = consumer.value();}/*** 批量消费* 需将配置文件中的listener.type设置成BATCH** @param consumers*///@KafkaListener(topics = {"USER", "LOG"})public void consumeBatch(List<ConsumerRecord<String, String>> consumers) {consumers.forEach(consumer -> {final String topic = consumer.topic();final String value = consumer.value();System.err.println(String.format("topic: %s,value: %s", topic, value));});}}
http://www.lryc.cn/news/302280.html

相关文章:

  • 世界顶级名校计算机专业,都在用哪些书当教材?(文末送书)
  • 蓝桥杯刷题--python-8(2023 填空题)
  • Eclipse - Reset Perspective
  • 1.5v的电池电压低于多少v等于没电
  • LabVIEW智能监测系统
  • 代码随想录刷题第34天
  • AMD FPGA设计优化宝典笔记(5)低频全局复位与高扇出
  • 14. Qt 程序菜单实现,基于QMainWindow
  • 如何利用SpringSecurity进行认证与授权
  • 如何简单上手清华AutoGPT并搭建到本地环境
  • 【漏洞复现-通达OA】通达OA share存在前台SQL注入漏洞
  • HTML5 Canvas与JavaScript携手绘制动态星空背景
  • 如何优雅地与ChatGPT对话?
  • AI提示工程实战:从零开始利用提示工程学习应用大语言模型【文末送书-19】
  • 量子算法入门——3.狄拉克符号与量子态(3)
  • c++ STL系列——(三)list
  • 软考29-上午题-排序
  • 【详细流程】vue+Element UI项目中使用echarts绘制圆环图 折线图 饼图 柱状图
  • Unity之XR Interaction Toolkit如何在VR中实现一个可以拖拽的UI
  • 开源项目热度榜单
  • Ubuntu系统搭建HadSky论坛并结合内网穿透实现无公网ip远程访问
  • gowin GW1N4 LED
  • Linux ipvlan详解(l2、l3、l3s和bridge、private和vepa模式)
  • 理解并实现OpenCV中的图像平滑技术
  • ChatGPT高效提问—prompt实践(白领助手)
  • Code Composer Studio (CCS) - Comment (注释)
  • springboot/ssm校园菜鸟驿站管理系统Java校园快递取件管理系统
  • 【Mybatis】TypeHandler使用
  • [计算机网络]---网络编程套接字
  • 分布式文件系统 SpringBoot+FastDFS+Vue.js【二】