在MapReduce的计数器是用于收集关于 MapReduce 工作的统计信息的机制。这个信息在MapReduce的作业处理的问题的诊断是很有用的。 计数器类似于将在 map 或 reduce 在代码日志信息中。
通常情况下,这些计数器在一个程序(map 或 reduce)中定义,当一个特定事件或条件(特定于该计数器)发生执行期间递增。计数器是一个很好的应用来从输入数据集跟踪有效和无效的记录。
有两种类型的计数器:
1.%uA0Hadoop%uA0内置计数器:%uA0有一些内置计数器存在每个作业中。下面是内置计数器组:
- MapReduce任务计数器%uA0- 收集任务的具体信息(例如,输入记录的数量)在它的执行期间。
- 文件系统计数器%uA0- 收集信息像由一个任务读取或写入的字节数
- FileInputFormat计数器%uA0- 收集通过FileInputFormat读取的字节数的信息
- FileOutputFormat计数器%uA0- 收集的字节数量的信息通过 FileOutputFormat 写入
- Job%uA0计数器-%uA0这些计数器使用 JobTracker。它们收集统计数据包括如,任务发起了作业的数量。
2. 用户定义的计数器
除了内置的计数器,用户可以定义自己的计数器,通过使用编程语言提供了类似的功能。 例如,在 Java 的枚举用于定义用户定义的计数器。
一个MapClass例子使用计数器计算缺失和无效值的数量:
%uA0
|
publicstaticclassMapClass
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0extendsMapReduceBase
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0implementsMapper<LongWritable, Text, Text, Text>
{
%uA0%uA0%uA0%uA0staticenumSalesCounters { MISSING, INVALID }
%uA0%uA0%uA0%uA0publicvoidmap ( LongWritable key, Text value,
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0OutputCollector<Text, Text> output,
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0Reporter reporter)%uA0throwsIOException
%uA0%uA0%uA0%uA0{
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0//Input string is split using &apos,&apos and stored in &aposfields&apos array
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0String fields[] = value.toString().split(",", -20)
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0//Value at 4th index is country. It is stored in &aposcountry&apos variable
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0String country = fields[4]
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0//Value at 8th index is sales data. It is stored in &apossales&apos variable
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0String sales = fields[8]
%uA0%uA0%uA0%uA0%uA0%uA0
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0if(country.length() ==%uA00) {
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0reporter.incrCounter(SalesCounters.MISSING,%uA01)
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0}%uA0elseif(sales.startsWith("\"")) {
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0reporter.incrCounter(SalesCounters.INVALID,%uA01)
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0}%uA0else{
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0output.collect(newText(country),%uA0newText(sales +%uA0",1"))
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0}
%uA0%uA0%uA0%uA0}
}
|
上面的代码片段显示在 Map Reduce 实现计数器的示例。
在这里,SalesCounters是用“枚举”定义的计数器。它被用来计算 MISSING 和 INVALID 的输入记录。
在代码段中,如果 “country” 字段的长度为零那么它的值丢失,因此相应的计数器 SalesCounters.MISSING 递增。
接下来,如果 “sales” 字段开头是符号 &apos&apos ,则记录被视为无效。这通过递增计数器 SalesCounters.INVALID 来表示。
MapReduce 连接
连接两个大的数据集可以使用 MapReduce Join 来实现。然而,这个过程需要编写大量的代码来执行实际的连接操作。
连接两个数据集开始是通过比较每个数据集的大小。如果因为相比其他数据集一个数据集小,那么小数据集被分布到集群中的每个数据节点。一旦分散,无论是 Mapper 或 Reducer 使用更小的数据集进行查找匹配的大型数据集的记录,然后结合这些记录,形成输出记录。
这取决于在实际连接进行的地方,这个连接分为:
1. 映射端连接 -%uA0当该联接是由映射器执行的,它称为映射端链接。在这种类型中,联结前的数据由映射函数实际来消耗的处理。它是强制性的,输入到每个映射是在分区中的形式,并且是按排序顺序。另外,必须有一个相等数目的分区,它必须由连接键进行排序。
2. Reduce端连接-%uA0当连接是通过减速器进行的,称为reduce端连接。没有必要在此连接有数据集中在以结构化形式(或分区)。
在这里,映射端的处理发出连接这两个表的关键字和对应的元组。作为该处理的效果,所有的元组相同连接键都落在相同的 reducer,然后使用相同的连接键连接记录。
整体处理流程示于下图。