当前位置: 首页 > news >正文

flink写入到kafka 大坑解析。

1.kafka能不能发送null消息?

   能!

2 flink能不能发送null消息到kafka?

不能!

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>( "cc_test",new SimpleStringSchema(),properties);env.fromCollection(Lists.newArrayList("111", "222", "333")).map(s->{return s.equals("222")?null:s;}).addSink(flinkKafkaProducer);env.execute("ContractLabelJob");
}

 

 

这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。

这种问题会造成什么后果?

flink直接挂掉。

如果我们采取了失败重试机制会怎样?

env.setRestartStrategy(  RestartStrategies.fixedDelayRestart(3, Time.seconds(5))  );

数据重复或者丢失。

还有此时kafka的offset由flink在管理, 消费的offset 一直没有被commit,所以一直重复消费。

来个demo

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(6000);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>( "cc_test2",new SimpleStringSchema(),properties);KafkaSource<String> source = KafkaSource.<String>builder().setTopics("cc_test").setGroupId("cc_test1234").setBootstrapServers("9.135.68.201:9092").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.earliest()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");
//        stringDataStreamSource.print("kafka msg");stringDataStreamSource.addSink(sink);env.execute("test");

 从topic cc_test消费 然后写到cc_test2里面去

cc_test里的数据

 cc_test_2里写入的数据

 可以看到一个null 报错了,然后它分区的333就会一直被提交。

总之大家小心这个问题。

不加检查点 flink报错后就会直接停掉。。

加了检查点env.enableCheckpointing(6000); flink失败后会一直重试

加了重试机制 env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5000, TimeUnit.SECONDS),Time.of(5000,TimeUnit.SECONDS))); 失败的任务只会重试几次。

还是得熟悉源码呀。

http://www.lryc.cn/news/99635.html

相关文章:

  • MATLAB算法实战应用案例精讲-【深度学习】预训练模型-Subword
  • 【HarmonyOS】实现从视频提取音频并保存到pcm文件功能(API6 Java)
  • Linux:shell命令运行原理和权限的概念
  • Javascript -- 数组prototype方法探究
  • android stduio 打开工程后直接报Connection refused解决
  • 搜索与图论(一)
  • 百题千解计划【CSDN每日一练】“小明投篮,罚球线投球可得一分”(附解析+多种实现方法:Python、Java、C、C++、C#、Go、JavaScript)
  • lemon框架开发笔记
  • Spark SQL快速入门
  • linux+Jenkins+飞书机器人发送通知(带签名)
  • react hooks
  • 一起学数据结构(1)——复杂度
  • <el-date-picker>组件选择开始时间,结束时间自动延长30min
  • eslint-webpack-plugin
  • logback中文一直是乱码,logback中文问号
  • C++之文件操作
  • CentOS 7.6安装 MongoDB 5.0.2
  • Windows下安装python3教程
  • opencv-27 阈值处理 cv2.threshold()
  • AAOS 音频焦点请求
  • 订单系统中的幂等实现
  • 三个常用查询:根据用户名 / token查询用户信息+链表分页条件查询
  • 列表、张量、向量和矩阵的关系
  • 华为数通HCIP-ISIS高级
  • CorelDraw怎么做立体字效果?CorelDraw制作漂亮的3d立体字教程
  • 大致了解Redis
  • javaweb会话技术
  • android app控制ros机器人三(android登录界面)
  • Android版本的发展4-13
  • 【2023.7.29】浅谈手办——新人入坑指南