当前位置: 首页 > news >正文

【kafka实战】03 SpringBoot使用kafka生产者和消费者示例

本节主要介绍用SpringBoot进行开发时,使用kafka进行生产和消费

一、引入依赖

<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency>
</dependencies>

二、添加topic

使用offset explore软件,添加一个topic
在这里插入图片描述

三、配置文件

server:port: 8080
spring:kafka:consumer:# Kafka服务器bootstrap-servers: 192.168.56.201:9092# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 100properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000producer:# Kafka服务器bootstrap-servers: 192.168.56.201:9092# 发生错误后,消息重发的次数,开启事务必须设置大于0。retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer

四、生产者代码

@Slf4j
@RestController
@RequestMapping("/msg")
public class SendMessageController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("send")public String send() {for (int i = 0; i < 100; i++) {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId(String.valueOf(i));orderInfo.setProductName("华为P60:" + i);kafkaTemplate.send("order.topic", "order:" + i, new Gson().toJson(orderInfo));}return "success";}
}

五、消费者代码

消费者代码采用手动ack的方式

@Slf4j
@Component
public class KafkaOrderConsumer {@KafkaListener(topics = "order.topic", groupId = "orderGroup")public void consumeOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {log.info("消费订单消息key={},value={}", record.key(), record.value());ack.acknowledge();}
}

代码git仓库:https://gitee.com/syk1234/mqdmo

http://www.lryc.cn/news/173182.html

相关文章:

  • Only file and data URLs are supported by the default ESM loader
  • LeetCode01
  • 计算机网络高频面试题集锦
  • Linux启动过程详解 Xmind导图笔记
  • Qt5开发及实例V2.0-第十七章-Qt版MyWord字处理软件
  • 机器视觉工程师们,常回家看看
  • 网络隔离下实现的文件传输,现有的方式真的安全吗?
  • [医学图像知识]CT图和PET图的成像表现形式
  • 聊聊wireshark的进阶使用功能 | 京东云技术团队
  • 小米手机安装面具教程(Xiaomi手机获取root权限)
  • DSU ON TREE
  • Java“对象”
  • vuepress+gitee免费搭建个人在线博客(无保留版)
  • Android 12.0 系统限制上网系列之iptables用IOemNetd实现app上网白名单的功能实现
  • Idea和DataGrip自定义常用代码模板,熟练使用快捷模板可促进开发效率
  • Vue.js :实现嵌套对话框的查看按钮
  • 9.2.4 【MySQL】段的结构
  • 怎么快速提取图片中的文字信息?怎么使用OCR图片文字提取一键提取文字
  • Selenium隐藏浏览器特征
  • Linux下的buff/cache
  • 3.wifi开发,网络编程
  • Android框架mqtt库无法兼容高版本android13的问题
  • 一招解除csdn复制限制
  • 安全基础 --- nodejs沙箱逃逸
  • Redis集群架构搭建——主从、哨兵、集群
  • 39 | selenium基础架构,UI测试架构
  • 2023研究生数学建模E题保姆级思路 出血性脑卒中临床智能诊疗
  • 画电路板通用知识
  • 三相组合式过电压保护器试验
  • C++提高编程:01 模板