TiDB CDC Connector¶
The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to setup the TiDB CDC connector to run SQL queries against TiDB databases.
Dependencies¶
In order to setup the TiDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency¶
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-tidb-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.2.1</version>
</dependency>
SQL Client JAR¶
Download link is available only for stable releases.
Download flink-sql-connector-tidb-cdc-2.2.1.jar and put it under <FLINK_HOME>/lib/.
Note: flink-sql-connector-tidb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-tidb-cdc-2.2.1.jar, the released version will be available in the Maven central warehouse.
How to create a TiDB CDC table¶
The TiDB CDC table can be defined as following:
-- checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   
-- register a TiDB table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
     order_id INT,
     order_date TIMESTAMP(3),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'tidb-cdc',
     'tikv.grpc.timeout_in_ms' = '20000', 
     'pd-addresses' = 'localhost:2379',
     'database-name' = 'mydb',
     'table-name' = 'orders'
);
  
-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;
Connector Options¶
| Option | Required | Default | Type | Description | 
|---|---|---|---|---|
| connector | required | (none) | String | Specify what connector to use, here should be 'tidb-cdc'. | 
| database-name | required | (none) | String | Database name of the TiDB server to monitor. | 
| table-name | required | (none) | String | Table name of the TiDB database to monitor. | 
| scan.startup.mode | optional | initial | String | Optional startup mode for TiDB CDC consumer, valid enumerations are "initial" and "latest-offset". | 
| pd-addresses | required | (none) | String | TiKV cluster's PD address. | 
| tikv.grpc.timeout_in_ms | optional | (none) | Long | TiKV GRPC timeout in ms. | 
| tikv.grpc.scan_timeout_in_ms | optional | (none) | Long | TiKV GRPC scan timeout in ms. | 
| tikv.batch_get_concurrency | optional | 20 | Integer | TiKV GRPC batch get concurrency. | 
| tikv.* | optional | (none) | String | Pass-through TiDB client's properties. | 
Available Metadata¶
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
| Key | DataType | Description | 
|---|---|---|
| table_name | STRING NOT NULL | Name of the table that contain the row. | 
| database_name | STRING NOT NULL | Name of the database that contain the row. | 
| op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0. | 
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = 'localhost:2379',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);
Features¶
Exactly-Once Processing¶
The TiDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with exactly-once processing even failures happen.
Startup Reading Position¶
The config option scan.startup.mode specifies the startup mode for TiDB CDC consumer. The valid enumerations are:
- initial(default): Takes a snapshot of structure and data of captured tables; useful if you want fetch a complete representation of the data from the captured tables.
- latest_offset: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be fetched.
Multi Thread Reading¶
The TiDB CDC source can work in parallel reading, because there is multiple tasks can receive change events.
DataStream Source¶
The TiDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
DataStream Source¶
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.tidb.TDBSourceOptions;
import com.ververica.cdc.connectors.tidb.TiDBSource;
import com.ververica.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema;
import com.ververica.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Kvrpcpb;
import java.util.HashMap;
public class TiDBSourceExample {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> tidbSource =
            TiDBSource.<String>builder()
                .database("mydb") // set captured database
                .tableName("products") // set captured table
                .tiConf(
                    TDBSourceOptions.getTiConfiguration(
                        "localhost:2399", new HashMap<>()))
                .snapshotEventDeserializer(
                    new TiKVSnapshotEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Kvrpcpb.KvPair record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }
                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .changeEventDeserializer(
                    new TiKVChangeEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Cdcpb.Event.Row record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }
                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000);
        env.addSource(tidbSource).print().setParallelism(1);
        env.execute("Print TiDB Snapshot + Binlog");
    }
}