当前位置: 首页 > news >正文

Debezium发布历史87

原文地址: https://debezium.io/blog/2020/03/19/integration-testing-for-change-data-capture-with-testcontainers/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

使用 Testcontainer 进行变更数据捕获的集成测试
三月 19, 2020 作者: Gunnar Morling
讨论 testcontainers postgres
使用 Debezium 设置变更数据捕获 (CDC) 管道通常只是配置问题,无需涉及任何编程。对 CDC 设置进行自动化测试仍然是一个非常好的主意,确保一切配置正确并且 Debezium 连接器按预期设置。

涉及两个主要组件,其配置需要考虑:

源数据库:必须对其进行设置,以便 Debezium 可以连接到它并检索更改事件;详细信息取决于具体的数据库,例如对于 MySQL,binlog 必须为“行”模式,对于 Postgres,必须安装支持的逻辑解码插件之一等。

Debezium 连接器:必须使用正确的数据库主机和凭据进行配置,可能使用 SSL、应用表和列过滤器、可能一个或多个单一消息转换 (SMT) 等。

这就是新添加的 Debezium对与Testcontainers的集成测试的支持的用武之地。它允许使用 Linux 容器映像设置所有必需的组件(Apache Kafka、Kafka Connect 等)、配置和部署 Debezium 连接器并对生成的运行断言更改数据事件。

让我们看看它是如何完成的。

项目设立
假设您正在使用 Apache Maven 进行依赖项管理,请将以下依赖项添加到您的pom.xml中,引入 Debezium Testcontainers 集成和Apache Kafka 的 Testcontainers 模块:

io.debezium debezium-testing-testcontainers 1.1.0.CR1 test org.testcontainers kafka test 还要为您的数据库添加 Testcontainers 依赖项,例如 Postgres: org.testcontainers postgresql test 您可以在 GitHub 上的debezium-examples存储库中找到具有完整配置的示例项目。

初始化测试容器
声明了所有必需的依赖项后,就可以编写 CDC 集成测试了。通过 Testcontainers,集成测试是使用 Linux 容器和 Docker 实现的。它提供了一个 Java API,用于启动和管理测试所需的资源。我们可以用它来启动 Apache Kafka、Kafka Connect 和 Postgres 数据库:

public class CdcTest {

private static Network network = Network.newNetwork();

private static KafkaContainer kafkaContainer = new KafkaContainer()
.withNetwork(network);

public static PostgreSQLContainer<?> postgresContainer =
new PostgreSQLContainer<>(“debezium/postgres:11”)
.withNetwork(network)
.withNetworkAliases(“postgres”);

public static DebeziumContainer debeziumContainer =
new DebeziumContainer(“1.1.0.CR1”)
.withNetwork(network)
.withKafka(kafkaContainer)
.dependsOn(kafkaContainer);

@BeforeClass
public static void startContainers() {
Startables.deepStart(Stream.of(
kafkaContainer, postgresContainer, debeziumContainer))
.join();
}
}
定义供所有服务使用的 Docker 网络
为 Apache Kafka 设置容器
为 Postgres 11 设置容器(使用 Debezium 的 Postgres 容器映像)
使用 Debezium 为 Kafka Connect 设置容器
@BeforeClass在一个方法中启动所有三个容器
请注意,您需要安装 Docker 才能使用 Testcontainers。

测试实施
所需的基础设施到位后,我们可以为 CDC 设置编写测试。测试的总体流程是这样的:

为 Postgres 数据库配置 Debezium 连接器

执行几条SQL语句来改变一些数据

使用 Kafka 消费者从相应的 Kafka 主题检索生成的更改数据事件

针对这些事件运行一些断言

这是测试的 shell:

@Test
public void canObtainChangeEventsFromPostgres() throws Exception {
try (Connection connection = getConnection(postgresContainer);
Statement statement = connection.createStatement();
KafkaConsumer<String, String> consumer =
getConsumer(kafkaContainer)) {

  // TODO ...

}
}
数据库连接的凭据可以从通过 Testcontainers 启动的 Postgres 容器中获取,很好地避免了任何冗余:

private Connection getConnection(PostgreSQLContainer<?> postgresContainer)
throws SQLException {

return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
postgresContainer.getUsername(),
postgresContainer.getPassword());
}
Kafka 消费者也是如此:

private KafkaConsumer<String, String> getConsumer(
KafkaContainer kafkaContainer) {

return new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),

      ConsumerConfig.GROUP_ID_CONFIG,"tc-" + UUID.randomUUID(),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"),new StringDeserializer(),new StringDeserializer());

}
现在我们来实现实际的测试逻辑:

statement.execute(“create schema todo”);
statement.execute(“create table todo.Todo (” +
"id int8 not null, " +
"title varchar(255), " +
“primary key (id))”);
statement.execute(“alter table todo.Todo replica identity full”);
statement.execute(“insert into todo.Todo values (1, ‘Learn CDC’)”);
statement.execute(“insert into todo.Todo values (2, ‘Learn Debezium’)”);

ConnectorConfiguration connector = ConnectorConfiguration
.forJdbcContainer(postgresContainer)
.with(“database.server.name”, “dbserver1”);

