Step(作业)
更新时间:2022-05-18
添加steps
作业是和集群相关联的资源,对作业的操作需要指定集群ID。
BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。请注意:参考下面样例代码时,需要修改作业参数指定的BOS路径为您的账户可用的BOS路径。
steps = [
BmrClient.step(
'Java',
'Continue',
BmrClient.java_step_properties(
'bos://benchmark/hadoop/hadoop-mapreduce-examples.jar',
'org.apache.hadoop.examples.WordCount',
'bos://helloworld/input/install.log bos://tester01/sdk/output_java/out1'
),
'sdk-job-01'
),
BmrClient.step(
'Streaming',
'Continue',
BmrClient.streaming_step_properties(
'bos://helloworld/input/install.log',
'bos://tester01/sdk/output_streaming/out1',
'cat'),
'sdk-job-02'
),
BmrClient.step(
'Hive',
'Continue',
BmrClient.hive_step_properties(
'bos://chy3/hive/hql/hive_src.hql',
'--hivevar LOCAT=bos://chy3/hive/tables/src',
'bos://chy3/hive/data/hive_src.data',
'bos://tester01/sdk/output_hive/out1'
),
'sdk-job-03'
),
BmrClient.step(
'Pig',
'Continue',
BmrClient.pig_step_properties(
'bos://chy3/pig/script/pig_grep.pig',
input='bos://chy3/pig/data/pig_grep.data',
output='bos://tester01/sdk/output_pig/out1'
),
'sdk-job-04'
),
BmrClient.step(
'Spark',
'Continue',
BmrClient.spark_step_properties(
'bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar',
'--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer',
'bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out'
),
'sdk-job-05'
),
BmrClient.step(
'Streaming',
'Continue',
BmrClient.streaming_step_properties(
'bos://helloworld/input/install.log',
'bos://tester01/sdk/output_streaming/out1',
'cat',
'cat',
'-libjars testB.jar'
),
'sdk-job-06',
[BmrClient.additional_file("bos://path/to/testA.jar", "testB.jar")]
)
]
try:
response = bmr_client.add_steps(cluster_id, steps)
LOG.debug('add steps response: %s' % response)
except BceHttpClientError as e:
if isinstance(e.last_error, BceServerError):
LOG.error('add_steps failed. Response %s, code: %s, msg: %s'
% (e.last_error.status_code, e.last_error.code, e.last_error.args))
else:
LOG.error('add_steps failed. Unknown exception: %s' % e)
列出全部steps
如下代码可以罗列出指定集群上的全部作业,用户可以通过指定pageNo和pageSize来限制一次请求返回的最大作业数目:
try:
response = bmr_client.list_steps(cluster_id, pageNo=1, pageSize=50)
for step in response.steps:
LOG.debug('list step %s: %s' % (step.id, step))
except BceHttpClientError as e:
if isinstance(e.last_error, BceServerError):
LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
% (e.last_error.status_code, e.last_error.code, e.last_error.args))
else:
LOG.error('list_steps failed. Unknown exception: %s' % e)
查询指定的step
如下代码可以查看指定作业的信息:
try:
response = bmr_client.get_step(cluster_id, step_id)
LOG.debug('describe step response: %s' % response)
except BceHttpClientError as e:
if isinstance(e.last_error, BceServerError):
LOG.error('get_step failed. Response %s, code: %s, msg: %s'
% (e.last_error.status_code, e.last_error.code, e.last_error.args))
else:
LOG.error('get_step failed. Unknown exception: %s' % e)