当前位置: 首页 > news >正文

Flink 热存储维表 使用 Guava Cache 减轻访问压力

目录

背景

Guava Cache 简介

实现方案

1. 项目依赖

(1) 定义 Cache

(2) 使用 Cache 优化维表查询

3. 应用运行效果

(1) 维表查询逻辑优化

(2) 减少存储压力

Guava Cache 配置优化

总结


背景

在实时计算场景中,Flink 应用中经常需要通过维表进行维度数据的关联。为了保证关联的实时性,常将维表数据存储在 Redis 或数据库中。然而,这种方案可能会因高频访问导致存储压力过大,甚至出现性能瓶颈。

为了解决这个问题,可以在 Flink 中引入本地缓存。本文介绍如何通过 Google 的开源库 Guava Cache,实现对热存储维表访问的优化。


Guava Cache 简介

Guava Cache 是 Google 开发的一个 Java 缓存工具库,具有以下优点:

  1. 支持本地缓存,提升查询性能。
  2. 提供缓存淘汰策略(如基于时间或容量)。
  3. 线程安全,适合高并发场景。
  4. 提供监听机制,可在缓存失效时触发回调。

实现方案

1. 项目依赖

在 Maven 项目中引入 Guava 依赖:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version>
</dependency>

以下是一个典型的实现步骤:

(1) 定义 Cache

使用 Guava 提供的 CacheBuilder 创建一个本地缓存:

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;import java.util.concurrent.TimeUnit;public class CacheUtil {private static final Cache<String, String> DIM_CACHE = CacheBuilder.newBuilder().maximumSize(10000) // 最大缓存数量.expireAfterWrite(10, TimeUnit.MINUTES) // 缓存过期时间.build();public static String getFromCache(String key) {return DIM_CACHE.getIfPresent(key);}public static void putToCache(String key, String value) {DIM_CACHE.put(key, value);}
}
(2) 使用 Cache 优化维表查询

在自定义的 RichFlatMapFunction 中使用缓存查询维表数据:

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;public class DimensionJoinFunction extends RichFlatMapFunction<String, String> {@Overridepublic void open(Configuration parameters) throws Exception {// 初始化连接到 Redis 或其他外部存储}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String dimKey = extractKey(value);// 1. 先查询缓存String dimValue = CacheUtil.getFromCache(dimKey);// 2. 如果缓存未命中,再查询外部存储if (dimValue == null) {dimValue = queryFromExternalStorage(dimKey);if (dimValue != null) {CacheUtil.putToCache(dimKey, dimValue); // 写入缓存}}// 3. 关联维度数据if (dimValue != null) {String result = enrichData(value, dimValue);out.collect(result);}}private String extractKey(String value) {// 从输入数据中提取维表关联键return value.split(",")[0];}private String queryFromExternalStorage(String key) {// 模拟查询 Redis 或数据库return "mock_value_for_" + key;}private String enrichData(String input, String dimValue) {// 组合维度数据return input + "," + dimValue;}
}

3. 应用运行效果

(1) 维表查询逻辑优化
  • 缓存命中时:直接返回缓存数据,访问延迟为纳秒级。
  • 缓存未命中时:查询外部存储,并将结果写入缓存,后续重复访问相同的 Key 时不再查询外部存储。
(2) 减少存储压力

Guava Cache 本地缓存避免了大量高频查询直接命中外部存储,降低了 Redis、MySQL 等服务的负载。


Guava Cache 配置优化

  1. 缓存淘汰策略

    • expireAfterWrite:基于写入时间自动过期。
    • expireAfterAccess:基于访问时间自动过期。
    • maximumSize:限制最大缓存数量,避免内存占用过高。
  2. 异步加载机制: 如果需要异步加载数据,可以使用 CacheLoader,在缓存未命中时自动加载:

    Cache<String, String> cache = CacheBuilder.newBuilder().maximumSize(10000).build(new CacheLoader<String, String>() {@Overridepublic String load(String key) throws Exception {return queryFromExternalStorage(key);}});
  3. 监控与统计: 使用 Cache.stats() 查看缓存命中率等统计数据,便于优化缓存策略。


总结

通过在 Flink 中引入 Guava Cache,可以显著降低热存储维表的访问压力,提升系统性能。
这种方案适用于维表数据更新频率较低,且查询热点相对集中的场景

http://www.lryc.cn/news/492800.html

相关文章:

  • 深入探索SenseVoiceSmall:高效多语言语音识别与处理模型
  • Flink--API 之Transformation-转换算子的使用解析
  • 每日十题八股-2024年11月27日
  • OpenCV截取指定图片区域
  • Java部分新特性
  • 【SpringBoot】28 API接口防刷(Redis + 拦截器)
  • IT运维专家给年轻人一些职业上的建议
  • Django基础之路由
  • Python实例化中默认值的行为及应用
  • 【WRF后处理】WRF模拟效果评价及可视化:MB、RMSE、IOA、R
  • ShenNiusModularity项目源码学习(4:身份认证)
  • python+django自动化部署日志采用‌WebSocket前端实时展示
  • flink学习(6)——自定义source和kafka
  • 开发常见问题及解决
  • python excel接口自动化测试框架!
  • mybatis:You have an error in your SQL syntax;
  • 使用 Maven 开发 IntelliJ IDEA 插件
  • Windows修复SSL/TLS协议信息泄露漏洞(CVE-2016-2183) --亲测
  • uniapp生命周期:应用生命周期和页面生命周期
  • 基于SSM的婴幼儿用品商城系统+LW示例参考
  • 【工具变量】城市供应链创新试点数据(2007-2023年)
  • 【carla生成车辆时遇到的问题】carla显示的坐标和carlaworld中提取的坐标y值相反
  • Jira使用笔记二 ScriptRunner 验证问题创建角色
  • Java线程的使用
  • 自动化测试工具Ranorex Studio(四十三)-RANOREXPATH编辑器5
  • 超高流量多级缓存架构设计!
  • 数据结构(Java)—— ArrayList
  • 实习冲刺第三十三天
  • Uniapp开发下拉刷新功能onPullDownRefresh/onReachBottom
  • 什么是 C++ 中的函数对象?函数对象与普通函数有什么区别?如何定义和使用函数对象?