当前位置: 首页 > news >正文

java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志

pom 依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version>
</dependency>

 或者

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.1</version>
</dependency>

 ps:前面的 spring-kafka 依赖中已经包含了后面的 kafka-clients

KafkaConsumerDemo.java:


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;public class KafkaConsumerDemo {static Map<String,Object> properties = new HashMap<String,Object>();private static KafkaConsumer kafkaConsumer = null;/**** windows 环境需要将下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:*      xxx.xxx.xxx.xxx1 xxx-data01*      xxx.xxx.xxx.xxx2 xxx-data02*      xxx.xxx.xxx.xxx3 xxx-data03*      xxx.xxx.xxx.xxx4 xxx-data04*      xxx.xxx.xxx.xxx5 xxx-data05*      xxx.xxx.xxx.xxx6 xxx-data06*      xxx.xxx.xxx.xxx7 xxx-data07*      xxx.xxx.xxx.xxx8 xxx-data08* @param args*/public static void main(String[] args) {// 禁止控制台输出一些 org.apache.kafka.xxx 相关的日志LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192");  // 指定 Brokerproperties.put("group.id", "11111111111111111111111");              // 指定消费组群 ID,为防止自己启动拉取消息导致其他生产环境的消费者无法消费该消息,请设置一个绝对不重复的值,以起到隔离的作用properties.put("max.poll.records", "1000");// todo 设置可批量拉取???properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象kafkaConsumer = new KafkaConsumer<String, String>(properties);// List<String> topics = queryAllTopics( consumer );kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) );  // 订阅主题 order-eventsnew Thread(new Runnable() {@Overridepublic void run() {receiveMessage();}}).start();}/*** 查询全部的主题(topic)列表* @param kafkaConsumer* @return*/private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {if( kafkaConsumer == null ){return null;}Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();if( map == null ){return null;}return new ArrayList<String>( map.keySet() );}public static void receiveMessage() {try {while ( true ){synchronized (KafkaConsumerDemo.class) {// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 30L 表示超时时间为 30秒,有消息立即返回,没消息最多等 30 秒后返回SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));String date = sdf.format(new Date());if( records == null ){System.out.println( date + " 本次未拉取到任何消息" );}else {System.out.println( date + " 本次拉取到 " + records.count() + " 条消息" );int i = 1;for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println( "第" + i + "条消息:" + info );i++;}kafkaConsumer.commitSync();}/*** 当你用 KafkaConsumer从Kafka里读取消息并且处理完后,commitSync 方法会帮你把这些消息的处理进度(也就是偏移量 offset )同步地告诉 Kafka 服务器。* 这样,Kafka 就知道你已经处理到哪儿了。如果消费者(也就是读取消息的程序)突然崩溃或者重启,Kafka 就能根据最后一次提交的偏移量,让你从上一次处理* 完的地方继续开始,而不会漏掉或者重复处理消息。* 简单来说,commitSync 方 法就是用来“保存进度”的,确保消息处理的可靠性和顺序性。*/// Thread.sleep( 5000L );}}} catch (Exception e){e.printStackTrace();} finally {kafkaConsumer.close();}}
}

http://www.lryc.cn/news/519981.html

相关文章:

  • 【后端面试总结】Golang可能的内存泄漏场景及应对策略
  • Java 反射机制详解
  • 【k8s】scc权限 restricted、anyuid、privileged
  • 2025华数杯国际赛A题完整论文讲解(含每一问python代码+数据+可视化图)
  • ThreadLocal 的使用场景
  • 后端开发 Springboot整合Redis Spring Data Redis 模板
  • 代码随想录算法训练营第 4 天(链表 2)| 24. 两两交换链表中的节点19.删除链表的倒数第N个节点 -
  • 【RDMA学习笔记】1:RDMA(Remote Direct Memory Access)介绍
  • 网络安全常见的35个安全框架及模型
  • Elasticsearch介绍及使用
  • Leetocde516. 最长回文子序列 动态规划
  • iOS 逆向学习 - Inter-Process Communication:进程间通信
  • 高级生化大纲
  • YARN WebUI 服务
  • 【Unity3D】利用IJob、Burst优化处理切割物体
  • 【大前端】Vue3 工程化项目使用详解
  • 基于文件系统分布式锁原理
  • 简历整理YH
  • Kotlin 协程基础三 —— 结构化并发(二)
  • 微信小程序实现长按录音,点击播放等功能,CSS实现语音录制动画效果
  • 校园跑腿小程序---轮播图,导航栏开发
  • 详细全面讲解C++中重载、隐藏、覆盖的区别
  • 一文读懂单片机的串口
  • HTML5 网站模板
  • mybatis分页插件:PageHelper、mybatis-plus-jsqlparser(解决SQL_SERVER2005连接分页查询OFFSET问题)
  • uniapp中rpx和upx的区别
  • 什么是卷积网络中的平移不变性?平移shft在数据增强中的意义
  • java.net.SocketException: Connection reset 异常原因分析和解决方法
  • Maven 仓库的分类
  • 隧道网络:为数据传输开辟安全通道