沉沙
2018-10-08
来源 :
阅读 1466
评论 0
摘要:本篇教程探讨了大数据技术学习笔记(9)Mapreduce的高级特性(B),希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术学习笔记(9)Mapreduce的高级特性(B),希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
二.排序
对象排序
员工数据 Employee.java ----> 作为key2输出
需求:按照部门和薪水升序排列
Employee.java
复制代码
package mr.object;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
//Ա����: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements WritableComparable<Employee>{
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;// @Override
// public int compareTo(Employee o) {
// // 一个列的排序规则:按照员工的薪水排序
// if(this.sal >= o.getSal()){
// return 1;
// }else{
// return -1;
// }
// }
@Override
public int compareTo(Employee o) {
// 两个列排序规则:部门
if(this.deptno > o.getDeptno()){
return 1;
}else if(this.deptno < o.getDeptno()){
return -1;
}
//薪水
if(this.sal >= o.getSal()){
return 1;
}else{
return -1;
}
}
@Override
public String toString() {
return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
@Override
public void readFields(DataInput input) throws IOException {
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
复制代码
EmployeeSortMapper.java
复制代码
package mr.object;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// Key2
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1,Context context)
throws IOException, InterruptedException {
// ���ݣ�7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//员工姓名
e.setEname(words[1]);
//job
e.setJob(words[2]);
//经理号:注意 有些员工没有经理
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//null
e.setMgr(0);
}
//入职日期
e.setHiredate(words[4]);
//薪水
e.setSal(Integer.parseInt(words[5]));
//奖金
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//无奖金
e.setComm(0);
}
//部门
e.setDeptno(Integer.parseInt(words[7]));
//输出key2
context.write(e, NullWritable.get());
}
}
复制代码
EmployeeSortMain.java
复制代码
package mr.object;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeSortMain {
public static void main(String[] args) throws Exception {
// job = map + reduce
Job job = Job.getInstance(new Configuration());
//ָ任务入口
job.setJarByClass(EmployeeSortMain.class);
job.setMapperClass(EmployeeSortMapper.class);
job.setMapOutputKeyClass(Employee.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(EmployeeSortReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Employee.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
复制代码
结果:
三.分区分区:Partition:
根据Map的输出(k2 v2)进行分区
默认情况下,MapReduce只有一个分区(只有一个输出文件)
作用:提高查询的效率
建立分区:根据条件的不同
需求:按照员工的部门号进行分区,相同部门号的员工输出到一个分区中
EmpPartionMapper.java
复制代码
package demo.partion;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//k2部门号 v2 员工对象
public class EmpPartionMapper extends Mapper<LongWritable, Text, LongWritable, Employee> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// ���ݣ�7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
String[] words = data.split(",");
Employee e = new Employee();
e.setEmpno(Integer.parseInt(words[0]));
e.setEname(words[1]);
e.setJob(words[2]);
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//null
e.setMgr(0);
}
e.setHiredate(words[4]);
e.setSal(Integer.parseInt(words[5]));
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
e.setComm(0);
}
e.setDeptno(Integer.parseInt(words[7]));
//输出 k2是部门号 v2是员工对象
context.write(new LongWritable(e.getDeptno()), e);
}
}
复制代码
EmpPartionReducer.java
复制代码
package demo.partion;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
//把相同部门的员工输出到HDFS K4: 部门号 v4: 员工对象
public class EmpPartionReducer extends Reducer<LongWritable, Employee, LongWritable, Employee>{
@Override
protected void reduce(LongWritable k3, Iterable<Employee> v3, Context context)
throws IOException, InterruptedException {
for (Employee e : v3) {
context.write(k3, e);
}
}
}
复制代码
MyEmployeePartitioner.java
复制代码
package demo.partion;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//分区规则:根据Map的输出建立分区 k2 v2
public class MyEmployeePartitioner extends Partitioner<LongWritable, Employee>{
/*
* numParts 分区个数
*/
@Override
public int getPartition(LongWritable k2, Employee v2, int numParts) {
//分区规则
int deptno = v2.getDeptno();
if (deptno == 10) {
//放入一号分区
return 1%numParts;
}else if (deptno == 20) {
//放入二号分区
return 2%numParts;
}else {
//放入0号分区
return 3%numParts;
}
}
}
复制代码
EmpPartitionMain.java
复制代码
package demo.partion;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmpPartitionMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmpPartitionMain.class);
job.setMapperClass(EmpPartionMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Employee.class);
//指定分区规则
job.setPartitionerClass(MyEmployeePartitioner.class);
//指定分区的个数
job.setNumReduceTasks(3);
job.setReducerClass(EmpPartionReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Employee.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
复制代码
结果:建立了三个分区
一号分区:
二号分区:
0号分区:
四.合并:Combiner
1、MapReduce的任务中,可以没有Combiner
2、Combiner是一种特殊的Reducer,是在Mapper端先做一次Reducer,用来减少Map的输出,从而提高的效率。
3、注意事项:
(1)有些情况,不能使用Combiner -----> 求平均值
(2)引入Combiner,不引人Combiner,一定不能改变原理的逻辑。(MapReduce编程案例:实现倒排索引)
WordCountMapper.java
复制代码
package demo.combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//取出数据: I love beijing
String data = v1.toString();
//分词
String[] words = data.split(" ");
//输出K2:单词 V2:记一次数
for (String w : words) {
context.write(new Text(w), new LongWritable(1));
}
}
}
复制代码
WordCountReducer.java
复制代码
package demo.combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text k3, Iterable<LongWritable> v3,
Context context) throws IOException, InterruptedException {
long total = 0;
for (LongWritable l : v3) {
total = total + l.get();
}
//输出K4 V4
context.write(k3, new LongWritable(total));
}
}
复制代码
WordCountMain.java:增加Combiner
复制代码
package demo.combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class);
//Mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);//指定k2
job.setMapOutputValueClass(LongWritable.class);//指定v2
//Combiner
job.setCombinerClass(WordCountReducer.class);
//ָreducer
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//ָmapper/reducer路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//ִ执行任务
job.waitForCompletion(true);
}
}
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号