[转帖]adoop Streaming框架使用_Hadoop,ERP及大数据讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Hadoop,ERP及大数据讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 4824 | 回复: 0   主题: [转帖]adoop Streaming框架使用        下一篇 
刘伟
注册用户
等级:少校
经验:938
发帖:82
精华:0
注册:2013-6-24
状态:离线
发送短消息息给刘伟 加好友    发送短消息息给刘伟 发消息
发表于: IP:您无权察看 2013-6-27 14:44:58 | [全部帖] [楼主帖] 楼主

Hadoop Streaming框架使用(一)

Streaming简介 

Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。因此可以说对于hadoop的扩展性意义重大,今天简单说一下。

Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用MapReduce Java接口获取key/value对输入,创建一个新的进程启动包装的用户程序,将数据通过管道传递给包装的用户程序处理,然后调用MapReduce Java接口将用户程序的输出切分成key/value对输出。 

Streaming优点

1 开发效率高,便于移植

只要按照标准输入输出格式进行编程,就可以满足hadoop要求。因此单机程序稍加改动就可以在集群上进行使用。 同样便于测试

只要按照 cat input | mapper | sort | reducer > output 进行单机测试即可。

如果单机测试通过,大多数情况是可以在集群上成功运行的,只要控制好内存就好了。

    2 提高程序效率

有些程序对内存要求较高,如果用java控制内存毕竟不如C/C++。

Streaming不足

    1 Hadoop Streaming默认只能处理文本数据,无法直接对二进制数据进行处理 

    2 Streaming中的mapper和reducer默认只能向标准输出写数据,不能方便地处理多路输出 

具体参数介绍

-input    <path>  输入数据路径

-output   <path>  输出数据路径

-mapper  <cmd|JavaClassName>  mapper可���行程序或Java类

-reducer  <cmd|JavaClassName>  reducer可执行程序或Java类

-file            <file>        Optional 分发本地文件

-cacheFile       <file>        Optional 分发HDFS文件

-cacheArchive   <file>         Optional 分发HDFS压缩文件

-numReduceTasks  <num>    Optional reduce任务个数

-jobconf | -D NAME=VALUE    Optional   作业配置参数

-combiner <JavaClassName>   Optional  Combiner Java类

-partitioner <JavaClassName>  Optional  Partitioner Java类

-inputformat <JavaClassName> Optional  InputFormat Java类

-outputformat <JavaClassName>Optional  OutputFormat Java类

-inputreader <spec>            Optional  InputReader配置

-cmdenv   <n>=<v>           Optional  传给mapper和reducer的环境变量

-mapdebug <path>             Optional  mapper失败时运行的debug程序

-reducedebug <path>           Optional  reducer失败时运行的debug程序

-verbose                      Optional  详细输出模式

 下面是对各个参数的详细说明:

l -input <path>:指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入。

l -output <path>:指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次��

l -mapper:指定mapper可执行程序或Java类,必须指定且唯一。

l -reducer:指定reducer可执行程序或Java类,必须指定且唯一。

l -file, -cacheFile, -cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件。

l -numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出。

-jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml。特别建议用-jobconf mapred.job.name='My Job Name'设置作业名,使用-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级,使用-jobconf mapred.job.map.capacity=M设置同时最多运行M个map任务,使用-jobconf mapred.job.reduce.capacity=N设置同时最多运行N个reduce任务。

常见的作业配置参数如下表所示: 

mapred.job.name         作业名

mapred.job.priority     作业优先级

mapred.job.map.capacity   最多同时运行map任务数

mapred.job.reduce.capacity 最多同时运行reduce任务数

hadoop.job.ugi             作业执行权限

mapred.map.tasks           map任务个数

mapred.reduce.tasks        reduce任务个数

mapred.job.groups         作业可运行的计算节点分组

mapred.task.timeout       任务没有响应(输入输出)的最大时间

mapred.compress.map.output     map的输出是否压缩

mapred.map.output.compression.codec    map的输出压缩方式

mapred.output.compress        reduce的输出是否压缩

mapred.output.compression.codec     reduce的输出压缩方式

stream.map.output.field.separator    map输出分隔符

 l -combiner:指定combiner Java类,对应的Java类文件打包成jar文件后用-file分发。

l -partitioner:指定partitioner Java类,Streaming提供了一些实用的partitioner实现,参考KeyBasedFiledPartitoner和IntHashPartitioner。

l -inputformat, -outputformat:指定inputformat和outputformat Java类,用于读取输入数据和写入输出数据,分别要实现InputFormat和OutputFormat接口。如果不指定,默认使用TextInputFormat和TextOutputFormat。

l -cmdenv NAME=VALUE:给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。