debeziumContainer.registerConnector(“my-connector”,
connector);

consumer.subscribe(Arrays.asList(“dbserver1.todo.todo”));

List<ConsumerRecord<String, String>> changeEvents =
drain(consumer, 2);

ConsumerRecord<String, String> changeEvent = changeEvents.get(0);
assertThat(JsonPath. read(changeEvent.key(), “ . i d " ) ) . i s E q u a l T o ( 1 ) ; a s s e r t T h a t ( J s o n P a t h . < S t r i n g > r e a d ( c h a n g e E v e n t . v a l u e ( ) , " .id")) .isEqualTo(1); assertThat(JsonPath.<String> read(changeEvent.value(), " .id")).isEqualTo(1);assertThat(JsonPath.<String>read(changeEvent.value(),".op”))
.isEqualTo(“r”);
assertThat(JsonPath. read(changeEvent.value(), “$.after.title”))
.isEqualTo(“Learn CDC”);

changeEvent = changeEvents.get(1);
assertThat(JsonPath. read(changeEvent.key(), “ . i d " ) ) . i s E q u a l T o ( 2 ) ; a s s e r t T h a t ( J s o n P a t h . < S t r i n g > r e a d ( c h a n g e E v e n t . v a l u e ( ) , " .id")) .isEqualTo(2); assertThat(JsonPath.<String> read(changeEvent.value(), " .id")).isEqualTo(2);assertThat(JsonPath.<String>read(changeEvent.value(),".op”))
.isEqualTo(“r”);
assertThat(JsonPath. read(changeEvent.value(), “$.after.title”))
.isEqualTo(“Learn Debezium”);

consumer.unsubscribe();
在Postgres数据库中创建一个表并插入两条记录
注册 Debezium Postgres 连接器的实例
从 Kafka 中的更改事件主题读取两条记录并断言它们的属性
请注意 Debezium 的 Testcontainers 支持如何允许从数据库容器播种连接器配置,从而避免显式给出数据库连接属性的需要。仅必须给出唯一的database.server.name,当然您可以应用其他配置选项,例如表或列过滤器、SMT 等。

drain()为了简洁起见,省略了从 Kafka 主题读取给定数量记录的方法的源代码。您可以在 GitHub 上的完整示例中找到它。

基于 JsonPath 的断言可以方便地断言预期数据更改事件的属性,但当然您也可以使用任何其他 JSON API 来完成这项工作。当使用 Apache Avro 而不是 JSON 作为序列化格式时,您必须改用 Avro API。

包起来
Testcontainers 和 Debezium 对它的支持使得为 CDC 设置编写自动化集成测试变得相当容易。

本文讨论的测试方法可以通过多种方式进行扩展。例如,可能需要将连接器配置置于修订控制之下(以便您可以管理和跟踪任何配置更改)并使用这些配置文件驱动测试。您还可以更进一步并测试整个数据流管道。为此,您不仅必须部署 Debezium 连接器,还必须部署接收器连接器,例如用于数据仓库或搜索服务器的连接器。然后,您可以对这些接收器系统中的数据运行断言,确保数据管道端到端的正确性。

您对测试 CDC 设置和管道有何看法?请在下面的评论中告诉我们!

http://www.lryc.cn/news/288225.html

相关文章:

  • Leetcode131.分割回文串-Palindrome Patitioning-Python-回溯法
  • Java面试——基础篇
  • C++——结构体
  • C++技术要点总结, 面试必备, 收藏起来慢慢看
  • VR数字展厅,平面静态跨越到3D立体化时代
  • Linux中LVM实验
  • 外包干了一个月,技术退步明显。。。。。
  • gitlab.rb主要配置
  • 网络协议基础
  • Mac使用adb调试安卓手机
  • Web 开发 1: Flask 框架介绍和使用
  • Centos7.6之禅道开源版17.6.1安装记录
  • 有趣的代码(简单)
  • Java和Redis实现一个简单的热搜功能
  • 超越传统,想修哪里就修哪里,SUPIR如何通过文本提示实现智能图像修复
  • 《如何画好架构图》学习笔记
  • redis整合
  • 开循环低温样品架节约液氦操作技巧
  • 年薪30W+,待遇翻倍,我的经历值得每个测试人借鉴
  • DEB方式安装elastic search7以及使用
  • [Tomcat] [最全] 目录和文件详解
  • 微信小程序元素/文字在横向和纵向实现居中对齐、两端对齐、左右对齐、上下对齐
  • 兼容树莓派扩展模块,专注工业产品开发的瑞米派强势来袭
  • 云原生 - 微信小程序 COS 对象存储图片缓存强制更新解决方案
  • 设计公司设计ppt的优势—南京梵构广告
  • gitlab设置/修改克隆clone地址端口
  • Jellyfin影音服务本地部署并结合内网穿透实现公网访问本地资源
  • 笨蛋学设计模式行为型模式-责任链模式【18】
  • 【.NET Core】深入理解任务并行库 (TPL)
  • win10安装redis并配置加自启动(采用官方推荐unix子系统)