大数据技术 MapReduce操作
沉沙
2018-10-11
来源 :
阅读 1885
评论 0
摘要:本篇教程探讨了大数据技术 MapReduce操作,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术 MapReduce操作,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
本章通过几个案例详细讲解MapReduce程序的编写与运行。
5.1 案例分析:单词计数
假如有这样一个例子,需要统计过去10年计算机论文中出现次数最多的几个单词,以分析当前的热点研究议题是什么。那么,在将论文样本收集完毕之后,接下来应该怎样做呢?
这一经典的单词计数案例可以采用MapReduce处理。MapReduce中已经自带了一个单词计数程序WordCount,如同Java中的经典程序“Hello World”一样,WordCount是MapReduce中统计单词出现次数的Java类,是MapReduce的入门程序。该程序要求计算出文件中单词的出现次数,并将输出结果输出到HDFS文件系统中,且按照单词的字母顺序进行排序,每个单词和其出现次数占一行,单词与出现次之间有间隔。
例如,输入内容如下的文件:
hello world
hello hadoop
bye hadoop
其符合要求的输出结果如下:
bye 1
hadoop 2
hello 2
world 1
下面进一步对上述WordCount程序进行分析。
1.设计思路
WordCount对于单词计数问题的解决方案很直接:先将文件内容切分成单词,然后将所有相同的单词聚集到一起,最后计算各个单词出现的次数,将计算结果排序输出。
根据MapReduce并行设计的原则可知:解决方案中的内容切分步骤和内容不相关,可以并行化处理,每个拿到原始数据的节点只需要将输入数据切分成单词就可以了,因此可由Mapping阶段完成单词切分的任务;另外,不同单词之间的频数也不相关,所以对相同单词频数的计算也可以并行化处理,将相同的单词交由同一节点来计算频数,然后输出最终结果,该任务可由Reduce阶段完成;至于将Mapping阶段的输出结果根据不同单词进行分组,然后再发送给Reduce节点的任务,可由MapReduce中的Shuffle阶段完成。
由于MapReduce中传递的数据都是键值对形式的,而且Shuffle的排序、聚集和分发也是按照键值进行的,因此,可将Map的输出结果设置为以单词作为键,1作为值的形式,表示某单词出现了1次(输入Map的数据则采用Hadoop默认的输入格式,即文件的一行作为值,行号作为键)。由于Reduce的输入是Map的输出聚集后的结果,因此格式为,也就是;Reduce的输出则可由设置成与Map输出相同的形式,只是后面的数值不再是固定的1,而是具体计算出的某单词所对应的频数。
WordCount程序的执行流程如下图:
2.程序源代码
WordCount程序类的源代码如下所示:
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount
{
public static void main(String[] args)
throws Exception
{
//初始化Configuration类
Configuration conf = new Configuration();
//通过实例化对象GenericOptionsParser可以获得程序执行所传入的参数
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [...] ");
System.exit(2);
}
//构建任务对象
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//设置输出结果的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
//设置需要统计的文件的输入路径
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
//设置统计结果的输出路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
//提交任务给Hadoop集群
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class IntSumReducer extends Reducer
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values, Reducer.Context context)
throws IOException, InterruptedException
{
//统计单词总数
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
//输出统计结果
context.write(key, this.result);
}
}
public static class TokenizerMapper extends Mapper
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢
快给朋友分享吧~
评论(0)