[转帖]自定义Hadoop Map/Reduce输入文件切割InputFormat_Hadoop,ERP及大数据讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Hadoop,ERP及大数据讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 3128 | 回复: 0   主题: [转帖]自定义Hadoop Map/Reduce输入文件切割InputFormat        下一篇 
huizai
注册用户
等级:少校
经验:933
发帖:83
精华:0
注册:2013-6-18
状态:离线
发送短消息息给huizai 加好友    发送短消息息给huizai 发消息
发表于: IP:您无权察看 2013-6-24 9:42:15 | [全部帖] [楼主帖] 楼主

Hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作 为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方 法。至于获得记录的方法是有不同的子类进行实现的。

那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。

但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。

又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相 同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相 对较为复杂了,但并不是不能实现。

1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符
1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:

public class FileInputFormatB extends FileInputFormat<LongWritable, Text> {
      @Override
      public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {
            return new SearchRecordReader("\b");
      }
      @Override
      protected boolean isSplitable(FileSystem fs, Path filename) {
            // 输入文件不分片
            return false;
      }
}


2)关键在于定义一个新的SearchRecordReader继承RecordReader,支持自定义的行分隔符,即一条记录的分隔符。标红的地方为与hadoop默认的LineRecordReader不同的地方。

public class IsearchRecordReader extends RecordReader<LongWritable, Text> {
      private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);
      private CompressionCodecFactory compressionCodecs = null;
      private long start;
      private long pos;
      private long end;
      private LineReader in;
      private int maxLineLength;
      private LongWritable key = null;
      private Text value = null;
      //行分隔符,即一条记录的分隔符
private byte[] separator = {'\b'};
      private int sepLength = 1;
      ‍ public IsearchRecordReader(){
      }
      public IsearchRecordReader(String seps){
            this.separator = seps.getBytes();
            sepLength = separator.length;
      }
      public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
            this.start = split.getStart();
            this.end = (this.start + split.getLength());
            Path file = split.getPath();
            this.compressionCodecs = new CompressionCodecFactory(job);
            CompressionCodec codec = this.compressionCodecs.getCodec(file);
            // open the file and seek to the start of the split
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            boolean skipFirstLine = false;
            if (codec != null) {
                  this.in = new LineReader(codec.createInputStream(fileIn), job);
                  this.end = Long.MAX_VALUE;
            } else {
            if (this.start != 0L) {
                  skipFirstLine = true;
                  this.start -= sepLength;
                  fileIn.seek(this.start);
            }
            this.in = new LineReader(fileIn, job);
      }
      if (skipFirstLine) { // skip first line and re-establish "start".
            int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
            if(newSize > 0){
                  start += newSize;
            }
      }
      this.pos = this.start;
}
public boolean nextKeyValue() throws IOException {
      if (this.key == null) {
            this.key = new LongWritable();
      }
      this.key.set(this.pos);
      if (this.value == null) {
            this.value = new Text();
      }
      int newSize = 0;
      while (this.pos < this.end) {
            newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
            (int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));
            if (newSize == 0) {
                  break;
            }
            this.pos += newSize;
            if (newSize < this.maxLineLength) {
                  break;
            }
            LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
      }
      if (newSize == 0) {
            //读下一个buffer
            this.key = null;
            this.value = null;
            return false;
      }
      //读同一个buffer的下一个记录
      return true;
}
public LongWritable getCurrentKey() {
      return this.key;
}
public Text getCurrentValue() {
      return this.value;
}
public float getProgress() {
      if (this.start == this.end) {
            return 0.0F;
      }
      return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
}
public synchronized void close() throws IOException {
      if (this.in != null)
      this.in.close();
}
}


3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就 是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况, 这种情况一定要加以拼接处理。

