当前位置: 首页 > news >正文

保姆级Debezium抽取SQL Server同步kafka

前言:

Debezium SQL Server连接器捕获SQL Server数据库模式中发生的行级更改。

官方2.0文档:

Debezium connector for SQL Server :: Debezium Documentation

有关与此连接器兼容的SQL Server版本的信息,请参阅

SQL Server

Database: 2017, 2019


 

JDBC Driver: 10.2.1.jre8


 

Database: 2017, 2019


 

JDBC Driver: 9.4.1.jre8

Debezium SQL Server连接器首次连接到SQL Server数据库或集群时,它会对数据库中的模式进行一致的快照。初始快照完成后,连接器会连续捕获提交到CDC启用的SQL Server数据库的INSERT、UPDATE或DELETE操作的行级更改。该连接器为每个数据更改操作生成事件,并将其流式传输到Kafka主题。该连接器将表的所有事件流式传输到一个专门的Kafka主题。然后,应用程序和服务可以消耗该主题的数据更改事件记录。

要使Debezium SQL Server连接器捕获数据库操作的更改事件记录,您必须首先在SQL Server数据库上启用更改数据捕获。必须在数据库和要捕获的每个表上启用CDC。在源数据库上设置CDC后,连接器可以捕获数据库中发生的行级INSERT、UPDATE和DELETE操作。该连接器将每个源表的事件记录写入专门用于该表的Kafka主题。每个捕获的表都有一个主题。客户端应用程序读取他们遵循的数据库表的Kafka主题,并可以响应他们从这些主题中消耗的行级事件。

连接器首次连接到SQL Server数据库或集群时,它需要对其配置为捕获更改的所有表的模式进行一致的快照,并将此状态流式传输到Kafka。快照完成后,连接器会持续捕获随后发生的行级更改。通过首先建立所有数据的一致视图,连接器可以继续读取,而不会丢失快照发生时所做的任何更改。

Debezium SQL Server连接器可以容忍故障。当连接器读取更改并生成事件时,它会定期记录事件在数据库日志中的位置(LSN/日志序列号)。如果连接器因任何原因(包括通信故障、网络问题或崩溃)而停止,则在重新启动后,连接器将从读取的最后一点恢复读取SQL Server CDC表。

抵消定期提交。它们不是在更改事件发生时提交的。因此,在中断后,可能会生成重复的事件。

容错也适用于快照。也就是说,如果连接器在快照期间停止,则连接器在重新启动时开始新的快照。


 

安装sqlserver linux

1,环境centos7.2以上,2 GB+ 的内存

2,rpm安装包下载:Index of /rhel/7/mssql-server-2017/

3,下载并上传到linux环境下

wget https://packages.microsoft.com/rhel/7/mssql-server-2017/mssql-server-14.0.1000.169-2.x86_64.rpm 
rpm -ivh mssql-server-14.0.1000.169-2.x86_64.rpm

提示缺少依赖包 把包补上

rpm -ivh继续安装,根据提示执行sudo /opt/mssql/bin/mssql-conf setup

[root@node01 opt]# sudo /opt/mssql/bin/mssql-conf setup 
选择 SQL Server 的一个版本: 1) Evaluation (免费,无生产许可,180 天限制) 2) Developer (免费,无生产许可) 3) Express (免费) 4) Web (付费版) 5) Standard (付费版) 6) Enterprise (付费版) 7) Enterprise Core (付费版) 8) 我通过零售渠道购买了许可证并具有要输入的产品密钥。 可在以下位置找到有关版本的详细信息: 
https://go.microsoft.com/fwlink/?LinkId=852748&clcid=0x804 使用此软件的付费版本需要通过以下途径获取单独授权 
Microsoft 批量许可计划。 
选择付费版本即表示你具有适用的 
要安装和运行此软件的就地许可证数量。 输入版本(1-8):1 
可以在以下位置找到此产品的许可条款: 
/usr/share/doc/mssql-server 或从以下位置下载: 
https://go.microsoft.com/fwlink/?LinkId=855864&clcid=0x804 可以从以下位置查看隐私声明: 
https://go.microsoft.com/fwlink/?LinkId=853010&clcid=0x804 接受此许可条款吗? [Yes/No]:yes 选择 SQL Server 的语言: 
(1) English 
(2) Deutsch 
(3) Español 
(4) Français 
(5) Italiano 
(6) 日本語 
(7) 한국어 
(8) Português 
(9) Русский 
(10) 中文 – 简体 
(11) 中文 (繁体) 
输入选项 1-11:10 
输入 SQL Server 系统管理员密码:DTstack@123 
确认 SQL Server 系统管理员密码:DTstack@123 
正在配置 SQL Server… 
Created symlink from /etc/systemd/system/multi-user.target.wants/mssql-server.service to /usr/lib/systemd/system/mssql-server.service. 
安装程序已成功完成。SQL Server 正在启动。 
[root@node01 opt]#

