沉沙
2018-10-10
来源 :
阅读 1940
评论 0
摘要:本篇教程探讨了大数据技术全面解读 MapReduce详解(五)mapJoin、GroupingComparator与更多MR实例,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术全面解读 MapReduce详解(五)mapJoin、GroupingComparator与更多MR实例,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
一、数据倾斜分析——mapJoin
1.背景
接上一个day的Join算法,我们的解决join的方式是:在reduce端通过pid进行串接,这样的话:
--order
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0002,3
--product
P0001,小米5,1000,2
P0002,锤子T1,1000,3
例如订单中的小米5卖的比较好(截止博客时间,已经是米7将出的时候了。),这样的话大部分的数据都流向了P0001的这个reduce上,而P0002
的锤子的reduce确很轻松,这样,就产生了数据倾斜了!
更多的数据倾斜的介绍,参考://..net/u010039929/article/details/55044407
我们这里用的是比较简单的map端join!也就是不需要通过reduce来串接了。具体来说就是在map端就直接拼接好,不需要reduce来拼接;那我们就需要在map的阶段进行join连接,也就是map端就需要能够连接,那就是产品全表(字典表)需要在map端就有这个字典表,放在内存而不放在输入文件。这里
mapreduce给我们提供了一个很棒的解决方案:DistributedCache,了解这个,可以参考://..net/lzm1340458776/article/details/42971075
相关的分布式缓存的用法,参考://..net/qq1010885678/article/details/50751007
当然,首先应当查看的,应该是官方文档的介绍:点击查看
2.代码
package com.mr.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
/**
* mapper
*
* @author zcc ON 2018/2/5
**/
public class MapJoinMapper extends Mapper
Map
Text k = new Text();
/**
* 启动之前进行一些必要的初始化工作
* @param context 上下文
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt")));
String path = context.getCacheFiles()[0].getPath();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
String line;
while (StringUtils.isNotEmpty(line = br.readLine())) {
String[] fields = line.split(",");
// 将字典加载进入map
infoMap.put(fields[0], fields[1]);
}
// 关闭流
br.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String orderLine = value.toString();
// 切分订单信息
String[] fields = orderLine.split("\t");
String pName = infoMap.get(fields[1]);
k.set(orderLine + "\t" + pName);
// 写出去
context.write(k, NullWritable.get());
}
}
package com.mr.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
/**
* driver
*
* @author zcc ON 2018/2/5
**/
public class MapJoinDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 初始化Job
Job job = Job.getInstance(conf);
// job相关配置
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MapJoinMapper.class);
// 这里直接省略reduce阶段(map端已经完成)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:/input"));
FileOutputFormat.setOutputPath(job, new Path("F:/output"));
// 指定缓存
/* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
/* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
/* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
/* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录
job.addCacheFile(new URI("file:/F:/pd.txt"));
// 指定不需要reduce
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
更多,待补充
二、倒排索引建立
1.需求
需求:有大量的文本(文档、网页),需要建立搜索索引
// 对比与wordcount的不同
2.思路
把单词和文件作为key
3.代码
package com.mr.inverseindex;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* mapper
*
* @author zcc ON 2018/2/5
**/
public class InverseIndexMapper extends Mapper
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
// 得到文件名
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
for (String field : fields) {
k.set(field + "--" + fileName);
context.write(k, v);
}
}
}
package com.mr.inverseindex;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer
*
* @author zcc ON 2018/2/5
**/
public class InverseIndexReducer extends Reducer
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
v.set(count);
context.write(key, v);
}
}
package com.mr.inverseindex;
import com.mr.wordcount.WordCountDriver;
import com.mr.wordcount.WordCountMapper;
import com.mr.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* driver
*
* @author zcc ON 2018/2/5
**/
public class InverseIndexDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置本程序jar包本地位置
job.setJarByClass(InverseIndexDriver.class);
// 指定本业务job要使用的mapper/reducer业务类
job.setMapperClass(InverseIndexMapper.class);
job.setReducerClass(InverseIndexReducer.class);
// 指定map输出的数据类型(由于可插拔的序列化机制导致)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:/input"));
FileOutputFormat.setOutputPath(job, new Path("F:/output"));
// 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
// job.submit();
// 反馈集群信息
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
输出结果:
hello--a.txt 3
hello--b.txt 2
hello--c.txt 2
jerry--a.txt 1
jerry--b.txt 3
jerry--c.txt 1
tom--a.txt 2
tom--b.txt 1
tom--c.txt 1
可以看到,很多时候是会出现一次无法解决的情况,需要配合多次mapreduce配合!
再次在结果上执行mapreduce:
package cn.itcast.bigdata.mr.inverindex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class IndexStepTwo {
public static class IndexStepTwoMapper extends Mapper
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] files = line.split("--");
context.write(new Text(files[0]), new Text(files[1]));
}
}
public static class IndexStepTwoReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
StringBuffer sb = new StringBuffer();
for (Text text : values) {
sb.append(text.toString().replace("\t", "-->") + "\t");
}
context.write(key, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
if (args.length < 1 || args == null) {
args = new String[]{"D:/temp/out/part-r-00000", "D:/temp/out2"};
}
Configuration config = new Configuration();
Job job = Job.getInstance(config);
job.setMapperClass(IndexStepTwoMapper.class);
job.setReducerClass(IndexStepTwoReducer.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 1:0);
}
}
所以,总结诸多示例来说,使用Mapreduce的最大的关键是确定什么作为key,因为key相同的会归并到reduce进行处理,接下来的示例也都是这个思路!
三、寻找共同好友
1.需求
以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
2.分析
前面已经提到,遇到mapreduce问题,有时候一步不好解决,可以逐步逼近,多次求解!
#1.1求出哪些人都有好友c,也就是求出c是哪些人的共同好友
c --> a b d f g
#2.1得到有关c的共同好友的关系
a-b c
a-d c
a-g c
#1.2同理,求出d是哪些人的共同好友
d --> a e g h
#2.2同理,得到有关d的共同好友的关系
a-e d
a-g d
#3对第二步结果再进行mapreduce,则得到例如a-g c d这样的共同好友列表了!
3.阶段一代码:
package com.mr.fans;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* mapper
*
* @author zcc ON 2018/2/6
**/
public class SharedFriendsMapper extends Mapper
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(":");
// 切割用户和好友 A:B,C,D
String person = fields[0];
String[] friends = fields[1].split(",");
for (String friend : friends) {
k.set(friend);
v.set(person);
// 输出K-V对,<好友,用户>
context.write(k, v);
}
}
}
package com.mr.fans;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer
*
* @author zcc ON 2018/2/6
**/
public class SharedFriendsReducer extends Reducer
Text v = new Text();
@Override
protected void reduce(Text key, Iterable
StringBuffer sb = new StringBuffer();
for (Text value : values) {
sb.append(value).append("-");
}
// 去除最后一个字符(,)
sb.deleteCharAt(sb.length() - 1);
v.set(sb.toString());
context.write(key, v);
}
}
package com.mr.fans;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* driver
*
* @author zcc ON 2018/2/6
**/
public class SharedFriendsDriver {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置本程序jar包本地位置
job.setJarByClass(SharedFriendsDriver.class);
// 指定本业务job要使用的mapper/reducer业务类
job.setMapperClass(SharedFriendsMapper.class);
job.setReducerClass(SharedFriendsReducer.class);
// 指定map输出的数据类型(由于可插拔的序列化机制导致)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
FileInputFormat.setInputPaths(job, new Path("F:/input"));
FileOutputFormat.setOutputPath(job, new Path("F:/output"));
// 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
// job.submit();
// 反馈集群信息
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 :1);
}
}
阶段一结果:
A I-K-C-B-G-F-H-O-D
B A-F-J-E
C A-E-B-H-F-G-K
D G-C-K-A-L-F-E-H
E G-M-L-H-A-F-B-D
F L-M-D-C-G-A
G M
H O
I O-C
J O
K B
L D-E
M E-F
O A-H-I-J-F
这里提一下,经过查证,输出的k,v之间的默认分隔符是“\t”,我们也可以自己定义:
conf.set("mapred.textoutputformat.separator", ";");
更多自定义分隔符,参考://www.xuebuyuan.com/2132293.html
4.阶段二代码
package com.mr.fans;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
/**
* mapper step2
*
* @author zcc ON 2018/2/6
**/
public class SharedFriendsMapper2 extends Mapper
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// A B-C-D-E-F(默认分隔符\t)
String line = value.toString();
String[] fields = line.split("\t");
// 切割用户和好友
String friend = fields[0];
String[] persons = fields[1].split("-");
// 使用工具排序
Arrays.sort(persons);
for (int i = 0; i < persons.length - 2; i++) {
for (int j = i + 1; j < persons.length -1; j++) {
k.set(persons[i] + "-" + persons[j]);
v.set(friend);
// 将B-C A进行写出!,这样相同的X-Y的对会归集到同一个reduce
context.write(k, v);
}
}
}
}
package com.mr.fans;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer step2
*
* @author zcc ON 2018/2/6
**/
public class SharedFriendsReducer2 extends Reducer
Text v = new Text();
@Override
protected void reduce(Text key, Iterable
StringBuffer sb = new StringBuffer();
for (Text value : values) {
sb.append(value).append(",");
}
// 去除最后一个字符(,)
sb.deleteCharAt(sb.length() - 1);
v.set(sb.toString());
context.write(key, v);
}
}
package com.mr.fans;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* step2
*
* @author zcc ON 2018/2/6
**/
public class SharedFriendsDriver2 {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置本程序jar包本地位置
job.setJarByClass(SharedFriendsDriver2.class);
// 指定本业务job要使用的mapper/reducer业务类
job.setMapperClass(SharedFriendsMapper2.class);
job.setReducerClass(SharedFriendsReducer2.class);
// 指定map输出的数据类型(由于可插拔的序列化机制导致)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
FileInputFormat.setInputPaths(job, new Path("F:/output"));
FileOutputFormat.setOutputPath(job, new Path("F:/output2"));
// 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
// job.submit();
// 反馈集群信息
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 :1);
}
}
部分结果示例:
A-B C,E
A-C F,D
A-D E,F
A-E B,C,D
A-F C,D,B,E,O
A-G D,E,F,C
A-H E,O,C,D
A-I O
A-K D
A-L F,E
四、web日志预处理
1.需求:
对web访问日志中的各字段识别切分
去除日志中不合法的记录
根据KPI统计需求,生成各类访问请求过滤数据
部分日志示例:
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
总的来说,就是根据业务逻辑进行日志的清洗
2.代码
这里暂不深入了。给出核心代码:
public class WeBean {
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息
private boolean valid = true;// 判断数据是否合法
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.remote_addr);
sb.append("\001").append(this.remote_user);
sb.append("\001").append(this.time_local);
sb.append("\001").append(this.request);
sb.append("\001").append(this.status);
sb.append("\001").append(this.body_bytes_sent);
sb.append("\001").append(this.http_referer);
sb.append("\001").append(this.http_user_agent);
return sb.toString();
}
}
public class WeParser {
public static WeBean parser(String line) {
WeBean weBean = new WeBean();
String[] arr = line.split(" ");
if (arr.length > 11) {
weBean.setRemote_addr(arr[0]);
weBean.setRemote_user(arr[1]);
weBean.setTime_local(arr[3].substring(1));
weBean.setRequest(arr[6]);
weBean.setStatus(arr[8]);
weBean.setBody_bytes_sent(arr[9]);
weBean.setHttp_referer(arr[10]);
if (arr.length > 12) {
weBean.setHttp_user_agent(arr[11] + " " + arr[12]);
} else {
weBean.setHttp_user_agent(arr[11]);
}
if (Integer.parseInt(weBean.getStatus()) >= 400) {// 大于400,HTTP错误
weBean.setValid(false);
}
} else {
weBean.setValid(false);
}
return weBean;
}
public static String parserTime(String time) {
time.replace("/", "-");
return time;
}
}
public class WePreProcess {
static class WePreProcessMapper extends Mapper
Text k = new Text();
NullWritable v = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
WeBean weBean = WeParser.parser(line);
if (!weBean.isValid())
return;
k.set(weBean.toString());
context.write(k, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WePreProcess.class);
job.setMapperClass(WePreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
字符串、日期时间等的处理需要加强!
五、自定义GroupingComparator
1.需求
有如下订单:
现在需要求出每一个订单中成交金额最大的一笔交易
2.分析
1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值
注意,这不是一种常规的做法,只是我们刚好利用了它的一种机制!
默认的分组WritableComparator是通过文本的内容是否相同来决定是否是同一个Key,从而在reduce进行分组,我们只需要修改这个,即可!
3.代码
核心代码:
比较器:这里我们选择继承它的一个通用实现:WritableComparator
public class OrderComparator extends WritableComparator{
protected OrderComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 强转为子类
OrderBean bean1 = (OrderBean) a;
OrderBean bean2 = (OrderBean) b;
// 将ID是否相同视为一组
return bean1.getItemId().compareTo(bean2.getItemId());
}
}
/**
* 自定义分组比较器
* @author 廖*民
* time : 2015年1月18日下午9:15:26
* @version
*/
class MyGroupComparator implements RawComparator
// 分组策略中,这个方法不是重点
public int compare(CombineKey o1, CombineKey o2) {
// TODO Auto-generated method stub
return 0;
}
/**
* b1 表示第一个参与比较的字节数组
* s1 表示第一个字节数组中开始比较的位置
* l1 表示第一个字节数组中参与比较的字节长度
* b2 表示第二个参与比较的字节数组
* s2 表示第二个字节数组中开始比较的位置
* l2 表示第二个字节数组参与比较的字节长度
*/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
OrderBean的比较逻辑:
@Override
public int compareTo(OrderBean o) {
// 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
int cmp = this.getItemId().compareTo(o.getItemId());
if (0 == cmp) {
return -Double.compare(this.getAmount(), o.getAmount());
}
return cmp;
}
job中进行设置分组器:
// 关键步骤,设置分组器
job.setGroupingComparatorClass(OrderComparator.class);
完整代码:
package com.mr.group;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* mapper
*
* @author zcc ON 2018/2/6
**/
public class GroupMapper extends Mapper
OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String itemId = fields[0];
double amount = Double.parseDouble(fields[2]);
bean.set(itemId, amount);
context.write(bean, NullWritable.get());
}
}
package com.mr.group;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer
*
* @author zcc ON 2018/2/6
**/
public class GroupReducer extends Reducer
@Override
protected void reduce(OrderBean key, Iterable
// 设置了自定义分组策略后,到这里就是ID相同则为一组了,取出第一个key则为结果
context.write(key, NullWritable.get());
}
}
package com.mr.group;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* compator
*
* @author zcc ON 2018/2/6
**/
public class OrderComparator extends WritableComparator{
protected OrderComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 强转为子类
OrderBean bean1 = (OrderBean) a;
OrderBean bean2 = (OrderBean) b;
// 将ID是否相同视为一组
return bean1.getItemId().compareTo(bean2.getItemId());
}
}
package com.mr.group;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* bean
*
* @author zcc ON 2018/2/6
**/
public class OrderBean implements WritableComparable
private String itemId;
private Double amount;
public OrderBean() {
}
public OrderBean(String itemId, Double amount) {
this.itemId = itemId;
this.amount = amount;
}
public void set(String itemId, Double amount) {
this.itemId = itemId;
this.amount = amount;
}
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
@Override
public int compareTo(OrderBean o) {
// 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
int cmp = this.getItemId().compareTo(o.getItemId());
if (0 == cmp) {
return -Double.compare(this.getAmount(), o.getAmount());
}
return cmp;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(itemId);
out.writeDouble(amount);
}
@Override
public void readFields(DataInput in) throws IOException {
this.itemId = in.readUTF();
this.amount = in.readDouble();
}
@Override
public String toString() {
return itemId + "\t" + amount;
}
}
package com.mr.group;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* driver
*
* @author zcc ON 2018/2/6
**/
public class GroupDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置本程序jar包本地位置
job.setJarByClass(GroupDriver.class);
// 指定本业务job要使用的mapper/reducer业务类
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
// 指定map输出的数据类型(由于可插拔的序列化机制导致)
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 关键步骤,设置分组器
job.setGroupingComparatorClass(OrderComparator.class);
FileInputFormat.setInputPaths(job, new Path("F:/input"));
FileOutputFormat.setOutputPath(job, new Path("F:/output"));
// 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
// job.submit();
// 反馈集群信息
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号