当前位置: 首页 > news >正文

Flink CDC系列之:Oracle CDC 导入 Elasticsearch

Flink CDC系列之:Oracle CDC 导入 Elasticsearch

  • 一、深入理解Flink Oracle CDC Connector
  • 二、创建docker-compose.yml文件
  • 三、启动容器
  • 四、下载Flink Oracle CDC的jar包
  • 五、启动 Flink 集群,再启动 SQL CLI
  • 六、检查 ElasticSearch 中的结果
  • 七、在 Oracle 制造一些变更,观察 ElasticSearch 中的结果

一、深入理解Flink Oracle CDC Connector

  • Flink CDC系列之:Oracle CDC Connector

二、创建docker-compose.yml文件

version: '2.1'
services:oracle:image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0ports:- "1521:1521"elasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- "5601:5601"volumes:- /var/run/docker.sock:/var/run/docker.sock

该 Docker Compose 中包含的容器有:

  • Oracle: Oracle 11g, 已经预先创建了 products 和 orders表,并插入了一些数据
  • Elasticsearch: orders 表将和 products 表进行join,join的结果写入Elasticsearch中
  • Kibana: 可视化 Elasticsearch 中的数据

三、启动容器

在 docker-compose.yml 所在目录下运行如下命令以启动所有容器:

docker-compose up -d

该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。 另外可以通过如下命令停止所有的容器:

docker-compose down

四、下载Flink Oracle CDC的jar包

下载以下 jar 包到 <FLINK_HOME>/lib/:

  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
  • flink-sql-connector-oracle-cdc-2.4.1.jar

五、启动 Flink 集群,再启动 SQL CLI

-- Flink SQL
-- checkpoint every 3000 milliseconds                       
Flink SQL> SET execution.checkpointing.interval = 3s;Flink SQL> CREATE TABLE products (ID INT,NAME STRING,DESCRIPTION STRING,PRIMARY KEY (ID) NOT ENFORCED) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'flinkuser',  'table-name' = 'products');Flink SQL> CREATE TABLE orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'flinkuser',  'table-name' = 'orders');

创建elasticsearch

Flink SQL> CREATE TABLE enriched_orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN,PRODUCT_NAME STRING,PRODUCT_DESCRIPTION STRING,PRIMARY KEY (ORDER_ID) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders_1'

关联处理后,插入数据

Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.NAME, p.DESCRIPTIONFROM orders AS oLEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;

六、检查 ElasticSearch 中的结果

检查最终的结果是否写入ElasticSearch中, 可以在Kibana看到ElasticSearch中的数据

七、在 Oracle 制造一些变更,观察 ElasticSearch 中的结果

进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。

docker-compose exec sqlplus flinkuser/flinkpw

插入更新数据

INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;
http://www.lryc.cn/news/131049.html

相关文章:

  • Linux忘记root密码解决方法
  • AR/VR眼镜转接器方案,实现同时传输视频快充方案
  • ASP.NET Core中路由规则匹配
  • IDEA:Error running,Command line is too long. 解决方法
  • 什么是反射机制?为什么反射慢?
  • list元素
  • OkHttp 源码浅析一
  • 【解决问题】远程仓库GitHub/GitLab添加了SSH Key之后依然无法clone的解决办法
  • 回归预测 | MATLAB实现SA-SVM模拟退火算法优化支持向量机多输入单输出回归预测(多指标,多图)
  • Spring事务和事务传播机制(1)
  • 如何快速在vscode中实现不同python文件的对比查看
  • 网络安全---Ring3下动态链接库.so函数劫持
  • leetcode283. 移动零
  • GuLi商城-前端基础Vue-生命周期和钩子函数
  • 输入输出+暴力模拟入门:魔法之树、染色の树、矩阵、字母加密、玫瑰鸭
  • ​Kubernetes的演变:从etcd到分布式SQL的过渡
  • 29、简单通过git把项目远程提交到gitee
  • 元宇宙之应用(04)沉浸式游戏
  • 浙大数据结构第八周之08-图7 公路村村通
  • SpringBoot 解决跨域问题
  • 2023 年牛客多校第十场题解
  • 韦东山老师 RTOS 入门课程(一)RTOS 介绍,熟悉裸机的汇编逻辑
  • WebRTC | SDP详解
  • Springboot 实践(9)springboot集成Oauth2.0授权包,5个接口文件配置详解
  • 最新AI系统ChatGPT程序源码/支持GPT4/自定义训练知识库/GPT联网/支持ai绘画(Midjourney)+Dall-E2绘画/支持MJ以图生图
  • 【高频面试题】 消息中间件
  • 物联网智慧安防实训综合实训基地建设方案
  • openGauss学习笔记-44 openGauss 高级数据管理-存储过程
  • 【Linux】进程信号篇Ⅲ:可重入函数、volatile关键字、SIGCHLD信号
  • 排序算法:冒泡排序