当前位置: 首页 > news >正文

使用TCP方式拉取Canal数据

1 Canal对接Kafka联调

1.1 配置修改

canal.properties

修改 zk:

canal.zkServers = 10.51.50.219:2181

instance.properties

开启配置项:

canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

  • test_javaedge_01 是kafka 的 topic
  • test_db.users 要监控的数据库、表
  • test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users

开启一个消费者

[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01

datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据

现在 serverMode 改回tcp。重启

javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %

canal 同步程序

package com.javaedge.canal;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;public class CanalClientApp {public static void main(String[] args) throws Exception {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),"example",null, null);while (true) {connector.connect();connector.subscribe("test_db.users");Message message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size()>0) {for (CanalEntry.Entry entry : entries) {String tableName = entry.getHeader().getTableName();CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT) {for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();HashMap<Object, Object> map = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());map.put(key, column.getValue());}System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));}}}}}}
}

运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!

数据链路

MySQL -》canal server(tcp)-》canal client-》kafka。

http://www.lryc.cn/news/163739.html

相关文章:

  • Docker安装mysql实战说明
  • 前端DOM操作精解:基础概念、方法与最佳实践
  • python sorted函数详解2023.9.11
  • Spring Reactive:响应式编程与WebFlux的深度探索
  • Qt应用开发(基础篇)——工具按钮类 QToolButton
  • 【数据结构面试题】栈与队列的相互实现
  • 华为认证和红帽认证哪个比较好考呢
  • [Java]_[中级]_[使用okhttp3和HttpClient代理访问外部网络]
  • ubuntu 20.04 docker 安装 mysql
  • C++在C语言基础上的优化
  • 分享一个python实验室设备预约管理系统 实验室设备维修系统源码 lw 调试
  • 兵者多诡(HCTF2016)
  • 【JAVA-Day04】Java关键字和示例:深入了解常用关键字的用法
  • Android请求网络报错:not permitted by network security policy
  • python报错:ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1
  • 如何使用adb command来设置cpu频率和核数
  • 236. 二叉树的最近公共祖先
  • Git常见问题:git pull 和 git pull --rebase二者区别
  • 关于HarmonyOS元服务的主题演讲与合作签约
  • cache 学习
  • SSM - Springboot - MyBatis-Plus 全栈体系(六)
  • 【Flutter】引入网络图片时,提示:Failed host lookup: ‘[图片host]‘
  • Python基础教程:索引和切片
  • JVM基础面试题
  • 蓝桥杯官网填空题(平方末尾)
  • 深入探究数据结构与算法:构建强大编程基础
  • Android 自定义View之圆形进度条
  • 力扣(LeetCode)算法_C++——字母异位词分组
  • 【LeetCode-中等题】59. 螺旋矩阵 II
  • 错误: 找不到或无法加载主类 Main