l -mapdebug, -reducedebug:分别指定mapper和reducer程序失败时运行的debug程序。

l -verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试。

Hadoop Streaming框架使用(二)

上一篇文章介绍了Streaming的各种参数,本文具体介绍使用方法。

提交hadoop任务示例: 

$HADOOP_HOME/bin/hadoop streaming \
-input /user/test/input -output /user/test/output \
-mapper “mymapper.sh” -reducer “myreducer.sh” \
-file/home/work/mymapper.sh \
-file /home/work/myreducer.sh \
-jobconf mapred.job.name=”file-demo”


上面的命令提交了一个hadoop任务,输出和输入分别为 /user/test/output 和/user/test/input。 map程序为 mymapper.sh,reduce程序为myreducer.sh。这里需要注意一定要将这两��文件用-file分发到集群的节点上。最后一行指定了任务的名字。

还有一些较为复杂的使用,比如需要指定任务个数等,可以使用

-jobconf mapred.job.map.capacity=m -jobconf mapred.job.reduce.capacity=n


上面的命令设置最多同时运行m个map任务,n个reduce任务,如果m或n为0或者没有指定,则对应的capacity没有限制,默认配置就是0没有限制。建议在运行作业时都设置map和reduce capacity,防止作业占用过多资源。 

当然,这里只是简单介绍了最基本的用法,hadoop streaming还有很多高级使用方法,可一些很强大的排序指定功能,这里不再过多介绍,有需要的朋友可以给我留言进行询问,只要我遇到过的问题一定给出解决方案。如果运行时出现错误,可以参见我的另一篇文章——hadoop错误码

Hadoop Streaming框架使用(三)

  前两篇文章介绍了Hadoop Streaming框架的使用方法。由于篇幅所限,并没有介绍其中的高级使用方法,但是有一些用法还是相当常见的。今天对一些高级用法进行一个简单的说明,希望能给大家一些启发。

  1 使用cacheFile分发文件

  如果文件(如字典文件)存放在HDFS中,希望计算时在每个计算节点上将文件当作本地文件处理,,可以使用-cacheFile hdfs://host:port/path/to/file#linkname选项在计算节点缓存文件,Streaming程序通过./linkname访问文件。

  例如:

hadoop = `which hadoop`
$hadoop streaming \
-input /user/test/input -output /user/test/output \
-mapper mymapper.sh -reducer myreducer.sh \
-file /home/work/mymapper.sh \
-file /home/work/myreducer.sh \
-cacheFile hdfs://namenode:port/user/test/dict.data#dictlink \
-jobconf mapred.job.name=”cache-file-demo”


  mymapper.sh和myreducer.sh可以通过./dictlink直接访问字典文件hdfs://user/test/dict.data,而且是从本地读取文件。

  2 用cacheArchive分发压缩包

  有时要分发的文件有一定的目录结构,可以先将整个目录打包,然后整体进行上传。使用-cacheArchive hdfs://host:port/path/to/archivefile#linkname分发压缩包。

例如在本地有一个目录为app,里面有mapper.pl, reducer.pl, dict/dict.txt这些子目录和文件,mapper.pl和reducer.pl要读取./dict/dict.txt文件,希望在任务执行时不需要修改程序和目录结构, 可以按照下面的方式分发app目录:

$ tar app.tar.gz –C app .  #本地打包
$ $HADOOP_HOME/bin/hadoop fs –put app.tar.gz /user/test/app.tar.gz   #包上传到HDFS
$ $HADOOP_HOME/bin/hadoop streaming \
-input /user/test/input -output /user/test/output \
-mapper “perl app/mapper.pl” -reducer “perl app/reducer.pl” \
-cacheArchive hdfs://namenode:port/user/test/ app.tar.gz #app \
-jobconf mapred.job.name=”cache-archive-demo”


首先将本地app目录中的所有文件和目录打包压缩,然后上传到HDFS的/user/test/app.tar.gz,启动streaming任务时使用-cacheArchive选项将app.tar.gz分发到计算节点并解压到app目录,然后在当前工作目录创建到app目录的链接,-mapper选项指定app/mapper.pl为mapper程序,-reducer选项指定app/reducer.pl为reducer程序,它们都可以读取./dict/dict.txt文件。本地打包时要进入目录app而不是在app的上层目录打包,否则要通过app/app/mapper.pl才能访问到mapper.pl文件。

hadoop支持zip, jar, tar.gz格式的压缩包,由于Java解压zip压缩包时会丢失文件权限信息而且遇到中文文件名会出错,所见建议采用tar.gz压缩包。

三种文件分发方式的区别:-file将客户端本地文件打成jar包上传到HDFS然后分发到计算节点,-cacheFile将HDFS文件分发到计算节点,-cacheArchive将HDFS压缩文件分发到计算节点并解压。

