当前位置: 首页 > news >正文

flink学习(6)——自定义source和kafka

概述

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。

 基础案例

package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {// ctrl + oprivate final Random random = new Random();private boolean flag = true;// 现在不用@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("实现一些资源的开启");}// 现在不用@Overridepublic void close() throws Exception {System.out.println("实现一些资源的关闭");}@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {while (flag){String stu_id = UUID.randomUUID().toString();String stu_name = "Student_"+stu_id;int stu_age = random.nextInt(8)+10;long stu_timestamp = System.currentTimeMillis();Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道@Overridepublic void cancel() {flag = false;System.out.println("停止运行");}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// add + new DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());int parallelism = studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}

cancel+open+close的调用时机

package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度(8个cpu)studentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute = env.execute();JobClient flink_job = env.executeAsync("Flink Job");Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
}

 kafkaSource

package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");// consumerFlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);// sourceDataStreamSource<String> dataStreamSource = env.addSource(consumer);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}}).print();env.execute();}
}

http://www.lryc.cn/news/492786.html

相关文章:

  • 开发常见问题及解决
  • python excel接口自动化测试框架!
  • mybatis:You have an error in your SQL syntax;
  • 使用 Maven 开发 IntelliJ IDEA 插件
  • Windows修复SSL/TLS协议信息泄露漏洞(CVE-2016-2183) --亲测
  • uniapp生命周期:应用生命周期和页面生命周期
  • 基于SSM的婴幼儿用品商城系统+LW示例参考
  • 【工具变量】城市供应链创新试点数据(2007-2023年)
  • 【carla生成车辆时遇到的问题】carla显示的坐标和carlaworld中提取的坐标y值相反
  • Jira使用笔记二 ScriptRunner 验证问题创建角色
  • Java线程的使用
  • 自动化测试工具Ranorex Studio(四十三)-RANOREXPATH编辑器5
  • 超高流量多级缓存架构设计!
  • 数据结构(Java)—— ArrayList
  • 实习冲刺第三十三天
  • Uniapp开发下拉刷新功能onPullDownRefresh/onReachBottom
  • 什么是 C++ 中的函数对象?函数对象与普通函数有什么区别?如何定义和使用函数对象?
  • PointNet++论文复现
  • 【VUE】el-table表格内输入框或者其他控件规则校验实现
  • django开发中html继承模板样式
  • MT6769/MTK6769核心板规格参数_联发科安卓主板开发板方案
  • 鸿蒙进阶篇-状态管理之@Provide与@Consume
  • java集合及源码
  • GraphRAG访问模式和知识图谱建模
  • TCP/IP协议攻击与防范
  • Java基于 SpringBoot+Vue的口腔管理平台(附源码+lw+部署)
  • 11.26深度学习_神经网络-数据处理
  • 【人工智能】Python常用库-TensorFlow常用方法教程
  • 微信小程序按字母顺序渲染城市 功能实现详细讲解
  • 23省赛区块链应用与维护(房屋租凭【下】)