TiDB CDC Connector

The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to setup the TiDB CDC connector to run SQL queries against TiDB databases.

Dependencies

In order to setup the TiDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-tidb-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.4.0</version>
</dependency>

SQL Client JAR

Download link is available only for stable releases.

Download flink-sql-connector-tidb-cdc-2.4.0.jar and put it under <FLINK_HOME>/lib/.

Note: flink-sql-connector-tidb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-tidb-cdc-2.2.1.jar, the released version will be available in the Maven central warehouse.

How to create a TiDB CDC table

The TiDB CDC table can be defined as following:

-- checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

-- register a TiDB table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
     order_id INT,
     order_date TIMESTAMP(3),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'tidb-cdc',
     'tikv.grpc.timeout_in_ms' = '20000', 
     'pd-addresses' = 'localhost:2379',
     'database-name' = 'mydb',
     'table-name' = 'orders'
);
  
-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'tidb-cdc'.
database-name required (none) String Database name of the TiDB server to monitor.
table-name required (none) String Table name of the TiDB database to monitor.
scan.startup.mode optional initial String Optional startup mode for TiDB CDC consumer, valid enumerations are "initial" and "latest-offset".
pd-addresses required (none) String TiKV cluster's PD address.
tikv.grpc.timeout_in_ms optional (none) Long TiKV GRPC timeout in ms.
tikv.grpc.scan_timeout_in_ms optional (none) Long TiKV GRPC scan timeout in ms.
tikv.batch_get_concurrency optional 20 Integer TiKV GRPC batch get concurrency.
tikv.* optional (none) String Pass-through TiDB client's properties.

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
table_name STRING NOT NULL Name of the table that contain the row.
database_name STRING NOT NULL Name of the database that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the binlog, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = 'localhost:2379',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

Features

Exactly-Once Processing

The TiDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with exactly-once processing even failures happen.

Startup Reading Position

The config option scan.startup.mode specifies the startup mode for TiDB CDC consumer. The valid enumerations are:

  • initial (default): Takes a snapshot of structure and data of captured tables; useful if you want fetch a complete representation of the data from the captured tables.

  • latest-offset: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be fetched.

Multi Thread Reading

The TiDB CDC source can work in parallel reading, because there is multiple tasks can receive change events.

DataStream Source

The TiDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

DataStream Source

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import com.ververica.cdc.connectors.tidb.TDBSourceOptions;
import com.ververica.cdc.connectors.tidb.TiDBSource;
import com.ververica.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema;
import com.ververica.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Kvrpcpb;

import java.util.HashMap;

public class TiDBSourceExample {

    public static void main(String[] args) throws Exception {

        SourceFunction<String> tidbSource =
            TiDBSource.<String>builder()
                .database("mydb") // set captured database
                .tableName("products") // set captured table
                .tiConf(
                    TDBSourceOptions.getTiConfiguration(
                        "localhost:2399", new HashMap<>()))
                .snapshotEventDeserializer(
                    new TiKVSnapshotEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Kvrpcpb.KvPair record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .changeEventDeserializer(
                    new TiKVChangeEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Cdcpb.Event.Row record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);
        env.addSource(tidbSource).print().setParallelism(1);

        env.execute("Print TiDB Snapshot + Binlog");
    }
}

Data Type Mapping

TiDB type Flink SQL type NOTE
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
FLOAT
FLOAT
REAL
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <= 65
STRING The precision for DECIMAL data type is up to 65 in TiDB, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
DATE DATE
TIME [(p)] TIME [(p)]
TIMESTAMP [(p)] TIMESTAMP_LTZ [(p)]
DATETIME [(p)] TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES Currently, for BLOB data type in TiDB, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported.
YEAR INT
ENUM STRING
JSON STRING The JSON data type will be converted into STRING with JSON format in Flink.
SET ARRAY<STRING> As the SET data type in TiDB is a string object that can have zero or more values, it should always be mapped to an array of string