当前位置: 首页 > news >正文

Flink消费kafka出现空指针异常

文章目录

      • 出现场景:
      • 表现:
      • 问题:
      • 解决:

tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.

出现场景:

双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。

表现:

在这里插入图片描述

问题:

此时消费该topic的flink程序会出现,空指针异常

DataStream Api会出现,Table Api 未发现

解决:

自定义kafka反序列化器过滤Null值,flink1.14.4
代码:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("").setTopics("test").setGroupId("gid").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new MySimpleStringSchema()).setProperty("auto.offset.commit", "false").build();DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");kfkDs.print();env.execute();}// 自定义反序列化器static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{@Overridepublic String deserialize(byte[] message) {if (message != null) return new String(message, StandardCharsets.UTF_8);else{return deserialize(new byte[1]); // 返回空 不是Null}}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic byte[] serialize(String element) {return element.getBytes(StandardCharsets.UTF_8);}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}
http://www.lryc.cn/news/94443.html

相关文章:

  • 【探索 Kubernetes|作业管理篇 系列 9】Pod 的服务对象
  • 多种拖拽= =自用留档
  • 贝叶斯与认知——读《贝叶斯的博弈》有感
  • MySQL安装失败starting the sever
  • 合并文件夹中所有文件,并输出重复的条形码值
  • P3089 [USACO13NOV] Pogo-Cow S 弹簧踩高跷
  • 计算机网络 - 第一章(下)
  • 【Uniapp】小程序携带Token请求接口+无感知登录方案2.0
  • Ubuntu常用命令
  • ERP重构-SLA子分类账-分布式实现方案
  • IP路由协议(RIP、IGRP、OSPF、IS-IS、BGP)
  • 互斥锁、自旋锁、读写锁、悲观锁、乐观锁的应用场景
  • Python WSGI 与 Web 开发框架
  • [洛谷]P6464 [传智杯 #2 决赛] 传送门
  • Http协议和RestTemplate协议有什么区别?
  • 基于SpringBoot+微信小程序的医院预约叫号小程序
  • springboot整合RabbitMQ 消费端处理数据
  • 计算机中CPU、内存、缓存的关系
  • 【Linux实验】构造一个简单的 shell
  • 【电路原理学习笔记】第2章:电压、电流和电阻:2.6 电路
  • 基于深度学习的人脸检测技术
  • 【linux kernel】一文总结linux内核通知链
  • kafka入门,Kafka 副本(十三)
  • 利用PPT制作简单的矢量图
  • 18-Linux 常用命令
  • 2024考研408-计算机组成原理第六章-总线学习笔记
  • uni_app 微信小程序 苹果手机 边框显示不全
  • vue 访问第三方 跨域, 配置vue.config.js
  • 使用gradio库的File模块实现文件上传和展示
  • 网络安全进阶学习第四课——SSRF服务器请求伪造