Step(作业)
更新时间:2022-04-15
概述
作业是和集群相关联的资源,对作业的操作需要指定相关集群的ID。
添加steps
BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。
添加作业可以通过配置AddStepsRequest对象的clientToken属性来保证创建请求的幂等性。clientToken是一个长度不超过64位的ASCII字符串,配置AddStepsRequest对象的clientToken方法是:addStepsRequest.withClientToken(clientToken)
。
请求返回的AddStepsResponse对象包含了新创建作业ID的数组List<String>
,获取方法为response.getStepIds()
。
public void addSteps(BmrClient bmrClient, String clusterId) {
List<StepConfig> steps = new ArrayList<StepConfig>();
// Custom Jar作业
steps.add(
new JavaStepConfig()
.withName("java-step")
.withActionOnFailure("Continue")
.withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
.withMainClass("org.apache.hadoop.examples.WordCount")
.withArguments("bos://path/to/input bos://path/to/java_output")
);
// Streaming作业
steps.add(
new StreamingStepConfig()
.withName("streaming-step")
.withActionOnFailure("Continue")
.withInput("bos://path/to/input_streaming")
.withMapper("cat")
.withOutput("bos://path/to/output_streaming")
);
List<AdditionalFile> additionalFiles = new ArrayList<AdditionalFile>();
additionalFiles.add(new AdditionalFile().withRemote("bos://path/to/testA.jar").withLocal("testB.jar"));
// 使用附加文件的Streaming作业
steps.add(
new StreamingStepConfig()
.withName("streaming-step2")
.withActionOnFailure("Continue")
.withInput("bos://path/to/input_streaming2")
.withMapper("cat")
.withReducer("cat")
.withOutput("bos://path/to/output_streaming2")
.withAdditionalFiles(additionalFiles)
);
// Hive作业
steps.add(
new HiveStepConfig()
.withName("hive-step")
.withActionOnFailure("Continue")
.withScript("bos://path/to/hive/hql/hive_src.hql")
.withInput("bos://path/to/hive/data/hive_src.data")
.withOutput("bos://path/to/output_hive")
.withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
);
// Pig作业
steps.add(
new PigStepConfig()
.withName("pig-step")
.withActionOnFailure("Continue")
.withScript("bos://path/to/pig/script/pig_grep.pig")
.withInput("bos://path/to/pig/data/pig_grep.data")
.withOutput("bos://path/to/output_pig")
);
//Spark作业
steps.add(
new SparkStepConfig()
.withName("spark-step")
.withActionOnFailure("Continue")
.withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
.withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
.withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
);
try {
AddStepsResponse response = bmrClient.addSteps(
new AddStepsRequest().withClusterId(clusterId)
.withSteps(steps)
);
// 输出各个添加的作业ID
for (String stepId : response.getStepIds()) {
System.out.println(stepId);
}
} catch (BceServiceException e) {
System.out.println("Add steps failed: " + e.getErrorMessage());
} catch (BceClientException e) {
System.out.println(e.getMessage());
}
}
列出全部steps
如下代码可以罗列出指定集群上的全部作业,用户可以通过配置查询参数pageNo和pageSize来限制每次请求返回的作业数目和查询记录的起点。
public void listSteps(BmrClient bmrClient, String clusterId) {
int maxKeys = 10;
try {
// 罗列指定集群ID相关的作业
ListStepsRequest request = new ListStepsRequest().withClusterId(clusterId);
ListStepsResponse response = bmrClient.listSteps(request);
// 输出各个作业的状态
for (Step step : response.getSteps()) {
System.out.println(step.getStatus().getState());
}
} catch (BceServiceException e) {
System.out.println("List steps failed: " + e.getErrorMessage());
}
}
请求返回的ListStepsResponse对象包含了相关的集群对象数组List<Step>
, 获取集群对象数组的方法为response.getSteps()
。作业对象Step的属性包括了作业相关的配置信息,每个属性均有对应的getter访问器方法。
public class Step {
private String id; // 作业ID
private String actionOnFailure; // 作业失败策略
private String type; // 作业类型
private Map<String, String> properties; // 作业描述
private String name; // 作业名称
private StepStatus status; // 作业状态
}
public class StepStatus {
private String createDateTime; // 作业提交时间
private String endDateTime; // 作业结束时间
private String startDateTime; // 作业开始执行的时间
private String state; // 作业状态字段
}
查询指定的step
如下代码可以查看指定作业的信息:
请求返回的GetStepResponse对象包含了获取作业属性的getter访问器方法,可以直接调用response的访问器方法来获得目标作业的属性信息。
public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
try {
// 方法 1. 查询指定集群ID、作业ID对应作业的信息
GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);
// 方法 2. 自定义GetStepRequest对象的查询请求
GetStepResponse response2 = bmrClient.getStep(
new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
);
// 输出作业的状态信息
System.out.println(response1.getStatus().getState());
} catch (BceServiceException e) {
System.out.println("Describe steps failed: " + e.getErrorMessage());
}
}