[转帖]使用MapReduce进行排序_Android, Python及开发编程讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Android, Python及开发编程讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 3715 | 回复: 0   主题: [转帖]使用MapReduce进行排序        下一篇 
Leon
注册用户
等级:少校
经验:1436
发帖:116
精华:7
注册:2013-1-4
状态:离线
发送短消息息给Leon 加好友    发送短消息息给Leon 发消息
发表于: IP:您无权察看 2013-1-6 10:10:42 | [全部帖] [楼主帖] 楼主

之前在工作中使用到过MapReduce的排序,当时对于这个平台的理解还比较浅显,选择的是一个最为简单的方式,就是只用一个Recude来做。因为Map之后到Reduce阶段,为了Merge的方便,MapReduce的实现会自己依据key值进行排序,这样得出的结果就是一个整体排序的结果。而如果使用超过一个Reduce任务的话,所得的结果是每个part内部有序,但是整体是需要进行merge才可以得到最终的全体有序的。今天读了《Hadoop权威指南》中的第8章,对使用Hadoop这一MapReduce的Java实现进行排序有所了解,在此进行简单的总结。

     首先我们来看一下Hadoop中内部Map和Reduce两个阶段所做的排序,可以使用下图来说明。北京联动北方科技有限公司

     对MapReduce或者Hadoop有所了解的人可能都知道,所谓对于key值的排序,其实是在Map阶段进行的,而Rduce阶段所做的工作是对各个Map任务的结果进行Merge工作,这样就能保证整体是有序的。如果想在使用多个Reduce任务的情况下保证结果有序,我们可以做的是在上图中的partition阶段进行控制,使分配到每个reduce task的数据块为数值区域独立的,即比如整体数据在0~50之间,划分为5个Reduce任务的话,可以0~10区间的数据到第一个Reduce Task,10~20之间的到第二个,以此类推。但是这样就存在一个问题,划分出的各个任务中的数据可能并不是均等的,这样某些Reduce Task处理了很多数据,而其他的处理了很少的数据。Hadoop提供了RandomSampler类(位于InputSampler类中)来进行随机取样,然后按照取样结果对值域进行划分。一个示例代码如下:

北京联动北方科技有限公司

 public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool {
      @Override
      public int run(String[] args) throws Exception {
            JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
            if (conf == null) {
                  return -1;
            }
            conf.setInputFormat(SequenceFileInputFormat.class);
            conf.setOutputKeyClass(IntWritable.class);
            conf.setOutputFormat(SequenceFileOutputFormat.class);
            SequenceFileOutputFormat.setCompressOutput(conf, true);
            SequenceFileOutputFormat
            .setOutputCompressorClass(conf, GzipCodec.class);
            SequenceFileOutputFormat.setOutputCompressionType(conf,
            CompressionType.BLOCK);
            conf.setPartitionerClass(TotalOrderPartitioner.class);
            InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
            0.1, 10000, 10);
            Path input = FileInputFormat.getInputPaths(conf)[0];
            input = input.makeQualified(input.getFileSystem(conf));
            Path partitionFile = new Path(input, "_partitions");
            TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
            InputSampler.writePartitionFile(conf, sampler);
            // Add to DistributedCache
            URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
            DistributedCache.addCacheFile(partitionUri, conf);
            DistributedCache.createSymlink(conf);
            JobClient.runJob(conf);
            return 0;
      }
      public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(
            new SortByTemperatureUsingTotalOrderPartitioner(), args);
            System.exit(exitCode);
      }
}


北京联动北方科技有限公司

    使用上述程序执行所得的结果会是多个划分,每个划分内部是有序的,而且第i个划分的key值会比i+1个划分的key值都要小。这样,就可以不需要进行再一步的merge,就可以得到整体的上有序结果。

    关于排序,一个更加有意思的应用是所谓的Secondary Sort,亦即在保证第一个key值有序的情况下,对第二个key值也要保证有序(可以是升序或者降序)。此处的一个实现方法是将这两个需要排序的部分都作为key值,使用IntPair进行存储,然后自己实现一个继承自WritableComparator的名为KeyComparator的用于比较的类,其代码如下:

北京联动北方科技有限公司

 public static class KeyComparator extends WritableComparator {
      protected KeyComparator() {
            super(IntPair.class, true);
      }
      @Override
      public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
            if (cmp != 0) {
                  return cmp;
            }
            return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); // reverse
      }
}


北京联动北方科技有限公司

     这里对于第二列是得到降序的结果,在conf设置的时候,可以使用conf.setOutputKeyComparatorClass(KeyComparator.class);语句进行设置。这样执行计算程序的话,会存在一个问题,因为将两个int型的值共同作为key值来处理,在Map阶段结束后进行Partition的划分的时候,就会同样依照这个总key值进行划分,我们想要两个值,比如(1900,20)和(1900,23)被放到同一个Reduce任务中进行处理就无法实现,于是我们需要实现自己的Partitioner接口,代码如下:

北京联动北方科技有限公司

 public static class FirstPartitioner implements
Partitioner<IntPair, NullWritable> {
      @Override
      public void configure(JobConf job) {
      }
      @Override
      public int getPartition(IntPair key, NullWritable value, int numPartitions) {
            return Math.abs(key.getFirst() * 127) % numPartitions;
      }
}


北京联动北方科技有限公司

     同样在配置过程中使用conf.setPartitionerClass(FirstPartitioner.class);语句进行设置。除此之外,需要进行控制的还有一个Reduce中的Group by操作,方法是实现一个GroupComparator类,其中的比较只使用第一个键值即可,代码如下:

北京联动北方科技有限公司

 public static class GroupComparator extends WritableComparator {
      protected GroupComparator() {
            super(IntPair.class, true);
      }
      @Override
      public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            return IntPair.compare(ip1.getFirst(), ip2.getFirst());
      }
}


北京联动北方科技有限公司

     需要设置的是conf.setOutputValueGroupingComparator(GroupComparator.class);。这样就可以实现Secondary Sort过程了。




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论