MySQL CDC Pipeline 连接器¶
MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。
如何创建 Pipeline¶
从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404
sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass
pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4
Pipeline 连接器选项¶
| Option | Required | Default | Type | Description | 
|---|---|---|---|---|
| hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 | 
| port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 | 
| username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。 | 
| password | required | (none) | String | 连接 MySQL 数据库服务器时使用的密码。 | 
| tables | required | (none) | String | 需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。 需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。 例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*  | 
    
| schema-change.enabled | optional | true | Boolean | 是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。 | 
| server-id | optional | (none) | String | 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 | 
| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 | 
| scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次读取数据的最大条数。 | 
| scan.startup.mode | optional | initial | String | MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 启动模式 章节了解更多详细信息。 | 
| scan.startup.specific-offset.file | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 | 
| scan.startup.specific-offset.pos | optional | (none) | Long | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 | 
| scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 | 
| scan.startup.specific-offset.skip-events | optional | (none) | Long | 在指定的启动位点后需要跳过的事件数量。 | 
| scan.startup.specific-offset.skip-rows | optional | (none) | Long | 在指定的启动位点后需要跳过的数据行数量。 | 
| connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 | 
| connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 | 
| connection.pool.size | optional | 20 | Integer | 连接池大小。 | 
| jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. | 
| heartbeat.interval | optional | 30s | Duration | 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 | 
| debezium.* | optional | (none) | String | 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。
          例如: 'debezium.snapshot.mode' = 'never'.
          查看更多关于  Debezium 的  MySQL 连接器属性 |  
    
| scan.incremental.close-idle-reader.enabled | optional | false | Boolean | 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。  | 
    
启动模式¶
配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:
initial(默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
数据类型映射¶
| MySQL type | CDC type | NOTE | 
|---|---|---|
| TINYINT(n) | TINYINT | |
| 
        SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL  | 
      SMALLINT | |
| 
        INT YEAR MEDIUMINT MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL  | 
      INT | |
| 
        BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL  | 
      BIGINT | |
| 
        BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL  | 
      DECIMAL(20, 0) | |
| 
        FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL  | 
      FLOAT | |
| 
        REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL  | 
      DOUBLE | |
| 
        NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38  | 
      DECIMAL(p, s) | |
| 
        NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65  | 
      STRING | 在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。 | 
| 
        BOOLEAN TINYINT(1) BIT(1)  | 
      BOOLEAN | |
| DATE | DATE | |
| TIME [(p)] | TIME [(p)] | |
| TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] | |
| DATETIME [(p)] | TIMESTAMP [(p)] | |
| CHAR(n) | CHAR(n) | |
| VARCHAR(n) | VARCHAR(n) | |
| BIT(n) | BINARY(⌈(n + 7) / 8⌉) | |
| BINARY(n) | BINARY(n) | |
| VARBINARY(N) | VARBINARY(N) | |
| 
        TINYTEXT TEXT MEDIUMTEXT LONGTEXT  | 
      STRING | |
| 
        TINYBLOB BLOB MEDIUMBLOB LONGBLOB  | 
      BYTES | 目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。 | 
| ENUM | STRING | |
| JSON | STRING | JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。 | 
| SET | - | 暂不支持 | 
| 
       GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION  | 
      STRING | MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。 | 
空间数据类型映射¶
MySQL中除GEOMETRYCOLLECTION之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
字段srid标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。
由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid将始终为 0。
字段type标识空间数据类型,例如POINT/LINESTRING/POLYGON。
字段coordinates表示空间数据的坐标。
对于GEOMETRYCOLLECTION,它将转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
Geometrics字段是一个包含所有空间数据的数组。
不同空间数据类型映射的示例如下:
| Spatial data in MySQL | Json String converted in Flink | 
|---|---|
| POINT(1 1) | {"coordinates":[1,1],"type":"Point","srid":0} | 
| LINESTRING(3 0, 3 3, 3 5) | {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0} | 
| POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) | {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0} | 
| MULTIPOINT((1 1),(2 2)) | {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0} | 
| MultiLineString((1 1,2 2,3 3),(4 4,5 5)) | {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0} | 
| MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) | {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0} | 
| GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) | {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0} |