当前位置: 首页 > news >正文

Flink CDC -Sqlserver to Sqlserver java 模版编写

1.基本环境

     <flink.version>1.17.0</flink.version>

2. 类文件

package com.flink.tablesql;import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.io.File;
import java.util.List;public class Main2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);String path = "E:\\test\\flinktestsql\\orders.sql";
//        String path = "/data/flink/flinksql/orders.sql";List<String> list = FileUtils.readLines(new File(path),"UTF-8");StringBuilder stringBuilder = new StringBuilder("");String sql = "";for(String var : list){if(StringUtils.isNotBlank(var)){stringBuilder.append(var);if(var.contains("$")){sql = stringBuilder.toString().replace("$","");System.out.println(sql);System.out.println("end-----");tabEnv.executeSql(sql);stringBuilder = new StringBuilder("");}else{stringBuilder.append("\n");}}}}
}

3.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinktestsql</artifactId><version>1.0-SNAPSHOT</version><!--    &lt;!&ndash; 指定仓库位置,依次为aliyun、apache和cloudera仓库 &ndash;&gt;--><repositories><repository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url></repository><repository><id>apache</id><url>https://maven.aliyun.com/repository/apache-snapshots</url></repository><repository><id>cloudera</id><url>https://maven.aliyun.com/repository/gradle-plugin</url></repository><repository><id>central</id><url>https://maven.aliyun.com/repository/central</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><!--版本--><properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.0</flink.version><slf4j.version>2.0.5</slf4j.version><scala.binary.version>2.11</scala.binary.version><scala.version>2.11.12</scala.version></properties><dependencies><!-- Flink相关依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>mssql-jdbc</artifactId><version>11.2.1.jre8</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.11.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>1.17.1</version><scope>test</scope></dependency><!-- 日志管理相关依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.19.0</version></dependency></dependencies></project>

4.后续只需要修改增加 .sql文件即可

       String path = "E:\\test\\flinktestsql\\orders.sql";

        // String path = "/data/flink/flinksql/orders.sql";

5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。

Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebfat com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)

6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以

7.以上遇到很多问题,总算解决了,

如增加:

<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

http://www.lryc.cn/news/245811.html

相关文章:

  • 4.前端--HTML标签-表格列表表单【2023.11.25】
  • MySQL的Redo Log跟Binlog
  • 定制手机套餐---python序列
  • 线性分类器--数据处理
  • 一些可能被忽视的 Vue3 API 附带案例
  • Linux git
  • 136. 只出现一次的数字
  • redis的性能管理及集群架构(主从复制、哨兵模式)
  • 【自然语言处理】正向最大匹配算法(FMM),反向最大匹配算法(BMM)和双向最大匹配算法(BM)原理及实现
  • 数据结构 | 堆排序
  • 编程语言发展史:Go语言的设计和特点
  • FinGPT:金融垂类大模型架构
  • 24. 深度学习进阶 - 矩阵运算的维度和激活函数
  • 杰发科技AC7801——keil工程移植到IAR
  • Word怎么看字数?简单教程分享!
  • 万字解析设计模式之观察者模式、中介者模式、访问者模式
  • 【MySQL | TCP】宝塔面板结合内网穿透实现公网远程访问
  • Python break用法详解
  • 【C++初阶】STL详解(五)List的介绍与使用
  • MySQL特点和基本语句
  • Gin 学习笔记03-参数绑定
  • 【100天精通Python】Day73:python机器学习入门算法详解与代码示例
  • Node.js入门指南(四)
  • Java LeetCode篇-深入了解关于数组的经典解法
  • LeeCode前端算法基础100题(4)- 无重复字符的最长子串
  • Axios简单使用与配置安装-Vue
  • 【初始前后端交互+原生Ajax+Fetch+axios+同源策略+解决跨域】
  • C语言--每日选择题--Day24
  • 记一次简单的PHP反序列化字符串溢出
  • 找工作面试技巧