DB-iceberg
更新时间:2026-07-03
Iceberg 数据源支持离线库表写入(Iceberg-batch)和实时库表写入(Iceberg-streaming)两类 sink 插件,是 DataBuilder 数据湖仓的核心写入目标。本文以 MySQL 作为源端示例。
DB-Iceberg 插件类型
| 插件 | plugin_name | _db_pluginId | 作业类型 | 说明 |
|---|---|---|---|---|
| Iceberg 离线写入 | Iceberg |
Iceberg-batch |
batch |
离线批量写入 Iceberg 表,支持追加/覆盖写入和 Upsert 模式 |
| Iceberg 流式写入 | Iceberg |
Iceberg-streaming |
streaming |
准实时 CDC 变更数据写入 Iceberg 表,支持 INSERT/UPDATE/DELETE 事件处理 |
脚本 Demo 与参数说明
一、离线任务:MySQL →DB-Iceberg
作业类型为 batch,source 使用 Jdbc-Mysql(参数详见 MySQL 数据源文档),sink 使用 Iceberg-batch。
JSON
1{
2 "_db_jobMeta": {
3 "name": "mysql_to_iceberg_batch",
4 "description": "MySQL orders 表离线同步到 Iceberg",
5 "parentFolderId": "project_91cc_799bd26eee94",
6 "type": "batch",
7 "mode": "script"
8 },
9 "env": {
10 "job.mode": "BATCH",
11 "parallelism": 1,
12 "checkpoint.interval": 30000
13 },
14 "source": [
15 {
16 "plugin_name": "Jdbc",
17 "_db_pluginId": "Jdbc-Mysql",
18 "plugin_output": "source_table",
19 "_db_connectionId": "mysql-conn-001",
20 "_db_sourceDatabase": "sales_db",
21 "_db_sourceTable": "orders"
22 }
23 ],
24 "sink": [
25 {
26 "plugin_name": "Iceberg",
27 "_db_pluginId": "Iceberg-batch",
28 "plugin_input": "source_table",
29 "_db_sinkPath": "lake.default",
30 "_db_catalog": "lake",
31 "_db_schema": "default",
32 "iceberg.table.upsert-mode-enabled": false,
33 "data_save_mode": "APPEND_DATA",
34 "_db_isAutoCreated": true,
35 "_db_sinkTableType": "MANAGED",
36 "_db_sinkNameRule": "SAME",
37 "_db_comment": "订单表"
38 }
39 ]
40}
Writer 参数:Iceberg-batch
| 参数 | 描述 | 是否必选 | 默认值 |
|---|---|---|---|
plugin_name |
SeaTunnel 插件名,固定填 Iceberg |
是 | 无 |
_db_pluginId |
DataBuilder 侧插件 ID,固定填 Iceberg-batch |
是 | 无 |
plugin_input |
对应 source 的 plugin_output 值 |
是 | 无 |
_db_sinkPath |
目标 Catalog 和 Schema,格式为 {catalog}.{schema},如 lake.default |
是 | 无 |
_db_catalog |
目标 Catalog 名称 | 是 | 无 |
_db_schema |
目标 Schema 名称 | 是 | 无 |
iceberg.table.upsert-mode-enabled |
是否启用 Upsert 写入模式(基于主键去重更新) | 否 | false |
data_save_mode |
写入模式:APPEND_DATA(追加)/ DROP_DATA(覆盖) |
否 | APPEND_DATA |
_db_isAutoCreated |
是否自动建表:true(自动建表)/ false(选择已有表) |
否 | true |
_db_sinkTableType |
表类型,当前固定为 MANAGED(托管表) |
否 | MANAGED |
_db_sinkNameRule |
目标表命名规则:SAME(与源表同名)/ ADD_PREFIX / ADD_SUFFIX / ADD_PREFIX_AND_SUFFIX / CUSTOM |
否 | SAME |
_db_prefix |
表名前缀,_db_sinkNameRule 含 ADD_PREFIX 时生效 |
否 | 无 |
_db_suffix |
表名后缀,_db_sinkNameRule 含 ADD_SUFFIX 时生效 |
否 | 无 |
_db_sinkTableName |
自定义表名,_db_sinkNameRule=CUSTOM 时使用 |
否 | 无 |
_db_sinkTable |
选择已有目标表,_db_isAutoCreated=false 时使用 |
否 | 无 |
_db_comment |
目标表描述(可选) | 否 | 无 |
二、实时任务:MySQL-CDC → DB-Iceberg
作业类型为 streaming,source 使用 MySQL-CDC(参数详见 MySQL 数据源文档),sink 使用 Iceberg-streaming。
JSON
1{
2 "_db_jobMeta": {
3 "name": "mysql_cdc_to_iceberg_streaming",
4 "description": "MySQL orders 表 CDC 同步到 Iceberg",
5 "parentFolderId": "folder_af42_bccdb16a1d00",
6 "type": "streaming",
7 "mode": "script",
8 "syncMode": "BASE_INCREMENT"
9 },
10 "env": {
11 "job.mode": "STREAMING",
12 "parallelism": 1,
13 "_db_enableCheckpoint": false
14 },
15 "source": [
16 {
17 "plugin_name": "MySQL-CDC",
18 "_db_pluginId": "MySQL-CDC",
19 "plugin_output": "source_table",
20 "_db_connectionId": "mysql-cdc-conn-001",
21 "_db_sourceDatabase": "sales_db",
22 "_db_syncMode": "BASE_INCREMENT"
23 }
24 ],
25 "sink": [
26 {
27 "plugin_name": "Iceberg",
28 "_db_pluginId": "Iceberg-streaming",
29 "plugin_input": "source_table",
30 "_db_sinkPath": "lake.default",
31 "_db_catalog": "lake",
32 "_db_schema": "default",
33 "_db_sinkTableType": "MANAGED"
34 }
35 ],
36 "_db_tableConfigs": [
37 {
38 "tableIdentity": {
39 "sourceTable": "orders",
40 "sinkTable": "ods_orders"
41 },
42 "sink": {
43 "_db_isAutoCreated": true,
44 "_db_sinkNameRule": "SAME",
45 "_db_dmlConfig": {
46 "insert": "INSERT",
47 "update": "UPDATE",
48 "delete": "DELETE",
49 "logicalDeleteTag": ""
50 }
51 }
52 }
53 ]
54}
Writer 参数:Iceberg-streaming
sink[0] 全局参数
| 参数 | 描述 | 是否必选 | 默认值 |
|---|---|---|---|
plugin_name |
SeaTunnel 插件名,固定填 Iceberg |
是 | 无 |
_db_pluginId |
DataBuilder 侧插件 ID,固定填 Iceberg-streaming |
是 | 无 |
plugin_input |
对应 source 的 plugin_output 值 |
是 | 无 |
_db_sinkPath |
目标 Catalog 和 Schema,格式为 {catalog}.{schema},如 lake.default |
是 | 无 |
_db_catalog |
目标 Catalog 名称 | 是 | 无 |
_db_schema |
目标 Schema 名称 | 是 | 无 |
_db_sinkTableType |
表类型,当前固定为 MANAGED(托管表) |
否 | MANAGED |
表级配置(写入 _db_tableConfigs[i].sink)
| 参数 | 描述 | 可选值 | 默认值 |
|---|---|---|---|
_db_isAutoCreated |
是否自动建表 | true / false |
true |
_db_sinkNameRule |
目标表命名规则 | SAME / ADD_PREFIX / ADD_SUFFIX / ADD_PREFIX_AND_SUFFIX / CUSTOM |
SAME |
_db_prefix |
表名前缀 | 任意字符串 | 无 |
_db_suffix |
表名后缀 | 任意字符串 | 无 |
_db_sinkTable |
选择已有目标表,_db_isAutoCreated=false 时使用 |
任意字符串 | 无 |
_db_comment |
目标表描述(可选) | 任意字符串 | 无 |
_db_dmlConfig.insert |
INSERT 事件处理策略 | INSERT / IGNORE |
INSERT |
_db_dmlConfig.update |
UPDATE 事件处理策略 | UPDATE / IGNORE |
UPDATE |
_db_dmlConfig.delete |
DELETE 事件处理策略;逻辑删除会写入 logical_delete_tag 字段 |
DELETE / IGNORE / LOGICAL_DELETE |
DELETE |
配置编写注意事项
- plugin_name 相同,_db_pluginId 不同:
Iceberg-batch和Iceberg-streaming的plugin_name均为Iceberg,通过_db_pluginId区分。 -
离线 vs 流式的选型原则:
- 离线全量/条件过滤增量 → 作业类型
batch,sink 用Iceberg-batch - CDC 实时同步 → 作业类型
streaming,sink 用Iceberg-streaming
- 离线全量/条件过滤增量 → 作业类型
- Upsert 模式:
Iceberg-batch支持iceberg.table.upsert-mode-enabled=true,在源端数据有重复主键时执行插入或更新,避免重复写入。 - 覆盖写入:
data_save_mode=DROP_DATA会先清空目标表再写入,适用于全量覆盖场景;日常增量同步使用APPEND_DATA。 - Streaming 表级配置:流式写入的建表方式、命名规则、DML 策略均在
_db_tableConfigs[i].sink中配置,不在sink[0]中配置。
评价此篇文章