验证是否启动

systemctl status mssql-server

安装 SQL Server 命令行工具

若要创建数据库,需要使用一个能够在 SQL Server 上运行 Transact-SQL 语句的工具进行连接。 以下步骤安装 SQL Server 命令行工具: sqlcmd和bcp。

下载 Microsoft Red Hat 存储库配置文件。 
sudo curl -o /etc/yum.repos.d/msprod.repo https://packages.microsoft.com/config/rhel/7/prod.repo 
————— 
运行以下命令以安装 mssql-tools 和 unixODBC 开发人员包。 
sudo yum install -y mssql-tools unixODBC-devel 
为方便起见,请将 /opt/mssql-tools/bin/ 添加到 PATH 环境变量。 这样就可以在运行工具时不指定完整路径。 请运行以下命令,以便修改登录会话和交互/非登录会话的 PATH: 
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bash_profile 
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc 
source ~/.bashrc

SQLServer基本命令

本地连接

以下步骤使用 sqlcmd 本地连接到新的 SQL Server 实例。

使用 SQL Server 名称 (-S),用户名 (-U) 和密码 (-P) 的参数运行 sqlcmd。 在本教程中,用户进行本地连接,因此服务器名称为 localhost。 用户名为 SA,密码是在安装过程中为 SA 帐户提供的密码。

sqlcmd -S localhost -U SA -P 密码 # 用命令行连接

如果成功,应会显示 sqlcmd 命令提示符:1>。

提示 可以在命令行上省略密码,以收到密码输入提示。 如果以后决定进行远程连接,请指定 -S 参数的计算机名称或 IP 地址,并确保防火墙上的端口 1433 已打开。

SQLServer基本命令

(1) 建库

> create database TestDB 
> go

(2) 看当前数据库列表

> select * from SysDatabases 
> go

(3) 看当前数据表

> use 库名 
> select * from sysobjects where xtype='u' 
> go

(4) 看表的内容

> select * from 表名; 
> go

插入数据

接下来创建一个新表 mgmg,然后插入两个新行。

在 sqlcmd 命令提示符中,将上下文切换到新的 TestDB 数据库: 
USE TestDB

创建名为 mgmg 的新表:

CREATE TABLE mgmg (id INT, name NVARCHAR(50), quantity INT)

将数据插入新表:

INSERT INTO mgmg VALUES (1, 'banana', 150); INSERT INTO mgmg VALUES (2, 'orange', 154);

要执行上述命令的类型 GO:

GO

在 SQL Server 数据库上启用 CDC


 

在为表启用 CDC 之前,您必须为 SQL Server 数据库启用它。 SQL Server 管理员通过运行系统存储过程来启用 CDC。系统存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。

先决条件

您是 SQL Server 的 sysadmin 固定服务器角色的成员。

您是数据库的 db_owner。

SQL Server 代理正在运行。

注意:

SQL Server CDC 功能仅处理用户创建的表中发生的更改。您不能在 SQL Server 主数据库上启用 CDC。

程序

1.从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。

2.在模板浏览器中,展开 SQL Server 模板。

3.展开更改数据捕获 > 配置,然后单击为 CDC 启用数据库。

4.在模板中,将 USE 语句中的数据库名称替换为要为 CDC 启用的数据库名称。

5.运行存储过程 sys.sp_cdc_enable_db 为 CDC 启用数据库。

为 CDC 启用数据库后,将创建名为 cdc 的模式,以及 CDC 用户、元数据表和其他系统对象。

以下示例显示如何为

数据库 TestDB 启用 CDC:

USE TestDB 
GO 
EXEC sys.sp_cdc_enable_db 
GO

在 SQL Server 表上启用 CDC

SQL Server 管理员必须在您希望 Debezium 捕获的源表上启用更改数据捕获。数据库必须已为 CDC 启用。要在表上启用 CDC,SQL Server 管理员为该表运行存储过程 sys.sp_cdc_enable_table。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。必须为要捕获的每个表启用 SQL Server CDC。

先决条件:

CDC 在 SQL Server 数据库上启用。

SQL Server 代理正在运行。

您是数据库的 db_owner 固定数据库角色的成员。

程序

从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。

在模板浏览器中,展开 SQL Server 模板。