3输出数据分割

默认情况下Streaming框架将map输出的每一行第一个”\t”之前的部分作为key,之后的部分作为value,key\tvalue又作为reduce的输入。可以用-D stream.map.output.field.separator改变map输出中key和value的分隔符,用-D stream.num.map.output.key.fields设置分隔符的位置,该位置之前的部分作为key,之后的部分作为value。如下所示,其中-D stream.map. output.field.separator=:指定使用冒号”:”将map输出的一行分隔为key/value,-D stream.num.map.output.key.fields=2指定在第二个冒号处进行分隔,也就是第二个冒号之前的作为key,之后的作为value。如果没有冒号或冒号少于两个,则key为整行,value为空。 

$HADOOP_HOME/bin/hadoop streaming \
-D stream.map.output.field.separator=: \
-D stream.num.map.output.key.fields=2 \
-input /user/test/input -output /user/test/output \
-mapper mymapper.sh -reducer myreducer.sh \
-file /home/work/mymapper.sh \
-file /home/work/myreducer.sh \
-jobconf mapred.job.name=”output-sep-demo”


与map类似,对于reduce的输出,同样也可以用-D stream.reduce.output.field.separator和-D stream.num.reduce.output.key.fields定制key/value分隔方式。

4 二次排序

  KeyFieldBasedPartitioner是Hadoop库中的一个实用Partitioner,配置相应的参数就可以使用,通过KeyFieldBasedPartitioner可以方便地实现二次排序。 

$HADOOP_HOME/bin/hadoop streaming \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=. \
-D num.key.fields.for.partition=2 \
-input /user/test/input -output /user/test/output \
-mapper “mymapper.sh” -reducer “ myreducer.sh” \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-file /home/work/mymapper.sh \
-file /home/work/myreducer.sh \
-jobconf mapred.job.name=”key-partition-demo”


  其中-Dstream.map.output.field.separator=.和-D stream.num.map.output.key.fields=4与上面的定制输出数据分隔方式意义相同,指定map的输出行第4个英文句号”.”之前为key,后面为value。-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner指定使用KeyFieldBasedPartitioner,-D map.output.key.field.separator=.指定key的内部用英文句号”.”分隔,-D num.key.fields.for.partition=2指定将key分隔出来的前两个部分而不是整个key用于Partitioner做partition。

  以上就是我个人认为hadoop streaming中比较常用的技巧,希望对大家有所帮助,同时也多多补充。

关于最后一点的补充:

 我们知道,一个典型的Map-Reduce过程包括:Input->Map->Patition->Reduce->Output。Pation负责把Map任务输出的中间结果按key分发给不同的Reduce任务进行处理。Hadoop 提供了一个非常实用的partitioner类KeyFieldBasedPartitioner,通过配置相应的参数就可以使用。通过KeyFieldBasedPartitioner可以方便地实现二次排序。 

使用方法: 

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


一般配合: 

      -D map.output.key.field.separator及-D num.key.fields.for.partition使用。 

map.output.key.field.separator指定key内部的分隔符 

      num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个key

示例: 

1. 编写map程序mapper.sh;reduce程序reducer.sh; 测试数据test.txt

[html] view plaincopy
mapper.sh:
#!/bin/sh
cat
reducer.sh:
#!/bin/sh
sort


test.txt内容:  

1,2,1,1,1
1,2,2,1,1
1,3,1,1,1
1,3,2,1,1
1,3,3,1,1
1,2,3,1,1
1,3,1,1,1
1,3,2,1,1
1,3,3,1,1


2. 测试数据test.txt放入hdfs,运行map-reduce程序

[html] view plaincopy
$ hadoop streaming /
-D stream.reduce.output.field.separator=, /
-D stream.num.reduce.output.key.fields=4 /
-D map.output.key.field.separator=, /
-D num.key.fields.for.partition=2 /
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner /
-input /app/test/test.txt  /
-output /app/test/test_result /
-mapper ./mapper.sh  /
-reducer ./reducer.sh /
-file mapper.sh /
-file reducer.sh /
-jobconf mapre.job.name="sep_test"
$ hadoop fs –cat /app/test/test_result/part-00003
1,2,1,1     1
1,2,2,1     1
1,2,3,1     1
$ hadoop fs –cat /app/test/test_result/part-00004
1,3,1,1     1
1,3,1,1     1
1,3,2,1     1
1,3,2,1     1
1,3,3,1     1
1,3,3,1     1


通过这种方式,就做到前4个字段是key,但是通过前两个字段进行partition的目的

该贴被刘伟编辑于2013-6-27 14:50:51




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论