当前位置: 首页 > news >正文

利用java8 的 CompletableFuture 优化 Flink 程序,性能提升 50%

你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:

  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

  • 一、前言
  • 二、Flink 代码优化
    • 2.0 问题发现
    • 2.1 原有代码
    • 2.2 CompletableFuture 优化
  • 三、avatorscript 使用的简单介绍
    • 3.1 自定义函数
    • 3.2 从 Map 中取值
    • 3.3 使用 Java 的工具类
    • 3.4 AviatorScript 函数
  • 四、总结


一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

图片.png
通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);});
executeFutures.add(executeFuture);for (int i = 0; i < executeFutures.size(); i++) {executeFutures.get(i).get()xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

class AddFunction extends AbstractFunction {@Overridepublic AviatorObject call(Map<String, Object> env,AviatorObject arg1, AviatorObject arg2) {Number left = FunctionUtils.getNumberValue(arg1, env);Number right = FunctionUtils.getNumberValue(arg2, env);return new AviatorDouble(left.intValue() + right.intValue());}public String getName() {return "add" ;}
}public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "testId1" , 1);stringObjectHashMap.put( "testId2" , 2);Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "ip" , "a1111" );// stringObjectHashMap.put("result", "a&B&C&d");stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );stringObjectHashMap.put( "testId1" , "sku" );stringObjectHashMap.put( "a" , "123" );stringObjectHashMap.put( "a1" , "null" );stringObjectHashMap.put( "b1" , 123);AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);System.out.println(execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);System.out.println( "###" + execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

## examples/function.av
fn add(x, y) {return x + y;
}
p(add(1,2))
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {String function = "## examples/function.av\n" +"\n" +"fn add(x, y) {\n" +"  return x + y;\n" +"}" ;AviatorEvaluator.defineFunction( "add" , function);System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。

http://www.lryc.cn/news/355010.html

相关文章:

  • 香橙派 AIpro综合体验及AI样例运行
  • 通过域名接口申请免费的ssl多域名证书
  • 【JAVA WEB实用与优化技巧】如何自己封装一个自定义UI的Swagger组件,包含Swagger如何处理JWT无状态鉴权自动TOKEN获取
  • 理解大语言模型(二)——从零开始实现GPT-2
  • SSH远程登录时常见问题解决
  • 工业级3D开发引擎HOOPS:创新与效率的融合!
  • IDEA创建Spring Boot项目
  • mysql实战——xtrabackup全量备份/增量备份及恢复
  • 探索演进:了解IPv4和IPv6之间的区别
  • Python 实现Word (DOC或DOCX)与TXT文本格式互转
  • anaconda install on CentOS 7
  • git管理Codeup云效平台
  • Pycharm最新安装教程(最新更新时间2024年5月27日)
  • 医院门诊互联电子病历|基于SSM+vue的医院门诊互联电子病历管理信息系统的设计与实现(源码+数据库+文档)
  • H3CNE-8-ARP工作原理
  • 上交提出TrustGAIN,提出6G网络中可信AIGC新模式!
  • 内存泄漏案例分享2-Fragment的内存泄漏
  • Selenium的百度高级搜索-自动化(未完成)
  • cs与msf权限传递,以及mimikatz抓取win2012明文密码
  • java欢迪迈手机商城设计与实现源码(springboot+vue+mysql)
  • 【FPGA】Verilog:2-bit 二进制比较器的实现(2-bit binary comparator)
  • RPA(机器人流程自动化)技术解读
  • Qt | QTabBar 类(选项卡栏)
  • 基于Pytorch框架的深度学习ShufflenetV2神经网络十七种猴子动物识别分类系统源码
  • Leetcode260
  • Webpack性能调优:从加载器到插件的全面优化
  • cin-getline缓存区
  • 牛客前端面试高频八股总结(1)(附文档)
  • 韦专家:广告投放方式和内容运营底层方法论逻辑上有什么关系?
  • 003 ++ --