Vitess CDC Connector

The Vitess CDC connector allows for reading of incremental data from Vitess cluster. The connector does not support snapshot feature at the moment. This document describes how to setup the Vitess CDC connector to run SQL queries against Vitess databases. Vitess debezium documentation

Dependencies

In order to setup the Vitess CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-vitess-cdc</artifactId>
  <version>3.0-SNAPSHOT</version>
</dependency>

SQL Client JAR

Download flink-sql-connector-vitess-cdc-3.0-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/.

Setup Vitess server

You can follow the Local Install via Docker guide, or the Vitess Operator for Kubernetes guide to install Vitess. No special setup is needed to support Vitess connector.

Checklist

  • Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed

gRPC authentication

Because Vitess connector reads change events from the VTGate VStream gRPC server, it does not need to connect directly to MySQL instances. Therefore, no special database user and permissions are needed. At the moment, Vitess connector only supports unauthenticated access to the VTGate gRPC server.

How to create a Vitess CDC table

The Vitess CDC table can be defined as following:

-- checkpoint every 3000 milliseconds
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

-- register a Vitess table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'vitess-cdc',
     'hostname' = 'localhost',
     'port' = '3306',
     'keyspace' = 'mydb',
     'table-name' = 'orders');

-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be ‘vitess-cdc’.
hostname required (none) String IP address or hostname of the Vitess database server (VTGate).
keyspace required (none) String The name of the keyspace from which to stream the changes.
username optional (none) String An optional username of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.
password optional (none) String An optional password of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.
shard optional (none) String An optional name of the shard from which to stream the changes. If not configured, in case of unsharded keyspace, the connector streams changes from the only shard, in case of sharded keyspace, the connector streams changes from all shards in the keyspace.
gtid optional current String An optional GTID position for a shard to stream from.
stopOnReshard optional false Boolean Controls Vitess flag stop_on_reshard.
tombstonesOnDelete optional true Boolean Controls whether a delete event is followed by a tombstone event.
tombstonesOnDelete optional true Boolean Controls whether a delete event is followed by a tombstone event.
schemaNameAdjustmentMode optional avro String Specifies how schema names should be adjusted for compatibility with the message converter used by the connector.
table-name required (none) String Table name of the MySQL database to monitor.
tablet.type optional RDONLY String The type of Tablet (hence MySQL) from which to stream the changes: MASTER represents streaming from the master MySQL instance REPLICA represents streaming from the replica slave MySQL instance RDONLY represents streaming from the read-only slave MySQL instance.

Features

Incremental Reading

The Vitess connector spends all its time streaming changes from the VTGate’s VStream gRPC service to which it is subscribed. The client receives changes from VStream as they are committed in the underlying MySQL server’s binlog at certain positions, which are referred to as VGTID.

The VGTID in Vitess is the equivalent of GTID in MySQL, it describes the position in the VStream in which a change event happens. Typically, A VGTID has multiple shard GTIDs, each shard GTID is a tuple of (Keyspace, Shard, GTID), which describes the GTID position of a given shard.

When subscribing to a VStream service, the connector needs to provide a VGTID and a Tablet Type (e.g. MASTER, REPLICA). The VGTID describes the position from which VStream should starts sending change events; the Tablet type describes which underlying MySQL instance (master or replica) in each shard do we read change events from.

The first time the connector connects to a Vitess cluster, it gets and provides the current VGTID to VStream.

The Debezium Vitess connector acts as a gRPC client of VStream. When the connector receives changes it transforms the events into Debezium create, update, or delete events that include the VGTID of the event. The Vitess connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.

Checkpoint

Incremental snapshot reading provides the ability to perform checkpoint in chunk level. It resolves the checkpoint timeout problem in previous version with old snapshot reading mechanism.

Exactly-Once Processing

The Vitess CDC connector is a Flink Source connector which will read table snapshot chunks first and then continues to read binlog, both snapshot phase and binlog phase, Vitess CDC connector read with exactly-once processing even failures happen.

DataStream Source

The Incremental Reading feature of Vitess CDC Source only exposes in SQL currently, if you’re using DataStream, please use Vitess Source:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.vitess.VitessSource;

public class VitessSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = VitessSource.<String>builder()
      .hostname("localhost")
      .port(15991)
      .keyspace("inventory")
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

Note: Please refer Deserialization for more details about the JSON deserialization.

Data Type Mapping

MySQL type Flink SQL type
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEAN
CHAR(n)
VARCHAR(n)
TEXT
STRING