沉沙
2018-10-08
来源 :
阅读 1867
评论 0
摘要:本篇教程探讨了大数据技术学习笔记(8)Mapreduce的高级特性(A),希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术学习笔记(8)Mapreduce的高级特性(A),希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
一.序列化
类似于Java的序列化:将对象——>文件
如果一个类实现了Serializable接口,这个类的对象就可以输出为文件
同理,如果一个类实现了的Hadoop的序列化机制(接口:Writable),这个类的对象就可以作为输入和输出的值
例子:使用序列化 求每个部门的工资总额
数据:在map阶段输出k2部门号 v2是Employee对象
reduce阶段:k4部门号 v3.getSal()得到薪水求和——>v4
Employee.java:封装的员工属性
复制代码
package saltotal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//定义员工的属性: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements Writable{
private int empno;//员工号
private String ename;//员工姓名
private String job;//ְ职位
private int mgr;//经理的员工号
private String hiredate;//入职日期
private int sal;//月薪
private int comm;//奖金
private int deptno;// 部门号
@Override
public String toString() {
return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
}
@Override
public void write(DataOutput output) throws IOException {
// 代表序列化过程:输出
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
@Override
public void readFields(DataInput input) throws IOException {
// 代表反序列化:输入
//注意:序列化和反序列化的顺序要一致
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
复制代码
EmployeeMapper.java
复制代码
package saltotal;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import saltotal.Employee;
//k2 部门号 v2 员工对象
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee>{
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 数据:MARTIN,SALEsMAN,7698,1981/9/28,1250,1400,30
String data = v1.toString();
//分词
String[] words = data.split(",");
//创建员工的对象
Employee e = new Employee();
//设置员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//经理号:有些没有
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//空值设0
e.setMgr(0);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金:有的没有
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
e.setComm(0);
}
//部门
e.setDeptno(Integer.parseInt(words[7]));
//输出 部门号 员工对象
context.write(new IntWritable(e.getDeptno()), e);
}
}
复制代码
SalaryTotalReducer.java
复制代码
package saltotal;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import saltotal.Employee;
// k3 部门号 v3员工对象 k4部门号 v4 工资总额
public class SalaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable>{
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
//对v3求和
int total = 0;
for (Employee e : v3) {
total = total + e.getSal();
}
//输出
context.write(k3, new IntWritable(total));
}
}
复制代码
SalaryTotalMain.java
复制代码
package saltotal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
//创建一个job = map + reduce
Job job = Job.getInstance(new Configuration());
//ָ指定任务的入口
job.setJarByClass(SalaryTotalMain.class);
//ָ指定任务的Mapper和输出的数据类型k2 v2
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);
//ָ指定任务的Reducer和输出的数据类型k4 v4
job.setReducerClass(SalaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//ָ指定输入输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
复制代码
输出jar文件,传到Linux上temp文件夹下,然后执行任务:
hadoop jar temp/s3.jar /scott/emp.csv /output/day0301/s3
二.排序
1.数字的排序
默认:按照key2进行升序排序
现在HDFS上有一个文件,里面的数据如下:
开发MapReduce程序进行排序:
NumberMapper.java
复制代码
package mr.number;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NumberMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数字:10
String data = value1.toString().trim();
//输出:把数字作为k2
context.write(new LongWritable(Long.parseLong(data)), NullWritable.get());
}
}
复制代码
NumberMain.java
复制代码
package mr.number;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NumberMain {
public static void main(String[] args) throws Exception {
// 创建一个job = map + reduce
Job job = Job.getInstance(new Configuration());
//ָ指定任务入口
job.setJarByClass(NumberMain.class);
//ָ指定mapper和输出的数据类型:k2 v2
job.setMapperClass(NumberMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
//job.setSortComparatorClass(MyNumberComparator.class);
//ָ指定输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
复制代码
执行任务后看到结果:
如果要改变默认的排序规则,需要创建一个自己的比较器
定义一个降序比较器类 MyNumberComparator.java
复制代码
package mr.number;
import org.apache.hadoop.io.LongWritable;
//自己定义的比较器
public class MyNumberComparator extends LongWritable.Comparator{
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// 使用降序排序
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
复制代码
将NumberMain.java的这句话放开:
job.setSortComparatorClass(MyNumberComparator.class);
然后重新打包执行任务之后可看到如下结果:
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号