1. CDC 概述

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:

  • 数据同步:用于备份,容灾;
  • 数据分发:一个数据源分发给多个下游系统;
  • 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。

CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:

  • 基于查询的 CDC:
    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
    • 不保障实时性,基于离线调度存在天然的延迟。
  • 基于日志的 CDC:
    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

常见开源 CDC 方案对比:

  • 对比增量同步能力,
    • 基于日志的方式,可以很好的做到增量同步;
    • 而基于查询的方式是很难做到增量同步的。
  • 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
  • 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
  • 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。
  • 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?
    • 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;
    • 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
  • 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。

2.1. 简介

Flink CDC (CDC,全称是 Change Data Capture),是基于Apache Flink 生态的数据源连接器,目的是为了用于监控和捕获数据变更,以便下游针对变更进行相应处理。其底层封装了 Debezium ,Debezium 支持全量同步,也支持增量同步,也支持全量+增量的同步,非常灵活,同时基于日志的 CDC 技术使得 Exactly—Once 成为可能。

Debezium 同步一张表分为两个阶段:

  • 全量阶段:查询当前表中的所有记录
  • 增量阶段:从 binlog 消费变更数据
Flink CDC 1.xFlink CDC 2.xFlink CDC 3.0
一致性通过加锁保证(默认是全局锁):加锁是发生在全量阶段,目的是为了确定全量阶段的初始位点,保证增量 + 全量实现一条不多,一条不少,从而保证数据一致性。全局锁可能导致数据库hang住,表级锁会锁住表的读,DBA 一般不给表权限全程无锁,不对线上业务产生锁的风险从捕获数据变更的 Flink 数据源正式迈向为以 Flink 为基础的端到端流式 ELT 数据集成框架(CDC Streaming ELT Framework)。在该版本中,社区首先支持实时同步 MySQL 数据至 Apache Doris 和 StarRocks 两条链路,未来会提供更多开箱即用的能力
不支持水平扩展:1.x版本只支持单并发,在全量读取阶段,如果表非常大(亿级别),读取时间都在小时级别全量数据的读取可以水平扩展
全量阶段不支持 checkpoint:失败后要重新读取断点续传,支持全量阶段的 checkpoint(chunk粒度)

加锁可能会造成的后果:

关于 Flink CDC 2.0 的实现原理,会在后续的文章中再做介绍。

3.1. CDC 版本

3.2. MySQL binlog 设置

  • 确保 MySQL binlog 已开启:
1
show variables like '%log_bin%';
  • 确保 MySQL binlog 格式为 ROW:
1
show variables like '%binlog_format%';

3.3. Demo

环境:

MySQL:8.1.0

Flink:1.17.0

flink-connector-mysql-cdc:2.4.2

Java:1.8

Scala:2.12.8

DataStream API:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
... // 创建环境、设置 checkpoint

Properties properties = new Properties();
properties.setProperty("snapshot.locking.mode", "none");

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("cdc_test") // set captured database
                .tableList("cdc_test.tableA") // set captured table
                .username("root")
                .password("")
                .serverId("5400-6000") // 设置为区间才能并发运行
                .serverTimeZone("Asia/Shanghai")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
//                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

SQL:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
... // 创建执行环境、设置 checkpoint

// 创建Table环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance().inStreamingMode().build());

String mysqlSourceDDL = "create table mysql_source(\n" +
                "id String,\n" +
                "name String,\n" +
                "age Int,\n" +
                "PRIMARY KEY (id) NOT ENFORCED \n" +
                ") WITH (\n" +
                "'connector' = 'mysql-cdc', \n" +
                "'hostname' = 'localhost',\n" +
                "'port' = '3306',\n" +
                "'username' = 'root',\n" +
                "'password' = '', \n" +
                "'server-id' = '5401-6000', \n" +
                "'database-name' = 'cdc_test',\n" +
                "'table-name' = 'tableA',\n" +
                "'server-time-zone' = 'Asia/Shanghai',\n" +
                "'scan.startup.mode' = 'initial', \n" +
//                "'scan.startup.mode' = 'latest-offset', \n" +
                "'debezium.snapshot.locking.mode'='none'\n" +
                ")"
                ;
System.out.println(mysqlSourceDDL);
tEnv.executeSql(mysqlSourceDDL);
tEnv.executeSql("select  * from mysql_source").print();

Print data: