Step(作业)
所有文档

          百度MapReduce BMR

          Step(作业)

          添加steps

          作业是和集群相关联的资源,对作业的操作需要指定集群ID。

          BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。请注意:参考下面样例代码时,需要修改作业参数指定的BOS路径为您的账户可用的BOS路径。

          steps =  [
              BmrClient.step(
                      'Java',
                      'Continue',
                      BmrClient.java_step_properties(
                          'bos://benchmark/hadoop/hadoop-mapreduce-examples.jar',
                          'org.apache.hadoop.examples.WordCount',
                          'bos://helloworld/input/install.log bos://tester01/sdk/output_java/out1'
                          ),
                      'sdk-job-01'
                      ),
              BmrClient.step(
                      'Streaming',
                      'Continue',
                      BmrClient.streaming_step_properties(
                          'bos://helloworld/input/install.log',
                          'bos://tester01/sdk/output_streaming/out1',
                          'cat'),
                      'sdk-job-02'
                      ),
              BmrClient.step(
                      'Hive',
                      'Continue',
                      BmrClient.hive_step_properties(
                          'bos://chy3/hive/hql/hive_src.hql',
                          '--hivevar LOCAT=bos://chy3/hive/tables/src',
                          'bos://chy3/hive/data/hive_src.data',
                          'bos://tester01/sdk/output_hive/out1'
                          ),
                      'sdk-job-03'
                      ),
              BmrClient.step(
                      'Pig',
                      'Continue',
                      BmrClient.pig_step_properties(
                          'bos://chy3/pig/script/pig_grep.pig',
                          input='bos://chy3/pig/data/pig_grep.data',
                          output='bos://tester01/sdk/output_pig/out1'
                          ),
                      'sdk-job-04'
                      ),
              BmrClient.step(
                      'Spark',
                      'Continue',
                      BmrClient.spark_step_properties(
                          'bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar',
                          '--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer',
                          'bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out'
                          ),
                      'sdk-job-05'
                      ),
              BmrClient.step(
                      'Streaming',
                      'Continue',
                      BmrClient.streaming_step_properties(
                          'bos://helloworld/input/install.log',
                          'bos://tester01/sdk/output_streaming/out1',
                          'cat',
                          'cat',
                          '-libjars testB.jar'
                          ),
                      'sdk-job-06',
                      [BmrClient.additional_file("bos://path/to/testA.jar", "testB.jar")]
                      )   				
          ]
          
          try:
              response = bmr_client.add_steps(cluster_id, steps)
              LOG.debug('add steps response: %s' % response)
          except BceHttpClientError as e:
              if isinstance(e.last_error, BceServerError):
                  LOG.error('add_steps failed. Response %s, code: %s, msg: %s'
                            % (e.last_error.status_code, e.last_error.code, e.last_error.message))
              else:
                  LOG.error('add_steps failed. Unknown exception: %s' % e)

          列出全部steps

          如下代码可以罗列出指定集群上的全部作业,用户可以通过指定max_keys来限制一次请求返回的最大作业数目:

          try:
              response = bmr_client.list_steps(cluster_id, max_keys=50)
              for step in response.steps:
                  LOG.debug('list step %s: %s' % (step.id, step))
          except BceHttpClientError as e:
              if isinstance(e.last_error, BceServerError):
                  LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
                            % (e.last_error.status_code, e.last_error.code, e.last_error.message))
              else:
                  LOG.error('list_steps failed. Unknown exception: %s' % e)

          同样,SDK也提供了分页查询的调用方式:

          try:
              page = 1
              next_marker = None
              max_keys = 5
              is_truncated = True
              while is_truncated:
                  response = bmr_client.list_steps(cluster_id, marker=next_marker, max_keys=max_keys)
                  LOG.debug('list %s steps on Page %s' % (len(response.steps), page))
                  page += 1
                  is_truncated = response.is_truncated
                  next_marker = response.next_marker
          except BceHttpClientError as e:
              if isinstance(e.last_error, BceServerError):
                  LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
                            % (e.last_error.status_code, e.last_error.code, e.last_error.message))
              else:
                  LOG.error('list_steps failed. Unknown exception: %s' % e)

          查询指定的step

          如下代码可以查看指定作业的信息:

          try:
              response = bmr_client.get_step(cluster_id, step_id)
              LOG.debug('describe step response: %s' % response)
          except BceHttpClientError as e:
              if isinstance(e.last_error, BceServerError):
                  LOG.error('get_step failed. Response %s, code: %s, msg: %s'
                            % (e.last_error.status_code, e.last_error.code, e.last_error.message))
              else:
                  LOG.error('get_step failed. Unknown exception: %s' % e)
          上一篇
          Cluster(集群)
          下一篇
          BmrClient