当前位置: 首页 > news >正文

Flink 维表关联

1、实时查询维表

实时查询维表是指用户在 Flink 算子中直接访问外部数据库,比如用 MySQL 来进行关联,这种方式是同步方式,数据保证是最新的。但是,当我们的流计算数据过大,会对外 部系统带来巨大的访问压力,一旦出现比如连接失败、线程池满等情况,由于我们是同步调用,所以一般会导致线程阻塞、Task 等待数据返回,影响整体任务的吞吐量。而且这种方案对外部系统的 QPS 要求较高,在大数据实时计算场景下,QPS 远远高于普通的后台系统,峰值高达十万到几十万,整体作业瓶颈转移到外部系统


public class DimSync  extends RichMapFunction<fplOverview, String> {private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);private Connection conn = null;public void open(Configuration parameters) throws Exception {super.open(parameters);conn = DriverManager.getConnection("jdbc:test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");//根据 dp_id 查询  上周的 fpl_amount,ywPreparedStatement pst = conn.prepareStatement("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = ? \n" +"group by dp_id");pst.setString(1,dp_id);ResultSet resultSet = pst.executeQuery();String fpl_amount = null;String yw = null ;while (resultSet.next()){fpl_amount = resultSet.getString(1);yw = resultSet.getString(2);}pst.close();jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw)return jsonObject.toString();}public void close() throws Exception {super.close();conn.close();}

2、LRU 缓存 (flink 异步Id)

利用 Flink 的 RichAsyncFunction 读取 mysql 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 mysql,然后插入到缓存中。


public class JDBCAsyncFunction  extends RichAsyncFunction<fplOverview, JsonObject> {private SQLClient client;@Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config = new JsonObject().put("url", "jdbc:mysql://rm-bp161be65d56kbt4nzo.mysql.rds.aliyuncs.com:3306/mysqldb?characterEncoding=UTF-8;useSSL=false").put("driver_class", "com.mysql.cj.jdbc.Driver").put("max_pool_size", 10).put("user", "root").put("password", "");client = JDBCClient.createShared(vertx, config);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(fplOverview fplOverview, ResultFuture<JsonObject> resultFuture) throws Exception {client.getConnection(conn -> {if (conn.failed()) {return;}final SQLConnection connection = conn.result();// 执行sqlconnection.query("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = '" + fplOverview.getDp_id() + " ' " +"group by dp_id ", res2 -> {ResultSet rs = new ResultSet();if (res2.succeeded()) {rs = res2.result();}else{System.out.println("查询数据库出错");}List<JsonObject> stores = new ArrayList<>();for (JsonObject json : rs.getRows()) {stores.add(json);}connection.close();resultFuture.complete(stores);});});}}

3、预加载全量mysql数据

预加载全量mysql数据 使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据,这种方式适用于那些实时场景不是很高,维表数据较小的场景

public class WholeLoad extends RichMapFunction<fplOverview,String> {private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);// 定义map的结果,key为关联字段private static Map<String,String> cache ;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);cache = new HashMap<>();ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);executor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {load();} catch (Exception e) {e.printStackTrace();}}},0,5, TimeUnit.MINUTES); //从现在开始每隔5分钟查询数据}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");// 获取对应id的结果String rs = cache.get(dp_id);JSONObject rsObject = JSONObject.parseObject(rs);jsonObject.putAll(rsObject);return jsonObject.toString();}public   void  load() throws Exception {Class.forName("com.mysql.jdbc.Driver");Connection con = DriverManager.getConnection("jdbc:mysql://test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");// 执行查询的SQLPreparedStatement statement = con.prepareStatement("select  dp_id,max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"group by dp_id");ResultSet rs = statement.executeQuery();while (rs.next()) {// 查询结果放入缓存String dp_id = rs.getString("dp_id");String fpl_amount = rs.getString("fpl_amount");String yw = rs.getString("yw");JSONObject jsonObject = JSONObject.parseObject("{}");jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw);cache.put(dp_id,jsonObject.toString());}System.out.println("数据输出测试:"+cache.toString());con.close();}
}
http://www.lryc.cn/news/206359.html

相关文章:

  • 阳光蟹场小程序的盈利模式与思考深度
  • 2-Java进阶知识总结-7-UDP-TCP
  • C++数据结构X篇_19_排序基本概念及冒泡排序(重点是核心代码,冒泡是稳定的排序)
  • 工作:三菱伺服驱动器连接参数及其电机钢性参数配置与调整
  • 企事业单位/公司电脑文件透明加密保护 | 防泄密软件\系统!
  • [Leetcode] 0101. 对称二叉树
  • .NET、VUE利用RSA加密完成登录并且发放JWT令牌设置权限访问
  • go实现文件的读写
  • 基于 nodejs+vue购物网站设计系统mysql
  • Mysql数据库 4.SQL语言 DQL数据操纵语言 查询
  • threejs(3)-详解材质与纹理
  • 10月最新H5自适应樱花导航网站源码SEO增强版
  • 探索SOCKS5与SK5代理在现代网络环境中的应用
  • 有六家机器视觉公司今年11月份初放假到明年春节后,除夕不放假看住企业不跑路,不倒闭,明年大家日子会越来越甜
  • 【Linux】MAC帧协议 + ARP协议
  • 深入理解指针:【探索指针的高级概念和应用一】
  • Leetcode周赛365补题(3 / 3)
  • Python基础入门例程13-NP13 格式化输出(三)
  • Vue快速入门
  • MySQL - 如何判断一行扫描数?
  • 3682: 【C3】【递推】台阶问题
  • C++(Qt)软件调试---线程死锁调试(15)
  • HugeGraph Hubble 配置 https 协议的操作步骤
  • 大型应用的架构演进--spring家族在其中的作用
  • LinkedHashMap 简单实现LRU
  • mysql字符串函数
  • 【强烈推荐】视频转gif、图片拼gif,嘎嘎好用,免费免费真的免费,亲测有效,无效过来打我
  • C# Onnx Yolov8 Detect 印章 指纹捺印 检测
  • 0034【Edabit ★☆☆☆☆☆】【修改Bug4】Buggy Code (Part 4)
  • 第十五篇-推荐-Huggingface-镜像-2023-10