StarRocks Pipeline 连接器¶
StarRocks Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入StarRocks。 本文档介绍如何设置 StarRocks Pipeline 连接器。
连接器的功能¶
自动建表
表结构变更同步
数据实时同步
如何创建 Pipeline¶
从 MySQL 读取数据同步到 StarRocks 的 Pipeline 可以定义如下:
source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404
sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass
pipeline:
   name: MySQL to StarRocks Pipeline
   parallelism: 2
Pipeline 连接器配置项¶
| Option | Required | Default | Type | Description | 
|---|---|---|---|---|
| type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'starrocks'. | 
    
| name | optional | (none) | String | Sink 的名称. | 
| jdbc-url | required | (none) | String | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:`jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2`。 | 
| load-url | required | (none) | String | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:`fe_host1:fe_http_port1;fe_host2:fe_http_port2`。 | 
| username | required | (none) | String | StarRocks 集群的用户名。 | 
| password | required | (none) | String | StarRocks 集群的用户密码。 | 
| sink.label-prefix | optional | (none) | String | 指定 Stream Load 使用的 label 前缀。 | 
| sink.connect.timeout-ms | optional | 30000 | String | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 | 
| sink.wait-for-continue.timeout-ms | optional | 30000 | String | 等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 | 
| sink.buffer-flush.max-bytes | optional | 157286400 | Long | 内存中缓冲的数据量大小,缓冲区由所有导入的表共享,达到阈值后将选择一个或多个表的数据写入到StarRocks。 达到阈值后取值范围:[64MB, 10GB]。 | 
| sink.buffer-flush.interval-ms | optional | 300000 | Long | 每个表缓冲数据发送的间隔,用于控制数据写入 StarRocks 的延迟。单位是毫秒,取值范围:[1000, 3600000]。 | 
| sink.scan-frequency.ms | optional | 50 | Long | 连接器会定期检查每个表是否到达发送间隔,该配置控制检查频率,单位为毫秒。 | 
| sink.io.thread-count | optional | 2 | Integer | 用来执行 Stream Load 的线程数,不同表之间的导入可以并发执行。 | 
| sink.at-least-once.use-transaction-stream-load | optional | true | Boolean | at-least-once 下是否使用 transaction stream load。 | 
| sink.properties.* | optional | (none) | String | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 `sink.properties.timeout` 用来控制导入的超时时间。 全部参数和解释请参考 STREAM LOAD。 | 
| table.create.num-buckets | optional | (none) | Integer | 自动创建 StarRocks 表时使用的桶数。对于 StarRocks 2.5 及之后的版本可以不设置,StarRocks 将会 自动设置分桶数量;对于 StarRocks 2.5 之前的版本必须设置。 | 
| table.create.properties.* | optional | (none) | String | 自动创建 StarRocks 表时使用的属性。比如: 如果使用 StarRocks 3.2 及之后的版本,'table.create.properties.fast_schema_evolution' = 'true'
          将会打开 fast schema evolution 功能。 更多信息请参考 
          主键模型。 |  
    
| table.schema-change.timeout | optional | 30min | Duration | StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。 | 
使用说明¶
只支持主键表,因此源表必须有主键
暂不支持 exactly-once,连接器 通过 at-least-once 和主键表实现幂等写
对于自动建表
分桶键和主键相同
没有分区键
分桶数由
table.create.num-buckets控制。如果使用的 StarRocks 2.5 及之后的版本可以不设置,StarRocks 能够 自动设置分桶数量。对于 StarRocks 2.5 之前的版本必须设置,否则无法自动创建表。
对于表结构变更同步
只支持增删列
新增列只能添加到最后一列
如果使用 StarRocks 3.2 及之后版本,并且通过连接器来自动建表, 可以通过配置
table.create.properties.fast_schema_evolution为true来加速 StarRocks 执行变更。
对于数据同步,pipeline 连接器使用 StarRocks Sink 连接器 将数据写入 StarRocks,具体可以参考 Sink 文档。
数据类型映射¶
| CDC type | StarRocks type | NOTE | 
|---|---|---|
| TINYINT | TINYINT | |
| SMALLINT | SMALLINT | |
| INT | INT | |
| BIGINT | BIGINT | |
| FLOAT | FLOAT | |
| DOUBLE | DOUBLE | |
| DECIMAL(p, s) | DECIMAL(p, s) | |
| BOOLEAN | BOOLEAN | |
| DATE | DATE | |
| TIMESTAMP | DATETIME | |
| TIMESTAMP_LTZ | DATETIME | |
| CHAR(n) where n <= 85 | CHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。 | 
| CHAR(n) where n > 85 | VARCHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。 | 
| VARCHAR(n) | VARCHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。 |