MySQL CDC 连接器¶
MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。
支持的数据库¶
Connector |
Database |
Driver |
---|---|---|
JDBC Driver: 8.0.27 |
依赖¶
为了设置 MySQL CDC 连接器,下表提供了使用构建自动化工具(如 Maven 或 SBT )和带有 SQL JAR 包的 SQL 客户端的两个项目的依赖关系信息。
Maven dependency¶
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 -->
<version>3.0-SNAPSHOT</version>
</dependency>
SQL Client JAR¶
下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。
下载 flink-sql-connector-mysql-cdc-3.0-SNAPSHOT.jar 到 <FLINK_HOME>/lib/
目录下。
注意: flink-sql-connector-mysql-cdc-XXX-SNAPSHOT 版本是开发分支release-XXX
对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 flink-sql-connector-mysql-cdc-2.2.1.jar 当前已发布的所有版本都可以在 Maven 中央仓库获取。
配置 MySQL 服务器¶
你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。
创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
向用户授予所需的权限:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
注意: 在 scan.incremental.snapshot.enabled
参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
刷新用户权限:
mysql> FLUSH PRIVILEGES;
查看更多用户权限问题请参考 权限说明.
注意事项¶
为每个 Reader 设置不同的 Server id¶
每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。 MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此,如果不同的作业共享相同的 Server id, 则可能导致从错误的 binlog 位置读取数据。
因此,建议通过为每个 Reader 设置不同的 Server id SQL Hints,
假设 Source 并行度为 4, 我们可以使用 SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
来为 4 个 Source readers 中的每一个分配唯一的 Server id。
设置 MySQL 会话超时时间¶
当为大型数据库创建初始一致快照时,你建立的连接可能会在读取表时碰到超时问题。你可以通过在 MySQL 侧配置 interactive_timeout 和 wait_timeout 来缓解此类问题。
interactive_timeout
: 服务器在关闭交互连接之前等待活动的秒数。 更多信息请参考 MySQL documentations.wait_timeout
: 服务器在关闭非交互连接之前等待活动的秒数。 更多信息请参考 MySQL documentations.
如何创建 MySQL CDC 表¶
MySQL CDC 表可以定义如下:
-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- 在 Flink SQL中注册 MySQL 表 'orders'
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');
-- 从订单表读取全量数据(快照)和增量数据(binlog)
Flink SQL> SELECT * FROM orders;
连接器选项¶
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | 指定要使用的连接器, 这里应该是 'mysql-cdc' . |
hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。 |
password | required | (none) | String | 连接 MySQL 数据库服务器时使用的密码。 |
database-name | required | (none) | String | 要监视的 MySQL 服务器的数据库名称。数据库名称还支持正则表达式,以监视多个与正则表达式匹配的表。 |
table-name | required | (none) | String | 需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。注意:MySQL CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后使用该正则表达式和 MySQL 数据库中表的全限定名进行正则匹配。 |
port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 |
server-id | optional | (none) | String | 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 |
scan.incremental.snapshot.enabled | optional | true | Boolean | 增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多优点,包括: (1)在快照读取期间,Source 支持并发读取, (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint, (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 Source 并行运行,则每个并行 Readers 都应该具有唯一的 Server id,所以 Server id 必须是类似 `5400-6400` 的范围,并且该范围必须大于并行度。 请查阅 增量快照读取 章节了解更多详细信息。 |
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 |
scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次读取数据的最大条数。 |
scan.startup.mode | optional | initial | String | MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 启动模式 章节了解更多详细信息。 |
scan.startup.specific-offset.file | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 |
scan.startup.specific-offset.pos | optional | (none) | Long | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 |
scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 |
scan.startup.specific-offset.skip-events | optional | (none) | Long | 在指定的启动位点后需要跳过的事件数量。 |
scan.startup.specific-offset.skip-rows | optional | (none) | Long | 在指定的启动位点后需要跳过的数据行数量。 |
server-time-zone | optional | (none) | String | 数据库服务器中的会话时区, 例如: "Asia/Shanghai". 它控制 MYSQL 中的时间戳类型如何转换为字符串。 更多请参考 这里. 如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。 |
debezium.min.row. count.to.stream.result | optional | 1000 | Integer | 在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。 此参数确定 MySQL 连接是否将表的所有结果拉入内存(速度很快,但需要大量内存), 或者结果是否需要流式传输(传输速度可能较慢,但适用于非常大的表)。 该值指定了在连接器对结果进行流式处理之前,表必须包含的最小行数,默认值为1000。将此参数设置为`0`以跳过所有表大小检查,并始终在快照期间对所有结果进行流式处理。 |
connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 |
connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 |
connection.pool.size | optional | 20 | Integer | 连接池大小。 |
jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. |
heartbeat.interval | optional | 30s | Duration | 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 |
debezium.* | optional | (none) | String | 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。
例如: 'debezium.snapshot.mode' = 'never' .
查看更多关于 Debezium 的 MySQL 连接器属性 |
scan.incremental.close-idle-reader.enabled | optional | false | Boolean | 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。 |
debezium.binary.handling.mode | optional | (none) | String | debezium.binary.handling.mode 参数可以设置为以下值: none:不进行任何处理,直接将二进制数据类型作为字节数组(byte array)传输。 base64:将二进制数据类型转换为 Base64 编码的字符串,然后传输。 hex:将二进制数据类型转换为十六进制编码的字符串,然后传输。 默认值为 none。根据您的需求和数据类型,您可以选择合适的处理模式。如果您的数据库中包含大量二进制数据类型,建议使用 base64 或 hex 模式,以便在传输过程中更容易处理。 |
支持的元数据¶
下表中的元数据可以在 DDL 中作为只读(虚拟)meta 列声明。
Key | DataType | Description |
---|---|---|
table_name | STRING NOT NULL | 当前记录所属的表名称。 |
database_name | STRING NOT NULL | 当前记录所属的库名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 当前记录表在数据库中更新的时间。 如果从表的快照而不是 binlog 读取记录,该值将始终为0。 |
row_kind | STRING NOT NULL | 当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL 算子在处理消息撤回时会因为这个字段不同而比对失败,
建议只在简单的同步作业中引用该元数据列。 '+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。 |
下述创建表示例展示元数据列的用法:
CREATE TABLE products
(
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
下述创建表示例展示使用正则表达式匹配多张库表的用法:
CREATE TABLE products
(
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',
'table-name' = '(t[5-8]|tt)'
);
匹配示例 | 表达式 | 描述 |
---|---|---|
前缀匹配 | ^(test).* | 匹配前缀为test的数据库名或表名,例如test1、test2等。 |
后缀匹配 | .*[p$] | 匹配后缀为p的数据库名或表名,例如cdcp、edcp等。 |
特定匹配 | txc | 匹配具体的数据库名或表名。 |
进行库表匹配时,会使用正则表达式 database-name\\.table-name
来与MySQL表的全限定名做匹配,所以该例子使用 (^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt)
,可以匹配到表 txc.tt、test2.test5。
支持的特性¶
增量快照读取¶
增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括:
(1)在快照读取期间,Source 支持并发读取,
(2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint,
(3)在快照读取之前,Source 不需要数据库锁权限。
如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此server id
的范围必须类似于 5400-6400
,
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk),
然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。
并发读取¶
增量快照读取提供了并行读取快照数据的能力。
你可以通过设置作业并行度的方式来控制 Source 的并行度 parallelism.default
. For example, in SQL CLI:
Flink SQL> SET 'parallelism.default' = 8;
全量阶段支持 checkpoint¶
增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。
无锁算法¶
MySQL CDC source 使用 增量快照算法, 避免了数据库锁的使用,因此不需要 “RELOAD” 权限。
MySQL高可用性支持¶
mysql cdc
连接器通过使用 GTID 提供 MySQL 高可用集群的高可用性信息。为了获得高可用性, MySQL集群需要启用 GTID 模式,MySQL 配置文件中的 GTID 模式应该包含以下设置:
gtid_mode = on
enforce_gtid_consistency = on
如果监控的MySQL服务器地址包含从实例,则需要对MySQL配置文件设置以下设置。设置 log slave updates=1
允许从实例也将从主实例同步的数据写入其binlog, 这确保了mysql cdc
连接器可以使用从实例中的全部数据。
gtid_mode = on
enforce_gtid_consistency = on
log-slave-updates = 1
MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服务器地址更改为其他可用服务器,然后从最新的检查点/保存点重新启动作业, 作业将从 checkpoint/savepoint 恢复,不会丢失任何记录。
建议为 MySQL 集群配置 DNS(域名服务)或 VIP(虚拟 IP 地址), 使用mysql cdc
连接器的 DNS 或 VIP 地址, DNS或VIP将自动将网络请求路由到活动MySQL服务器。 这样,你就不再需要修改地址和重新启动管道。
MySQL心跳事件支持¶
如果表不经常更新,则 binlog 文件或 GTID 集可能已在其最后提交的 binlog 位置被清理。
在这种情况下,CDC 作业可能会重新启动失败。因此心跳事件将帮助更新 binlog 位置。 默认情况下,MySQL CDC Source 启用心跳事件,间隔设置为30秒。 可以使用表选项heartbeat
指定间隔。或将选项设置为0s
以禁用心跳事件。
增量快照读取的工作原理¶
当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。
在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。 快照块被分配给多个快照读取器。每个快照读取器使用 区块读取算法 并将读取的数据发送到下游。 Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。 如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。
所有快照块完成后,Source 将继续在单个任务中读取 binlog。 为了保证快照记录和 binlog 记录的全局数据顺序,binlog reader 将开始读取数据直到快照块完成后并有一个完整的 checkpoint,以确保所有快照数据已被下游消费。 binlog reader 在状态中跟踪所使用的 binlog 位置,因此 binlog 阶段的 Source 可以支持行级别的 checkpoint。
Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业将重新启动并从最后一个成功的 checkpoint 状态恢复,并保证只执行一次语义。
全量阶段分片算法¶
在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。
如果表中没有主键, 增量快照读取将失败,你可以禁用 scan.incremental.snapshot.enabled
来回退到旧的快照读取机制。
对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
例如,如果你有一个主键列为id
的表,它是自动增量 BIGINT 类型,最小值为0
,最大值为100
,
和表选项 scan.incremental.snapshot.chunk.size
大小 value
为25
,表将被拆分为以下块:
(-∞, 25),
[25, 50),
[50, 75),
[75, 100),
[100, +∞)
对于其他主键列类型, MySQL CDC Source 将以下形式执行语句: SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25)
来获得每个区块的低值和高值,
分割块集如下所示:
(-∞, 'uuid-001'),
['uuid-001', 'uuid-009'),
['uuid-009', 'uuid-abc'),
['uuid-abc', 'uuid-def'),
[uuid-def, +∞).
Chunk 读取算法¶
对于上面的示例MyTable
,如果 MySQL CDC Source 并行度设置为 4,MySQL CDC Source 将在每一个 executes 运行 4 个 Readers 通过偏移信号算法
获取快照区块的最终一致输出。 偏移信号算法简单描述如下:
(1) 将当前 binlog 位置记录为
LOW
偏移量(2) 通过执行语句读取并缓冲快照区块记录
SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high
(3) 将当前 binlog 位置记录为
HIGH
偏移量(4) 从
LOW
偏移量到HIGH
偏移量读取属于快照区块的 binlog 记录(5) 将读取的 binlog 记录向上插入缓冲区块记录,并发出缓冲区中的所有记录作为快照区块的最终输出(全部作为插入记录)
(6) 继续读取并发出属于 单个 binlog reader 中
HIGH
偏移量之后的区块的 binlog 记录。
该算法的是基于 DBLog Paper 并结合 Flink 的一个变种, 请参考它了解更多详细信息。
注意: 如果主键的实际值在其范围内分布不均匀,则在增量快照读取时可能会导致任务不平衡。
Exactly-Once 处理¶
MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。
启动模式¶
配置选项scan.startup.mode
指定 MySQL CDC 使用者的启动模式。有效枚举包括:
initial
(默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。earliest-offset
:跳过快照阶段,从可读取的最早 binlog 位点开始读取latest-offset
:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。specific-offset
:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。timestamp
:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
例如使用 DataStream API:
MySQLSource.builder()
.startupOptions(StartupOptions.earliest()) // 从最早位点启动
.startupOptions(StartupOptions.latest()) // 从最晚位点启动
.startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动
.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动
.startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
...
.build()
使用 SQL:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动
'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
'scan.startup.mode' = 'timestamp', -- 从特定位点启动
'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名
'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合
'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳
...
)
注意:
MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 “Binlog offset on checkpoint {checkpoint-id}”。 该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。
DataStream Source¶
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
.tableList("yourDatabaseName.yourTableName") // 设置捕获的表
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 3s 的 checkpoint 间隔
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 设置 source 节点的并行度为 4
.setParallelism(4)
.print().setParallelism(1); // 设置 sink 节点并行度为 1
env.execute("Print MySQL Snapshot + Binlog");
}
}
注意: 请参考 Deserialization 有关 JSON 反序列化的更多详细信息。
动态加表¶
扫描新添加的表功能使你可以添加新表到正在运行的作业中,新添加的表将首先读取其快照数据,然后自动读取其变更日志。
想象一下这个场景:一开始, Flink 作业监控表 [product, user, address]
, 但几天后,我们希望这个作业还可以监控表 [order, custom]
,这些表包含历史数据,我们需要作业仍然可以复用作业的已有状态,动态加表功能可以优雅地解决此问题。
以下操作显示了如何启用此功能来解决上述场景。 使用现有的 Flink CDC Source 作业,如下:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
.databaseList("db") // 设置捕获的数据库
.tableList("db.product, db.user, db.address") // 设置捕获的表 [product, user, address]
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
如果我们想添加新表 [order, custom]
对于现有的 Flink 作业,只需更新 tableList()
将新增表 [order, custom]
加入并从已有的 savepoint 恢复作业。
Step 1: 使用 savepoint 停止现有的 Flink 作业。
$ ./bin/flink stop $Existing_Flink_JOB_ID
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
Step 2: 更新现有 Flink 作业的表列表选项。
更新
tableList()
参数.编译更新后的作业,示例如下:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.scanNewlyAddedTableEnabled(true)
.databaseList("db")
.tableList("db.product, db.user, db.address, db.order, db.custom") // 设置捕获的表 [product, user, address ,order, custom]
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
Step 3: 从 savepoint 还原更新后的 Flink 作业。
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./FlinkCDCExample.jar
注意: 请参考文档 Restore the job from previous savepoint 了解更多详细信息。
关于无主键表¶
从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column
,且只能选择非空类型的一个字段。
在使用无主键表时,需要注意以下两种情况。
配置
scan.incremental.snapshot.chunk.key-column
时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。无主键表的处理语义由
scan.incremental.snapshot.chunk.key-column
指定的列的行为决定:
如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。
关于二进制类型数据转换为base64编码数据¶
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
binary_data STRING,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'debezium.binary.handling.mode' = 'base64'
);
binary_data
字段, 在数据库中的类型是VARBINARY(N),我们在有些场景需要将二进制数据转换为base64编码的字符串数据,可以通过添加参数’debezium.binary.handling.mode’ = ‘base64’来开启这个功能,
添加此参数的情况下,我们就可以在flink sql中将该字段类型映射为STRING
,从而获取base64编码的字符串数据。
数据类型映射¶
MySQL type | Flink SQL type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL |
SMALLINT | |
INT MEDIUMINT SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL |
INT | |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL |
BIGINT | |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL |
DECIMAL(20, 0) | |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL |
FLOAT | |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL |
DOUBLE | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 |
DECIMAL(p, s) | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 |
STRING | 在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。 |
BOOLEAN TINYINT(1) BIT(1) |
BOOLEAN | |
DATE | DATE | |
TIME [(p)] | TIME [(p)] | |
TIMESTAMP [(p)] DATETIME [(p)] |
TIMESTAMP [(p)] | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) | |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) | |
BINARY(n) | BINARY(n) | |
VARBINARY(N) | VARBINARY(N) | |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT |
STRING | |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES | 目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。 |
YEAR | INT | |
ENUM | STRING | |
JSON | STRING | JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。 |
SET | ARRAY<STRING> | 因为 MySQL 中的 SET 数据类型是一个字符串对象,可以有零个或多个值 它应该始终映射到字符串数组。 |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION |
STRING | MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。 |
空间数据类型映射¶
MySQL中除GEOMETRYCOLLECTION
之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
字段srid
标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。
由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid
将始终为 0。
字段type
标识空间数据类型,例如POINT
/LINESTRING
/POLYGON
。
字段coordinates
表示空间数据的坐标
。
对于GEOMETRYCOLLECTION
,它将转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
Geometrics
字段是一个包含所有空间数据的数组。
不同空间数据类型映射的示例如下:
Spatial data in MySQL | Json String converted in Flink |
---|---|
POINT(1 1) | {"coordinates":[1,1],"type":"Point","srid":0} |
LINESTRING(3 0, 3 3, 3 5) | {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0} |
POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) | {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0} |
MULTIPOINT((1 1),(2 2)) | {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0} |
MultiLineString((1 1,2 2,3 3),(4 4,5 5)) | {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0} |
MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) | {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0} |
GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) | {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0} |