百度MapReduce BMR

    Step(作业)

    概述

    作业是和集群相关联的资源,对作业的操作需要指定相关集群的ID。

    添加steps

    BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。

    添加作业可以通过配置AddStepsRequest对象的clientToken属性来保证创建请求的幂等性。clientToken是一个长度不超过64位的ASCII字符串,配置AddStepsRequest对象的clientToken方法是:addStepsRequest.withClientToken(clientToken)

    请求返回的AddStepsResponse对象包含了新创建作业ID的数组List<String>,获取方法为response.getStepIds()

    public void addSteps(BmrClient bmrClient, String clusterId) {
        List<StepConfig> steps = new ArrayList<StepConfig>();
        // Custom Jar作业
        steps.add(
                new JavaStepConfig()
                        .withName("java-step")
                        .withActionOnFailure("Continue")
                        .withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
                        .withMainClass("org.apache.hadoop.examples.WordCount")
                        .withArguments("bos://path/to/input bos://path/to/java_output")
        );
    
    	 // Streaming作业
        steps.add(
                new StreamingStepConfig()
                        .withName("streaming-step")
                        .withActionOnFailure("Continue")
                        .withInput("bos://path/to/input_streaming")
                        .withMapper("cat")
                        .withOutput("bos://path/to/output_streaming")
        );
    
    	// 使用附加文件的Streaming作业
        steps.add(
            new StreamingStepConfig()
                    .withName("streaming-step2")
                    .withActionOnFailure("Continue")
                    .withInput("bos://path/to/input_streaming2")
                    .withMapper("cat")
                    .withReducer("cat")
                    .withOutput("bos://path/to/output_streaming2")
                    .withAdditionalFile("bos://path/to/testA.jar", "testB.jar")
                    .withArguments("-libjars testB.jar")
        );
    
        // Hive作业
        steps.add(
                new HiveStepConfig()
                        .withName("hive-step")
                        .withActionOnFailure("Continue")
                        .withScript("bos://path/to/hive/hql/hive_src.hql")
                        .withInput("bos://path/to/hive/data/hive_src.data")
                        .withOutput("bos://path/to/output_hive")
                        .withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
        );
    
        // Pig作业
        steps.add(
                new PigStepConfig()
                        .withName("pig-step")
                        .withActionOnFailure("Continue")
                        .withScript("bos://path/to/pig/script/pig_grep.pig")
                        .withInput("bos://path/to/pig/data/pig_grep.data")
                        .withOutput("bos://path/to/output_pig")
        );
        
        //Spark作业
        steps.add(
                new SparkStepConfig()
                        .withName("spark-step")
                        .withActionOnFailure("Continue")
                        .withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
                        .withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
                        .withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
        );
        }
    
        try {
            AddStepsResponse response = bmrClient.addSteps(
                    new AddStepsRequest().withClusterId(clusterId)
                            .withSteps(steps)
            );
            // 输出各个添加的作业ID
            for (String stepId : response.getStepIds()) {
                System.out.println(stepId);
            }
        } catch (BceServiceException e) {
            System.out.println("Add steps failed: " + e.getErrorMessage());
        } catch (BceClientException e) {
    	    System.out.println(e.getMessage());
        }
    }

    列出全部steps

    如下代码可以罗列出指定集群上的全部作业,用户可以通过配置查询参数maxKeys来限制每次请求返回的作业数目,并且通过配置有效的查询参数marker来指定查询记录的起点。marker参数值是由BMR系统生成并返回的,因而初次查询请求中不需要配置该参数,它是在多次循环查询请求的后继请求中进行使用的。

    public void listSteps(BmrClient bmrClient, String clusterId) {
        int maxKeys = 10;
        try {
            // 方法 1. 罗列指定集群ID相关的作业
            ListStepsResponse response1 = bmrClient.listSteps(clusterId);
    
            // 方法 2. 配置maxKeys的单次查询请求
            ListStepsResponse response2 = bmrClient.listSteps(clusterId,maxKeys);
    
            // 方法 3. 配置maxKeys和marker的循环查询请求
            boolean isTruncated = true;
            String marker = null;
            int page = 0;
            while (isTruncated) {
                ListStepsResponse response3 = bmrClient.listSteps(clusterId, marker, maxKeys);
                page++;
                System.out.format("Page %d: Step count: %d\n", page, response3.getSteps().size());
                isTruncated = response3.isTruncated();
                marker = response3.getNextMarker();
            }
    
            // 方法 4. 自定义ListStepsRequest对象的查询请求
            ListStepsResponse response4 = bmrClient.listSteps(
                    new ListStepsRequest().withClusterId(clusterId).withMaxKeys(maxKeys)
            );
            // 输出各个作业的状态
            for (Step step : response4.getSteps()) {
                System.out.println(step.getStatus().getState());
            }
        } catch (BceServiceException e) {
            System.out.println("List steps failed: " + e.getErrorMessage());
        }
    }

    请求返回的ListStepsResponse对象包含了相关的集群对象数组List<Step>, 获取集群对象数组的方法为response.getSteps()。作业对象Step的属性包括了作业相关的配置信息,每个属性均有对应的getter访问器方法。

    public class Step {
        private String id;                               // 作业ID
        private String actionOnFailure;                  // 作业失败策略
        private String type;                             // 作业类型
        private Map<String, String> properties;          // 作业描述
        private String name;                             // 作业名称
        private StepStatus status;                       // 作业状态
    	}
    
    public class StepStatus {
        private String createDateTime;                   // 作业提交时间
        private String endDateTime;                      // 作业结束时间
        private String startDateTime;                    // 作业开始执行的时间
        private String state;                            // 作业状态字段
    }

    查询指定的step

    如下代码可以查看指定作业的信息:

    请求返回的GetStepResponse对象包含了获取作业属性的getter访问器方法,可以直接调用response的访问器方法来获得目标作业的属性信息。

    public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
        try {
            // 方法 1. 查询指定集群ID、作业ID对应作业的信息
            GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);
    
            // 方法 2. 自定义GetStepRequest对象的查询请求
            GetStepResponse response2 = bmrClient.getStep(
                    new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
            );
            // 输出作业的状态信息
            System.out.println(response1.getStatus().getState());
        } catch (BceServiceException e) {
            System.out.println("Describe steps failed: " + e.getErrorMessage());
        }
    }
    一篇
    Cluster(集群)
    一篇
    日志