Flink CDC in Detail


I. Introduction to CDC ?

What is CDC?

CDC stands for Change Data Capture. The core idea is to monitor and capture database changes (including insertion, update, and deletion of data or data tables, etc.), record these changes in the order in which they occurred, and write them to the message middleware for other services to subscribe and consume.

Types of CDC

CDC is mainly categorized into two ways: query-based and Binlog-based, and we mainly understand the difference between these two:

Query-based CDCBinlog-based CDC
Open Source ProductsSqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
implementation modeBatchStreaming
Whether all data changes can be capturednois
latencyhigh latencylow latency
Does it increase database pressureisno

About Flink-CDC

The Flink community has developed the flink-cdc-connectors component, which is a source component for reading full and incremental changes directly from databases such as MySQL and PostgreSQL.

It is also currently open source with an open source address:https://github.com/ververica/flink-cdc-connectors

Flink CDC case practice

1. DataStream application

A. Importing dependencies


B. Writing code

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

public class FlinkCDC {
	public static void main(String[] args) throws Exception {
		//1. Creating the execution environment
		StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
		/2.Flink-CDC will read the binlog location information in the form of state stored in the CK, if you want to do continuous transmission, you need to start the program from Checkpoint or Savepoint.
		//2.1 Start Checkpoint, do CK every 5 seconds.
		//2.2 Conformance Semantics for Specifying CKs
		//2.3 Retaining the last CK data when setting up a task closure
		//2.4 Specifying the Automatic Reboot from CK Policy
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
		//2.5 Setup status backend
		env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
		//2.6 Setting the Username for Accessing HDFS
		System.setProperty("HADOOP_USER_NAME", "atguigu");
		//Creating a Source for Flink-MySQL-CDC
		//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
		//latest-offset: Never to perform snapshot on the monitored database tables upon first  startup, just read from the end of the binlog which means only have the changes since the connector was started.
		//timestamp: Never to perform snapshot on the monitored database tables upon first  startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
		//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
	    DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
			.tableList("gmall-flink.z_user_info") //Optional configuration item, if you do not specify this parameter, it will read all the table data under the previous configuration, note: you need to use the "db.table" method when you specify.
			.deserializer(new StringDebeziumDeserializationSchema())
		//Reading Data from MySQL with CDC Source
		DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
		//5. Printing data
		//6. Implementation of mandates

C. Case testing

(1) Package and upload to Linux

(2) Enable MySQL Binlog and Restart MySQL

(3) Starting a Flink Cluster

[fancy@hadoop102 flink-standalone]$ bin/start-cluster.sh

(4) Starting an HDFS Cluster

[fancy@hadoop102 flink-standalone]$ start-dfs.sh

(5) Launching the program

[fancy@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(6) Adding, Modifying or Deleting Data in MySQL’s gmall-flink.z_user_info Table

(7) Create a Savepoint for the current Flink program.

[fancy@hadoop102 flink-standalone]$ bin/flink savepoint JobId 

(8) Restarting the program from Savepoint after closing the program

[fancy@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c 
com.fancy.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

2. Application of FlinkSQL

A. Code realization

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
	public static void main(String[] args) throws Exception {
		//1. Creating the execution environment
		StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
		//Creating a Source for Flink-MySQL-CDC
		tableEnv.executeSql("CREATE TABLE user_info (" +
			" id INT," +
			" name STRING," +
			" phone_num STRING" +
			") WITH (" +
			" 'connector' = 'mysql-cdc'," +
			" 'hostname' = 'hadoop102'," +
			" 'port' = '3306'," +
			" 'username' = 'root'," +
			" 'password' = '000000'," +
			" 'database-name' = 'gmall-flink'," +
			" 'table-name' = 'z_user_info'" +
		tableEnv.executeSql("select * from user_info").print();

3. Custom deserializers

A. Code realization

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
public class Flink_CDCWithCustomerSchema {
	public static void main(String[] args) throws Exception {
		//1. Creating the execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		//Creating a Source for Flink-MySQL-CDC
		DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
			.tableList("gmall-flink.z_user_info") //Optional configuration item, if you do not specify this parameter, it will read all the table data under the previous configuration, note: you need to specify the use of "db.table" way.
				new DebeziumDeserializationSchema<String>() { // custom data parser
					public void deserialize(SourceRecord sourceRecord, Collector<String>collector) throws Exception {
						// Get subject information, including database and table names mysql_binlog_source.gmall-flink.z_user_info
						String topic = sourceRecord.topic();
						String[] arr = topic.split("\\.");
						String db = arr[1];
						String tableName = arr[2];
						//Get Operation Type READ DELETE UPDATE CREATE
						Envelope.Operation operation =	Envelope.operationFor(sourceRecord);
						// Get value information and convert to Struct type.
						Struct value = (Struct) sourceRecord.value();
						// Get the changed data
						Struct after = value.getStruct("after");
						// Create JSON objects to store data information.
						JSONObject data = new JSONObject();
						for (Field field : after.schema().fields()) {
							Object o = after.get(field);
							data.put(field.name(), o);
						// Create a JSON object to encapsulate the final return value data information.
						JSONObject result = new JSONObject();
						result.put("operation", operation.toString().toLowerCase());
						result.put("data", data);
						result.put("database", db);
						result.put("table", tableName);
						//Send data downstream
				public TypeInformation<String> getProducedType() {
					return TypeInformation.of(String.class);
		//Read Data from MySQL using CDC Source
		DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
		//4. Printing data
		//5. Implementation of mandates

Three, Flink-CDC 2.0

1.1.x Pain points

Flink CDC in Detail

2. Design Objectives

Flink CDC in Detail

3. Design realization

A. Overall overview

In the initialization mode for tables with primary keys, the overall process is divided into five stages:

1. Chunk cut;
2. Chunk allocation; (realizing parallel reading of data & CheckPoint)
3. Chunk reads; (to achieve lock-free reads)
4. Chunk reporting;
5. Chunk allocation.

Flink CDC in Detail
B. Chunk Slicing

Flink CDC in Detail
According to the lock-free algorithm in the Netflix DBlog paper, the target table is sliced according to the primary key, and the interval of each slice is set to be either left-closed-right-open or left-open-right-closed to ensure data continuity.

C. Chunk allocation

Flink CDC in Detail
The divided Chunk is distributed to multiple SourceReaders, and each SourceReader reads a portion of the data in the table, realizing the goal of parallel reading.

At the same time in each Chunk read can be done separately CheckPoint, a Chunk read failed only need to execute the task of the Chunk alone, do not need to fail as in 1.x can only read from the beginning.

If each SourceReader guarantees data consistency, the entire table is guaranteed data consistency.

D. Chunk Read

Flink CDC in Detail
Reading can be divided into 5 stages

(1) Before SourceReader reads the table data, it records the current Binlog position information as the low point;
2) SourceReader queries the data in its own interval and places it in a buffer;
3) After the query is completed, record the current Binlog location information as the high point;
4) Consume the Binlog from the low point to the high point in the incremental part;
5) Based on the primary key, the data in the buffer is corrected and output.

The above five stages can ensure that the final output of each Chunk is the latest data in the Chunk at the high point, but at present it is only done to ensure the consistency of the data in a single Chunk.

E. Chunk reporting

Flink CDC in Detail
After the Snapshot Chunk is read, there is a reporting process, as shown in the above figure, i.e. the SourceReader needs to report the Snapshot Chunk completion information to the SourceEnumerator.

F. Chunk allocation

Flink CDC in Detail

FlinkCDC supports full + incremental data synchronization, after the SourceEnumerator receives all Snapshot Chunk completion information, there is still a task to consume incremental data (Binlog), at this time, it is through the Binlog Chunk sent to any SourceReader for a single concurrent read to This is achieved by sending a Binlog Chunk to any SourceReader for a single concurrent read.

IV. Analysis of core principles

A. Start reading the location source code in Binlog Chunk.


private MySqlBinlogSplit createBinlogSplit() {
	final List<MySqlSnapshotSplit> assignedSnapshotSplit = snapshotSplitAssigner.getAssignedSplits().values().stream()
	Map<String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();
	final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
	final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
	BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;
	for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
		// find the min binlog offset
		BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
		if (binlogOffset.compareTo(minBinlogOffset) < 0) {
			minBinlogOffset = binlogOffset;
		new FinishedSnapshotSplitInfo(
	final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
	return new MySqlBinlogSplit(

B. Read Binlog between low and high points


* Returns the record should emit or not.
* <p>The watermark signal algorithm is the binlog split reader only sends thebinlog event that
* belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
* since the offset is after its high watermark.
* <pre> E.g: the data input is :
* snapshot-split-0 info : [0, 1024) highWatermark0
* snapshot-split-1 info : [1024, 2048) highWatermark1
* the data output is:
* only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
* only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
* </pre>
private boolean shouldEmit(SourceRecord sourceRecord) {
	if (isDataChangeRecord(sourceRecord)) {
		TableId tableId = getTableId(sourceRecord);
		BinlogOffset position = getBinlogPosition(sourceRecord);
		// aligned, all snapshot splits of the table has reached max highWatermark
		if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
			return true;
		Object[] key =
		for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
			if (RecordUtils.splitKeyRangeContains(key, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAtOrBefore(splitInfo.getHighWatermark())) {
				return true;
		// not in the monitored splits scope, do not emit
		return false;
	// always send the schema change event and signal event
	// we need record them to state of Flink
	return true;

