当前位置: 首页 > news >正文

解决Flink读取kafka主题数据无报错无数据打印的重大发现(问题已解决)



        亦菲、彦祖们,今天使用idea开发的时候,运行flink程序(读取kafka主题数据)的时候,发现操作台什么数据都没有只有满屏红色日志输出,关键干嘛?一点报错都没有,一开始我觉得应该执行程序的姿势有问题,然后我重新执行了一次还是不行,我就一直等待,发现等了好久都没有数据来到,我就开始察觉不对了。

        下面是我排查的思路:

        1.kafka broker有没有数据:因为我是读取kafka主题数据,所以我屁颠屁颠的去kakfa查看我的消费主题是否有数据,查看没有问题!

        2.读取的主题是否出现问题:经过切换其他主题读取数据,发现也是没有数据出现在操作台,所以不是主题的问题

        3.查看flink与kafka 连接器的配置是否有问题:我就回去查看构建kafka连接器的builder是否问题,我尝试把偏移量改为从最早的偏移量开始读取,也是无动于衷呀!

        通过以上思路之后,我就彻底无语了,那到底是什么问题?

因为我是从flink连接kafka读取数据的,所以我觉得直接连接kafka读取主题数据试一试,这样就可以排除是不是flink有问题了,所以我就写了以下代码进行测试:

 // 配置 Kafka 消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  "hadoop101:10092,hadoop102:10092,hadoop103:10092"); // Kafka 集群地址props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Key 反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Value 反序列化器props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的偏移量开始读取// 创建消费者、KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("AllData_topic_ods"));//拉取超时时间ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(6000));for (ConsumerRecord<String, String> record : poll) {System.out.printf("offset = %d,key= %s.value=%s%n",record.offset(),record.key(),record.value());}consumer.close();

         一开始当拉取超时时间为100ms的时候,我也是消费不到数据的,但是我就想是不是我拉取超时时间太短了,因为我网络io和电脑性能匹配不上的话,它拉取时间是需要进行网络io的。所以我尝试修改拉取时间为6000ms,就是6s啦!
        然后突然就消费到数据了,我了个豆,搞定了我感觉我已经!

        100ms

        6000ms 

        于是我就回去把我那个builder的参数也修改了,一执行flink程序,这次不负众望,成功消费到数据了!!!

return KafkaSource.<String>builder().setProperty("max.poll.interval.ms","10000") // 设置拉取超时时间为10s.setProperty("partition.discovery.interval.ms", "10000").setProperty("commit.offsets.on.checkpoint", "true").setProperty("isolation.level", "read_committed")//read_committed 只会读取事务型成功提交事务写入的消息;  read_uncommitted 默认值,能够读取到 Kafka 写入的任何消息.setBootstrapServers(bootstrapServers).setTopics(topicName).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));

        亦菲、彦祖们,搞定了!如果不是这个问题的话,也参照我上面的排查思路看看是哪里出现了问题!我能解决也是一个一个排查到,给点耐心。 

        如果帮到你,恭喜呀!如果解决不了,那当我没说,你去看别人的文章吧! 



 感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!! 

关注我下一篇不迷路哦!

http://www.lryc.cn/news/494254.html

相关文章:

  • python自动化测开面试题汇总(持续更新)
  • 1-1 Gerrit实用指南
  • docker如何安装redis
  • 省级新质生产力数据(蔡湘杰版本)2012-2022年
  • 【游资悟道】-作手新一悟道心法
  • Diffusion中的Unet (DIMP)
  • 编译以前项目更改在x64下面时报错:函数“PVOID GetCurrentFiber(void)”已有主体
  • 【AIGC】大模型面试高频考点-数据清洗篇
  • 当测试时间与测试资源有限时,你会如何优化测试策略?
  • 基于R语言森林生态系统结构、功能与稳定性分析与可视化
  • 如何使用 Python 实现插件式架构
  • 【北京迅为】iTOP-4412全能版使用手册-第二十章 搭建和测试NFS服务器
  • 【纯原生js】原生实现h5落地页面中的单选组件按钮及功能
  • 深入浅出:开发者如何快速上手Web3生态系统
  • 通过深度点图表示的隐式场实现肺树结构的高效解剖标注文献速递-生成式模型与transformer在医学影像中的应用
  • 数据结构 (17)广义表
  • 论文笔记 SliceGPT: Compress Large Language Models By Deleting Rows And Columns
  • 前端工具的选择和安装
  • Fantasy中定时器得驱动原理
  • 【反转链表】力扣 445. 两数相加 II
  • SpringBoot 项目中使用 spring-boot-starter-amqp 依赖实现 RabbitMQ
  • Uniapp 安装安卓、IOS模拟器并调试
  • JavaScript 中的原型和原型链
  • 数组变换(两倍)
  • GBN协议、SR协议
  • 三维扫描检测仪3d扫描测量尺寸-自动蓝光测量
  • 大模型翻译能力评测
  • MySQL隐式转换造成索引失效
  • SuperMap Objects组件式GIS开发技术浅析
  • 多组数输入a+b:JAVA