当前位置: 首页 > news >正文

flink中使用外部定时器实现定时刷新

背景:

我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?

外部定时器定时加载实现

1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示

package wikiedits.schedule;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {// 定时任务执行器private transient ScheduledExecutorService scheduledExecutorService;// 本地变量private int threshold;@Overridepublic void open(Configuration parameters) throws Exception {// 1.从db查询数据初始化本地变量
//        threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
//            for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key", "value");
//            }// 2.2 更新本地变量threshold的值
//            threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {}@Overridepublic void close() throws Exception {ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现
package wikiedits.schedule;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 保存Config信息的本地缓存 ---定时同步DB配置表的数据*/
public class ConfigEntityLocalCache {private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();/*** 获取本地缓存实例*/public static ConfigEntityLocalCache getInstance() {return instance;}/** 缓存内存配置项 */private static Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/*** 更新本地缓存数据*/public boolean update(String key, String value){configCache.put(key, value);return true;}/*** 更新本地缓存数据*/public  String getByKey(String key){return configCache.getIfPresent(key);}}

2.在静态类中通过static语句块创建定时器并定时加载,代码如下

package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 静态类定时加载DB配置表到本地内存中*/
public class StaticLoadUtil {// 定时任务执行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 获取本地缓存*/public static Cache<String, String> getConfigCache() {return configCache;}}

总结:

1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行

2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行

http://www.lryc.cn/news/187405.html

相关文章:

  • Spring Cloud Pipelines 入门实践
  • G1 GC详解及设置
  • GitHub详细教程
  • 【小沐学Python】Python实现Web图表功能(Dash)
  • 【RabbitMQ】docker rabbitmq集群 docker搭建rabbitmq集群
  • Linux 网络驱动实验
  • 访问Apache Tomcat的虚拟主机管理页面
  • 【算法】排序——归并排序和计数排序
  • discuz封面设置失败的解决办法(centos系统+windows系统)
  • AI绘画-Stable Diffusion笔记
  • 中值滤波算法及例程
  • SpringBoot 如何使用 Ehcache 作为缓存
  • Stable Diffusion 图片换脸插件Roop保姆教程 附错误解决办法和API使用
  • 华为OD机试 - 组成最大数(Java 2023 B卷 100分)
  • 十一、2023.10.5.计算机网络(end).11
  • 基于SpringBoot的网上摄影工作室
  • Spring源码解析——IOC之bean 的初始化
  • 互联网摸鱼日报(2023-10-07)
  • 深入理解RBAC
  • uniapp微信小程序蓝牙连接与设备数据对接
  • HBase 计划外启动 Major Compaction 的原因
  • 设计模式-桥接模式
  • arcgis地形分析全流程
  • mapper.xml中的sql标签
  • 重启redis的步骤
  • 第二证券:如何选股票的龙头股?
  • 【华为OD机考B卷 | 100分】统计监控、需要打开多少监控器(JAVA题解——也许是全网最详)
  • Python Django 详解(基础)
  • C语言内存函数
  • 【Docker】Docker-compose及Consul多容器编排工具