MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,MapReduce程序本质上是并行运行的,因此可以解决海量数据的计算问题.
MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段.每个阶段都以键值对作为输入和输出.用户只需要实现map()和reduce()两个函数即可实现分布
式计算.
执行步骤:
map任务处理:
1.读取输入文件内容,解析成键值对(key/value).对输入文件的每一行,解析成键值对(key/value).每一个键值对调用一次map函数
2.写自己的逻辑,对输入的键值对(key/value)处理,转换成新的键值对(key/value)输出.
3.对输出的键值对(key/value)进行分区.(partition)
4.对不同分区的数据,按照key进行排序,分组.相同的key/value放到一个集合中.(shuffle)
5.分组后的数据进行规约.(combiner,可选择的)
reduce任务处理:
1.对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点.
2.对多个map任务的输出进行合并,排序.写reduce函数自己的逻辑,对输入的key/value处理,转换成新的key/value输出.
3.把reduce的输出保存到文件中(写入到hdfs中).
MapReduce作业流程:
1.代码编写
2.作业配置(输入输出路径,reduce数量等等)
3.作业提交
3.1通过JobClient提交,与JobTracker通信得到一个jar的存储路径和JobId.
3.2检查输入输出的路径
3.3计算分片信息.
3.4将作于所需要的资源(jar,配置文件,计算所得的输入分片)赋值到以作业命名的HDFS上
3.5告知JobTracker作业准备执行.
4.作业初始化
当JobTracker接收到提交过来的作业后,会把次调用放入一个内部队列中,交由作业调度器进行调度,默认是(FIFO),并对其初始化.
初始化:创建一个表示正在运行作业的对象--分装任务和记录信息,以便跟踪任务的状态和进程.
为了创建任务列表,作业调度器首先从共享文件系统中获取已经计算好的输入分片信息.然后为每一个分片创建一个map任务,调度器创建相应数量的要运行的
reduce任务.此时,任务被指定ID.
5.任务分配
tasktracker运行一个简单的循环来定期发送"心跳"给JobTracker,心跳告知JobTracker,tasktracker是否还存活,同时指明tasktracker是否已经准备好运行新
的任务,如果是,JobTracker会分配给它一个任务.
6.任务执行
tasktracker拿到任务后
6.1会将所有的信息拷贝到本地(包括jar,代码,配置信息,分片信息等)
6.2tasktracker为任务新建一个本地工作目录,并把jar文件中的内容解压到这个目录下.
6.3tasktracker新建一个TaskRunner实例来运行该任务.TaskRunner会启动一个新的JVM来运行每个步骤.(防止其他软件影响到tasktracker,但是在不同的任
务之间重用JVM是有可能的.
7.进度和状态的更新
task会定期向tasktracker汇报执行情况,tasktracker会定期收集所集群上的所有task信息,并想JobTracker汇报.JobTracker会根据所有tasktracker汇报上来
的信息进行汇总
8.作业完成
JobTracker是在接收到最后一个任务完成后,才将任务标记为"成功".并将数据结果写入到HDFS上.
PS:JobTracker职能:负责接收用户提交的作业,负责启动,跟踪任务执行
tasktracker职能:负责执行任务
9.作业失败
9.1JobTracker失败,这是最为严重的一种任务失败,失败机制--它是一个单节点故障,因此,作业注定失败.(hadoop2.0解决了)
9.2tasktracker失败,tasktracker崩溃了会停止向jobt发送心跳信息,并且JobTracker会将tasktracker从等待的任务池中移除,将该任务转移到其他的地方执行.JobTracker会将tasktracker加入到黑名单.
9.3task失败,map或reduce运行失败,会向tasktracker抛出异常,任务挂起.
MapReduce启动流程:
start-mapred.sh --> hadoop-daemon.sh --> hadoop -->org.apache.hadoop.mapred.JobTracker
Jobtracker调用顺序:
main --> startTracker --> new JobTracker 在其构造方法中首先创建 一个调度器,接着创建一个RPC的server(interTrackerServer)
tasktracker会通过PRC接触与其通信, 然后调用offerService方法对外提供服务,在offerService方法中启动RPC server,初始化jobtracker,
调用taskScheduler的start方法 --> eagerTaskInitializationListener,调用start方法,接着调用jobInitManagerThread的start方法,
因为其是一个线程,会调用JobInitManager的run方法,随后jobInitQueue任务队列去取第一个任务,然后把它丢入线程池中,
再调用-->InitJob的run方法,再然后调用jobTracker的initJob方法--> JobInProgress的initTasks
--> maps = new TaskInProgress[numMapTasks]和reduces = new TaskInProgress[numReduceTasks];
TaskTracker调用顺序:
main --> new TaskTracker在其构造方法中调用了initialize方法,在initialize方法中调用RPC.waitForProxy,得到一个jobtracker的
代理对象,接着TaskTracker调用了本身的run方法,--> offerService方法 --> transmitHeartBeat返回值是(HeartbeatResponse)是
jobTracker的指令,在transmitHeartBeat方法中InterTrackerProtocol调用了heartbeat将tasktracker的状态
通过RPC机制发送给jobTracker,返回值就是JobTracker的指令,heartbeatResponse.getActions()得到具体的指令,然后判断指令
的具体类型,开始执行任务,addToTaskQueue启动类型的指令加入到队列当中,TaskLauncher又把任务加入到任务队列当中,
--> TaskLauncher的run方法 --> startNewTask方法 --> localizeJob下载资源 --> launchTaskForJob开始加载任务
--> launchTask --> runner.start()启动线程; --> TaskRunner调用run方法 --> launchJvmAndWait启动java child进程
--转自