MySQL数据源
更新时间:2026-07-03
MySQL 数据源支持离线库表采集(Jdbc-Mysql)和实时库表采集(MySQL-CDC)两类 source 插件,可写入Iceberg、Doris、结构化数据集等目标端(各目标端插件参数详见各自文档)。本文以Iceberg作为目标端示例。
MySQL 插件类型
| 插件 | plugin_name | _db_pluginId | 作业类型 | 说明 |
|---|---|---|---|---|
| MySQL 离线采集 | Jdbc |
Jdbc-Mysql |
batch |
JDBC 读取 MySQL 表,适用于离线全量/条件增量同步 |
| MySQL 实时 CDC 采集 | MySQL-CDC |
MySQL-CDC |
streaming |
通过 binlog 实现全量+增量或仅增量同步 |
脚本 Demo 与参数说明
一、离线任务:MySQL → Iceberg
适用于离线全量同步或按条件增量同步(如按 updated_at 每日增量)。作业类型为 batch,source 使用 Jdbc-Mysql,sink 使用 Iceberg-batch。
JSON
1{
2 "_db_jobMeta": {
3 "name": "mysql_orders_to_iceberg_batch",
4 "description": "MySQL orders 表离线同步到 Iceberg",
5 "parentFolderId": "project_91cc_799bd26eee94",
6 "type": "batch",
7 "mode": "script"
8 },
9 "env": {
10 "job.mode": "BATCH",
11 "parallelism": 1,
12 "checkpoint.interval": 30000
13 },
14 "source": [
15 {
16 "plugin_name": "Jdbc",
17 "_db_pluginId": "Jdbc-Mysql",
18 "plugin_output": "source_table",
19 "_db_connectionId": "mysql-conn-001",
20 "_db_sourceDatabase": "sales_db",
21 "_db_sourceTable": "orders",
22 "_db_where": "updated_at >= ${logicTime(yyyy-MM-dd HH:mm:ss,-1d)}",
23 "_db_enableRateLimit": false,
24 "_db_rateLimit": {
25 "records": 10000,
26 "flow": 100
27 },
28 "partition_column": "id",
29 "_db_sourceChange": {
30 "onDeleteColumn": "SKIP",
31 "onAddColumn": "SKIP",
32 "onDeleteSource": "PAUSE"
33 }
34 }
35 ],
36 "sink": [
37 {
38 "plugin_name": "Iceberg",
39 "_db_pluginId": "Iceberg-batch",
40 "plugin_input": "source_table",
41 "_db_sinkPath": "lake.default",
42 "_db_catalog": "lake",
43 "_db_schema": "default",
44 "iceberg.table.upsert-mode-enabled": false,
45 "data_save_mode": "APPEND_DATA",
46 "_db_isAutoCreated": true,
47 "_db_sinkTableType": "MANAGED",
48 "_db_sinkNameRule": "SAME",
49 "_db_comment": "订单表"
50 }
51 ]
52}
Reader 参数:Jdbc-Mysql
| 参数 | 描述 | 是否必选 | 默认值 |
|---|---|---|---|
plugin_name |
SeaTunnel 插件名,固定填 Jdbc |
是 | 无 |
_db_pluginId |
DataBuilder 侧插件 ID,固定填 Jdbc-Mysql |
是 | 无 |
plugin_output |
Source 输出流名称,需与 sink 的 plugin_input 保持一致 |
是 | 无 |
_db_connectionId |
MySQL 数据源连接 ID | 是 | 无 |
_db_sourceDatabase |
MySQL 数据库名称 | 是 | 无 |
_db_sourceTable |
MySQL 数据表名称 | 是 | 无 |
partition_column |
分片字段,用于多并发采集,建议使用主键或唯一键 | 否 | 无 |
_db_where |
数据过滤条件,不含 WHERE 关键字,支持时间宏如 ${logicTime(yyyy-MM-dd HH:mm:ss,-1d)} |
否 | 无 |
_db_enableRateLimit |
是否启用源端限速 | 否 | false |
_db_rateLimit.records |
限速:每秒最大记录数 | 否 | 无 |
_db_rateLimit.flow |
限速:每秒最大流量(MB) | 否 | 无 |
_db_sourceChange.onDeleteColumn |
源端表字段被删除时的策略:SKIP 忽略 / PAUSE 终止任务 |
否 | SKIP |
_db_sourceChange.onAddColumn |
源端表新增字段时的策略:SKIP 忽略 / PAUSE 终止任务 |
否 | SKIP |
_db_sourceChange.onDeleteSource |
源端表被删除时的策略,当前仅支持 PAUSE |
否 | PAUSE |
二、实时任务:MySQL-CDC → Iceberg
适用于 MySQL 变更数据实时同步到 Iceberg 数据湖的场景。作业类型为 streaming,source 使用 MySQL-CDC,sink 使用 Iceberg-streaming。示例为全量+增量模式:先进行全量快照,完成后自动切换为 binlog 增量消费。
JSON
1{
2 "_db_jobMeta": {
3 "name": "mysql_orders_to_iceberg_streaming",
4 "description": "MySQL orders 表 CDC 实时同步到 Iceberg",
5 "parentFolderId": "folder_af42_bccdb16a1d00",
6 "type": "streaming",
7 "mode": "script",
8 "syncMode": "BASE_INCREMENT"
9 },
10 "env": {
11 "job.mode": "STREAMING",
12 "parallelism": 1,
13 "_db_enableCheckpoint": false
14 },
15 "source": [
16 {
17 "plugin_name": "MySQL-CDC",
18 "_db_pluginId": "MySQL-CDC",
19 "_db_connectionId": "mysql-cdc-conn-001",
20 "_db_sourceDatabase": "sales_db",
21 "_db_syncMode": "BASE_INCREMENT"
22 }
23 ],
24 "sink": [
25 {
26 "plugin_name": "Iceberg",
27 "_db_pluginId": "Iceberg-streaming",
28 "_db_sinkPath": "lake.default",
29 "_db_catalog": "lake",
30 "_db_schema": "default",
31 "_db_sinkTableType": "MANAGED"
32 }
33 ],
34 "_db_tableConfigs": [
35 {
36 "tableIdentity": {
37 "sourceTable": "orders",
38 "sinkTable": "ods_orders"
39 },
40 "source": {
41 "_db_sourceChange": {
42 "onDeleteColumn": "SKIP",
43 "onAddColumn": "SKIP",
44 "onDeleteSource": "PAUSE",
45 "onRenameColumn": "SKIP",
46 "onTruncateTable": "SKIP",
47 "onRenameTable": "SKIP",
48 "onChangeColumn": "SKIP",
49 "onChangeTableComment": "SKIP",
50 "onChangeColumnComment": "SKIP"
51 }
52 },
53 "sink": {
54 "_db_isAutoCreated": true,
55 "_db_sinkNameRule": "SAME",
56 "_db_dmlConfig": {
57 "insert": "INSERT",
58 "update": "UPDATE",
59 "delete": "DELETE",
60 "logicalDeleteTag": ""
61 }
62 }
63 }
64 ]
65}
Reader 参数:MySQL-CDC
source[0] 全局参数
| 参数 | 描述 | 是否必选 | 默认值 |
|---|---|---|---|
plugin_name |
SeaTunnel 插件名,固定填 MySQL-CDC |
是 | 无 |
_db_pluginId |
DataBuilder 侧插件 ID,固定填 MySQL-CDC |
是 | 无 |
plugin_output |
Source 输出流名称,需与 sink 的 plugin_input 保持一致 |
是 | 无 |
_db_connectionId |
MySQL CDC 数据源连接 ID | 是 | 无 |
_db_sourceDatabase |
MySQL 数据库名称 | 是 | 无 |
_db_syncMode |
同步类型:BASE_INCREMENT(全量+增量)/ INCREMENT(仅增量) |
否 | BASE_INCREMENT |
_db_incrementPosition.position |
增量场景下指定起始 binlog 位点值,参考如2026-06-16T16:00:00.000Z;需确保对应 binlog 未被清理 | 否 | 无 |
表级 DDL 变更策略(写入 _db_tableConfigs[i].source._db_sourceChange)
| 参数 | 描述 | 可选值 | 默认值 |
|---|---|---|---|
onDeleteColumn |
源端表字段被删除时的策略 | SKIP / PAUSE |
PAUSE |
onAddColumn |
源端表新增字段时的策略 | SKIP / PAUSE |
SKIP |
onDeleteSource |
源端表被删除时的策略 | PAUSE |
PAUSE |
onRenameColumn |
源端表字段被重命名时的策略 | SKIP / PAUSE |
SKIP |
onTruncateTable |
源端执行 TRUNCATE TABLE 时的策略 |
SKIP / PAUSE |
SKIP |
onRenameTable |
源端表被重命名时的策略 | SKIP / PAUSE |
SKIP |
onChangeColumn |
源端字段类型变更时的策略 | SKIP / PAUSE / SYNC |
SKIP |
onChangeTableComment |
源端表描述变更时的策略 | SKIP / PAUSE / SYNC |
SKIP |
onChangeColumnComment |
源端字段描述变更时的策略 | SKIP / PAUSE / SYNC |
SKIP |
配置编写注意事项
- plugin_name 与 _db_pluginId 的区别:
plugin_name是引擎层的真实插件名,_db_pluginId是 DataBuilder 平台侧标识。两者不同——MySQL 离线插件的plugin_name = Jdbc,_db_pluginId = Jdbc-Mysql;实时插件两者均为MySQL-CDC。 -
离线 vs 实时的选型原则:
- 离线全量/条件过滤增量 → 作业类型
batch,source 用Jdbc-Mysql - 准实时 CDC 同步 → 作业类型
streaming,source 用MySQL-CDC
- 离线全量/条件过滤增量 → 作业类型
-
全局配置 vs 表级配置:
- 离线(batch):属性写在
source[0]/sink[0]中 - 实时(streaming):表级策略(DDL 变更策略、DML 写入策略)写在
_db_tableConfigs[i].source/_db_tableConfigs[i].sink中
- 离线(batch):属性写在
- binlog 前提:使用
MySQL-CDC时,MySQL 必须已开启 binlog,且消费起点对应的 binlog 文件未被清理。
评价此篇文章
