Postgres CDC Connector

The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases.

Dependencies

In order to setup the Postgres CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-postgres-cdc</artifactId>
  <version>2.0.2</version>
</dependency>

SQL Client JAR

Download flink-sql-connector-postgres-cdc-2.0.2.jar and put it under <FLINK_HOME>/lib/.

How to create a Postgres CDC table

The Postgres CDC table can be defined as following:

-- register a PostgreSQL table 'shipments' in Flink SQL
CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments'
);

-- read snapshot and binlogs from shipments table
SELECT * FROM shipments;

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'postgres-cdc'.
hostname required (none) String IP address or hostname of the PostgreSQL database server.
username required (none) String Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
password required (none) String Password to use when connecting to the PostgreSQL database server.
database-name required (none) String Database name of the PostgreSQL server to monitor.
schema-name required (none) String Schema name of the PostgreSQL database to monitor.
table-name required (none) String Table name of the PostgreSQL database to monitor.
port optional 5432 Integer Integer port number of the PostgreSQL database server.
decoding.plugin.name optional decoderbufs String The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
slot.name optional flink String The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring.
Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Postgres server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's Postgres Connector properties

Note: slot.name is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot "flink" is active for PID 974 error. See more here.

Features

Exactly-Once Processing

The Postgres CDC connector is a Flink Source connector which will read database snapshot first and then continues to read binlogs with exactly-once processing even failures happen. Please read How the connector works.

Single Thread Reading

The Postgres CDC source can’t work in parallel reading, because there is only one task can receive binlog events.

DataStream Source

The Postgres CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;

public class PostgreSQLSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
      .hostname("localhost")
      .port(5432)
      .database("postgres") // monitor postgres database
      .schemaList("inventory")  // monitor inventory schema
      .tableList("inventory.products") // monitor products table
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

Data Type Mapping

PostgreSQL type Flink SQL type
TINYINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INTEGER
SERIAL
INT
BIGINT
BIGSERIAL
BIGINT
DECIMAL(20, 0)
BIGINT BIGINT
REAL
FLOAT4
FLOAT
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BYTEA BYTES