展开更改数据捕获 > 配置,然后单击启用表指定文件组选项。

在模板中,将 USE 语句中的表名替换为您要捕获的表名。

运行存储过程 sys.sp_cdc_enable_table。

以下示例显示如何为表 mgmg 启用 CDC:

示例:为 SQL Server 表启用 CDC

USE TestDB 
GO EXEC sys.sp_cdc_enable_table 
@source_schema = N'dbo', 
@source_name   = N'mgmg', 
@role_name     = N'NULL',   
@filegroup_name = N'MyDB_CT', 
@supports_net_changes = 0 
GO


 

示例

USE MyDB 
GO EXEC sys.sp_cdc_enable_table 
@source_schema = N'dbo', 
@source_name   = N'MyTable', 
@role_name     = N'MyRole',   
@filegroup_name = N'MyDB_CT', 
@supports_net_changes = 0 
GO

指定要捕获的表的名称。

指定角色 MyRole,您可以向该角色添加要授予对源表的捕获列的 SELECT 权限的用户。具有 sysadmin 或 db_owner 角色的用户还可以访问指定的更改表。将 @role_name 的值设置为 NULL,以仅允许 sysadmin 或 db_owner 中的成员对捕获的信息具有完全访问权限。

指定 SQL Server 为捕获的表放置更改表的文件组。命名的文件组必须已经存在。最好不要将更改表放在用于源表的同一文件组中。

Ps文件组

1. 如何查看数据库中所有的文件组。 语法:sp_helpfilegroup 步骤: use 数据库 sp_helpfilegroup 
2. 如何找到文件组和文件的对应情况. sp_helpdb love 创建文件组。 
语法: alter database 数据库名 add filegroup 文件组名 
步骤: use 数据库名 alter database  数据库名  add filegroup 文件组名 
范例: 
use love 
alter database TestDB add filegroup 财务部


 

SQL Server 管理员可以运行系统存储过程来查询数据库或表以检索其 CDC 配置信息。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。

先决条件

您对捕获实例的所有捕获列具有 SELECT 权限。 db_owner 数据库角色的成员可以查看所有已定义捕获实例的信息。

您拥有为查询包括的表信息定义的任何门控角色的成员资格。

程序

从 SQL Server Management Studio 的“查看”菜单中,单击“对象资源管理器”。

在对象资源管理器中,展开数据库,然后展开您的数据库对象,例如 MyDB。

展开可编程性 > 存储过程 > 系统存储过程。

运行 sys.sp_cdc_help_change_data_capture 存储过程来查询表。

查询不应返回空结果。

以下示例在数据库 TestDB 上运行存储过程 sys.sp_cdc_help_change_data_capture:

示例:

查询表以获取 CDC 配置信息

USE TestDB; 
GO 
EXEC sys.sp_cdc_help_change_data_capture 
GO

该查询返回数据库中每个表的配置信息,这些表为 CDC 启用并且包含调用者有权访问的更改数据。如果结果为空,请验证用户是否具有访问捕获实例和 CDC 表的权限。

部署Debezium SQL Server连接器

要部署Debezium SQL Server连接器,请安装Debezium SQL Server连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。

先决条件

安装了Apache ZooKeeper、Apache Kafka和Kafka Connect。

SQL Server已安装,已配置为CDC,并准备与Debezium连接器一起使用。

程序

下载Debezium SQL Server连接器插件存档

Wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.7.Final/debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz

将文件提取到您的Kafka Connect环境中。

将带有JAR文件的目录添加到Kafka Connect的plugin.path中。

配置连接器并将配置添加到您的Kafka Connect集群中。

重新启动您的Kafka Connect流程以提取新的JAR文件。

解压插件包

tar -xvf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz

配置kafka,让kafka知道这个插件.

[root@hhz02 config]# vi connect-distributed.properties

配置注意项:(单机模式启动连接器 connect-standlone.properties)

bootstrap.servers=172.16.120.17:9092
offset.storage.topic=connect-sqlserver-status
offset.storage.replication.factor=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
status.storage.topic=connect-sqlserver-statu
status.storage.replication.factor=1
config.storage.topic=connect-sqlserver-config
config.storage.replication.factor=1
group.id=connect-sqlserver
offset.flush.interval.ms=10000
plugin.path=/opt/debezium-connector-sqlserver
rest.port=8083

Ps:
如果kafka开启kerberos 该配置文件还需要添加:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka还需要再启动脚本connect-distributed.sh中添加:
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/dtstack/Kafka/kafka/config/kafka_jaas.conf"

