当前位置: 首页 > news >正文

Kafka与Flink的整合 -- sink、source

1、首先导入依赖:
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {public static void main(String[] args) throws Exception{//构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//构建kafka source 环境KafkaSource<String> source = KafkaSource.<String>builder()//指定broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定topic.setTopics("bigdata")//指定消费组.setGroupId("my-group")//指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据.setStartingOffsets(OffsetsInitializer.earliest())//读取数据格式:.setValueOnlyDeserializer(new SimpleStringSchema()).build();//使用kafka数据源DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSourceDS.print();//启动flinkenv.execute();}
}
        启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {public static void main(String[] args) throws Exception{//构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据文件:DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder()//指定flink broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定数据的格式:.setRecordSerializer(KafkaRecordSerializationSchema.builder()//指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic.setTopic("student")//指定数据的格式.setValueSerializationSchema(new SimpleStringSchema()).build())//指定数据处理的语义:.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//执行flinkstudentDS.sinkTo(sink);//构建flink环境env.execute();}
}
        启动消费kafka:
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student

http://www.lryc.cn/news/221773.html

相关文章:

  • 小鱼ROS
  • 简单讲讲RISC-V跳转指令基于具体场景的实现
  • 第13章 Java IO流处理(一) File类
  • 测试面试题集锦(四)| Linux 与 Python 编程篇(附答案)
  • pytorch中的矩阵乘法
  • Java--Stream流详解
  • [PHP]ShopXO企业级B2C免费开源商城系统 v2.3.1
  • Python基础入门系列详解20篇
  • P02项目(学习)
  • pandas 笔记:get_dummies分类变量one-hot化
  • PTE作文练习(一)
  • 如何做到一套FPGA工程无缝兼容两款不同的板卡?
  • VSCode修改主题为Eclipse 绿色护眼模式
  • conan和cmake编译器版本不匹配问题解决
  • float单精度浮点数如何在计算机中存储
  • 机器视觉在虚拟现实与增强现实中的作用
  • 红黑数原理及存在原因
  • Ansible入门—安装部署及各个模块应用案例(超详细)
  • Spring Boot 3系列之-启动类详解
  • muduo源码剖析之Timer定时器
  • CocosCreator:背景滚动 、背景循环滚动
  • 中远麒麟堡垒机SQL注入漏洞复现
  • ActiveMq学习⑨__基于zookeeper和LevelDB搭建ActiveMQ集群
  • Ansible概述以及模块
  • Cannot run program “D:\c\IntelliJ IDEA 2021.1.3\jbr\bin\java.exe“
  • 案例-注册页面(css)
  • Ansible--playbook 剧本
  • Vue3.0路由拦截
  • EtherCAT转EtherNET/IP协议网关控制EtherCAT伺服驱动器的方法
  • 钉钉内嵌H5遇到的一些问题