catalogs
I. Introduction to CDC ?
What is CDC?
CDC stands for Change Data Capture. The core idea is to monitor and capture database changes (including insertion, update, and deletion of data or data tables, etc.), record these changes in the order in which they occurred, and write them to the message middleware for other services to subscribe and consume.
Types of CDC
CDC is mainly categorized into two ways: query-based and Binlog-based, and we mainly understand the difference between these two:
Query-based CDC | Binlog-based CDC | |
---|---|---|
Open Source Products | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
implementation mode | Batch | Streaming |
Whether all data changes can be captured | no | is |
latency | high latency | low latency |
Does it increase database pressure | is | no |
About Flink-CDC
The Flink community has developed the flink-cdc-connectors component, which is a source component for reading full and incremental changes directly from databases such as MySQL and PostgreSQL.
It is also currently open source with an open source address:https://github.com/ververica/flink-cdc-connectors
Flink CDC case practice
1. DataStream application
A. Importing dependencies
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
B. Writing code
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1. Creating the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/2.Flink-CDC will read the binlog location information in the form of state stored in the CK, if you want to do continuous transmission, you need to start the program from Checkpoint or Savepoint.
//2.1 Start Checkpoint, do CK every 5 seconds.
env.enableCheckpointing(5000L);
//2.2 Conformance Semantics for Specifying CKs
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 Retaining the last CK data when setting up a task closure
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 Specifying the Automatic Reboot from CK Policy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 Setup status backend
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
//2.6 Setting the Username for Accessing HDFS
System.setProperty("HADOOP_USER_NAME", "atguigu");
//Creating a Source for Flink-MySQL-CDC
//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-flink")
.tableList("gmall-flink.z_user_info") //Optional configuration item, if you do not specify this parameter, it will read all the table data under the previous configuration, note: you need to use the "db.table" method when you specify.
.startupOptions(StartupOptions.initial())
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//Reading Data from MySQL with CDC Source
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//5. Printing data
mysqlDS.print();
//6. Implementation of mandates
env.execute();
}
}
C. Case testing
(1) Package and upload to Linux
(2) Enable MySQL Binlog and Restart MySQL
(3) Starting a Flink Cluster
[fancy@hadoop102 flink-standalone]$ bin/start-cluster.sh
(4) Starting an HDFS Cluster
[fancy@hadoop102 flink-standalone]$ start-dfs.sh
(5) Launching the program
[fancy@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(6) Adding, Modifying or Deleting Data in MySQL’s gmall-flink.z_user_info Table
(7) Create a Savepoint for the current Flink program.
[fancy@hadoop102 flink-standalone]$ bin/flink savepoint JobId
hdfs://hadoop102:8020/flink/save
(8) Restarting the program from Savepoint after closing the program
[fancy@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c
com.fancy.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2. Application of FlinkSQL
A. Code realization
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
public static void main(String[] args) throws Exception {
//1. Creating the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//Creating a Source for Flink-MySQL-CDC
tableEnv.executeSql("CREATE TABLE user_info (" +
" id INT," +
" name STRING," +
" phone_num STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'hadoop102'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '000000'," +
" 'database-name' = 'gmall-flink'," +
" 'table-name' = 'z_user_info'" +
")"
);
tableEnv.executeSql("select * from user_info").print();
env.execute();
}
}
3. Custom deserializers
A. Code realization
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception {
//1. Creating the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//Creating a Source for Flink-MySQL-CDC
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-flink")
.tableList("gmall-flink.z_user_info") //Optional configuration item, if you do not specify this parameter, it will read all the table data under the previous configuration, note: you need to specify the use of "db.table" way.
.startupOptions(StartupOptions.initial())
.deserializer(
new DebeziumDeserializationSchema<String>() { // custom data parser
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String>collector) throws Exception {
// Get subject information, including database and table names mysql_binlog_source.gmall-flink.z_user_info
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
//Get Operation Type READ DELETE UPDATE CREATE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
// Get value information and convert to Struct type.
Struct value = (Struct) sourceRecord.value();
// Get the changed data
Struct after = value.getStruct("after");
// Create JSON objects to store data information.
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
}
// Create a JSON object to encapsulate the final return value data information.
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
//Send data downstream
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}).build();
//Read Data from MySQL using CDC Source
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//4. Printing data
mysqlDS.print();
//5. Implementation of mandates
env.execute();
}
}
Three, Flink-CDC 2.0
1.1.x Pain points
2. Design Objectives
3. Design realization
A. Overall overview
In the initialization mode for tables with primary keys, the overall process is divided into five stages:
1. Chunk cut;
2. Chunk allocation; (realizing parallel reading of data & CheckPoint)
3. Chunk reads; (to achieve lock-free reads)
4. Chunk reporting;
5. Chunk allocation.
B. Chunk Slicing
According to the lock-free algorithm in the Netflix DBlog paper, the target table is sliced according to the primary key, and the interval of each slice is set to be either left-closed-right-open or left-open-right-closed to ensure data continuity.
C. Chunk allocation
The divided Chunk is distributed to multiple SourceReaders, and each SourceReader reads a portion of the data in the table, realizing the goal of parallel reading.
At the same time in each Chunk read can be done separately CheckPoint, a Chunk read failed only need to execute the task of the Chunk alone, do not need to fail as in 1.x can only read from the beginning.
If each SourceReader guarantees data consistency, the entire table is guaranteed data consistency.
D. Chunk Read
Reading can be divided into 5 stages
(1) Before SourceReader reads the table data, it records the current Binlog position information as the low point;
2) SourceReader queries the data in its own interval and places it in a buffer;
3) After the query is completed, record the current Binlog location information as the high point;
4) Consume the Binlog from the low point to the high point in the incremental part;
5) Based on the primary key, the data in the buffer is corrected and output.
The above five stages can ensure that the final output of each Chunk is the latest data in the Chunk at the high point, but at present it is only done to ensure the consistency of the data in a single Chunk.
E. Chunk reporting
After the Snapshot Chunk is read, there is a reporting process, as shown in the above figure, i.e. the SourceReader needs to report the Snapshot Chunk completion information to the SourceEnumerator.
F. Chunk allocation
FlinkCDC supports full + incremental data synchronization, after the SourceEnumerator receives all Snapshot Chunk completion information, there is still a task to consume incremental data (Binlog), at this time, it is through the Binlog Chunk sent to any SourceReader for a single concurrent read to This is achieved by sending a Binlog Chunk to any SourceReader for a single concurrent read.
IV. Analysis of core principles
A. Start reading the location source code in Binlog Chunk.
MySqlHybridSplitAssigner
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSnapshotSplit> assignedSnapshotSplit = snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
Map<String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (binlogOffset.compareTo(minBinlogOffset) < 0) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset)
);
tableSchemas.putAll(split.getTableSchemas());
}
final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
lastSnapshotSplit.getSplitKeyType(),
minBinlogOffset,
BinlogOffset.NO_STOPPING_OFFSET,
finishedSnapshotSplitInfos,
tableSchemas
);
}
B. Read Binlog between low and high points
BinlogSplitReader
/**
* Returns the record should emit or not.
*
* <p>The watermark signal algorithm is the binlog split reader only sends thebinlog event that
* belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
* since the offset is after its high watermark.
*
* <pre> E.g: the data input is :
* snapshot-split-0 info : [0, 1024) highWatermark0
* snapshot-split-1 info : [1024, 2048) highWatermark1
* the data output is:
* only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
* only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
* </pre>
*/
private boolean shouldEmit(SourceRecord sourceRecord) {
if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
// aligned, all snapshot splits of the table has reached max highWatermark
if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true;
}
Object[] key =
getSplitKey(
currentBinlogSplit.getSplitKeyType(),
sourceRecord,
statefulTaskContext.getSchemaNameAdjuster()
);
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(key, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAtOrBefore(splitInfo.getHighWatermark())) {
return true;
}
}
// not in the monitored splits scope, do not emit
return false;
}
// always send the schema change event and signal event
// we need record them to state of Flink
return true;
}