Kakfa数据源
更新时间:2026-07-03
Kafka 数据源支持消息流采集(Kafka)一类 source 插件,以 streaming 作业类型运行,可写入 Iceberg-streaming 等目标端(各目标端插件参数详见各自文档)。本文以 Iceberg-streaming 作为目标端示例。
Kafka 插件类型
| 插件 | plugin_name | _db_pluginId | 作业类型 | 说明 |
|---|---|---|---|---|
| Kafka 消息采集 | Kafka |
Kafka |
streaming |
消费 Kafka Topic 消息,支持 JSON/Text/Canal JSON 等多种格式 |
脚本 Demo 与参数说明
一、准实时任务:Kafka → Iceberg
适用于消费 Kafka Topic 消息并写入 Iceberg 数据湖的场景。作业类型为 streaming,source 使用 Kafka,sink 使用 Iceberg-streaming。
JSON
1{
2 "_db_jobMeta": {
3 "name": "kafka_orders_to_iceberg_streaming",
4 "description": "Kafka topic_orders 消费后写入 Iceberg",
5 "parentFolderId": "folder_af42_bccdb16a1d00",
6 "type": "streaming",
7 "mode": "script"
8 },
9 "env": {
10 "job.mode": "STREAMING",
11 "parallelism": 1,
12 "_db_enableCheckpoint": false
13 },
14 "source": [
15 {
16 "plugin_name": "Kafka",
17 "_db_pluginId": "Kafka",
18 "plugin_output": "source_table",
19 "_db_connectionId": "kafka-conn-001",
20 "_db_syncMode": "INCREMENT",
21 "_db_startingOffset": "latest",
22 "_db_startTimestamp": "2026-01-01 00:00:00"
23 }
24 ],
25 "sink": [
26 {
27 "plugin_name": "Iceberg",
28 "_db_pluginId": "Iceberg-streaming",
29 "plugin_input": "source_table",
30 "_db_sinkPath": "lake.default",
31 "_db_catalog": "lake",
32 "_db_schema": "default",
33 "_db_sinkTableType": "MANAGED"
34 }
35 ],
36 "_db_tableConfigs": [
37 {
38 "tableIdentity": {
39 "sourceTable": "topic_orders",
40 "sinkTable": "ods_orders"
41 },
42 "source": {
43 "format": "json",
44 "_inter_jsonSample": "{\"id\":1,\"name\":\"demo\",\"amount\":100.0}"
45 },
46 "mapping": {
47 "sourceFields": [
48 {
49 "name": "id",
50 "type": "BIGINT"
51 },
52 {
53 "name": "name",
54 "type": "STRING"
55 },
56 {
57 "name": "amount",
58 "type": "DOUBLE"
59 }
60 ]
61 },
62 "sink": {
63 "_db_isAutoCreated": true,
64 "_db_sinkNameRule": "SAME",
65 "_db_dmlConfig": {
66 "insert": "INSERT",
67 "update": "UPDATE",
68 "delete": "DELETE",
69 "logicalDeleteTag": ""
70 }
71 }
72 }
73 ]
74}
Reader 参数:Kafka
source[0] 全局参数
| 参数 | 描述 | 是否必选 | 默认值 |
|---|---|---|---|
plugin_name |
SeaTunnel 插件名,固定填 Kafka |
是 | 无 |
_db_pluginId |
DataBuilder 侧插件 ID,固定填 Kafka |
是 | 无 |
plugin_output |
Source 输出流名称,需与 sink 的 plugin_input 保持一致 |
是 | 无 |
_db_connectionId |
Kafka 数据源连接 ID | 是 | 无 |
_db_startingOffset |
消费起始位置:latest(最新位点)/ earliest(最早位点)/ timestamp(指定时间) |
否 | latest |
_db_startTimestamp |
当 _db_startingOffset 为 timestamp 时,指定起始消费时间 |
否 | 无 |
表级配置(写入 _db_tableConfigs[i].source)
| 参数 | 描述 | 可选值 | 默认值 |
|---|---|---|---|
format |
消息序列化格式 | json / text / canal_json / debezium_json / maxwell_json / ogg_json |
json |
field_delimiter |
消息字段分隔符,format=text 时生效 |
任意字符 | , |
配置编写注意事项
- plugin_name 与 _db_pluginId 均为
Kafka:与 JDBC 类插件不同,Kafka 的plugin_name和_db_pluginId值相同,均为Kafka。 - 消息格式与字段映射:
format在_db_tableConfigs[i].source中按表配置。使用json格式时,需在mapping.sourceFields中声明字段名与类型;_inter_jsonSample仅为前端辅助字段,不写入后端配置。 -
消费起始位置的选择:
- 首次上线、对历史消息无需求 →
latest - 需从最早消息回刷 →
earliest - 需从特定时间点开始消费 →
timestamp,配合_db_startTimestamp使用
- 首次上线、对历史消息无需求 →
- CDC 格式消息:若 Kafka Topic 中的消息是由 Canal / Debezium / Maxwell / OGG 等工具产生的 CDC 格式,
format需对应填写canal_json/debezium_json/maxwell_json/ogg_json,DML 写入策略在_db_tableConfigs[i].sink._db_dmlConfig中配置。 - Kafka 仅支持 streaming 作业:不支持 batch 作业类型,
env.job.mode必须为STREAMING。
评价此篇文章
