Changelog JSON Format

Flink supports to emit changelogs in JSON format and interpret the output back again.

Dependencies

In order to setup the Changelog JSON format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <!-- the dependency is available only for stable releases. -->
  <version>2.1.1</version>
</dependency>

SQL Client JAR

Download link is available only for stable releases.

Download flink-format-changelog-json-2.1.1.jar and put it under <FLINK_HOME>/lib/.

How to use Changelog JSON format

-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format' = 'json'  -- the data format is json
);

-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (
    day_str STRING,
    uv BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'day_uv',
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format' = 'changelog-json'  -- the data format is changelog-json
);

-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');

-- reading the changelog back again
SELECT * FROM day_uv;

Format Options

Option Required Default Type Description
format required (none) String Specify what format to use, here should be 'changelog-json'.
changelog-json.ignore-parse-errors optional false Boolean Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
changelog-json.timestamp-format.standard optional 'SQL' String Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
  • Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.
  • Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.

Data Type Mapping

Currently, the Canal format uses JSON format for deserialization. Please refer to JSON format documentation for more details about the data type mapping.