沉沙
2018-10-08
来源 :
阅读 1422
评论 0
摘要:本篇教程探讨了大数据技术学习笔记(14)HBase的过滤器与Mapreduce,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术学习笔记(14)HBase的过滤器与Mapreduce,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
一. HBase过滤器
1、列值过滤器
2、列名前缀过滤器
3、多个列名前缀过滤器
4、行键过滤器
5、组合过滤器
复制代码
package demo;
import javax.swing.RowFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import net.spy.memcached.ops.OperationErrorType;
public class TestHBaseFilter {
/**
* 列值过滤器:SingleColumnValueFilter
*/
@Test
public void testSingleColumnValueFilter() throws Exception{
//查询工资等于3000的员工
//select * from emp where sal=3000
//配置ZK的地址信息
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.153.11");
//得到HTable客户端
HTable client = new HTable(conf, "emp");
//定义一个列值过滤器
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("empinfo"),//列族
Bytes.toBytes("sal"), //工资
CompareOp.EQUAL, // =
Bytes.toBytes("3000"));//ֵ
//定义一个扫描器
Scan scan = new Scan();
scan.setFilter(filter);
//通过过滤器查询数据
ResultScanner rs = client.getScanner(scan);
for (Result result : rs) {
String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
System.out.println(name);
}
client.close();
}
/**
* 列名前缀过滤器:ColumnPrefixFilter
*/
@Test
public void testColumnPrefixFilter() throws Exception{
//列名前缀过滤器
//select ename from emp
//配置ZK的地址信息
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.153.11");
//得到HTable客户端
HTable client = new HTable(conf, "emp");
//定义一个列名前缀过滤器
ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("ename"));
//定义一个扫描器
Scan scan = new Scan();
scan.setFilter(filter);
//通过过滤器查询数据
ResultScanner rs = client.getScanner(scan);
for (Result result : rs) {
String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
System.out.println(name);
}
client.close();
}
/**
* 多个列名前缀过滤器:MultipleColumnPrefixFilter
*/
@Test
public void testMultipleColumnPrefixFilter() throws Exception{
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.153.11");
HTable client = new HTable(conf, "emp");
//员工姓名 薪资
byte[][] names = {Bytes.toBytes("ename"),Bytes.toBytes("sal")};
MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(names);
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner rs = client.getScanner(scan);
for (Result result : rs) {
String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
String sal = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
System.out.println(name+"\t"+sal);
}
client.close();
}
/**
* 行键过滤器:RowFilter
*/
@Test
public void testRowFilter() throws Exception{
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.153.11");
HTable client = new HTable(conf, "emp");
//定义一个行键过滤器
org.apache.hadoop.hbase.filter.RowFilter filter = new org.apache.hadoop.hbase.filter.RowFilter(
CompareOp.EQUAL, //=
new RegexStringComparator("7839"));
//定义一个扫描器
Scan scan = new Scan();
scan.setFilter(filter);
//通过过滤器查询数据
ResultScanner rs = client.getScanner(scan);
for (Result result : rs) {
String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
String sal = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
System.out.println(name+"\t"+sal);
}
client.close();
}
/**
* 组合过滤器
*/
@Test
public void testFilter() throws Exception{
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.153.11");
HTable client = new HTable(conf, "emp");
//工资=3000
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("empinfo"),
Bytes.toBytes("sal"), CompareOp.EQUAL, Bytes.toBytes("3000"));
//名字
ColumnPrefixFilter filter2 = new ColumnPrefixFilter(Bytes.toBytes("ename"));
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
filterList.addFilter(filter1);
filterList.addFilter(filter2);
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner rs = client.getScanner(scan);
for (Result result : rs) {
String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
String sal = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
System.out.println(name+"\t"+sal);
}
client.close();
}
}
复制代码
二. HDFS上的mapreduce
建立表
create 'word','content'
put 'word','1','content:info','I love Beijing'
put 'word','2','content:info','I love China'
put 'word','3','content:info','Beijing is the capital of China'
create 'stat','content'
注意:export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:$CLASSPATH
WordCountMapper.java
复制代码
package wc;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
//K2 V2
//没有k1和v1,因为输入的就是表中一条记录
public class WordCountMapper extends TableMapper<Text, IntWritable>{
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context)throws IOException, InterruptedException {
//key和value代表从表中输入的一条记录
//key:行键 value:数据
String data = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));
//分词
String[] words = data.split(" ");
for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
复制代码
WordCountReducer.java
复制代码
package wc;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//k3 v3 代表输出一条记录
public class WordCountReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3,Context context)
throws IOException, InterruptedException {
// 求和
int total = 0;
for (IntWritable v : v3) {
total = total + v.get();
}
//构造一个put对象
Put put = new Put(Bytes.toBytes(k3.toString()));
put.add(Bytes.toBytes("content"),//列族
Bytes.toBytes("result"),//列
Bytes.toBytes(String.valueOf(total)));
//输出
context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), //把这个单词作为key,就是输出的行键
put);//表中的一条记录
}
}
复制代码
WordCountMain.java
复制代码
package wc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class WordCountMain {
public static void main(String[] args) throws Exception {
//获取ZK的地址
//指定的配置信息:Zookeeper
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.153.11");
//创建一个任务
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountMain.class);
//定义一个扫描器读取:content:info
Scan scan = new Scan();
//可以使用filter
scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("info"));
//使用工具类设置Mapper
TableMapReduceUtil.initTableMapperJob(
Bytes.toBytes("word"), //输入的表
scan, //扫描器,只读取需要处理的数据
WordCountMapper.class,
Text.class, //key
IntWritable.class,//value
job);
//使用工具类Reducer
TableMapReduceUtil.initTableReducerJob("stat", WordCountReducer.class, job);
job.waitForCompletion(true);
}
}
复制代码
结果:
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号