[Hadoop] MapReduce怎样读取关系数据库的数据_Hadoop,ERP及大数据讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Hadoop,ERP及大数据讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 2907 | 回复: 0   主题: [Hadoop] MapReduce怎样读取关系数据库的数据        上一篇   下一篇 
可爱的龙八
注册用户
等级:中尉
经验:466
发帖:10
精华:2
注册:2015-10-9
状态:离线
发送短消息息给可爱的龙八 加好友    发送短消息息给可爱的龙八 发消息
发表于: IP:您无权察看 2015-12-4 9:32:53 | [全部帖] [楼主帖] 楼主

MapReduce怎样读取关系数据库的数据

介绍一下MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多。以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等。现在工作了,由于公司考虑成本的问题,所以都用成开源的ubuntu,mysql等,本人现在支持开源,特别像hadoop这样的东西,真的太好了,不但可以使用软件,也可以读到源代码,话不说多了。


hadoop技术推出一个曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持MapReduce访问关系数据库,如:mysql,MySQL、PostgreSQL、Oracle 等几个数据库系统。


1. 从Mysql读出数据 


Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。


在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。


MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理。 

 

DBInputFormat 类中包含以下三个内置类


protected class DBRecordReader implementsRecordReader<LongWritable, T>:用来从一张数据库表中读取一条条元组记录。 


public static class NullDBWritable implements DBWritable,Writable:主要用来实现 DBWritable 接口。DBWritable接口要实现二个函数,第一是

write,第二是readFileds,这二个函数都不难理解,一个是写,一个是读出所有字段。原型如下:



public void write(PreparedStatement statement) throwsSQLException; 
public void readFields(ResultSet resultSet) throws SQLException;

protected static class DBInputSplit implements InputSplit:主要用来描述输入元组集合的范围,包括 start 和 end 两个属性,start 用来表示第一条记录的索引号,end 表示最后一条记录的索引号. 


下面对怎样使用 DBInputFormat 读取数据库记录进行详细的介绍,具体步骤如下:


DBConfiguration.configureDB (JobConf job, StringdriverClass, String dbUrl, String userName, String passwd)函数,配置JDBC 驱动,数据源,以及数据库访问的用户名和密码。MySQL 数据库的 JDBC 的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。


DBInputFormat.setInput(JobConf job, Class<?extends DBWritable> inputClass, String tableName, String conditions,String orderBy, String... 

fieldNames),这个方法的参数很容易看懂,inputClass实现DBWritable接口。,string tableName表名, conditions表示查询的条件,orderby表示排序的条

件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载。etInput(JobConf job, Class<?extends DBWritable> inputClass, 

String inputQuery, StringinputCountQuery)。


编写MapReduce函数,包括Mapper 类、Reducer 类、输入输出文件格式等,然后调用JobClient.runJob(conf)。 


上面讲了理论,下面举个例子:假设 MySQL 数据库中有数据库student,假设数据库中的字段有“id”,“name”,“gender","number"。


第一步:要实现DBwrite和write数据接口。代码如下:


public class StudentRecord implements Writable, DBWritable{ 
 int id; 
 String name; 
 String gender; 
 String number; 

 public void readFields(DataInput in) throws IOException { 
 // TODO Auto-generated method stub 
 this.id = in.readInt(); 
 this.gender = Text.readString(in); 
 this.name = in.readString(); 
 this.number = in.readString(); 

 public void write(DataOutput out) throws IOException { 
 // TODO Auto-generated method stub 
 out.writeInt(this.id); 
 Text.writeString(out,this.name); 
 out.writeInt(this.gender); 
 out.writeInt(this.number); 

 public void readFields(ResultSet result) throws SQLException { 
 // TODO Auto-generated method stub 
 this.id = result.getInt(1); 
 this.name = result.getString(2); 
 this.gender = result.getString(3); 
 this.number = result.getString(4); 

 public void write(PreparedStatement stmt) throws SQLException{ 
 // TODO Auto-generated method stub 
 stmt.setInt(1, this.id); 
 stmt.setString(2, this.name); 
 stmt.setString(3, this.gender); 
 stmt.setString(4, this.number); 

 public String toString() { 
 // TODO Auto-generated method stub 
 return new String(this.name + " " + this.gender + " " +this.number);

第二步:实现Map和Reduce类


public class DBAccessMapper extends MapReduceBase implements 
 Mapper<LongWritable, TeacherRecord, LongWritable, Text> { 

 public void map(LongWritable key, TeacherRecord value, 
 OutputCollector<LongWritable, Text> collector, Reporter reporter) 
 throws IOException { 
 // TODO Auto-generated method stub 
 new collector.collect(new LongWritable(value.id), new Text(value 
 .toString()));

第三步:主函数的实现,函数



public class DBAccessReader { 
 public static void main(String[] args) throws IOException { 
 JobConf conf = new JobConf(DBAccessReader.class); 
 conf.setOutputKeyClass(LongWritable.class); 
 conf.setOutputValueClass(Text.class); 
 conf.setInputFormat(DBInputFormat.class); 
 FileOutputFormat.setOutputPath(conf, new Path("dboutput")); 
 DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", 
 "jdbc:mysql://localhost/school","root","123456"); 
 String [] fields = {"id", "name", "gender", "number"}; 
 DBInputFormat.setInput(conf, StudentRecord.class,"Student",null "id", fields); 
 conf.setMapperClass(DBAccessMapper.class); 
 conf.setReducerClass(IdentityReducer.class); 
 JobClient.runJob(conf);

2.写数据 


往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。


DBOutFormat: 提供数据库写入接口。 

DBRecordWriter:提供向数据库中写入的数据记录的接口。 

DBConfiguration:提供数据库配置和创建链接的接口。 

DBOutFormat提供一个静态方法setOutput(job,String table,String ...filedNames);


该方法的参数很容易看懂。假设要插入一个Student的数据,其代码为


public static void main(String[] args) throws IOException 
{   
Configuration conf = new Configuration();   
JobConf conf = new JobConf();   
conf.setOutputFormat(DBOutputFormat.class);   
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",   
"jdbc:mysql://localhost/school","root","123456");   
DBOutputFormat.setOutput(conf,"Student", 
456, 
"liqizhou", "man", "20004154578");   
JobClient.runJob(conf);




该贴被蜀山战纪编辑于2015-12-4 9:47:20



赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论