当前位置: 首页 > news >正文

kafka(五)spring-kafka(1)集成方法

一、集成

1、pom依赖
 <!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

版本号对应关系可以查看官网 

2、配置文件

基础配置:以下是必须的配置

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

其他配置参考文档

二、使用

1、生产者

使用 KafkaTemplate发送消息。如

package com.example.service.impl;import com.alibaba.fastjson.JSON;
import com.example.dto.UserDTO;
import com.example.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service("userService")
@Slf4j
public class UserServiceImpl implements UserService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Value("${user.topic}")private String userTopic;@Overridepublic void sendUserMsg(UserDTO userDTO) {String msg = JSON.toJSONString(userDTO);ProducerRecord producerRecord = new ProducerRecord(userTopic,msg);kafkaTemplate.send(producerRecord);log.info("user消息发送成功");}
}
2、消费者

使用 @KafkaListener 注解。如

package com.example.listen;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;@Component
@Slf4j
public class SchoolConsumer {@KafkaListener(topics = "${school.topic}", groupId = "${school.group.id}")public void consumer(ConsumerRecord<?, ?> record) {try {Object message = record.value();if (message != null) {String msg = String.valueOf(message);log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());}} catch (Exception e) {log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());} finally {//ack.acknowledge();}}
}

http://www.lryc.cn/news/379941.html

相关文章:

  • Java中的设计模式深度解析
  • 鸿蒙 HarmonyOS NEXT星河版APP应用开发—上篇
  • [FreeRTOS 基础知识] 互斥访问与回环队列 概念
  • 音视频的Buffer处理
  • 【总结】攻击 AI 模型的方法
  • Linux配置中文环境
  • 深入解析 iOS 应用启动过程:main() 函数前的四大步骤
  • textarea标签改写为富文本框编辑器KindEditor
  • 高通安卓12-Input子系统
  • HTML 事件
  • Mysql 官方提供的公共测试数据集 Example Databases
  • Docker 下载与安装以及配置
  • Java中的集合框架详解:List、Set、Map的使用场景
  • [Django学习]前端+后端两种方式处理图片流数据
  • 如何配置IOMMU或者SWIOTLB
  • 【大数据 复习】第3章 分布式文件系统HDFS(重中之重)
  • element-ui里message抖动问题
  • Attention系列总结-粘贴自知乎
  • swagger下载文件名中文乱码、swagger导出文件名乱码、swagger文件导出名称乱码、解决swagger中文下载乱码bug
  • 191.回溯算法:组合总和|||(力扣)
  • JupyterLab使用指南(二):JupyterLab基础
  • ubuntu18.04 + openssl + engine + pkcs11+ softhsm2 双向认证测试
  • 【C++】类和对象2.0
  • 【LLM之KG】KoPA论文阅读笔记
  • UI设计速成课:理解模态窗口与非模态窗口的区别
  • 【Linux】基础IO_4
  • C++模板类原理讲解
  • scratch编程03-反弹球
  • postgresql数据库进阶知识
  • 关于HTTP劫持,该如何理解、防范和应对