Oracle CDC Connector

The Oracle CDC connector allows for reading snapshot data and incremental data from Oracle database. This document describes how to setup the Oracle CDC connector to run SQL queries against Oracle databases.

Dependencies

In order to setup the Oracle CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-oracle-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself. -->
  <version>3.0-SNAPSHOT</version>
</dependency>

SQL Client JAR

Download link is available only for stable releases.

Download flink-sql-connector-oracle-cdc-3.0-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/.

Note: flink-sql-connector-oracle-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-oracle-cdc-2.3.0.jar, the released version will be available in the Maven central warehouse.

Setup Oracle

You have to enable log archiving for Oracle database and define an Oracle user with appropriate permissions on all databases that the Debezium Oracle connector monitors.

For Non-CDB database

  1. Enable log archiving

    (1.1). Connect to the database as DBA

    ORACLE_SID=SID
    export ORACLE_SID
    sqlplus /nolog
      CONNECT sys/password AS SYSDBA
    

    (1.2). Enable log archiving

    alter system set db_recovery_file_dest_size = 10G;
    alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;
    

    Note:

    • Enable log archiving requires database restart, pay attention when try to do it

    • The archived logs will occupy a large amount of disk space, so consider clean the expired logs the periodically

    (1.3). Check whether log archiving is enabled

    -- Should now "Database log mode: Archive Mode"
    archive log list;
    

    Note:

    Supplemental logging must be enabled for captured tables or the database in order for data changes to capture the before state of changed database rows. The following illustrates how to configure this on the table/database level.

    -- Enable supplemental logging for a specific table:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    
    -- Enable supplemental logging for database
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    
  2. Create an Oracle user with permissions

    (2.1). Create Tablespace

    sqlplus sys/password@host:port/SID AS SYSDBA;
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit;
    

    (2.2). Create a user and grant permissions

    sqlplus sys/password@host:port/SID AS SYSDBA;
      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
      GRANT CREATE SESSION TO flinkuser;
      GRANT SET CONTAINER TO flinkuser;
      GRANT SELECT ON V_$DATABASE to flinkuser;
      GRANT FLASHBACK ANY TABLE TO flinkuser;
      GRANT SELECT ANY TABLE TO flinkuser;
      GRANT SELECT_CATALOG_ROLE TO flinkuser;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
      GRANT SELECT ANY TRANSACTION TO flinkuser;
      GRANT LOGMINING TO flinkuser;
      GRANT ANALYZE ANY TO flinkuser;
    
      GRANT CREATE TABLE TO flinkuser;
      -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
      GRANT LOCK ANY TABLE TO flinkuser;
      GRANT ALTER ANY TABLE TO flinkuser;
      GRANT CREATE SEQUENCE TO flinkuser;
    
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    
      GRANT SELECT ON V_$LOG TO flinkuser;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
      GRANT SELECT ON V_$LOGFILE TO flinkuser;
      GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
      exit;
    

For CDB database

Overall, the steps for configuring CDB database is quite similar to non-CDB database, but the commands may be different.

  1. Enable log archiving

    ORACLE_SID=ORCLCDB
    export ORACLE_SID
    sqlplus /nolog
      CONNECT sys/password AS SYSDBA
      alter system set db_recovery_file_dest_size = 10G;
      -- should exist
      alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
      shutdown immediate
      startup mount
      alter database archivelog;
      alter database open;
      -- Should show "Database log mode: Archive Mode"
      archive log list
      exit;
    

    Note: You can also use the following commands to enable supplemental logging:

    -- Enable supplemental logging for a specific table:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- Enable supplemental logging for database
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    
  2. Create an Oracle user with permissions

    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit
    
    sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit
    
    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
      GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
      GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
      GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
      GRANT LOGMINING TO flinkuser CONTAINER=ALL;
      GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
      -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
      GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
    
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
      exit
    

See more about the Setting up Oracle

How to create an Oracle CDC table

The Oracle CDC table can be defined as following:

-- register an Oracle table 'products' in Flink SQL
Flink SQL> CREATE TABLE products (
     ID INT NOT NULL,
     NAME STRING,
     DESCRIPTION STRING,
     WEIGHT DECIMAL(10, 3),
     PRIMARY KEY(id) NOT ENFORCED
     ) WITH (
     'connector' = 'oracle-cdc',
     'hostname' = 'localhost',
     'port' = '1521',
     'username' = 'flinkuser',
     'password' = 'flinkpw',
     'database-name' = 'ORCLCDB',
     'schema-name' = 'inventory',
     'table-name' = 'products');
  
-- read snapshot and redo logs from products table
Flink SQL> SELECT * FROM products;

Note: When working with the CDB + PDB model, you are expected to add an extra option 'debezium.database.pdb.name' = 'xxx' in Flink DDL to specific the name of the PDB to connect to.

