百度MapReduce BMR

    Step(作业)

    添加steps

    作业是和集群相关联的资源,对作业的操作需要指定集群ID。

    BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。请注意:参考下面样例代码时,需要修改作业参数指定的BOS路径为您的账户可用的BOS路径。

    steps =  [
        BmrClient.step(
                'Java',
                'Continue',
                BmrClient.java_step_properties(
                    'bos://benchmark/hadoop/hadoop-mapreduce-examples.jar',
                    'org.apache.hadoop.examples.WordCount',
                    'bos://helloworld/input/install.log bos://tester01/sdk/output_java/out1'
                    ),
                'sdk-job-01'
                ),
        BmrClient.step(
                'Streaming',
                'Continue',
                BmrClient.streaming_step_properties(
                    'bos://helloworld/input/install.log',
                    'bos://tester01/sdk/output_streaming/out1',
                    'cat'),
                'sdk-job-02'
                ),
        BmrClient.step(
                'Hive',
                'Continue',
                BmrClient.hive_step_properties(
                    'bos://chy3/hive/hql/hive_src.hql',
                    '--hivevar LOCAT=bos://chy3/hive/tables/src',
                    'bos://chy3/hive/data/hive_src.data',
                    'bos://tester01/sdk/output_hive/out1'
                    ),
                'sdk-job-03'
                ),
        BmrClient.step(
                'Pig',
                'Continue',
                BmrClient.pig_step_properties(
                    'bos://chy3/pig/script/pig_grep.pig',
                    input='bos://chy3/pig/data/pig_grep.data',
                    output='bos://tester01/sdk/output_pig/out1'
                    ),
                'sdk-job-04'
                ),
        BmrClient.step(
                'Spark',
                'Continue',
                BmrClient.spark_step_properties(
                    'bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar',
                    '--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer',
                    'bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out'
                    ),
                'sdk-job-05'
                ),
        BmrClient.step(
                'Streaming',
                'Continue',
                BmrClient.streaming_step_properties(
                    'bos://helloworld/input/install.log',
                    'bos://tester01/sdk/output_streaming/out1',
                    'cat',
                    'cat',
                    '-libjars testB.jar'
                    ),
                'sdk-job-06',
                [BmrClient.additional_file("bos://path/to/testA.jar", "testB.jar")]
                )   				
    ]
    
    try:
        response = bmr_client.add_steps(cluster_id, steps)
        LOG.debug('add steps response: %s' % response)
    except BceHttpClientError as e:
        if isinstance(e.last_error, BceServerError):
            LOG.error('add_steps failed. Response %s, code: %s, msg: %s'
                      % (e.last_error.status_code, e.last_error.code, e.last_error.message))
        else:
            LOG.error('add_steps failed. Unknown exception: %s' % e)

    列出全部steps

    如下代码可以罗列出指定集群上的全部作业,用户可以通过指定max_keys来限制一次请求返回的最大作业数目:

    try:
        response = bmr_client.list_steps(cluster_id, max_keys=50)
        for step in response.steps:
            LOG.debug('list step %s: %s' % (step.id, step))
    except BceHttpClientError as e:
        if isinstance(e.last_error, BceServerError):
            LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
                      % (e.last_error.status_code, e.last_error.code, e.last_error.message))
        else:
            LOG.error('list_steps failed. Unknown exception: %s' % e)

    同样,SDK也提供了分页查询的调用方式:

    try:
        page = 1
        next_marker = None
        max_keys = 5
        is_truncated = True
        while is_truncated:
            response = bmr_client.list_steps(cluster_id, marker=next_marker, max_keys=max_keys)
            LOG.debug('list %s steps on Page %s' % (len(response.steps), page))
            page += 1
            is_truncated = response.is_truncated
            next_marker = response.next_marker
    except BceHttpClientError as e:
        if isinstance(e.last_error, BceServerError):
            LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
                      % (e.last_error.status_code, e.last_error.code, e.last_error.message))
        else:
            LOG.error('list_steps failed. Unknown exception: %s' % e)

    查询指定的step

    如下代码可以查看指定作业的信息:

    try:
        response = bmr_client.get_step(cluster_id, step_id)
        LOG.debug('describe step response: %s' % response)
    except BceHttpClientError as e:
        if isinstance(e.last_error, BceServerError):
            LOG.error('get_step failed. Response %s, code: %s, msg: %s'
                      % (e.last_error.status_code, e.last_error.code, e.last_error.message))
        else:
            LOG.error('get_step failed. Unknown exception: %s' % e)
    一篇
    Cluster(集群)
    一篇
    BmrClient