当前位置: 首页 > news >正文

Spring-Kafka笔记整理

  1. 引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. 配置application.properties
    spring.kafka.bootstrap-servers=192.168.99.51:9092
    
  3. 编写kafka的配置类
    @Configuration
    public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();// 并发数就是一个消费者实例起几个线程factory.setConcurrency(3);factory.setConsumerFactory(consumerFactory());return factory;}
    }
    
  4. Kafka消息监听
    @Component
    public class KafkaConsumer {@Autowiredprivate ObjectMapper mapper;@KafkaListener(topics = {"hello-kafka-topic"},groupId = "hello-kafka-group",containerFactory = "kafkaListenerContainerFactory")public void listener01(ConsumerRecord<String, String> record) throws Exception {String key = record.key();String value = record.value();HelloMessage kafkaMessage = mapper.readValue(value, HelloMessage.class);log.info("in listener consume kafka message: [{}], [{}]", key, mapper.writeValueAsString(kafkaMessage));}
    }
    
  5. Kafka消息发送
    @Component
    public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String key, String value, String topic) {if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {throw new IllegalArgumentException("value or topic is null or empty");}ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);// 异步回调的方式获取通知future.addCallback(success -> {assert null != success && null != success.getRecordMetadata();// 发送到 kafka 的 topicString _topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的 offsetlong offset = success.getRecordMetadata().offset();log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);}, failure -> {log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);});}
    }
    
http://www.lryc.cn/news/319067.html

相关文章:

  • 已解决org.apache.hadoop.hdfs.protocol.QuotaExceededException异常的正确解决方法,亲测有效!!!
  • GitHub打不开的解决方案(超简单)
  • Unity开发一个FPS游戏之二
  • STM32F103 CubeMX 使用USB生成鼠标设备
  • HJXH-E1/U静态信号继电器 面板安装 辅助电源220VDC 启动电压220VDC JOSEF约瑟
  • SpringBoot3下Kafka分组均衡消费实现
  • 鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:GridItem)
  • Qt 使用RAW INPUT获取HID触摸屏,笔设备,鼠标的原始数据,最低受支持的客户端:Windows XP [仅限桌面应用]
  • easyexcel导出excel文件到s3服务器
  • xss.haozi.me靶场“0x0B-0x12”通关教程
  • linux--redhat系统Yum源配置
  • el-Switch 开关二次确认
  • (二)丶RabbitMQ的六大核心
  • 微信小程序实现上下手势滑动切换
  • 详解命令docker run -d --name container_name -e TZ=Asia/Shanghai your_image
  • javaEE7
  • int与integer的区别
  • Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)
  • 音乐播放器-C#实现
  • 如何本地搭建hMailServer邮件服务
  • 裸机编程的几种模式、架构与缺陷。
  • TSINGSEE青犀视频AI方案:数据+算力+算法,人工智能的三大基石
  • Linux认识与学习BASH
  • Python JSON 序列化以及反序列化 文件读写
  • Spring MVC 返回JSON数据
  • 前端基础——HTML傻瓜式入门(1)
  • 【AI】如何创建自己的自定义ChatGPT
  • 电子科技大学链时代工作室招新题C语言部分---题号E
  • K8S CNI
  • Python数据分析实验一:Python数据采集与存储