public class LineReader {
      //回车键(hadoop默认)
      //private static final byte CR = 13;
      //换行符(hadoop默认)
      //private static final byte LF = 10;
      //按buffer进行文件读取
      private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
      private int bufferSize = DEFAULT_BUFFER_SIZE;
      private InputStream in;
      private byte[] buffer;
      private int bufferLength = 0;
      private int bufferPosn = 0;
      LineReader(InputStream in, int bufferSize) {
            this.bufferLength = 0;
            this.bufferPosn = 0;
            this.in = in;
            this.bufferSize = bufferSize;
            this.buffer = new byte[this.bufferSize];
      }
      public LineReader(InputStream in, Configuration conf) throws IOException {
            this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
      }
      public void close() throws IOException {
            in.close();
      }
      public int readLine(Text str, int maxLineLength) throws IOException {
            return readLine(str, maxLineLength, Integer.MAX_VALUE);
      }
      public int readLine(Text str) throws IOException {
            return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
      }
      //以下是需要改写的部分_start,核心代码
      public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
            str.clear();
            Text record = new Text();
            int txtLength = 0;
            long bytesConsumed = 0L;
            boolean newline = false;
            int sepPosn = 0;
            do {
                  //已经读到buffer的末尾了,读下一个buffer
                  if (this.bufferPosn >= this.bufferLength) {
                        bufferPosn = 0;
                        bufferLength = in.read(buffer);
                        //读到文件末尾了,则跳出,进行下一���文件的读取
                        if (bufferLength <= 0) {
                              break;
                        }
                  }
                  int startPosn = this.bufferPosn;
                  for (; bufferPosn < bufferLength; bufferPosn ++) {
                        //处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
                        if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
                              sepPosn = 0;
                        }
                        //遇到行分隔符的第一个字符
                        if (buffer[bufferPosn] == separator[sepPosn]) {
                              bufferPosn ++;
                              int i = 0;
                              //判断接下来的字符是否也是行分隔符中的字符
                              for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
                                    //buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
                                    if(bufferPosn + i >= bufferLength){
                                          bufferPosn += i - 1;
                                          break;
                                    }
                                    //一旦其中有一个字符不相同,就判定为不是分隔符
                                    if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
                                          sepPosn = 0;
                                          break;
                                    }
                              }
                              //的确遇到了行分隔符
                              if(sepPosn == sepLength){
                                    bufferPosn += i;
                                    newline = true;
                                    sepPosn = 0;
                                    break;
                              }
                        }
                  }
                  int readLength = this.bufferPosn - startPosn;
                  bytesConsumed += readLength;
                  //行分隔符不放入块中
                  //int appendLength = readLength - newlineLength;
                  if (readLength > maxLineLength - txtLength) {
                        readLength = maxLineLength - txtLength;
                  }
                  if (readLength > 0) {
                        record.append(this.buffer, startPosn, readLength);
                        txtLength += readLength;
                        //去掉记录的分隔符
                        if(newline){
                              str.set(record.getBytes(), 0, record.getLength() - sepLength);
                        }
                  }
            } while (!newline && (bytesConsumed < maxBytesToConsume));
            if (bytesConsumed > (long)Integer.MAX_VALUE) {
                  throw new IOException("Too many bytes before newline: " + bytesConsumed);
            }
            return (int) bytesConsumed;
      }
      //以下是需要改写的部分_end
      //以下是hadoop-core中LineReader的源码_start
      public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
            str.clear();
            int txtLength = 0;
            int newlineLength = 0;
            boolean prevCharCR = false;
            long bytesConsumed = 0L;
            do {
                  int startPosn = this.bufferPosn;
                  if (this.bufferPosn >= this.bufferLength) {
                        startPosn = this.bufferPosn = 0;
                        if (prevCharCR) bytesConsumed ++;
                        this.bufferLength = this.in.read(this.buffer);
                        if (this.bufferLength <= 0) break;
                  }
                  for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
                        if (this.buffer[this.bufferPosn] == LF) {
                              newlineLength = (prevCharCR) ? 2 : 1;
                              this.bufferPosn ++;
                              break;
                        }
                        if (prevCharCR) {
                              newlineLength = 1;
                              break;
                        }
                        prevCharCR = this.buffer[this.bufferPosn] == CR;
                  }
                  int readLength = this.bufferPosn - startPosn;
                  if ((prevCharCR) && (newlineLength == 0))
                  --readLength;
                  bytesConsumed += readLength;
                  int appendLength = readLength - newlineLength;
                  if (appendLength > maxLineLength - txtLength) {
                        appendLength = maxLineLength - txtLength;
                  }
                  if (appendLength > 0) {
                        str.append(this.buffer, startPosn, appendLength);
                  txtLength += appendLength; }
            }
            while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));
            if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);
            return (int)bytesConsumed;
      }
      //以下是hadoop-core中LineReader的源码_end
}


2、已经按主键key排好序了,并保证相同主键key一定是在一起的,假设每条记录的第一个字段为主键,那么如 果沿用上面的LineReader,需要在核心方法readLine中对前后两条记录的id进行equals判断,如果不同才进行split,如果相同继 续下一条记录的判断。代码就不再贴了,但需要注意的地方,依旧是前后两个buffer进行交接的时候,非常有可能一条记录被切成了两半,一半在前一个buffer中,一半在后一个buffer中。

这种方式的好处在于少去了reduce操作,会大大地提高效率,其实mapper的过程相当的快,费时的通常是reduce。




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论