Hadoop-Streaming
Hadoop Streaming简介
本文以分析Web日志统计每日请求量为例,介绍如何在百度智能云平台使用Hadoop Streaming。
在BMR集群中,您可以使用python、shell、C++等任何您熟悉的编程语言开发Hadoop Streaming作业。Hadoop Streaming是Hadoop提供的编程工具,允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,它将Mapper和Reducer文件封装成MapReduce作业并提交运行,让您无须打包即可快速实现MapReduce功能,满足复杂的业务需求。
程序准备
Mapper.py程序:
#!/usr/bin/env python
import sys
import re
separator = "\\s{1}"
ipAddr = "(\\S+)"
remoteUser = "(.*?)"
timeLocal = "\\[(.*?)\\]"
request = "(.*?)"
status = "(\\d{3})"
bodyBytesSent = "(\\S+)"
httpReferer = "(.*?)"
httpCookie = "(.*?)"
httpUserAgent = "(.*?)"
requestTime = "(\\S+)"
host = "(\\S+)"
msec = "(\\S+)"
p = ipAddr + separator + "-" + separator + timeLocal\
+ separator + request + separator + status + separator\
+ bodyBytesSent + separator + httpReferer + separator + httpCookie\
+ separator + remoteUser + separator + httpUserAgent + separator\
+ requestTime + separator + host + separator + msec
for line in sys.stdin:
line = line.strip()
m = re.match(p, line)
if m is not None:
print '%s\t%s' % (m.group(2).split(":")[0], 1)
Reducer.py程序:
#!/usr/bin/env python
import sys
from operator import itemgetter
sum_result = {}
for line in sys.stdin:
line = line.strip()
key, value = line.split()
try:
value = int(value)
sum_result[key] = sum_result.get(key, 0) + value
except ValueError:
pass
sorted_sum_result = sorted(sum_result.items(), key=itemgetter(0))
for key, value in sorted_sum_result:
print '%s\t%s'% (key, value)
集群准备
- 准备数据,请参考数据准备;
- 百度智能云环境准备;
-
登录控制台,选择“产品服务->MapReduce BMR”,点击“创建集群”,进入集群创建页,并做如下配置:
- 设置集群名称
- 设置管理员密码
- 关闭日志开关
- 选择集群版本“BMR1.6.0”
- 选择集群类型“hadoop”
- 请保持集群的其他默认配置不变,点击“完成”可在集群列表页查看已创建的集群,当集群状态由“初始化中”变为“空闲中”时,集群创建成功。
运行作业
- 在“产品服务>MapReduce>MapReduce-作业列表”页中,点击“创建作业”,进入创建作业页。
-
配置Streaming作业参数:
- 作业类型:选择“Streaming作业”;
- 作业名称:输入作业名称,长度不可超过255个字符;
- Mapper:若使用您自己的代码,请上传程序至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,路径如下:
华北-北京区域的BMR集群对应的样例程序路径:
bos://bmr-public-bj/sample/streaming-1.0-mapper.py
华南-广州区域的BMR集群对应的样例程序路径:bos://bmr-public-gz/sample/streaming-1.0-mapper.py
- Reducer:若使用您自己的代码,请上传程序至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,路径如下:
华北-北京区域的BMR集群对应的样例程序路径:
bos://bmr-public-bj/sample/streaming-1.0-reducer.py
华南-广州区域的BMR集群对应的样例程序路径:bos://bmr-public-gz/sample/streaming-1.0-reducer.py
- BOS输入地址:
bos://bmr-public-bj/data/log/accesslog-1k.log
- BOS输出地址:输出路径必须具有写权限且该路径不能已存在,
bos://{your-bucket}/output
- 失败后操作:继续;
- 应用程序参数:无。
- 在“集群适配”区,选择适配的集群。
- 点击“完成”,则作业创建成功;运行中的作业状态会由“等待中”更新为“运行中”,当作业运行完毕后,状态会更新为“已完成”,即可查看到结果了。
查看结果
您可以到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:
用户到bos://{your-bucket}/output路径下查看输出,如果使用系统提供的输入数据和程序,可以打开输出结果看到如下内容:
03/Oct/2015 139
04/Oct/2015 375
05/Oct/2015 372
06/Oct/2015 114
分布式缓存导入数据
分布式缓存是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到运行Map或Reduce任务的各个节点上,并缓存到本地,供用户程序读取使用。在Map或Reduce时,如需访问通用数据,利用分布式缓存可显著提高访问效率。下面以Streaming作业为例说明如何使用Hadoop分布式缓存。
应用场景
分布式缓存通常用在当MapReduce需要依赖一些特定的版本库,或者MapReduce需要访问第三方库或文件时,若将这些库或文件安装到集群各个机器上通常比较麻烦,这时可以使用分布式缓存将其分发到集群各个节点上,程序运行完后,Hadoop自动将其删除。
例如下述的场景,运用分布式缓存会很好的解决问题:
- 分发字典文件:Map或者Reduce需要用到一些外部字典时,可使用分发字典文件,比如黑白名单、词表等。
- 自动化软件部署:MapReduce需依赖于特定版本的库时,可使用自动化软件部署,比如依赖于某个版本的PHP解释器,可以运用分布式缓存进行上传分发。
支持的文件类型
分布式缓存支持单个文件和打包文件,支持以下压缩格式:
* zip
* tgz
* tar.gz
* tar
* jar
操作步骤
- 您需要先将文件上传到BOS,可参考BOS上传Object;
-
提交作业时通过指定参数,为缓存文件指定一个软链接,在Mapper和Reducer中通过调用软连接即可访问实际的文件内容;
- -files:通常用来上传单个文件,可将指定文件分发到各个Task的工作目录下,不做其他处理;如需上传多个文件,文件之间用逗号隔开;
- -archives:通常用来上传打包文件,可将指定文件分发到各个Task的工作目录下,并对后缀名为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自动解压,默认情况下,解压后的内容存放到工作目录下名称为解压前文件名的目录中;如需上传多个打包文件,文件之间用逗号隔开。
文件类型 | 操作 | 说明 |
---|---|---|
添加单个文件 | 输入参数:-files+文件在BOS中的位置+井号(#)+文件在缓存中的软链接名称 | -files bos://bucket_name/file_name#file_name |
添加打包文件 | 输入参数:-archives+文件在BOS中的位置+井号(#)+文件在缓存中的软链接名称 | -archives bos://bucket_name/archive_name#archive_name |
例如:
- 对于某一个文件mydata.txt,将其上传到BOS文件系统,路径为bos://mybucket/mydata.txt,提交MapReduce Streaming作业时在参数中指定 -files bos://mybucket/mydata.txt#data,在MapTask和ReduceTask中就可以通过 ./data 来访问所需的文件了;
- 对于某一个文件libA.so,将其打包到mylib.tar.gz文件中,并上传到BOS文件系统,路径为bos://mybucket/mylib.tar.gz,提交MapReduce Streaming作业时在参数中指定 -archives bos://mybucket/mylib.tar.gz#libs,在MapTask和ReduceTask中就可以通过./libs/libA.so来访问所需的文件了。