大数据技术学习笔记(14)HBase的过滤器与Mapreduce
沉沙 2018-10-08 来源 : 阅读 1422 评论 0

摘要:本篇教程探讨了大数据技术学习笔记(14)HBase的过滤器与Mapreduce,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术学习笔记(14)HBase的过滤器与Mapreduce,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

一. HBase过滤器

1、列值过滤器

2、列名前缀过滤器

3、多个列名前缀过滤器

4、行键过滤器
5、组合过滤器
复制代码

package demo;

import javax.swing.RowFilter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import net.spy.memcached.ops.OperationErrorType;

public class TestHBaseFilter {

    /**
     * 列值过滤器:SingleColumnValueFilter
     */
    @Test
    public void testSingleColumnValueFilter() throws Exception{
        //查询工资等于3000的员工
        //select * from emp where sal=3000
        //配置ZK的地址信息
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.153.11");
        
        //得到HTable客户端
        HTable client  = new HTable(conf, "emp");
        //定义一个列值过滤器
        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("empinfo"),//列族
                Bytes.toBytes("sal"), //工资
                CompareOp.EQUAL,       // =
                Bytes.toBytes("3000"));//ֵ
        
        //定义一个扫描器
        Scan scan = new Scan();
        scan.setFilter(filter);
        
        //通过过滤器查询数据
        ResultScanner rs = client.getScanner(scan);
        for (Result result : rs) {
            String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            System.out.println(name);
        }
        
        client.close();
    }
    
    /**
     * 列名前缀过滤器:ColumnPrefixFilter
     */
    @Test
    public void testColumnPrefixFilter() throws Exception{
        //列名前缀过滤器
        //select ename from emp
        //配置ZK的地址信息
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.153.11");
        
        //得到HTable客户端
        HTable client  = new HTable(conf, "emp");
        
        //定义一个列名前缀过滤器
        ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("ename"));
        
        //定义一个扫描器
        Scan scan = new Scan();
        scan.setFilter(filter);
        
        //通过过滤器查询数据
        ResultScanner rs = client.getScanner(scan);
        for (Result result : rs) {
            String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            System.out.println(name);
        }
        
        client.close();
    }
    
    /**
     * 多个列名前缀过滤器:MultipleColumnPrefixFilter
     */
    @Test
    public void testMultipleColumnPrefixFilter() throws Exception{

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.153.11");
        
        HTable client  = new HTable(conf, "emp");
        //员工姓名 薪资
        byte[][] names = {Bytes.toBytes("ename"),Bytes.toBytes("sal")};
        
        MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(names);
    
        Scan scan = new Scan();
        scan.setFilter(filter);
        
        ResultScanner rs = client.getScanner(scan);
        for (Result result : rs) {
            String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            String sal = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
            System.out.println(name+"\t"+sal);
        }
        
        client.close();
    }
    
    /**
     * 行键过滤器:RowFilter
     */
    @Test
    public void testRowFilter() throws Exception{
        
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.153.11");
        
        HTable client  = new HTable(conf, "emp");
        
        //定义一个行键过滤器
        org.apache.hadoop.hbase.filter.RowFilter filter = new org.apache.hadoop.hbase.filter.RowFilter(
                CompareOp.EQUAL, //=
                new RegexStringComparator("7839"));
        
        //定义一个扫描器
        Scan scan = new Scan();
        scan.setFilter(filter);
        
        //通过过滤器查询数据
        ResultScanner rs = client.getScanner(scan);
        for (Result result : rs) {
            String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            String sal = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
            System.out.println(name+"\t"+sal);
        }
        
        client.close();
    }
    
    /**
     * 组合过滤器
     */
    @Test
    public void testFilter() throws Exception{

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.153.11");
        
        HTable client  = new HTable(conf, "emp");
        
        //工资=3000
        SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("empinfo"), 
                Bytes.toBytes("sal"), CompareOp.EQUAL, Bytes.toBytes("3000"));
        //名字
        ColumnPrefixFilter filter2 = new ColumnPrefixFilter(Bytes.toBytes("ename"));
        
        FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
        filterList.addFilter(filter1);
        filterList.addFilter(filter2);
        
        Scan scan = new Scan();
        scan.setFilter(filterList);
        
        ResultScanner rs = client.getScanner(scan);
        for (Result result : rs) {
            String name = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            String sal = Bytes.toString(result.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
            System.out.println(name+"\t"+sal);
        }
        
        client.close();
    }
}

复制代码

二. HDFS上的mapreduce

建立表

   create 'word','content'

   put 'word','1','content:info','I love Beijing'

   put 'word','2','content:info','I love China'

   put 'word','3','content:info','Beijing is the capital of China'

  create 'stat','content'

注意:export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:$CLASSPATH

WordCountMapper.java

复制代码

package wc;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

//K2 V2
//没有k1和v1,因为输入的就是表中一条记录
public class WordCountMapper extends TableMapper<Text, IntWritable>{

    @Override
    protected void map(ImmutableBytesWritable key, Result value,
            Context context)throws IOException, InterruptedException {
        //key和value代表从表中输入的一条记录
        //key:行键 value:数据
        String data = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));
        //分词
        String[] words = data.split(" ");
        for (String w : words) {
            context.write(new Text(w), new IntWritable(1));
        }
    }
}

复制代码

WordCountReducer.java
复制代码

package wc;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//k3 v3 代表输出一条记录
public class WordCountReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{

    @Override
    protected void reduce(Text k3, Iterable<IntWritable> v3,Context context)
            throws IOException, InterruptedException {
        // 求和
        int total = 0;
        for (IntWritable v : v3) {
            total = total + v.get();
        }
        
        //构造一个put对象
        Put put = new Put(Bytes.toBytes(k3.toString()));
        put.add(Bytes.toBytes("content"),//列族
                Bytes.toBytes("result"),//列
                Bytes.toBytes(String.valueOf(total)));
        
        //输出
        context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), //把这个单词作为key,就是输出的行键
                put);//表中的一条记录
    }
    
}

复制代码

WordCountMain.java
复制代码

package wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class WordCountMain {

    public static void main(String[] args) throws Exception {
        //获取ZK的地址
        //指定的配置信息:Zookeeper
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.153.11");
        
        //创建一个任务
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountMain.class);
        
        //定义一个扫描器读取:content:info
        Scan scan = new Scan();
        //可以使用filter
        scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("info"));
        
        //使用工具类设置Mapper
        TableMapReduceUtil.initTableMapperJob(
                Bytes.toBytes("word"), //输入的表
                scan, //扫描器,只读取需要处理的数据
                WordCountMapper.class, 
                Text.class, //key
                IntWritable.class,//value 
                job);
        
        //使用工具类Reducer
        TableMapReduceUtil.initTableReducerJob("stat", WordCountReducer.class, job);
        
        job.waitForCompletion(true);
    }

}

复制代码

结果:

   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved