Step(作业)

概述

作业是和集群相关联的资源,对作业的操作需要指定相关集群的ID。

添加steps

BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。

添加作业可以通过配置AddStepsRequest对象的clientToken属性来保证创建请求的幂等性。clientToken是一个长度不超过64位的ASCII字符串,配置AddStepsRequest对象的clientToken方法是:addStepsRequest.withClientToken(clientToken)

请求返回的AddStepsResponse对象包含了新创建作业ID的数组List<String>,获取方法为response.getStepIds()

public void addSteps(BmrClient bmrClient, String clusterId) {
    List<StepConfig> steps = new ArrayList<StepConfig>();
    // Custom Jar作业
    steps.add(
            new JavaStepConfig()
                    .withName("java-step")
                    .withActionOnFailure("Continue")
                    .withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
                    .withMainClass("org.apache.hadoop.examples.WordCount")
                    .withArguments("bos://path/to/input bos://path/to/java_output")
    );

     // Streaming作业
    steps.add(
            new StreamingStepConfig()
                    .withName("streaming-step")
                    .withActionOnFailure("Continue")
                    .withInput("bos://path/to/input_streaming")
                    .withMapper("cat")
                    .withOutput("bos://path/to/output_streaming")
    );

    // 使用附加文件的Streaming作业
    steps.add(
        new StreamingStepConfig()
                .withName("streaming-step2")
                .withActionOnFailure("Continue")
                .withInput("bos://path/to/input_streaming2")
                .withMapper("cat")
                .withReducer("cat")
                .withOutput("bos://path/to/output_streaming2")
                .withAdditionalFile("bos://path/to/testA.jar", "testB.jar")
                .withArguments("-libjars testB.jar")
    );

    // Hive作业
    steps.add(
            new HiveStepConfig()
                    .withName("hive-step")
                    .withActionOnFailure("Continue")
                    .withScript("bos://path/to/hive/hql/hive_src.hql")
                    .withInput("bos://path/to/hive/data/hive_src.data")
                    .withOutput("bos://path/to/output_hive")
                    .withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
    );

    // Pig作业
    steps.add(
            new PigStepConfig()
                    .withName("pig-step")
                    .withActionOnFailure("Continue")
                    .withScript("bos://path/to/pig/script/pig_grep.pig")
                    .withInput("bos://path/to/pig/data/pig_grep.data")
                    .withOutput("bos://path/to/output_pig")
    );

    //Spark作业
    steps.add(
            new SparkStepConfig()
                    .withName("spark-step")
                    .withActionOnFailure("Continue")
                    .withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
                    .withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
                    .withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
    );
    }

    try {
        AddStepsResponse response = bmrClient.addSteps(
                new AddStepsRequest().withClusterId(clusterId)
                        .withSteps(steps)
        );
        // 输出各个添加的作业ID
        for (String stepId : response.getStepIds()) {
            System.out.println(stepId);
        }
    } catch (BceServiceException e) {
        System.out.println("Add steps failed: " + e.getErrorMessage());
    } catch (BceClientException e) {
        System.out.println(e.getMessage());
    }
}

列出全部steps

如下代码可以罗列出指定集群上的全部作业,用户可以通过配置查询参数maxKeys来限制每次请求返回的作业数目,并且通过配置有效的查询参数marker来指定查询记录的起点。marker参数值是由BMR系统生成并返回的,因而初次查询请求中不需要配置该参数,它是在多次循环查询请求的后继请求中进行使用的。

public void listSteps(BmrClient bmrClient, String clusterId) {
    int maxKeys = 10;
    try {
        // 方法 1. 罗列指定集群ID相关的作业
        ListStepsResponse response1 = bmrClient.listSteps(clusterId);

        // 方法 2. 配置maxKeys的单次查询请求
        ListStepsResponse response2 = bmrClient.listSteps(clusterId,maxKeys);

        // 方法 3. 配置maxKeys和marker的循环查询请求
        boolean isTruncated = true;
        String marker = null;
        int page = 0;
        while (isTruncated) {
            ListStepsResponse response3 = bmrClient.listSteps(clusterId, marker, maxKeys);
            page++;
            System.out.format("Page %d: Step count: %d\n", page, response3.getSteps().size());
            isTruncated = response3.isTruncated();
            marker = response3.getNextMarker();
        }

        // 方法 4. 自定义ListStepsRequest对象的查询请求
        ListStepsResponse response4 = bmrClient.listSteps(
                new ListStepsRequest().withClusterId(clusterId).withMaxKeys(maxKeys)
        );
        // 输出各个作业的状态
        for (Step step : response4.getSteps()) {
            System.out.println(step.getStatus().getState());
        }
    } catch (BceServiceException e) {
        System.out.println("List steps failed: " + e.getErrorMessage());
    }
}

请求返回的ListStepsResponse对象包含了相关的集群对象数组List<Step>, 获取集群对象数组的方法为response.getSteps()。作业对象Step的属性包括了作业相关的配置信息,每个属性均有对应的getter访问器方法。

public class Step {
    private String id;                               // 作业ID
    private String actionOnFailure;                  // 作业失败策略
    private String type;                             // 作业类型
    private Map<String, String> properties;          // 作业描述
    private String name;                             // 作业名称
    private StepStatus status;                       // 作业状态
    }

public class StepStatus {
    private String createDateTime;                   // 作业提交时间
    private String endDateTime;                      // 作业结束时间
    private String startDateTime;                    // 作业开始执行的时间
    private String state;                            // 作业状态字段
}

查询指定的step

如下代码可以查看指定作业的信息:

请求返回的GetStepResponse对象包含了获取作业属性的getter访问器方法,可以直接调用response的访问器方法来获得目标作业的属性信息。

public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
    try {
        // 方法 1. 查询指定集群ID、作业ID对应作业的信息
        GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);

        // 方法 2. 自定义GetStepRequest对象的查询请求
        GetStepResponse response2 = bmrClient.getStep(
                new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
        );
        // 输出作业的状态信息
        System.out.println(response1.getStatus().getState());
    } catch (BceServiceException e) {
        System.out.println("Describe steps failed: " + e.getErrorMessage());
    }
}