/ 中存储网

Hadoop mr数据流

2013-06-18 00:00:00 来源:中存储网

/*
符注:
()内为数据;[]内为处理;
{}内为框架模块;
()数据若无说明则为在内存;
->本机数据流;=>网络数据流;~>分布式-本地读写数据流;
/**/为标注;
*/
(分布式源文件)~>{JobTracker分配到各TaskTracker本机上}=>
-------------------------------- @TaskTracker(Map) machine
(split)->[InputFormat]->
[RecordReader]迭代begin
	(k1,v1)->[map]->
	(k2,v2)->[partition]->
[RecordReader]迭代end->
(k2,v2内存集合1)->[sort,merge]->
(k2,v2内存排序集合)->
[combine]迭代begin/*若有partition则按其规则*/
	(k2,iter(v2))->[combine]->
	(k3,v3)->
[combine]迭代end->
(k3,iter(v3)本地文件)->
-------------------------------- @TaskTracker(Map) machine
[shuffle]=>
-------------------------------- @TaskTracker(Reduce) machine
(k3,iter(v3)来自各mapper的子集)->[sort,merge]->
(k3,iter(v3)来自各mapper的合集)->[reduce]->
(k4,v4)->[OutputFormat]~>
-------------------------------- @TaskTracker(Reduce) machine
(分布式结果文件)


======================================
总结mapreduce数据处理流程:
所谓分布式计算在hadoop的实现里可表达为:
1.基于hdfs分布式存储的各存储节点的map运算过程;
2.之后的在少量(甚至唯一)节点上的reduce运算过程;
3.以及连接map运算输出和reduce输入的shuffle过程;
下图表达在某datanode机器节点进行map和shuffle的过程:


InputFomat:从block到split到(k1,v1)
hdfs中文件是按照配置大小(默认64M)分block存储n份(一般为)到n个独立datanode节点的;
当要解析某分布式文件,要执行map任务的TaskTracker将读取存储于本机的相关block进行本地文件解析;
个人理解是一个block可以被拆分为多个split,每个split作为一个map tasktracker的输入(但如何切分还没搞清楚),我所涉及的所有mr测试中都是一个block对应一个map任务.
这个split可以通过FSDataInputStream输入流读取,这是一个本地文件读取操作.
此过程中可以编写自己的InputFormat进行自定义的读取,此类功能的核心是返回一个RecordReader,RR是具体解析文件逻辑实现类:
对于hadoop0.20及以前版本,对应hadoop core jar里的 org.apache.hadoop.mapred包,RR我理解是一种被动模式,从其接口函数命名看,主要实现如下函数:
createKey()
createValue()
getPos()
next(Text key, Text value)

这个RecordReader接口具体实现类的调用者,应是先调用createKey和createValue来实例化key,value对象(这里是为初始化自定义对象考虑的),然后再调用next(key,value)来填充这两个对象而得到解析的结果.
对于hadoop1.0及之后的版本,对应hadoop core jar里的 org.apache.hadoop.mapreduce包,RR是一种主动模式,主要实现如下函数:
getCurrentKey()
getCurrentValue()
nextKeyValue()
getPos()
isSplitable(JobContext context, Path file)

此接口调用者,应是直接调用nextKeyValue()来获取key value实例,然后通过两个getCurrent方法由调用者获取其对应实例;
无论是next(Text key, Text value),还是nextKeyValue(),split被RR解析为一条的(k1,v1)
MapClass.map():从(k1,v1)到(k2,v2)
不多说.

Partition.partition():给(k2,v2)盖章(指定reduce)
经过此函数数据流中的k,v是不变的,函数相当于给此k,v对盖个章,指定其要去的reduce.
值得注意的是,经过RR出来的(k1,v1)是先顺序经过map和partition,而非全过完map之后再全过partition.

所有(k2,v2)写入membuffer.
经过InputFormat中RR的next/nextKeyValue迭代,形成的系列(k2,v2)会被写入到内存的buffer中,此buffer大小通过io.sort.mb参数指定.
另为了控制资源,会有另一个进程来监控此buffer容量,当实际容量达到/超过此buffer某百分比时,将发生spill操作,此百分比通过io.sort.spill.percent参数设定.
此监控进程与写入独立,所以不会影响写入速度.

