摘要:本篇教程探讨了大数据技术全面解读 MapReduce详解(六)MR其他补充,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术全面解读 MapReduce详解(六)MR其他补充,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
一、自定义in/outputFormat
1.需求
现有一些原始日志需要做增强解析处理,流程:
1、 从原始日志文件中读取数据
2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
1374609560.11 1374609560.16 1374609560.16 1374609560.16 110 5 8615038208365 460023383869133 8696420056841778 2 460 0 14615 54941 10.188.77.252 61.145.116.27 35020 80 6 cmnet 1 221.177.218.34 221.177.217.161 221.177.218.34 221.177.217.167 ad.veegao.com //ad.veegao.com/veegao/iris.action Apache-HttpClient/UNAVAILABLE (java 1.4) POST 200 593 310 4 3 0 0 4 3 0 0 0 0 //ad.veegao.com/veegao/iris.action 5903903079251243019 5903903103500771339 5980728
1374609558.91 1374609558.97 1374609558.97 1374609559.31 112 461 8615038208365 460023383869133 8696420056841778 2 460 0 14615 54941 10.188.77.252 101.226.76.175 37293 80 6 cmnet 1 221.177.218.34 221.177.217.161 221.177.218.34 221.177.217.167 short.weixin.qq.com //short.weixin.qq.com/cgi-bin/micromsg-bin/getdns Android QQMail HTTP Client POST 200 543 563 2 3 0 0 2 3 0 0 0 0 //short.weixin.qq.com/cgi-bin/micromsg-bin/getdns 5903903079251243019 5903903097240039435 5980728
1374609514.70 1374609514.75 1374609514.75 1374609515.58 110 5 8613674976196 460004901700207 8623350100353878 2 460 0 14694 58793 10.184.80.32 111.13.13.222 36181 80 6 cmnet 1 221.177.156.4 221.177.217.145 221.177.156.4 221.177.217.156 retype.wenku.bdimg.com //retype.wenku.bdimg.com/img/97308d2b7375a417866f8f09 AMB_400 GET 200 345 4183 5 5 0 0 5 5 0 0 0 0 //retype.wenku.bdimg.com/img/97308d2b7375a417866f8f09 5903900710696611851 5903902908140003339 5937307
1374609511.98 1374609512.02 1374609512.02 1374609512.48 110 362 8613674976196 460004901700207 8623350100353878 2 460 0 14694 58793 10.184.80.32 120.204.207.160 33548 80 6 cmnet 1 221.177.156.4 221.177.217.145 221.177.156.4 221.177.217.156 t4.qpic.cn //t4.qpic.cn/mpic/217cf24d43f1f19255e2/120 AMB_400 GET 200 346 3184 4 4 0 0 4 4 0 0 0 0 //t4.qpic.cn/mpic/217cf24d43f1f19255e2/120 5903900710696611851 5903902896317288459 5937307
1374609518.14 1374609518.24 1374609518.24 1374609518.72 110 362 8613674976196 460004901700207 8623350100353878 2 460 0 14694 58793 10.184.80.32 120.204.207.160 33548 80 6 cmnet 1 221.177.156.4 221.177.217.145 221.177.156.4 221.177.217.156 t4.qpic.cn //t4.qpic.cn/mpic/96e02ad781c9be6f5ad2/120 AMB_400 GET 200 346 3328 4 4 0 0 4 4 0 0 0 0 //t4.qpic.cn/mpic/96e02ad781c9be6f5ad2/120 5903900710696611851 5903902896317288459 5937307
2.分析
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现
这里和之前不一样的点就是需要从数据库提取信息,示例用的是原始的。那我们从简就可以使用DbUtils来简化一些,在mapper中通过setup()进行初始化即可!
3.代码
这里偷个小懒就没有手动建立数据库之类测试了。关键点是自定义OutputFormat
我们默认使用的是TextOutputFormat(),在自定义之前,当然有必要先参考这个默认的东东:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.*;
/** An {@link OutputFormat} that writes plain text files. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter
extends RecordWriter
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
一些参考与实例:https://www.cns.com/liuming1992/p/4758504.html
//..net/woshisap/article/details/42320129
//chengjianxiaoxue.iteye.com//2163284 --> 推荐
4.自定义inputFormat
public class WholeFileInputFormat extends
FileInputFormat
//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
class WholeFileRecordReader extends RecordReader
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// do nothing
}
}
参考://..net/woshixuye/article/details/53557487
//irwenqiang.iteye.com//1448164
//m635674608.iteye.com//2243076
完整代码:
package cn.itcast.bigdata.mr.logenhance;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class DBLoader {
public static void dbLoader(Map
Connection conn = null;
Statement st = null;
ResultSet res = null;
try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root");
st = conn.createStatement();
res = st.executeQuery("select url,content from url_rule");
while (res.next()) {
ruleMap.put(res.getString(1), res.getString(2));
}
} finally {
try{
if(res!=null){
res.close();
}
if(st!=null){
st.close();
}
if(conn!=null){
conn.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
}
package cn.itcast.bigdata.mr.logenhance;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogEnhance {
static class LogEnhanceMapper extends Mapper
Map
Text k = new Text();
NullWritable v = NullWritable.get();
// 从数据库中加载规则信息倒ruleMap中
@Override
protected void setup(Context context) throws IOException, InterruptedException {
try {
DBLoader.dbLoader(ruleMap);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称
Counter counter = context.getCounter("malformed", "malformedline");
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
try {
String url = fields[26];
String content_tag = ruleMap.get(url);
// 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志
if (content_tag == null) {
k.set(url + "\t" + "tocrawl" + "\n");
context.write(k, v);
} else {
k.set(line + "\t" + content_tag + "\n");
context.write(k, v);
}
} catch (Exception exception) {
counter.increment(1);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogEnhance.class);
job.setMapperClass(LogEnhanceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法
job.setOutputFormatClass(LogEnhanceOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:/srcdata/weinput/"));
// 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat
// 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path
FileOutputFormat.setOutputPath(job, new Path("D:/temp/output/"));
// 不需要reducer
job.setNumReduceTasks(0);
job.waitForCompletion(true);
System.exit(0);
}
}
package cn.itcast.bigdata.mr.logenhance;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter
* 然后再调用RecordWriter的write(k,v)方法将数据写出
*
* @author
*
*/
public class LogEnhanceOutputFormat extends FileOutputFormat
@Override
public RecordWriter
FileSystem fs = FileSystem.get(context.getConfiguration());
Path enhancePath = new Path("D:/temp/en/log.dat");
Path tocrawlPath = new Path("D:/temp/crw/url.dat");
FSDataOutputStream enhancedOs = fs.create(enhancePath);
FSDataOutputStream tocrawlOs = fs.create(tocrawlPath);
return new EnhanceRecordWriter(enhancedOs, tocrawlOs);
}
/**
* 构造一个自己的recordwriter
*
* @author
*
*/
static class EnhanceRecordWriter extends RecordWriter
FSDataOutputStream enhancedOs = null;
FSDataOutputStream tocrawlOs = null;
public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {
super();
this.enhancedOs = enhancedOs;
this.tocrawlOs = tocrawlOs;
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String result = key.toString();
// 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat
if (result.contains("tocrawl")) {
tocrawlOs.write(result.getBytes());
} else {
// 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat
enhancedOs.write(result.getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (tocrawlOs != null) {
tocrawlOs.close();
}
if (enhancedOs != null) {
enhancedOs.close();
}
}
}
}
其他 待补充。。
二、计数器与多Job串联
1.计数器
MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。可以用来记录一些全局数据等!
相关介绍与参考://..net/xw_classmate/article/details/50954384
https://www.cns.com/codeOfLife/p/5521356.html
2.多Job串联
一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现
——一般不用,因为串联在job中容易写死,建议通过shell脚本来控制
自定义实现可以通过jobName来区分多个Job,自己控制提交与依赖,所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入
自定义实现的示例,参考:https://www.cns.com/yjmyzz/p/4540469.html
通过JobControl来控制Job的依赖关系:
核心代码:
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
cJob1.setJob(job1);
cJob2.setJob(job2);
cJob3.setJob(job3);
// 设置作业依赖关系
cJob2.addDependingJob(cJob1);
cJob3.addDependingJob(cJob2);
JobControl jobControl = new JobControl("RecommendationJob");
jobControl.addJob(cJob1);
jobControl.addJob(cJob2);
jobControl.addJob(cJob3);
// 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
while (!jobControl.allFinished()) {
Thread.sleep(500);
}
jobControl.stop();
return 0;
更多完整示例://..net/sven119/article/details/78806380
//mntms.iteye.com//2096456 -->推荐
三、数据压缩
经典用法:
Mapper输出压缩:
new API:
Configuration conf = new Configuration();
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
Job job = new Job(conf);
old API:
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);
Reducer输出压缩:
配置方法:
mapreduce.output.fileoutputformat.compress=false
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type=RECORD
代码设置法:
//将reduce输出文件压缩
FileOutputFormat.setCompressOutput(job, true); //job使用压缩
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //设置压缩格式
更多参考:https://www.cns.com/ggjucheng/archive/2012/04/22/2465580.html
四、常用MR配置参数优化
1.资源相关参数
11.1 资源相关参数
//以下参数是在用户自己的mr应用程序中配置就可以生效
(1) mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
(2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
(3) mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”
(4) mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默认值: “”
(5) mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1
(6) mapreduce.reduce.cpu.vcores: 每个Reduce task可使用的最多cpu core数目, 默认值: 1
//应该在yarn启动之前就配置在服务器的配置文件中才能生效
(7) yarn.scheduler.minimum-allocation-mb 1024 给应用程序container分配的最小内存
(8) yarn.scheduler.maximum-allocation-mb 8192 给应用程序container分配的最大内存
(9) yarn.scheduler.minimum-allocation-vcores 1
(10)yarn.scheduler.maximum-allocation-vcores 32
(11)yarn.nodemanager.resource.memory-mb 8192
//shuffle性能优化的关键参数,应在yarn启动之前就配置好
(12)mapreduce.task.io.sort.mb 100 //shuffle的环形缓冲区大小,默认100m
(13)mapreduce.map.sort.spill.percent 0.8 //环形缓冲区溢出的阈值,默认80%
2.容错相关参数
(1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
(2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
(3) mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。
(4) mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.
(5) mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
3.本地作业参数
设置以下几个参数:
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local
4.效率和稳定性相关参数
(1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false
(2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false
(3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。
(4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片时的最大切片大小
(切片的默认大小就等于blocksize,即 134217728)
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号