百度智能云

All Product Document

          Database Transmission Server

          Kafka as DTS Destination

          1. Applicable Scenarios

          This document applies to scenarios where you migrate a data source DTS supported to a Kafka destination using the Baidu AI Cloud’s Data Transmission Service (DTS).

          2. Restrictions on Using Kafka as DTS destination

          • The incremental synchronization feature cannot synchronize DDL statements of a relational database
          • A message sent by DTS to Kafka is restrained by At-Least-Once (the message is not lost but may be repeated). The restarting tasks and other behaviors may result in repeated data at the downstream of Kafka (it is recommended to remove duplicates with GLOBAL_ID of each data)

          3. Prerequisites for Kafka as DTS destination

          3.1 Environment Requirements

          You have created a Kafka cluster or a Baidu Messaging System (BMS) topic as a migration destination. The available version of the self-built Kafka cluster is 0.9 or 0.10.

          3.2 Privilege requirements

          The given account has the privilege for writing data to the Kafka’s designated topic.

          3.3.1 destination as a Baidu Messaging System (BMS) topic

          You can configure a DTS task directly without additional configuration. For steps, see destination Kafka Task Configuration.

          3.3.2 destination as a self-built Kafka cluster

          Due to the network isolation between the DTS service control node and the destination self-built Kafka cluster, you need to configure the access routing rules for the self-built Kafka cluster. You can select different access methods as required and configure your Kafka cluster step by step.

          3.3.2.1 Access your Kafka cluster via the public network

          If you want DTS to access your Kafka cluster through a public network link, you need to configure public network access for each machine in the Kafka cluster. Suppose that there are three brokers in the destination Kafka cluster, their public IPs are 106.0.0.1, 106.0.0.2, and 106.0.0.3; their private IPs are 172.16.0.1, 172.16.0.2, and 172.16.0.3. You need to configure the configuration file server.properties of each broker. Take broker1 as an instance (Public network IP: 106.0.0.1, and private IP: 172.16.0.1):

          listeners
          listeners=INTERNAL://172.16.0.1:9092,PUBLIC://172.16.0.1:19092

          The listeners are configuration items used to define a broker's listener. Connection information (172.16.0.1:9092) under the INTERNAL tag is used for internal communication between brokers. Private IP (172.16.0.1) is configured here, indicating that it is possible to make possible the network communication between brokers through a private network. If you want to set up a public network linkage for communication between brokers, change the linkage to a Public IP (106.0.0.1).
          The connection information marked by the PUBLIC tag (172.16.0.1:19092) is used for the network communication with the public network. Note: The configured IP should correspond to the IP under the INTERNAL tag, but their ports must be different.

          advertised.listeners
          advertised.listeners=INTERNAL://172.16.0.1:9092,PUBLIC://106.0.0.1:19092

          The advertised.listeners is used to release broker's listener information to Zookeeper for clients or other brokers to query. If advertised.listeners is configured, the information configured by listeners will not be released to Zookeeper.
          Connection information (172.16.0.1: 9092) under the INTERNAL tag brings into correspondence with the listeners configuration, however, you need to fill in Public IP for connection information (106.0.0.1: 19092) under the PUBLIC tag.

          listener.security.protocol.map
          listener.security.protocol.map=INTERNAL:PLAINTEXT,PUBLIC:PLAINTEXT

          The listener.security.protocol.map is used to configure the listener’s security protocol. Here you can configure different security protocols for different connection methods according to your needs. In the example, INTERNAL and PUBLIC are configured by default with security protocol without access control (PLAINTEXT).

          inter.broker.listener.name
          inter.broker.listener.name=INTERNAL

          Inter.broker.listener.name is used to specify a tab as the connection method of the internal listener. The listener represented by this tab is specially used for communication between brokers in the Kafka cluster. In the example, the value of the field is configured as INTERNAL, indicating that you want to build communication between brokers through a private network.

          Start broker

          After the configuration of the four parameters is finished, save, modify and exit to the Kafka root directory, and then restart broker1. Then, follow the same steps to configure and start broker2 and broker3.

          3.3.2.2 Access your Kafka cluster via Baidu AI Cloud’s intranet

          DTS supports the BBC or BCC self-built Kafka clusters, in addition to the public network self-built instance. Because the cluster is deployed on the Baidu AI Cloud, users can select to bind EIP and make DTS access Kafka cluster through the public network or directly access Kafka cluster through the Baidu AI Cloud’s private network.

          For public network access, see the previous chapter. This chapter introduces how to configure the BBC/BCC self-built Kafka cluster to allow DTS to access the Kafka cluster through the Baidu AI Cloud’s private network.

          Query PNET IP

          In the Baidu AI Cloud’s private network, PNET IP is used to uniquely identify a virtual machine instance. DTS uses PNET IP to correctly access your Kafka cluster in the Baidu AI Cloud’s private network. By executing the following command on your own BBC/BCC instance command line, you can obtain the PNET IP of the instance.

          curl http://169.254.169.254/2009-04-04/meta-data/public-ipv4

          PNET IP.png

          Taking broker1 as an example (PNET IP: 10.0.0.1, Intranet IP: 192.168.0.1), modify 4 network communication configuration items in server.properties. For the meaning of each configuration item, see the Section, Access via Public Network.

          listeners
          listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://192.168.0.1:19092

          The IP configured by INTERNAL here is the Baidu Cloud VPC’s private IP. You can query the instance private IP on the BCC or BBC instance details page.

          The listener under an EXTERNAL tag indicates connection information on the access to the broker via PNET IP. Note: The configured IP should correspond to the IP under the INTERNAL tag, but their ports must be different.

          advertised.listeners
          advertised.listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://10.0.0.1:19092

          The advertised.listeners under the EXTERNAL tag is configured as PNET IP: Listening Port. The content of the INTERNAL tag is consistent with the configuration item listeners

          listener.security.protocol.map
          listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

          Here you can configure different security protocols for different connection methods according to your needs. In the example, INTERNAL and EXTERNAL are configured by default with security protocol without access control (PLAINTEXT).

          inter.broker.listener.name
          inter.broker.listener.name=INTERNAL

          In the example, the value of the field is configured as INTERNAL, indicating that the communication between brokers can be performed through Baidu Cloud VPC subnet.

          4. Application of Kafka as DTS destination

          For the operation process of task create, pre-check, task start, task pause, and task termination during the use of Kafka as the destination, see Operation Guide. Task Configuration and Object Mapping are different from other data sources.

          4.1 Task Configuration

          Enter the Task Connection Configuration page. Taking the Baidu AI Cloud‘s database RDS for MySQL as the source end as an example, you can select a source-end instance.

          image.png

          When you configure destination connection information, you need to select the access type based on the access mode of the destination Kafka cluster.

          If the destination is the Baidu Messaging System (BMS) topic, you need to choose Baidu Messaging System (BMS) in Access Type, and then Region and Topic ID

          image.png

          If the destination is the self-built Kafka cluster, you need to select the access type between the public network access and Baidu AI Cloud intranet access, as shown in the following figure.

          Note: IP in a Broker list must be PNET IP

          Then, enter other information as required. Note: DTS only supports Kafka clusters of versions 0.9 and 0.10 and supports Kafka clusters of 0.10 version to configure SASL access control.

          Click “Authorize Whitelist and Enter Next”, and select the migration object of the instance at the source.

          4.2 Object Mapping

          After you finish selecting the Migration Type and Migration Object, click Save and Pre-check to create a task. Then, view the task status in the task list.

          • If the status column displays "Pre-check Passed", you can select and start the migration task. After you start the task, you can view the migration progress in the task progress column.
          • If the status column displays “Pre-check Failed”, you can click the button next to it to view the cause of failure and modify. Before you start the migration task, you need to check again until pre-check is passed.

          For pre-check details, see Operation Guides for Data Migration-Pre-check

          5. Kafka Data Format

          After you start the task, you may drag Total Standard or Incremental Change data from the source-end database instance with DTS, and write it in the designated Topic of the destination Kafka cluster in the fixed format. The specific format is as follows:

          //json structure
          [   //Outermost array
          {   //The first line of record. One message may contain 1 or more record lines.
              "TIME":"20180831165311",          //The timestamp
              "GLOBAL_ID":"xxxxxxxx",           //The global and unique ID. The customer can use this ID to remove the duplicates.
              "DATABASE":"db1",                 //The database name
              "SCHEMA":"schema1",               //The SCHEMA name. It exists only when the upstream node is PostgreSQL or SQLServer
              "TABLE":"tbl1",                   //The table name
              "TYPE":"U",                       //The change type. I indicates insert, U indicates update, and D indicates delete.
              "OLD_VALUES":{                    //The "Column name":"Column value" group of each column before the change. When the change type is I, there is not OLD_VALUES.
                  "key1":"old-value1",
                  "key2":"old-value2",
                  ...
              },
              "NEW_VALUES":{                    //The "Column name":"Column value" array of each column before the change. When the change type is I, there is not OLD_VALUES.
          
                  "key1":"new-value1",
                  "key2":"new-value2",
                  ...
              },
              "OFFSET":{                        //The position information of the record. There are values in the incremental migration stage only when the upstream application is MySQL. This field is useless for the customer.
                  "BINLOG_NAME":"mysql-bin.xxx",
                  "BINLOG_POS":"xxx",
                  "GTID":"server_id:transaction_id"   //This field exists only when the synchronization is done by the GTTD method.
              }
          },  
          {   //The second line of record
              "TIME":"20180831165311",
              "DATABASE":"db1",
              ...
          }
          ... //More record lines
          ]
          Previous
          Elasticsearch as DTS Destination
          Next
          SQL Server as Destination