本教程中的代码分为 3 个部分:
SalesMapper类的说明
在本节中,我们将了解 SalesMapper 类的实现。
我们首先指定类的包名称。%uA0SalesCountry%uA0就是这个示例中使用的包名。请注意编译的输出,SalesMapper.class%uA0将进入目录并命名这个软件包名称:SalesCountry.
其次,我们导入库软件包。
以下快照显示实现 SalesMapper 类%uA0
代码解释:
1. SalesMapper 类定义
public class SalesMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {...}
每一个 mapper 类必须从 MapReduceBase 类进行扩展,它必须实现 Mapper 接口。
2. 定义 &aposmap&apos 函数
1
2
3
4
|
publicvoidmap(LongWritable key,
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter)%uA0throwsIOException
|
Mapper类的主要部分是接受四个参数的 “map()” 方法。
每次调用%uA0&aposmap()&apos%uA0方法, 一个键值%uA0key-value%uA0对 (&aposkey&apos%uA0和%uA0&aposvalue&apos%uA0在代码里) 被传递。
&aposmap()&apos%uA0方法开始被接受拆分输入文本作为一个参数,并使用分词来拆分这些行成词。
1
2
|
String valueString = value.toString()
String[] SingleCountryData = valueString.split(",")
|
这里,“,” 被用作分隔符。
在这之后,使用记录在数组%uA0%uA0&aposSingleCountryData&apos%uA0中的第七索引,其值为%uA0&apos1&apos.
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0output.collect(new Text(SingleCountryData[7]), one)
我们在选择第7索引记录,因为我们需要的国家数据,它位于数组%uA0&aposSingleCountryData&apos 的第七索引。
请注意,我们输入的数据是下面的格式 (Country%uA0在索引的位置为:7, %uA00 是开始的索引)-
Transaction_date,Product,Price,Payment_Type,Name,City,State,Country,Account_Created,Last_Login,Latitude,Longitude
mapper的输出使用的是%uA0&aposOutputCollector&apos%uA0的%uA0&aposcollect()&apos 方法的键值对.
SalesCountryReducer 类的说明
在本节中,我们将了解 SalesCountryReducer 类的实现。
1. 我们首先为类指定包的名称。SalesCountry 是包的名称。请注意编译的输出,%uA0SalesCountryReducer.class%uA0将进入命名这个软件包名称目录:%uA0SalesCountry.
其次,我们导入库软件包。
以下快照显示实现 SalesCountryReducer 类
代码解释:
1. SalesCountryReducer 类定义 -
public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
此处,前两个数据类型,%uA0&aposText&apos%uA0和%uA0&aposIntWritable&apos%uA0是输入键值的数据类型到reducer。
映射器的输出的形式<CountryName1, 1>, <CountryName2, 1>.映射器的输出被输入到reducer。所以,以配合其数据类型,%uA0Text%uA0和%uA0IntWritable%uA0数据在这里输入被使用。
最后两个数据类型,&aposText&apos 和 &aposIntWritable&apos 是由 reducer 的键 - 值对的形式生成的输出的数据类型。
每个 reducer 类必须从MapReduceBase类进行扩展,它必须实现 Reducer 接口。
2. Defining &aposreduce&apos function-
1
2
3
|
publicvoidreduce( Text t_key,
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0Iterator<IntWritable> values,%uA0%uA0OutputCollector<Text,IntWritable> output,
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0Reporter reporter)%uA0throwsIOException {
|
输入到 reduce() 方法是在具有多个值的列表中选择一个键。
例如,在我们的示例中,这将是 -
<United Arab Emirates, 1>, <United Arab Emirates, 1>, <United Arab Emirates, 1>,<United Arab Emirates, 1>, <United Arab Emirates, 1>, <United Arab Emirates, 1>.
这赋予 reducer 作为%uA0<United Arab Emirates, {1,1,1,1,1,1}>
因此,接受这种形式参数,前两个数据类型的使用,即 Text 和%uA0Iterator<IntWritable>.%uA0Text是一个数据类型的键 和%uA0Iterator<IntWritable>为对于键的值的列表的数据类型。
接下来的参数的类型是%uA0OutputCollector<Text,IntWritable>%uA0它收集 reducer 阶段的输出。
reduce()%uA0方法开始通过复制键值和初始化频率计数为0。
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0Text key = t_key
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0int frequencyForCountry = 0
然后,使用 “while” 循环,我们通过与键关联的值列表循环,并通过总结所有计算的值。
1
2
3
4
5
6
|
while(values.hasNext()) {
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0// replace type of value with the actual type of our value
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0IntWritable value = (IntWritable) values.next()
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0frequencyForCountry += value.get()
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0
%uA0%uA0%uA0%uA0%uA0%uA0%uA0}
|
现在,结果中的键得到的频率计数输出到收集器。
下面的代码执行这个 -
%uA0%uA0%uA0%uA0%uA0%uA0%uA0%uA0output.collect(key, new IntWritable(frequencyForCountry))
SalesCountryDriver类的说明
在本节中,我们将了解 SalesCountryDriver 类实现。
1. 我们首先为类指定包的名称。%uA0SalesCountry%uA0是这里使用的包名。请注意编译的输出,%uA0SalesCountryDriver.class%uA0将进入命名这个包名称的目录:%uA0SalesCountry.
这里一行指定是包名称后面的代码是导入库软件包。
2. 定义一个用于创建一个新的客户端工作,配置 Mapper及Reducer 类对象驱动程序类。
该驱动程序类负责设置我们的 MapReduce 作业在 Hadoop 运行。 在这个类中,我们指定作业名称,输入/输出,mapper 和 reducer 类名称的数据类型。
3. 在下面的代码片段中,我们设置这是用来输入数据集消费和生产输出,分别输入和输出目录。
arg[0]%uA0和%uA0arg[1]%uA0是通过 MapReduce 的实际操作,也就是赋予在命令行参数执行命令,
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. 触发我们的作业
下面的代码开始执行 MapReduce 作业
try{ // Run the job JobClient.runJob(job_conf) } catch(Exception e) { e.printStackTrace() }