StarRocks Pipeline 连接器

StarRocks Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入StarRocks。 本文档介绍如何设置 StarRocks Pipeline 连接器。

连接器的功能

  • 自动建表

  • 表结构变更同步

  • 数据实时同步

如何创建 Pipeline

从 MySQL 读取数据同步到 StarRocks 的 Pipeline 可以定义如下:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to StarRocks Pipeline
   parallelism: 2

Pipeline 连接器配置项

Option Required Default Type Description
type required (none) String 指定要使用的连接器, 这里需要设置成 'starrocks'.
name optional (none) String Sink 的名称.
jdbc-url required (none) String 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:`jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2`。
load-url required (none) String 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:`fe_host1:fe_http_port1;fe_host2:fe_http_port2`。
username required (none) String StarRocks 集群的用户名。
password required (none) String StarRocks 集群的用户密码。
sink.label-prefix optional (none) String 指定 Stream Load 使用的 label 前缀。
sink.connect.timeout-ms optional 30000 String 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。
sink.wait-for-continue.timeout-ms optional 30000 String 等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。
sink.buffer-flush.max-bytes optional 157286400 Long 内存中缓冲的数据量大小,缓冲区由所有导入的表共享,达到阈值后将选择一个或多个表的数据写入到StarRocks。 达到阈值后取值范围:[64MB, 10GB]。
sink.buffer-flush.interval-ms optional 300000 Long 每个表缓冲数据发送的间隔,用于控制数据写入 StarRocks 的延迟。单位是毫秒,取值范围:[1000, 3600000]。
sink.scan-frequency.ms optional 50 Long 连接器会定期检查每个表是否到达发送间隔,该配置控制检查频率,单位为毫秒。
sink.io.thread-count optional 2 Integer 用来执行 Stream Load 的线程数,不同表之间的导入可以并发执行。
sink.at-least-once.use-transaction-stream-load optional true Boolean at-least-once 下是否使用 transaction stream load。
sink.properties.* optional (none) String Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 `sink.properties.timeout` 用来控制导入的超时时间。 全部参数和解释请参考 STREAM LOAD
table.create.num-buckets optional (none) Integer 自动创建 StarRocks 表时使用的桶数。对于 StarRocks 2.5 及之后的版本可以不设置,StarRocks 将会 自动设置分桶数量;对于 StarRocks 2.5 之前的版本必须设置。
table.create.properties.* optional (none) String 自动创建 StarRocks 表时使用的属性。比如: 如果使用 StarRocks 3.2 及之后的版本,'table.create.properties.fast_schema_evolution' = 'true' 将会打开 fast schema evolution 功能。 更多信息请参考 主键模型
table.schema-change.timeout optional 30min Duration StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。

使用说明

  • 只支持主键表,因此源表必须有主键

  • 暂不支持 exactly-once,连接器 通过 at-least-once 和主键表实现幂等写

  • 对于自动建表

    • 分桶键和主键相同

    • 没有分区键

    • 分桶数由 table.create.num-buckets 控制。如果使用的 StarRocks 2.5 及之后的版本可以不设置,StarRocks 能够 自动设置分桶数量。对于 StarRocks 2.5 之前的版本必须设置,否则无法自动创建表。

  • 对于表结构变更同步

    • 只支持增删列

    • 新增列只能添加到最后一列

    • 如果使用 StarRocks 3.2 及之后版本,并且通过连接器来自动建表, 可以通过配置 table.create.properties.fast_schema_evolutiontrue 来加速 StarRocks 执行变更。

  • 对于数据同步,pipeline 连接器使用 StarRocks Sink 连接器 将数据写入 StarRocks,具体可以参考 Sink 文档

数据类型映射

CDC type StarRocks type NOTE
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP DATETIME
TIMESTAMP_LTZ DATETIME
CHAR(n) where n <= 85 CHAR(n * 3) CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。
CHAR(n) where n > 85 VARCHAR(n * 3) CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。
VARCHAR(n) VARCHAR(n * 3) CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。

FAQ