注意:集群模式kafka 需要注意配置分发。

重启kafka集群使配置生效。

启动Kafka connector

/opt/dtstack/DTBase/kafka/bin/connect-distributed.sh -daemon /opt/dtstack/DTBase/kafka/config/connect-sqlserver.properties

注意结尾这个文件 集群模式为:connect-distributed.properties,单机模式为:connect-standalone.properties

(如果启动有问题 观察/opt/app/kafka/logs/connectDistributed.out)

检测Kafka connector是否正常工作

1. 检测kafka连接器的服务状态

[root@node01 logs]# curl -H "Accept:application/json" node01:8083/ 
{"version":"1.1.1","commit":"8e07427ffb493498","kafka_cluster_id":"svXhhAFkSt6xLRPTBIdE1Q"}

2. 检查向 Kafka Connect 注册的连接器列表

[root@hdp01 kafka]# curl -H "Accept:application/json" node01:8083/connectors/ 
[]

返回空列表, 表示目前还没有注册的连接器

附:删除命令

curl -X DELETE node01:8083/connectors/db2-connector1

SQL Server 连接器配置示例

以下是连接器实例的配置示例,该实例在 172.16.120.17 的端口 1433 上从 SQL Server 服务器捕获数据,我们在逻辑上将其命名为 fullfillment。通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium SQL Server 连接器。

注册连接器

Kafka Connect 服务的 API 提交POST针对/connectors资源的请求,其中包含描述新连接器(称为inventory-connector)的 JSON 文档。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" node01:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "172.16.120.17", "database.port": "1433", "database.user": "SA", "database.password": "Dtstack@123", "database.dbname": "TestDB", "database.server.name": "fullfillment", "table.include.list": "dbo.mgmg", "database.history.kafka.bootstrap.servers": "node01:9092", "database.history.kafka.topic": "dbhistory.fullfillment" } 
}'

(如果注册有问题 观察/opt/app/kafka/logs/connectDistributed.out)

多看下日志 日志中都可以 写到 如果注册成功,也没有kafka数据 麻烦看下日志WARN中过滤问题。[sourceTableId=DB2INST1.TESTTB, changeTableId=ASNCDC.CDC_DB2INST1_TESTTB

]


 


 

查看kafka中的数据

/opt/dtstack/DTBase/kafka/bin/kafka-topics.sh -list -zookeeper localhost:2181/kafka 
/opt/dtstack/DTBase/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic fullfillment.dbo.mgmg


 


 


 


 


 


 


 

http://www.lryc.cn/news/626819.html

相关文章:

  • JSON::Value 功能详解:从三目运算符到高级用法
  • Pytest项目_day20(log日志)
  • PyTorch API 2
  • GPT-5 上线风波深度复盘:从口碑两极到策略调整,OpenAI 的变与不变
  • C++开发/Qt开发:单例模式介绍与应用
  • 拓扑排序判断环 P1347 排序题解
  • 第二十七天:游戏组队问题
  • 跨平台 RTSP/RTMP 播放器工程化实践:低延迟与高稳定性的挑战与突破
  • Redisson最新版本(3.50.0左右)启动时提示Netty的某些类找不到
  • pip 安装常见错误及实例化解决办法大全
  • Tomcat部署与HTTP协议详解
  • 凸问题-非凸问题-非凸模型
  • 第十四届“中国软件杯”大赛晋级现场总决赛名单公布
  • PyTorch API 6
  • 单片机通信协议核心关系梳理笔记(UART/USART/232/485/SPI/12C/LIN/BLE/WIFI)
  • Spring Boot 3.4.x 性能优化实战:用 Undertow 替换 Tomcat 全指南​
  • JavaScript 性能优化实战:从原理到落地的完整指南
  • 【OneAI】使用Rust构建的轻量AI网关
  • 【Axure高保真原型】拖拉拽画圆
  • JavaScript 性能优化实战(易懂版)
  • 实验8.20
  • LeetCode 刷题【47. 全排列 II】
  • 一种融合AI与OCR的施工许可证识别技术,提升工程监管效率,实现自动化、精准化处理。
  • 【解决方案】powershell自动连接夜神adb端口
  • 深入解析RAGFlow六阶段架构
  • 结合SAT-3D,运动+饮食双重养腰新方式
  • 十二,数据结构-链表
  • Linux用30秒部署Nginx+Tomcat+Mysql+Jdk1.8环境
  • 学习嵌入式的第二十二天——数据结构——双向链表
  • 为6G和超快光谱铺路,《Nature Communications》发布新型太赫兹光芯片,实现多通道信号操纵