Kafka as DTS Destination
1. Applicable Scenarios
This document applies to scenarios where you migrate a data source DTS supported to a Kafka destination using the Baidu AI Cloud’s Data Transmission Service (DTS).
2. Restrictions on Using Kafka as DTS destination
- The incremental synchronization feature cannot synchronize DDL statements of a relational database
- A message sent by DTS to Kafka is restrained by At-Least-Once (the message is not lost but may be repeated). The restarting tasks and other behaviors may result in repeated data at the downstream of Kafka (it is recommended to remove duplicates with GLOBAL_ID of each data)
3. Prerequisites for Kafka as DTS destination
3.1 Environment Requirements
You have created a Kafka cluster or a Baidu Messaging System (BMS) topic as a migration destination. The available version of the self-built Kafka cluster is 0.9 or 0.10.
3.2 Privilege requirements
The given account has the privilege for writing data to the Kafka’s designated topic.
3.3 Recommended Configuration of destination Kafka
3.3.1 destination as a Baidu Messaging System (BMS) topic
You can configure a DTS task directly without additional configuration. For steps, see destination Kafka Task Configuration.
3.3.2 destination as a self-built Kafka cluster
Due to the network isolation between the DTS service control node and the destination self-built Kafka cluster, you need to configure the access routing rules for the self-built Kafka cluster. You can select different access methods as required and configure your Kafka cluster step by step.
3.3.2.1 Access your Kafka cluster via the public network
If you want DTS to access your Kafka cluster through a public network link, you need to configure public network access for each machine in the Kafka cluster. Suppose that there are three brokers in the destination Kafka cluster, their public IPs are 106.0.0.1, 106.0.0.2, and 106.0.0.3; their private IPs are 172.16.0.1, 172.16.0.2, and 172.16.0.3. You need to configure the configuration file server.properties of each broker. Take broker1 as an instance (Public network IP: 106.0.0.1, and private IP: 172.16.0.1):
listeners
listeners=INTERNAL://172.16.0.1:9092,PUBLIC://172.16.0.1:19092
The listeners are configuration items used to define a broker's listener. Connection information (172.16.0.1:9092) under the INTERNAL tag is used for internal communication between brokers. Private IP (172.16.0.1) is configured here, indicating that it is possible to make possible the network communication between brokers through a private network. If you want to set up a public network linkage for communication between brokers, change the linkage to a Public IP (106.0.0.1).
The connection information marked by the PUBLIC tag (172.16.0.1:19092) is used for the network communication with the public network. Note: The configured IP should correspond to the IP under the INTERNAL tag, but their ports must be different.
advertised.listeners
advertised.listeners=INTERNAL://172.16.0.1:9092,PUBLIC://106.0.0.1:19092
The advertised.listeners is used to release broker's listener information to Zookeeper for clients or other brokers to query. If advertised.listeners is configured, the information configured by listeners will not be released to Zookeeper.
Connection information (172.16.0.1: 9092) under the INTERNAL tag brings into correspondence with the listeners configuration, however, you need to fill in Public IP for connection information (106.0.0.1: 19092) under the PUBLIC tag.
listener.security.protocol.map
listener.security.protocol.map=INTERNAL:PLAINTEXT,PUBLIC:PLAINTEXT
The listener.security.protocol.map is used to configure the listener’s security protocol. Here you can configure different security protocols for different connection methods according to your needs. In the example, INTERNAL and PUBLIC are configured by default with security protocol without access control (PLAINTEXT).
inter.broker.listener.name
inter.broker.listener.name=INTERNAL
Inter.broker.listener.name is used to specify a tab as the connection method of the internal listener. The listener represented by this tab is specially used for communication between brokers in the Kafka cluster. In the example, the value of the field is configured as INTERNAL, indicating that you want to build communication between brokers through a private network.
Start broker
After the configuration of the four parameters is finished, save, modify and exit to the Kafka root directory, and then restart broker1. Then, follow the same steps to configure and start broker2 and broker3.
3.3.2.2 Access your Kafka cluster via Baidu AI Cloud’s intranet
DTS supports the BBC or BCC self-built Kafka clusters, in addition to the public network self-built instance. Because the cluster is deployed on the Baidu AI Cloud, users can select to bind EIP and make DTS access Kafka cluster through the public network or directly access Kafka cluster through the Baidu AI Cloud’s private network.
For public network access, see the previous chapter. This chapter introduces how to configure the BBC/BCC self-built Kafka cluster to allow DTS to access the Kafka cluster through the Baidu AI Cloud’s private network.
Query PNET IP
In the Baidu AI Cloud’s private network, PNET IP is used to uniquely identify a virtual machine instance. DTS uses PNET IP to correctly access your Kafka cluster in the Baidu AI Cloud’s private network. By executing the following command on your own BBC/BCC instance command line, you can obtain the PNET IP of the instance.
curl http://169.254.169.254/2009-04-04/meta-data/public-ipv4
Taking broker1 as an example (PNET IP: 10.0.0.1, Intranet IP: 192.168.0.1), modify 4 network communication configuration items in server.properties. For the meaning of each configuration item, see the Section, Access via Public Network.
listeners
listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://192.168.0.1:19092
The IP configured by INTERNAL here is the Baidu Cloud VPC’s private IP. You can query the instance private IP on the BCC or BBC instance details page.
The listener under an EXTERNAL tag indicates connection information on the access to the broker via PNET IP. Note: The configured IP should correspond to the IP under the INTERNAL tag, but their ports must be different.
advertised.listeners
advertised.listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://10.0.0.1:19092
The advertised.listeners under the EXTERNAL tag is configured as PNET IP: Listening Port. The content of the INTERNAL tag is consistent with the configuration item listeners
listener.security.protocol.map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
Here you can configure different security protocols for different connection methods according to your needs. In the example, INTERNAL and EXTERNAL are configured by default with security protocol without access control (PLAINTEXT).
inter.broker.listener.name
inter.broker.listener.name=INTERNAL
In the example, the value of the field is configured as INTERNAL, indicating that the communication between brokers can be performed through Baidu Cloud VPC subnet.
4. Application of Kafka as DTS destination
For the operation process of task create, pre-check, task start, task pause, and task termination during the use of Kafka as the destination, see Operation Guide. Task Configuration and Object Mapping are different from other data sources.
4.1 Task Configuration
Enter the Task Connection Configuration page. Taking the Baidu AI Cloud‘s database RDS for MySQL as the source end as an example, you can select a source-end instance.
When you configure destination connection information, you need to select the access type based on the access mode of the destination Kafka cluster.
If the destination is the Baidu Messaging System (BMS) topic, you need to choose Baidu Messaging System (BMS) in Access Type, and then Region and Topic ID
If the destination is the self-built Kafka cluster, you need to select the access type between the public network access and Baidu AI Cloud intranet access, as shown in the following figure.
Note: IP in a Broker list must be PNET IP
Then, enter other information as required. Note: DTS only supports Kafka clusters of versions 0.9 and 0.10 and supports Kafka clusters of 0.10 version to configure SASL access control.
Click “Authorize Whitelist and Enter Next”, and select the migration object of the instance at the source.
4.2 Object Mapping
After you finish selecting the Migration Type and Migration Object, click Save and Pre-check to create a task. Then, view the task status in the task list.
- If the status column displays "Pre-check Passed", you can select and start the migration task. After you start the task, you can view the migration progress in the task progress column.
- If the status column displays “Pre-check Failed”, you can click the button next to it to view the cause of failure and modify. Before you start the migration task, you need to check again until pre-check is passed.
For pre-check details, see Operation Guides for Data Migration-Pre-check
5. Kafka Data Format
After you start the task, you may drag Total Standard or Incremental Change data from the source-end database instance with DTS, and write it in the designated Topic of the destination Kafka cluster in the fixed format. The specific format is as follows:
//json structure
[ //Outermost array
{ //The first line of record. One message may contain 1 or more record lines.
"TIME":"20180831165311", //The timestamp
"GLOBAL_ID":"xxxxxxxx", //The global and unique ID. The customer can use this ID to remove the duplicates.
"DATABASE":"db1", //The database name
"SCHEMA":"schema1", //The SCHEMA name. It exists only when the upstream node is PostgreSQL or SQLServer
"TABLE":"tbl1", //The table name
"TYPE":"U", //The change type. I indicates insert, U indicates update, and D indicates delete.
"OLD_VALUES":{ //The "Column name":"Column value" group of each column before the change. When the change type is I, there is not OLD_VALUES.
"key1":"old-value1",
"key2":"old-value2",
...
},
"NEW_VALUES":{ //The "Column name":"Column value" array of each column before the change. When the change type is I, there is not OLD_VALUES.
"key1":"new-value1",
"key2":"new-value2",
...
},
"OFFSET":{ //The position information of the record. There are values in the incremental migration stage only when the upstream application is MySQL. This field is useless for the customer.
"BINLOG_NAME":"mysql-bin.xxx",
"BINLOG_POS":"xxx",
"GTID":"server_id:transaction_id" //This field exists only when the synchronization is done by the GTTD method.
}
},
{ //The second line of record
"TIME":"20180831165311",
"DATABASE":"db1",
...
}
... //More record lines
]