大数据技术 使用bulkload向hbase中批量写入数据
沉沙 2018-10-11 来源 : 阅读 1676 评论 0

摘要:本篇教程探讨了大数据技术 使用bulkload向hbase中批量写入数据,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 使用bulkload向hbase中批量写入数据,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

1、数据样式
写入之前,需要整理以下数据的格式,之后将数据保存到hdfs中,本例使用的样式如下(用tab分开):

row1 N
row2 M
row3 B
row4 V
row5 N
row6 M
row7 B

2、代码
假设要将以上样式的数据写入到hbase中,列族为cf,列名为colb,可以使用下面的代码(参考)

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.HBaseConfiguration;
 7 import org.apache.hadoop.hbase.client.HTable;
 8 import org.apache.hadoop.hbase.client.Put;
 9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
11 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
12 import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
13 import org.apache.hadoop.hbase.util.Bytes;
14 import org.apache.hadoop.io.Text;
15 import org.apache.hadoop.mapreduce.Job;
16 import org.apache.hadoop.mapreduce.Mapper;
17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
19 
20 public class TestBulkLoad {
21     
22     public static class LoadMapper extends Mapper{
23         
24         @Override
25         protected void map(Object key, Text value, Context context)
26                 throws IOException, InterruptedException {
27             String[] values = value.toString().split("\t");
28             if(values.length ==2 ){
29                 byte[] rowkey = Bytes.toBytes(values[0]);
30                 byte[] col_value = Bytes.toBytes(values[1]);
31                 byte[] familly = Bytes.toBytes("cf");
32                 byte[] column = Bytes.toBytes("colb");
33                 ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey);
34                 Put testput = new Put(rowkey);
35                 testput.add(familly,column,col_value);
36                 context.write(rowkeyWritable, testput);    
37             }        
38             
39         }
40     }
41     public static void main(String[] args) throws Exception {
42         if(args.length !=4 ){
43             System.exit(0);
44         }
45         
46         String in = args[0];
47         String out = args[1];
48         int unitmb =Integer.valueOf(args[2]);                
49         String tbname = args[3];
50         
51         Configuration conf = new Configuration();                
52         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
53         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
54         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
55         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
56                 
57         Job job = new Job(conf);        
58         FileInputFormat.addInputPath(job, new Path(in));
59         FileOutputFormat.setOutputPath(job, new Path(out));            
60         job.setMapperClass(LoadMapper.class); 
61         job.setReducerClass(PutSortReducer.class);     
62         job.setOutputFormatClass(HFileOutputFormat2.class);
63         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
64         job.setMapOutputValueClass(Put.class);        
65         job.setJarByClass(TestBulkLoad.class);
66         
67         Configuration hbaseconf = HBaseConfiguration.create();
68         HTable table = new HTable(hbaseconf,tbname);
69         HFileOutputFormat2.configureIncrementalLoad(job, table);     
70         
71         job.waitForCompletion(true);   
72         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf);
73         loader.doBulkLoad(new Path(out), table);
74 
75     }
76 
77 }

这段代码使用mapreduce程序对数据做了进一步处理,之后调用相关的api将数据写入hbase中。PutSortReducer是一个自带的reducer类,不需要再进行编写。
3、执行
数据保存在TEXT文件中,上面代码导出的jar包为bulkload,hbase的数据表名称为testdata,注意,先指定以下HADOOP_CLASSPATH,避免出错。

1 export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:$HADOOP_CLASSPATH
2 hadoop jar ./Downloads/bulkload.jar com.testdata.TestBulkLoad Test hbasedata 64 testdata

4、结果

     

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程