当前位置: 首页 > news >正文

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器(只消费含"sister"的消息)

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;import java.util.*;public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();Set<TopicPartition> partitionSet = records.partitions();for(TopicPartition topicPartition: partitionSet){List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();for(ConsumerRecord<String,String> record: partitionRecordList){if(record.value().contains("sister")){newPartitionRecordList.add(record);}}finalResult.put(topicPartition,newPartitionRecordList);}return new ConsumerRecords<>(finalResult);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp,meta) -> {System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset());});}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

二、定义消费者,配置消费者拦截器

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerInterceptorTest  {public static void main(String[] args) {String topic="testTopic2";String server="xx.xx.xx.xx:9092";Properties properties=new Properties();properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));for(ConsumerRecord consumerRecord: records){System.out.println(consumerRecord.value());}//myConsumer.commitSync();}}
}
http://www.lryc.cn/news/138627.html

相关文章:

  • 水库大坝安全监测的主要内容包括哪些?
  • Cadence软件屏幕显示问题
  • 访问服务器快慢的因素
  • vue(element ui安装)
  • 基于FPGA视频接口之HDMI2.0编/解码
  • Codeforces Round #894 (Div.3)
  • MyBatid动态语句且模糊查询
  • JVM——垃圾回收器G1+垃圾回收调优
  • 【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析
  • iptables的使用规则
  • JS 动画 vs CSS 动画:究竟有何不同?
  • 供应链 | 大数据报童模型:基于机器学习的实践见解
  • Java开发工作问题整理与记录
  • 静态代码扫描持续构建(Jenkins)
  • Git gui教程---汇总篇
  • flink sql checkpoint 调优配置
  • Linux 网络文件共享介绍
  • Qt中如何在qml文件中使用其他的qml文件并创建对象
  • 学习心得04:CUDA
  • OpenCV实现摄像头图像分类(Python版)
  • 计算机竞赛 地铁大数据客流分析系统 设计与实现
  • sonarqube报错http status 500-internal server error,什么原因,怎么解决
  • 工业设计的四个主要阶段,你都知道吗?优漫动游
  • 【DevOps视频笔记】4.Build 阶段 - Maven安装配置
  • linux非GUI模式执行带有jpgc线程组jmeter脚本报错
  • mysql处理json格式的字段,一文搞懂mysql解析json数据
  • 测试数据生成
  • 网安周报|国防承包商Belcan泄露了带有漏洞列表的管理员密码
  • Vue3语法系统进阶 - 全面掌握Vue3特性
  • 第9天----【位运算进阶之----按位取反(~)】(附补码,原码讲解)