当前位置: 首页 > news >正文

flink使用demo

1、添加不同数据源

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.WordCountData;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 从不同数据源读取数据
//        DataStream<String> text = env.fromElements(WordCountData.WORDS);
//        DataStream<String> text = env.fromCollection(Arrays.asList("hello world", "hello flink"));
//        DataStream<String> text = env.fromSource(crateFileSource(), WatermarkStrategy.noWatermarks(), "file source");
//        DataStream<String> text = env.fromSource(crateKafkaSource(), WatermarkStrategy.noWatermarks(), "kafka source");DataStream<String> text = env.fromSource(createDataGeneratorSource(), WatermarkStrategy.noWatermarks(), "datagen source");// 处理逻辑DataStream<Tuple2<String, Integer>> counts =text.flatMap(new WordCount.Tokenizer()).keyBy(0).sum(1);counts.print();env.execute("WordCount");}public static FileSource crateFileSource() {FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build();return fileSource;}public static KafkaSource<String> crateKafkaSource() {KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("ip-port").setTopics("topic").setGroupId("groupId").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();return kafkaSource;}public static DataGeneratorSource<String> createDataGeneratorSource() {DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) value -> "hello" + value,100, // 一共100条数据RateLimiterStrategy.perSecond(5), // 每秒5条Types.STRING);return dataGeneratorSource;}
}

2、数据处理

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DataProcessDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 读取数据DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));// map算子 : 对数据流中的每个元素执行转换操作,一进一出SingleOutputStreamOperator<String> map = sensorDs.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.getId() + " : " + waterSensor.getVc();}});
//        map.print();// filter算子 : 对数据流中的每个元素执行过滤操作SingleOutputStreamOperator<WaterSensor> filter = sensorDs.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return waterSensor.getVc() > 1; // 只保留vc>1的元素}});
//        filter.print();// flatMap算子 : 扁平映射,一个可以有多个输出,在collector里面,然后将其平铺返回SingleOutputStreamOperator<String> flatMap = sensorDs.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {// collector里面是输出的数据if ("s1".equals(waterSensor.getId())) {collector.collect(waterSensor.getId());} else {collector.collect(waterSensor.getId());collector.collect(waterSensor.getVc().toString());}}});
//        flatMap.print();// keyBy 相同key的数据分到同一个分区,用于海量数据聚合操作来提升效率,不对数据进行转换,只是分区KeyedStream<WaterSensor, String> keyBy = sensorDs.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId(); // 按照id分组}});
//        keyBy.print();// 在keyBy后可以使用聚合算子,求sum max min等
//        keyBy.sum("vc").print(); // 传实体类的属性名
//        keyBy.maxBy("vc").print(); // 传实体类的属性名// reduce算子 : 两两聚合,keyBy后才能操作keyBy.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor t2, WaterSensor t1) throws Exception {System.out.println("t1=" + t1);System.out.println("t2=" + t2);return new WaterSensor(t1.getId(), t1.getTs() + t2.getTs(), t1.getVc() + t2.getVc());}}).print();env.execute("WordCount");}
}

3、分流/合流

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FenliuDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 读取数据DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<WaterSensor> oushu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 0);SingleOutputStreamOperator<WaterSensor> jishu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 1);oushu.print("偶数流");jishu.print("奇数流");// 偶数流和奇数流合并oushu.union(jishu).print("合并流");env.execute("WordCount");}
}

4、输出流 sink

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SinkDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 读取数据DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));FileSink<WaterSensor> fileSink = FileSink.<WaterSensor>forRowFormat(new Path("/Users/chengyong03/Downloads/output/flink"),new SimpleStringEncoder<>("UTF-8")).build();sensorDs.sinkTo(fileSink);env.execute("WordCount");}
}

5、flink流表互转,flink sql

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class SqlDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));sensorDs.print();// 2.流转表Table sensorTable = tableEnv.fromDataStream(sensorDs);// 3.注册临时表tableEnv.createTemporaryView("sensorTable", sensorTable);Table resultTable = tableEnv.sqlQuery("select * from sensorTable where vc > 10");// 4. table转流DataStream<WaterSensor> waterSensorDataStream = tableEnv.toDataStream(resultTable, WaterSensor.class);waterSensorDataStream.print();env.execute();}
}
http://www.lryc.cn/news/542232.html

相关文章:

  • OpenCV(8):图像直方图
  • 力扣LeetCode:1656 设计有序流
  • NGINX配置TCP负载均衡
  • vm和centos
  • c#丰田PLC ToyoPuc TCP协议快速读写 to c# Toyota PLC ToyoPuc读写
  • 量子计算的数学基础:复数、矩阵和线性代数
  • 【JavaScript】《JavaScript高级程序设计 (第4版) 》笔记-Chapter22-处理 XML
  • 一个不错的API测试框架——Karate
  • 文字语音相互转换
  • DeepSeek-R1:通过强化学习激发大语言模型的推理能力
  • MATLAB中fft函数用法
  • 【SpringBoot】【JWT】使用JWT的claims()方法存入Integer类型数据自动转为Double类型
  • Crack SmartGit
  • 【备赛】在keil5里面创建新文件的方法+添加lcd驱动
  • Rk3568驱动开发_驱动实现流程以及本质_3
  • 【学习笔记】LLM+RL
  • 深入理解IP子网掩码子网划分{作用} 以及 不同网段之间的ping的原理 以及子网掩码的区域划分
  • rust 前端npm依赖工具rsup升级日志
  • 2.2 STM32F103C8T6最小系统板的四种有关固件的开发方式
  • 【C++】 stack和queue以及模拟实现
  • python与C系列语言的差异总结(2)
  • Linux之文件系统
  • LeetCode刷题 -- 23. 合并 K 个升序链表
  • DeepSeek在MATLAB上的部署与应用
  • mapbox基础,使用geojson加载fill-extrusion三维填充图层
  • 基于 SpringBoot 的 “电影交流平台小程序” 系统的设计与实现
  • 单片机裸机编程-时机管理
  • Flutter系列教程之(2)——Dart语言快速入门
  • pyecharts介绍
  • 前缀和相关题目记录(未完待续...)