Note: While the connector might work with a variety of Oracle versions and editions, only Oracle 9i, 10g, 11g and 12c have been tested.

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'oracle-cdc'.
hostname optional (none) String IP address or hostname of the Oracle database server. If the url is not empty, hostname may not be configured, otherwise hostname can not be empty
username required (none) String Name of the Oracle database to use when connecting to the Oracle database server.
password required (none) String Password to use when connecting to the Oracle database server.
database-name required (none) String Database name of the Oracle server to monitor.
schema-name required (none) String Schema name of the Oracle database to monitor.
table-name required (none) String Table name of the Oracle database to monitor.
port optional 1521 Integer Integer port number of the Oracle database server.
url optional jdbc:oracle:thin:@{hostname}:{port}:{database-name} String JdbcUrl of the oracle database server . If the hostname and port parameter is configured, the URL is concatenated by hostname port database-name in SID format by default. Otherwise, you need to configure the URL parameter
scan.startup.mode optional initial String Optional startup mode for Oracle CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Position section for more detailed information.
scan.incremental.snapshot.enabled optional true Boolean Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including: (1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire ROW SHARE MODE lock before snapshot reading.
scan.incremental.snapshot.chunk.size optional 8096 Integer The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.
scan.snapshot.fetch.size optional 1024 Integer The maximum fetch size for per poll when read table snapshot.
connect.max-retries optional 3 Integer The max retry times that the connector should retry to build Oracle database server connection.
connection.pool.size optional 20 Integer The connection pool size.
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Oracle server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's Oracle Connector properties
scan.incremental.close-idle-reader.enabled optional false Boolean Whether to close idle readers at the end of the snapshot phase.
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
scan.incremental.snapshot.chunk.key-column optional (none) String The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. By default, the chunk key is 'ROWID'. This column must be a column of the primary key.

Limitation

Can’t perform checkpoint during scanning snapshot of tables

During scanning snapshot of database tables, since there is no recoverable position, we can’t perform checkpoints. In order to not perform checkpoints, Oracle CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
table_name STRING NOT NULL Name of the table that contain the row.
schema_name STRING NOT NULL Name of the schema that contain the row.
database_name STRING NOT NULL Name of the database that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    schema_name STRING METADATA FROM 'schema_name' VIRTUAL, 
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    ID INT NOT NULL,
    NAME STRING,
    DESCRIPTION STRING,
    WEIGHT DECIMAL(10, 3),
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = 'localhost',
    'port' = '1521',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database-name' = 'ORCLCDB',
    'schema-name' = 'inventory',
    'table-name' = 'products',
    'debezium.log.mining.strategy' = 'online_catalog',
    'debezium.log.mining.continuous.mine' = 'true'
);

Note : The Oracle dialect is case-sensitive, it converts field name to uppercase if the field name is not quoted, Flink SQL doesn’t convert the field name. Thus for physical columns from oracle database, we should use its converted field name in Oracle when define an oracle-cdc table in Flink SQL.

Features

Exactly-Once Processing

The Oracle CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with exactly-once processing even failures happen. Please read How the connector works.

Startup Reading Position

The config option scan.startup.mode specifies the startup mode for Oracle CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest redo log.

  • latest-offset: Never to perform a snapshot on the monitored database tables upon first startup, just read from the change since the connector was started.

Note: the mechanism of scan.startup.mode option relying on Debezium’s snapshot.mode configuration. So please do not use them together. If you specific both scan.startup.mode and debezium.snapshot.mode options in the table DDL, it may make scan.startup.mode doesn’t work.

Single Thread Reading

The Oracle CDC source can’t work in parallel reading, because there is only one task can receive change events.

DataStream Source

The Oracle CDC connector can also be a DataStream source. There are two modes for the DataStream source:

  • incremental snapshot based, which allows parallel reading

  • SourceFunction based, which only supports single thread reading

Incremental Snapshot based DataStream (Experimental)

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class OracleParallelSourceExample {

    public static void main(String[] args) throws Exception {
        Properties debeziumProperties = new Properties();
        debeziumProperties.setProperty("log.mining.strategy", "online_catalog");

        JdbcIncrementalSource<String> oracleChangeEventSource =
                new OracleSourceBuilder()
                        .hostname("host")
                        .port(1521)
                        .databaseList("ORCLCDB")
                        .schemaList("DEBEZIUM")
                        .tableList("DEBEZIUM.PRODUCTS")
                        .username("username")
                        .password("password")
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .includeSchemaChanges(true) // output the schema changes as well
                        .startupOptions(StartupOptions.initial())
                        .debeziumProperties(debeziumProperties)
                        .splitSize(2)
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000L);
        // set the source parallelism to 4
        env.fromSource(
                        oracleChangeEventSource,
                        WatermarkStrategy.noWatermarks(),
                        "OracleParallelSource")
                .setParallelism(4)
                .print()
                .setParallelism(1);
        env.execute("Print Oracle Snapshot + RedoLog");
    }
}

SourceFunction-based DataStream

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.oracle.OracleSource;

public class OracleSourceExample {
  public static void main(String[] args) throws Exception {
     SourceFunction<String> sourceFunction = OracleSource.<String>builder()
             .url("jdbc:oracle:thin:@{hostname}:{port}:{database}")
             .port(1521)
             .database("ORCLCDB") // monitor XE database
             .schemaList("inventory") // monitor inventory schema
             .tableList("inventory.products") // monitor products table
             .username("flinkuser")
             .password("flinkpw")
             .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
             .build();

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

     env
        .addSource(sourceFunction)
        .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering   
     
     env.execute();
  }
}

Note: Please refer Deserialization for more details about the JSON deserialization.

Data Type Mapping

Oracle type Flink SQL type
NUMBER(p, s <= 0), p - s < 3 TINYINT
NUMBER(p, s <= 0), p - s < 5 SMALLINT
NUMBER(p, s <= 0), p - s < 10 INT
NUMBER(p, s <= 0), p - s < 19 BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0) DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38 STRING
FLOAT
BINARY_FLOAT
FLOAT
DOUBLE PRECISION
BINARY_DOUBLE
DOUBLE
NUMBER(1) BOOLEAN
DATE
TIMESTAMP [(p)]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] WITH TIME ZONE TIMESTAMP [(p)] WITH TIME ZONE
TIMESTAMP [(p)] WITH LOCAL TIME ZONE TIMESTAMP_LTZ [(p)]
CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
SYS.XMLTYPE
STRING
BLOB
ROWID
BYTES
INTERVAL DAY TO SECOND
INTERVAL YEAR TO MONTH
BIGINT

FAQ