[Hadoop] MapReduce操作HBase_Hadoop,ERP及大数据讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Hadoop,ERP及大数据讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 3006 | 回复: 0   主题: [Hadoop] MapReduce操作HBase        上一篇   下一篇 
云云飞飞
注册用户
等级:列兵
经验:108
发帖:2
精华:0
注册:2015-8-27
状态:离线
发送短消息息给云云飞飞 加好友    发送短消息息给云云飞飞 发消息
发表于: IP:您无权察看 2015-12-3 17:32:32 | [全部帖] [楼主帖] 楼主

运行HBase时常会遇到个错误,我就有这样的经历。 

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

检查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 42, server = 41)


如果是这个错误,说明RPC协议不一致所造成的,解决方法:将hbase/lib目录下的hadoop-core的jar文件删除,将hadoop目录下的hadoop-0.20.2-core.jar拷贝

到hbase/lib下面,然后重新启动hbase即可。第二种错误是:没有启动hadoop,先启用hadoop,再启用hbase。


在Eclipse开发中,需要加入hadoop所有的jar包以及HBase二个jar包(hbase,zooKooper)。


建表,通过HBaseAdmin类中的create静态方法来创建表。

HTable类是操作表,例如,静态方法put可以插入数据,该类初始化时可以传递一个行键,静态方法getScanner()可以获得某一列上的所有数据,返回Result

类,Result类中有个静态方法getFamilyMap()可以获得以列名为key,值为value,这刚好与hadoop中map结果是一样的。


package test;
import java.io.IOException;import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;

public class Htable {    /**
 * @param args     */
    public static void main(String[] args) throws IOException {        
    // TODO Auto-generated method stub
        Configuration hbaseConf = HBaseConfiguration.create();
        HBaseAdmin admin = new HBaseAdmin(hbaseConf);
        HTableDescriptor htableDescriptor = new HTableDescriptor("table"
                .getBytes());  //set the name of table
        htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters
        admin.createTable(htableDescriptor); //create a table 
        HTable table = new HTable(hbaseConf, "table"); //get instance of table.
        for (int i = 0; i < 3; i++) {   //for is number of rows
            Put putRow = new Put(("row" + i).getBytes()); //the ith row
            putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"
                    .getBytes());  //set the name of column and value.
            putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"
                    .getBytes());
            putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"
                    .getBytes());
            table.put(putRow);
        }        for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters 
            for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result
                String column = new String(entry.getKey());
                String value = new String(entry.getValue());
                System.out.println(column+","+value);
            }
        }
        admin.disableTable("table".getBytes()); //disable the table
        admin.deleteTable("table".getBytes());  //drop the tbale    }
}


以上代码不难看懂。下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。

现在有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。


package test;
import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperClass extends Mapper<LongWritable,Text,Text,Text>{        
public void map(LongWritable key,Text value,Context context)thorws IOException{
            String[] items = value.toString().split(" ");
            String k = items[0];
            String v = items[1];         
            context.write(new Text(k), new Text(v));
    }

}


Reduce类,主要是将键值传到HBase表中


package test;
import java.io.IOException;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{   
 public void reduce(Text key,Iterable<Text> values,Context context){
        String k = key.toString();
        StringBuffer str=null;        
        for(Text value: values){
            str.append(value.toString());
        }
        String v = new String(str); 
        Put putrow = new Put(k.getBytes());
        putrow.add("fam1".getBytes(), "name".getBytes(), v.getBytes());     
    }
}


由上面可知ReducerClass继承TableReduce,在hadoop里面ReducerClass继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里

面是读出的Key类型是ImmutableBytesWritable。

Map,Reduce,以及Job的配置分离,比较清晰,mahout也是采用这种构架。


package test;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
public class Driver extends Configured implements Tool{

    @Override    public static void run(String[] arg0) throws Exception {        // TODO Auto-generated method stub
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  
        
        Job job = new Job(conf,"Hbase");
        job.setJarByClass(TxtHbase.class);
        
        Path in = new Path(arg0[0]);
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, in);
        
        job.setMapperClass(MapperClass.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class, job);
        
       job.waitForCompletion(true);
    }
    
}


Driver中job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); 来执行

reduce类。

主函数


package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class TxtHbase {    
public static void main(String [] args) throws Exception{

        Driver.run(new Configuration(),new THDriver(),args); 

    } 
}


 

读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了。


package test;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MapperClass extends MapReduceBase implements
        TableMap<Text, Text> {    
        static final String NAME = "GetDataFromHbaseTest";    
        private Configuration conf;    
        public void map(ImmutableBytesWritable row, Result values,
            OutputCollector<Text, Text> output, Reporter reporter)            
            throws IOException {
        StringBuilder sb = new StringBuilder();        
        for (Entry<byte[], byte[]> value : values.getFamilyMap(                
        "fam1".getBytes()).entrySet()) {
            String cell = value.getValue().toString();            
            if (cell != null) {
                sb.append(new String(value.getKey())).append(new String(cell));
            }
        }
        output.collect(new Text(row.get()), new Text(sb.toString()));
    }


要实现这个方法 initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extends org.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extends org.apache.hadoop.io.Writable> outputValueClass, org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)。


package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;

public class Driver extends Configured implements Tool{

    @Override    
public static void run(String[] arg0) throws Exception {        
// TODO Auto-generated method stub
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  
        Job job = new Job(conf,"Hbase");
        job.setJarByClass(TxtHbase.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        TableMapReduceUtilinitTableMapperJob("table", args0[0],MapperClass.class, job); 
        job.waitForCompletion(true); }
 }


主函数


package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
public class TxtHbase {    
public static void main(String [] args) throws Exception{

        Driver.run(new Configuration(),new THDriver(),args); 

    } 
}





                                                                                                                         --转自

该贴被蜀山战纪编辑于2015-12-4 9:47:54



赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论