Step(作业)

添加steps

作业是和集群相关联的资源,对作业的操作需要指定集群ID。

BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。请注意:参考下面样例代码时,需要修改作业参数指定的BOS路径为您的账户可用的BOS路径。

steps =  [
    BmrClient.step(
            'Java',
            'Continue',
            BmrClient.java_step_properties(
                'bos://benchmark/hadoop/hadoop-mapreduce-examples.jar',
                'org.apache.hadoop.examples.WordCount',
                'bos://helloworld/input/install.log bos://tester01/sdk/output_java/out1'
                ),
            'sdk-job-01'
            ),
    BmrClient.step(
            'Streaming',
            'Continue',
            BmrClient.streaming_step_properties(
                'bos://helloworld/input/install.log',
                'bos://tester01/sdk/output_streaming/out1',
                'cat'),
            'sdk-job-02'
            ),
    BmrClient.step(
            'Hive',
            'Continue',
            BmrClient.hive_step_properties(
                'bos://chy3/hive/hql/hive_src.hql',
                '--hivevar LOCAT=bos://chy3/hive/tables/src',
                'bos://chy3/hive/data/hive_src.data',
                'bos://tester01/sdk/output_hive/out1'
                ),
            'sdk-job-03'
            ),
    BmrClient.step(
            'Pig',
            'Continue',
            BmrClient.pig_step_properties(
                'bos://chy3/pig/script/pig_grep.pig',
                input='bos://chy3/pig/data/pig_grep.data',
                output='bos://tester01/sdk/output_pig/out1'
                ),
            'sdk-job-04'
            ),
    BmrClient.step(
            'Spark',
            'Continue',
            BmrClient.spark_step_properties(
                'bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar',
                '--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer',
                'bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out'
                ),
            'sdk-job-05'
            ),
    BmrClient.step(
            'Streaming',
            'Continue',
            BmrClient.streaming_step_properties(
                'bos://helloworld/input/install.log',
                'bos://tester01/sdk/output_streaming/out1',
                'cat',
                'cat',
                '-libjars testB.jar'
                ),
            'sdk-job-06',
            [BmrClient.additional_file("bos://path/to/testA.jar", "testB.jar")]
            )                   
]

try:
    response = bmr_client.add_steps(cluster_id, steps)
    LOG.debug('add steps response: %s' % response)
except BceHttpClientError as e:
    if isinstance(e.last_error, BceServerError):
        LOG.error('add_steps failed. Response %s, code: %s, msg: %s'
                  % (e.last_error.status_code, e.last_error.code, e.last_error.message))
    else:
        LOG.error('add_steps failed. Unknown exception: %s' % e)

列出全部steps

如下代码可以罗列出指定集群上的全部作业,用户可以通过指定max_keys来限制一次请求返回的最大作业数目:

try:
    response = bmr_client.list_steps(cluster_id, max_keys=50)
    for step in response.steps:
        LOG.debug('list step %s: %s' % (step.id, step))
except BceHttpClientError as e:
    if isinstance(e.last_error, BceServerError):
        LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
                  % (e.last_error.status_code, e.last_error.code, e.last_error.message))
    else:
        LOG.error('list_steps failed. Unknown exception: %s' % e)

同样,SDK也提供了分页查询的调用方式:

try:
    page = 1
    next_marker = None
    max_keys = 5
    is_truncated = True
    while is_truncated:
        response = bmr_client.list_steps(cluster_id, marker=next_marker, max_keys=max_keys)
        LOG.debug('list %s steps on Page %s' % (len(response.steps), page))
        page += 1
        is_truncated = response.is_truncated
        next_marker = response.next_marker
except BceHttpClientError as e:
    if isinstance(e.last_error, BceServerError):
        LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
                  % (e.last_error.status_code, e.last_error.code, e.last_error.message))
    else:
        LOG.error('list_steps failed. Unknown exception: %s' % e)

查询指定的step

如下代码可以查看指定作业的信息:

try:
    response = bmr_client.get_step(cluster_id, step_id)
    LOG.debug('describe step response: %s' % response)
except BceHttpClientError as e:
    if isinstance(e.last_error, BceServerError):
        LOG.error('get_step failed. Response %s, code: %s, msg: %s'
                  % (e.last_error.status_code, e.last_error.code, e.last_error.message))
    else:
        LOG.error('get_step failed. Unknown exception: %s' % e)