spill:mem buffer中的(k2,v2)落地为local disk的文件
当buffer实际容量超过门槛限制,或者split所有数据经RR next/nextKeyValue迭代完成时,均触发spill操作,所以每个map任务流中至少会有一次spill.
spill作用是将buffer里的kv对sort到本地磁盘文件.
sort:针对spill kv数据的排序
在spill时,sort会参考partition时给数据所盖的章,即reduce号,另在reduce号相同的情况下再按照k2排序.这个排序至少有两个目的:
1.使得在有可能的combine中进行同key的迭代变的很容易;
2.使得shuffle后的reduce大流程中基于同key合并变得相对容易;

combine:从(k2,iter(v2))到(k3,v3)
combine类建立的初衷是尽量减少spill发生的磁盘写操作的量(拿网上通用的说法就是本地的reduce,也确实是实现了Reduce的接口/父类)
combine基于业务规则处理key及同key的所有value.这(k2,iter(v2))经处理后,仅返回一个(k3,v3),减少了spill磁盘写入量.
combine很有意思,特别是当它跟partition联合来使用时,在此我在理解上了也颇费了翻功夫:
partition给(k2,v2)盖章,决定了这个(k2,v2)具体要到哪个reduce去.而其后的combine处理,我完全随便写逻辑来处理同key的(k2,iter(v2)),大家是否也有跟我一样的疑问:假设我在partition里是根据value的某种规则来决定reduce号,但到了combine中,某个(k2,v2)却要跟其他很多(k2,v2')进行处理,而其他(k2,v2')并不一定会在partition根据规则归到同一个reduce号上,那combine却笼统的返回了一个(k3,v3),那么这个(k3,v3)到底是会给哪个reduce呢?
我理解hadoop在这里的机制是,在spill时,首先按照partition的reduce号来将分到同reduce的(k2,v2)放到一起,在此前提下,再进行同key的sort,最终写入到一个与reduce号相关的spill file中(位于local disk).我还没有去看具体代码,暂时把这个理解写这里.
另对于这个问题,网上有文章(http://langyu.iteye.com/blog/992916)提及:"Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。"
这位作者文章很精彩,我很赞同以上的说法,但,实际上我认为运用spill的这种特性,可以完成二级(或者多级)统计的功能(见后面一个测试的例子).

merge and sort:将多个map的spill文件按找partition分配的reduce号归并
首先是按照reduce号进行分组,分组内将按照key(即k3)排序.

shuffle:洗牌,不如称之为摸牌
当一个TaskTracker执行完了map任务,他将完成状态汇报给JobTracker.这时(我认为)JobTracker会让分配reduce的TaskTracker来获取这个map生成的文件,每个TaskTracker获取它所执行reduce对应的那份.

merge and sort:reduce端的
当TaskTracker从各map机上取得属于自己的文件后,要执行merge和sort过程,即按照k3进行排序,这样有利于通过一次遍历就可以达到输入(k3,iter(v3))的效果.

reduce:从(k3,iter(v3))到(k4,v4)并写入至分布式文件系统
每个reduce会生成一个分布式文件,放置于任务目录下.





附:一个测试的例子:
...经过map的处理到partition,
partition的输入(k2,v2)其中k2的含义是书籍出版的年份m个(比如,1999/2010/2012/...),v2的含义是书的分类n个(比如,技术/文学/艺术/...)
partition的处理是根据value来映射到reduce,即每中书籍分类让一个reduce处理.
combine的输入(k2,iter(v2)),输出(k3,v3)其中k3=k2;v3=count(iter(v2));
reduce的输入是(k3,iter(v3)),输出(k4,v4),其中k4=k3;v4=sum(iter(v2));
整个mr执行完之后,按书籍分类输出n个文件,每个文件里根据年份输出该年份(该分类)的书籍总数;
达到做二级分类统计的目的