导入本地数据
所有文档

          数据仓库 Palo

          导入本地数据

          Stream Load 用于将本地文件导入到 Palo 中。

          不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Palo 进行连接交互的。

          该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

          • 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
          • 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。

          本文文档我们以 curl 命令为例演示如何进行数据导入。

          文档最后,我们给出一个使用 Java 导入数据的代码示例。

          导入数据

          Stream Load 的请求体如下:

          PUT /api/{db}/{table}/_stream_load
          1. 创建一张表

            通过 CREATE TABLE 命令创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:

            CREATE TABLE IF NOT EXISTS load_test
            (
                id INT,
                name VARCHAR(128)
            )
            DISTRIBUTED BY HASH(id) BUCKETS 8;
          2. 导入数据

            执行以下 curl 命令导入本地文件:

             curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example_db/load_test/_stream_load
            • user:passwd 为在 Palo 中创建的用户。初始用户为 admin,密码为创建 Palo 集群时设置的密码。
            • host:port 为 Compute Node 的 HTTP 协议端口,默认是 8040,可以在智能云 Palo 集群详情页面查看。
            • label: 可以在 Header 中指定 Label 唯一标识这个导入任务。

            关于 Stream Load 命令的更多高级操作,请参阅 Stream Load 命令文档。

          3. 等待导入结果

            Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

            {
                "TxnId": 1003,
                "Label": "example_label_1",
                "Status": "Success",
                "Message": "OK",
                "NumberTotalRows": 1000000,
                "NumberLoadedRows": 1000000,
                "NumberFilteredRows": 1,
                "NumberUnselectedRows": 0,
                "LoadBytes": 40888898,
                "LoadTimeMs": 2144,
                "BeginTxnTimeMs": 1,
                "StreamLoadPutTimeMs": 2,
                "ReadDataTimeMs": 325,
                "WriteDataTimeMs": 1933,
                "CommitAndPublishTimeMs": 106,
                "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
            }
            • Status 字段状态为 Success 即表示导入成功。
            • 其他字段的详细介绍,请参阅 Stream Load 命令文档。

          使用建议

          • Stream Load 只能导入本地文件。
          • 建议一个导入请求的数据量控制在 1 GB 以内。如果有大量本地文件,可以分批并发提交。

          Java 代码示例

          这里通过一个简单的 JAVA 示例来执行 Stream Load:

          package demo.palo;
          
          import com.google.gson.Gson;
          import com.google.gson.reflect.TypeToken;
          
          import java.io.BufferedInputStream;
          import java.io.BufferedOutputStream;
          import java.io.BufferedReader;
          import java.io.File;
          import java.io.FileInputStream;
          import java.io.InputStream;
          import java.io.InputStreamReader;
          import java.lang.reflect.Type;
          import java.net.HttpURLConnection;
          import java.net.URL;
          import java.nio.charset.StandardCharsets;
          import java.util.Base64;
          
          public class PaloStreamLoadDemo {
              private final static String HOST = "127.0.0.1"; // Compute Node host
              private final static int PORT = 8040;   // Compute Node HTTP port
          
              private static final String STREAM_LOAD_URL_PATTERN = "http://%s:%d/api/%s/%s/_stream_load";
          
              private final static String DB = "example_db";
              private final static String TABLE = "example_tbl";
              private final static String USER = "user";
              private final static String PASSWD = "passwd";
              // local file to be loaded
              private final static String LOAD_FILE = "./data.txt";
          
              public static void main(String[] args) throws Exception {
                  streamLoad();
              }
          
              private static void streamLoad() throws Exception {
                  String loadUrlStr = String.format(STREAM_LOAD_URL_PATTERN, HOST, PORT, DB, TABLE);
                  URL loadUrl = new URL(loadUrlStr);
                  HttpURLConnection conn = (HttpURLConnection) loadUrl.openConnection();
                  conn.setRequestMethod("PUT");
                  String auth = String.format("%s:%s", USER, PASSWD);
                  String authEncoding = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
                  conn.setRequestProperty("Authorization", "Basic " + authEncoding);
                  conn.addRequestProperty("Expect", "100-continue");
                  conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
                  // set header.
                  // your add add any other headers here.
                  conn.addRequestProperty("column_separator", ",");
                  conn.addRequestProperty("label", "example_label");
                  conn.setDoOutput(true);
                  conn.setDoInput(true);
          
                  // read and send file content
                  File loadFile = new File(LOAD_FILE);
                  try (BufferedOutputStream bos = new BufferedOutputStream(conn.getOutputStream());
                       BufferedInputStream bis = new BufferedInputStream(new FileInputStream(loadFile));) {
                      int i;
                      while ((i = bis.read()) > 0) {
                          bos.write(i);
                      }
                  }
          
                  // get response
                  int status = conn.getResponseCode();
                  String respMsg = conn.getResponseMessage();
                  System.out.println("get status: " + status + ", response msg: " + respMsg);
          
                  // parse the response json
                  InputStream stream = (InputStream) conn.getContent();
                  BufferedReader br = new BufferedReader(new InputStreamReader(stream));
                  StringBuilder sb = new StringBuilder();
                  String line;
                  while ((line = br.readLine()) != null) {
                      sb.append(line);
                  }
                  Type type = new TypeToken<SubmitResult>() {
                  }.getType();
                  SubmitResult result = new Gson().fromJson(sb.toString(), type);
          
                  System.out.println("Get result status: " + result.Status);
              }
          
              // The response json class
              public static class SubmitResult {
                  public String TxnId;
                  public String Label;
                  public String Status;
                  public String ExistingJobStatus;
                  public String Message;
                  public String NumberTotalRows;
                  public String NumberLoadedRows;
                  public String NumberFilteredRows;
                  public String NumberUnselectedRows;
                  public String LoadBytes;
                  public String LoadTimeMs;
                  public String BeginTxnTimeMs;
                  public String StreamLoadPutTimeMs;
                  public String ReadDataTimeMs;
                  public String WriteDataTimeMs;
                  public String CommitAndPublishTimeMs;
                  public String ErrorURL;
              }
          }
          上一篇
          导入总览
          下一篇
          导入BOS中的数据