Changelog JSON Format¶
Flink supports to emit changelogs in JSON format and interpret the output back again.
Dependencies¶
In order to setup the Changelog JSON format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency¶
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<version>2.0.2</version>
</dependency>
SQL Client JAR¶
Download flink-format-changelog-json-2.0.2.jar and put it under <FLINK_HOME>/lib/
.
How to use Changelog JSON format¶
-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka', -- using kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'json' -- the data format is json
);
-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (
day_str STRING,
uv BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'day_uv',
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'changelog-json' -- the data format is changelog-json
);
-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');
-- reading the changelog back again
SELECT * FROM day_uv;
Format Options¶
Option | Required | Default | Type | Description |
---|---|---|---|---|
format | required | (none) | String | Specify what format to use, here should be 'changelog-json'. |
changelog-json.ignore-parse-errors | optional | false | Boolean | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
changelog-json.timestamp-format.standard | optional | 'SQL' | String | Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
|
Data Type Mapping¶
Currently, the Canal format uses JSON format for deserialization. Please refer to JSON format documentation for more details about the data type mapping.