MapReduce总结 _Android, Python及开发编程讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Android, Python及开发编程讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 2059 | 回复: 0   主题: MapReduce总结         下一篇 
神月
注册用户
等级:列兵
经验:98
发帖:3
精华:0
注册:2013-3-13
状态:离线
发送短消息息给神月 加好友    发送短消息息给神月 发消息
发表于: IP:您无权察看 2015-11-5 17:39:03 | [全部帖] [楼主帖] 楼主

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,MapReduce程序本质上是并行运行的,因此可以解决海量数据的计算问题.

MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段.每个阶段都以键值对作为输入和输出.用户只需要实现map()和reduce()两个函数即可实现分布

式计算.

执行步骤:

map任务处理:

1.读取输入文件内容,解析成键值对(key/value).对输入文件的每一行,解析成键值对(key/value).每一个键值对调用一次map函数

2.写自己的逻辑,对输入的键值对(key/value)处理,转换成新的键值对(key/value)输出.

3.对输出的键值对(key/value)进行分区.(partition)

4.对不同分区的数据,按照key进行排序,分组.相同的key/value放到一个集合中.(shuffle)

5.分组后的数据进行规约.(combiner,可选择的)

reduce任务处理: 

1.对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点.

2.对多个map任务的输出进行合并,排序.写reduce函数自己的逻辑,对输入的key/value处理,转换成新的key/value输出.

3.把reduce的输出保存到文件中(写入到hdfs中).

MapReduce作业流程:

1.代码编写

2.作业配置(输入输出路径,reduce数量等等)

3.作业提交

  3.1通过JobClient提交,与JobTracker通信得到一个jar的存储路径和JobId.

  3.2检查输入输出的路径

  3.3计算分片信息.

  3.4将作于所需要的资源(jar,配置文件,计算所得的输入分片)赋值到以作业命名的HDFS上

  3.5告知JobTracker作业准备执行.

4.作业初始化

当JobTracker接收到提交过来的作业后,会把次调用放入一个内部队列中,交由作业调度器进行调度,默认是(FIFO),并对其初始化.

初始化:创建一个表示正在运行作业的对象--分装任务和记录信息,以便跟踪任务的状态和进程.

为了创建任务列表,作业调度器首先从共享文件系统中获取已经计算好的输入分片信息.然后为每一个分片创建一个map任务,调度器创建相应数量的要运行的

reduce任务.此时,任务被指定ID.

5.任务分配

tasktracker运行一个简单的循环来定期发送"心跳"给JobTracker,心跳告知JobTracker,tasktracker是否还存活,同时指明tasktracker是否已经准备好运行新

的任务,如果是,JobTracker会分配给它一个任务.

6.任务执行

  tasktracker拿到任务后

  6.1会将所有的信息拷贝到本地(包括jar,代码,配置信息,分片信息等)

  6.2tasktracker为任务新建一个本地工作目录,并把jar文件中的内容解压到这个目录下.

  6.3tasktracker新建一个TaskRunner实例来运行该任务.TaskRunner会启动一个新的JVM来运行每个步骤.(防止其他软件影响到tasktracker,但是在不同的任 

务之间重用JVM是有可能的.

7.进度和状态的更新

task会定期向tasktracker汇报执行情况,tasktracker会定期收集所集群上的所有task信息,并想JobTracker汇报.JobTracker会根据所有tasktracker汇报上来

的信息进行汇总

8.作业完成

JobTracker是在接收到最后一个任务完成后,才将任务标记为"成功".并将数据结果写入到HDFS上.

PS:JobTracker职能:负责接收用户提交的作业,负责启动,跟踪任务执行

   tasktracker职能:负责执行任务

9.作业失败

  9.1JobTracker失败,这是最为严重的一种任务失败,失败机制--它是一个单节点故障,因此,作业注定失败.(hadoop2.0解决了)

  9.2tasktracker失败,tasktracker崩溃了会停止向jobt发送心跳信息,并且JobTracker会将tasktracker从等待的任务池中移除,将该任务转移到其他的地方执行.JobTracker会将tasktracker加入到黑名单.

  9.3task失败,map或reduce运行失败,会向tasktracker抛出异常,任务挂起.

 

MapReduce启动流程:

             start-mapred.sh  --> hadoop-daemon.sh --> hadoop -->org.apache.hadoop.mapred.JobTracker

Jobtracker调用顺序:

             main --> startTracker  --> new JobTracker 在其构造方法中首先创建 一个调度器,接着创建一个RPC的server(interTrackerServer)

             tasktracker会通过PRC接触与其通信, 然后调用offerService方法对外提供服务,在offerService方法中启动RPC server,初始化jobtracker,

             调用taskScheduler的start方法 --> eagerTaskInitializationListener,调用start方法,接着调用jobInitManagerThread的start方法,

             因为其是一个线程,会调用JobInitManager的run方法,随后jobInitQueue任务队列去取第一个任务,然后把它丢入线程池中,

             再调用-->InitJob的run方法,再然后调用jobTracker的initJob方法--> JobInProgress的initTasks 

             --> maps = new TaskInProgress[numMapTasks]和reduces = new TaskInProgress[numReduceTasks];

TaskTracker调用顺序:

             main --> new TaskTracker在其构造方法中调用了initialize方法,在initialize方法中调用RPC.waitForProxy,得到一个jobtracker的

            代理对象,接着TaskTracker调用了本身的run方法,--> offerService方法  --> transmitHeartBeat返回值是(HeartbeatResponse)是   

            jobTracker的指令,在transmitHeartBeat方法中InterTrackerProtocol调用了heartbeat将tasktracker的状态

            通过RPC机制发送给jobTracker,返回值就是JobTracker的指令,heartbeatResponse.getActions()得到具体的指令,然后判断指令

            的具体类型,开始执行任务,addToTaskQueue启动类型的指令加入到队列当中,TaskLauncher又把任务加入到任务队列当中,

            -->  TaskLauncher的run方法 --> startNewTask方法  --> localizeJob下载资源 --> launchTaskForJob开始加载任务

            --> launchTask  --> runner.start()启动线程; --> TaskRunner调用run方法 --> launchJvmAndWait启动java child进程

 

--转自



赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论