跳到主要内容

深入浅出 Debezium 数据同步实战

简述

Debezium 是一个开源的数据订阅工具,它可以捕获数据库的 CDC 变更数据发送到 Kafka

为了实现将数据发送到其他数据库的目的,我们可以将 Kafka 中的数据,通过多种 Sink Connector 同步到 MySQL、Oracle、PostgreSQL、Starrocks 等。

debezium_01

本文以 MySQL -> Kafka -> Starrocks 为例,来演示 Debezium 的数据同步能力,并探讨如何构建一条稳定、高效的数据同步链路。

Debezium 环境准备

  • 相关资源一键部署(Docker) debezium-test.tar.gz
    • Kafka 集群 + Kafka UI(中间件)
    • Debezium(同步工具)
    • MySQL(源端)
    • Starrocks(目标端)
tar -xzvf debezium-test.tar.gz
sh install.sh

创建 MySQL Source Connector

  • 源端是 MySQL,通过下面的表进行创建。
CREATE DATABASE `inventory`;

CREATE TABLE `inventory`.`customer` (
`c_int` int NOT NULL,
`c_bigint` bigint NOT NULL,
`c_decimal` decimal(10,3) NOT NULL,
`c_date` date NOT NULL,
`c_datetime` datetime NOT NULL,
`c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`c_year` int NOT NULL,
`c_varchar` varchar(10) NOT NULL,
`c_text` text NOT NULL,
PRIMARY KEY (`c_int`)
);
  • 通过 Debezium 的 Api 接口创建 Connector 订阅 MySQL 的变更事件。
curl -i -X POST http://127.0.0.1:7750/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "connector-test-mx",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "112.124.38.87",
"database.port": "25000",
"database.user": "root",
"database.password": "123456",
"database.server.id": "1",
"database.server.name": "mx",
"database.include.list": "inventory",
"decimal.handling.mode": "string",
"binary.handling.mode": "hex",
"topic.prefix": "mx",
"table.include.list": "inventory.customer",
"snapshot.mode": "never",
"database.history.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092",
"schema.history.internal.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092",
"schema.history.internal.kafka.topic": "mx.schemahistory.customer",
"database.history.kafka.topic": "mx.mx_history_schema",
"include.schema.changes": "false",
"converters": "mysqltime",
"mysqltime.type": "io.debezium.converter.MySQLTimeConverter",
"mysqltime.format.date": "yyyy-MM-dd",
"mysqltime.format.time": "HH:mm:ss",
"mysqltime.format.datetime": "yyyy-MM-dd HH:mm:ss",
"mysqltime.format.timestamp": "yyyy-MM-dd HH:mm:ss",
"mysqltime.format.timestamp.zone": "UTC+8"
}
}'

  • 创建后,查看 Connetor 的状态。
curl -s http://127.0.0.1:7750/connectors/connector-test-mx/status

创建 Sink Connector For Starrocks

  • 目标端是 Starrocks,通过下面的表进行创建。
CREATE DATABASE `inventory`;

CREATE TABLE `inventory`.`customer` (
`c_int` int NOT NULL,
`c_bigint` bigint NOT NULL,
`c_decimal` decimal(10,3) NOT NULL,
`c_date` date NOT NULL,
`c_datetime` datetime NOT NULL,
`c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`c_year` int NOT NULL,
`c_varchar` varchar(10) NOT NULL,
`c_text` text NOT NULL
) ENGINE=OLAP
PRIMARY KEY(`c_int`)
DISTRIBUTED BY HASH(`c_int`) BUCKETS 4
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);
  • 通过 Debezium 的 Api 接口创建 Sink Connector,将 Kafka 变更数据写入到 Starrocks。
curl -i -X POST http://127.0.0.1:7750/connectors \
-H 'Content-Type: application/json' \
-d '{
"name":"jdbc-sink-starrocks",
"config":{
"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://112.124.38.87:19030/inventory",
"connection.username": "root",
"connection.password": "123456",
"topics":"mx.inventory.customer",
"auto.create":"false",
"insert.mode": "insert",
"delete.enabled": "true",
"primary.key.mode":"record_key",
"primary.key.fields":"c_int",
"table.name.format": "inventory.customer"
}
}'
  • 查看连接器的状态
curl -s http://127.0.0.1:7750/connectors/jdbc-sink-mysql/status

数据同步测试

  • 将随机的增删改操作写入 MySQL 中,Debezium 会捕获 MySQL 的 CDC 变更数据并写入到 Kafka 中;Sink Connector 会将 Kafka 的数据写入到 Starrocks。 debezium_02 debezium_03

  • 进行数据对比测试,两边数据一致 debezium_04 debezium_04

  • 暂停 Debezium 的 Sink 任务

curl -i -X PUT  http://127.0.0.1:7750/connectors/jdbc-sink-starrocks/pause

总结

Debezium 整体数据同步使用下来还是比较流畅的,社区也支持比较多的插件,生态丰富。

我个人感觉比较适合开发来使用,并在 Debezium 的生态基础上接入到内部的业务系统中。

但是我感觉整个开发过程会比较重度,对 DBA、运维不友好,且有一些问题不得不面对:

  • 数据同步状态的把控(同步延迟、告警、监控)
  • 数据同步的高可用(容灾恢复)
  • 数据同步可视化(运维)
  • 数据一致性对比及修复(数据准确)

能处理好以上这些问题,整个数据同步才能有一个比较好的保障。

参考文档

Debezium:https://debezium.io/documentation/reference/1.3/connectors/mysql.html

Kafka:https://kafka.apache.org/

Confluent:https://docs.confluent.io/platform/current/connect/index.html

CloudCanal:https://www.clougence.com/cc-doc/quick/quick_start