当前位置: 首页 > news >正文

flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}

运行结果,加密成功

http://www.lryc.cn/news/295176.html

相关文章:

  • Golang 学习(一)基础知识
  • C++学习:string的了解
  • Webpack源码浅析
  • Hadoop:HDFS学习巩固——基础习题及编程实战
  • SASS 官方文档速通
  • 《动手学深度学习(PyTorch版)》笔记7.4
  • 关于自动驾驶概念的学习和一些理解
  • C++ dfs搜索枚举(四十八)【第八篇】
  • 【优先级队列(大顶堆 小顶堆)】【遍历哈希表键值对】Leetcode 347 前K个高频元素
  • Java设计模式-模板方法模式(14)
  • 【C++ 二维前缀和】约会
  • 基于Springboot的社区疫情防控平台
  • JAVA中的类方法
  • rust嵌入式开发之RTICvsEmbassy
  • Bug地狱 #1 突然宕机,企业级应用到底怎么了
  • 使用 Python、Elasticsearch 和 Kibana 分析波士顿凯尔特人队
  • 探索C语言结构体:编程中的利器与艺术
  • Git介绍与常用命令总结
  • 机器学习 | 探索朴素贝叶斯算法的应用
  • 【无刷电机学习】电流采样电路硬件方案
  • 对于协同过滤算法我自己的一些总结和看法
  • 数据库管理phpmyadmin
  • Oracle数据表ID自增操作
  • npm WARN deprecated uuid@3.4.0: Please upgrade to version 7 or higher
  • 第2节、让电机转起来【51单片机+L298N步进电机系列教程】
  • 1154: 第多少天
  • 【C语言初阶-const作用详解】const修饰变量、const修饰指针(图文详解版)
  • 线程协作工具类【CountDownLatch倒数门闩、Semaphore信号量、CyclicBarrier循环栏栅、Condition接口】
  • Python 函数式编程进阶:map、filter、reduce
  • 大模型|基础_word2vec