Step(作业)
所有文档

          百度MapReduce BMR

          Step(作业)

          概述

          作业是和集群相关联的资源,对作业的操作需要指定相关集群的ID。

          添加steps

          BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。

          添加作业可以通过配置AddStepsRequest对象的clientToken属性来保证创建请求的幂等性。clientToken是一个长度不超过64位的ASCII字符串,配置AddStepsRequest对象的clientToken方法是:addStepsRequest.withClientToken(clientToken)

          请求返回的AddStepsResponse对象包含了新创建作业ID的数组List<String>,获取方法为response.getStepIds()

          public void addSteps(BmrClient bmrClient, String clusterId) {
              List<StepConfig> steps = new ArrayList<StepConfig>();
              // Custom Jar作业
              steps.add(
                      new JavaStepConfig()
                              .withName("java-step")
                              .withActionOnFailure("Continue")
                              .withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
                              .withMainClass("org.apache.hadoop.examples.WordCount")
                              .withArguments("bos://path/to/input bos://path/to/java_output")
              );
          
          	 // Streaming作业
              steps.add(
                      new StreamingStepConfig()
                              .withName("streaming-step")
                              .withActionOnFailure("Continue")
                              .withInput("bos://path/to/input_streaming")
                              .withMapper("cat")
                              .withOutput("bos://path/to/output_streaming")
              );
          
          	// 使用附加文件的Streaming作业
              steps.add(
                  new StreamingStepConfig()
                          .withName("streaming-step2")
                          .withActionOnFailure("Continue")
                          .withInput("bos://path/to/input_streaming2")
                          .withMapper("cat")
                          .withReducer("cat")
                          .withOutput("bos://path/to/output_streaming2")
                          .withAdditionalFile("bos://path/to/testA.jar", "testB.jar")
                          .withArguments("-libjars testB.jar")
              );
          
              // Hive作业
              steps.add(
                      new HiveStepConfig()
                              .withName("hive-step")
                              .withActionOnFailure("Continue")
                              .withScript("bos://path/to/hive/hql/hive_src.hql")
                              .withInput("bos://path/to/hive/data/hive_src.data")
                              .withOutput("bos://path/to/output_hive")
                              .withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
              );
          
              // Pig作业
              steps.add(
                      new PigStepConfig()
                              .withName("pig-step")
                              .withActionOnFailure("Continue")
                              .withScript("bos://path/to/pig/script/pig_grep.pig")
                              .withInput("bos://path/to/pig/data/pig_grep.data")
                              .withOutput("bos://path/to/output_pig")
              );
              
              //Spark作业
              steps.add(
                      new SparkStepConfig()
                              .withName("spark-step")
                              .withActionOnFailure("Continue")
                              .withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
                              .withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
                              .withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
              );
              }
          
              try {
                  AddStepsResponse response = bmrClient.addSteps(
                          new AddStepsRequest().withClusterId(clusterId)
                                  .withSteps(steps)
                  );
                  // 输出各个添加的作业ID
                  for (String stepId : response.getStepIds()) {
                      System.out.println(stepId);
                  }
              } catch (BceServiceException e) {
                  System.out.println("Add steps failed: " + e.getErrorMessage());
              } catch (BceClientException e) {
          	    System.out.println(e.getMessage());
              }
          }

          列出全部steps

          如下代码可以罗列出指定集群上的全部作业,用户可以通过配置查询参数maxKeys来限制每次请求返回的作业数目,并且通过配置有效的查询参数marker来指定查询记录的起点。marker参数值是由BMR系统生成并返回的,因而初次查询请求中不需要配置该参数,它是在多次循环查询请求的后继请求中进行使用的。

          public void listSteps(BmrClient bmrClient, String clusterId) {
              int maxKeys = 10;
              try {
                  // 方法 1. 罗列指定集群ID相关的作业
                  ListStepsResponse response1 = bmrClient.listSteps(clusterId);
          
                  // 方法 2. 配置maxKeys的单次查询请求
                  ListStepsResponse response2 = bmrClient.listSteps(clusterId,maxKeys);
          
                  // 方法 3. 配置maxKeys和marker的循环查询请求
                  boolean isTruncated = true;
                  String marker = null;
                  int page = 0;
                  while (isTruncated) {
                      ListStepsResponse response3 = bmrClient.listSteps(clusterId, marker, maxKeys);
                      page++;
                      System.out.format("Page %d: Step count: %d\n", page, response3.getSteps().size());
                      isTruncated = response3.isTruncated();
                      marker = response3.getNextMarker();
                  }
          
                  // 方法 4. 自定义ListStepsRequest对象的查询请求
                  ListStepsResponse response4 = bmrClient.listSteps(
                          new ListStepsRequest().withClusterId(clusterId).withMaxKeys(maxKeys)
                  );
                  // 输出各个作业的状态
                  for (Step step : response4.getSteps()) {
                      System.out.println(step.getStatus().getState());
                  }
              } catch (BceServiceException e) {
                  System.out.println("List steps failed: " + e.getErrorMessage());
              }
          }

          请求返回的ListStepsResponse对象包含了相关的集群对象数组List<Step>, 获取集群对象数组的方法为response.getSteps()。作业对象Step的属性包括了作业相关的配置信息,每个属性均有对应的getter访问器方法。

          public class Step {
              private String id;                               // 作业ID
              private String actionOnFailure;                  // 作业失败策略
              private String type;                             // 作业类型
              private Map<String, String> properties;          // 作业描述
              private String name;                             // 作业名称
              private StepStatus status;                       // 作业状态
          	}
          
          public class StepStatus {
              private String createDateTime;                   // 作业提交时间
              private String endDateTime;                      // 作业结束时间
              private String startDateTime;                    // 作业开始执行的时间
              private String state;                            // 作业状态字段
          }

          查询指定的step

          如下代码可以查看指定作业的信息:

          请求返回的GetStepResponse对象包含了获取作业属性的getter访问器方法,可以直接调用response的访问器方法来获得目标作业的属性信息。

          public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
              try {
                  // 方法 1. 查询指定集群ID、作业ID对应作业的信息
                  GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);
          
                  // 方法 2. 自定义GetStepRequest对象的查询请求
                  GetStepResponse response2 = bmrClient.getStep(
                          new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
                  );
                  // 输出作业的状态信息
                  System.out.println(response1.getStatus().getState());
              } catch (BceServiceException e) {
                  System.out.println("Describe steps failed: " + e.getErrorMessage());
              }
          }
          上一篇
          Cluster(集群)
          下一篇
          日志