大数据技术 Spark连接HBase详解
沉沙 2018-09-28 来源 : 阅读 989 评论 0

摘要:本篇教程探讨了大数据技术 Spark连接HBase详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 Spark连接HBase详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

(一)、Spark读取HBase中的数据
hbase中的数据

 

 1 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
 2 import org.apache.hadoop.hbase.client.HBaseAdmin
 3 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 4 import org.apache.spark._
 5 import org.apache.hadoop.hbase.util.Bytes
 6 
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 从hbase读取数据转化成RDD
11   */
12 object SparkReadHBase {
13 
14   def main(args: Array[String]): Unit = {
15     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
16     val sc = new SparkContext(sparkConf)
17 
18     val tablename = "account"
19     val conf = HBaseConfiguration.create()
20     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //设置zookeeper连接端口,默认2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24     conf.set(TableInputFormat.INPUT_TABLE, tablename)
25 
26     // 如果表不存在则创建表
27     val admin = new HBaseAdmin(conf)
28     if (!admin.isTableAvailable(tablename)) {
29       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
30       admin.createTable(tableDesc)
31     }
32 
33     //读取数据并转化成rdd
34     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
35       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
36       classOf[org.apache.hadoop.hbase.client.Result])
37 
38     val count = hBaseRDD.count()
39     println(count)
40     hBaseRDD.foreach{case (_,result) =>{
41       //获取行键
42       val key = Bytes.toString(result.getRow)
43       //通过列族和列名获取列
44       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
45       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
46       println("Row key:"+key+" Name:"+name+" Age:"+age)
47     }}
48 
49     sc.stop()
50     admin.close()
51   }
52 
53 }

(二)、Spark写HBase
  1.第一种方式:

 1 import org.apache.hadoop.hbase.HBaseConfiguration
 2 import org.apache.hadoop.hbase.client.Put
 3 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 4 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 5 import org.apache.hadoop.hbase.util.Bytes
 6 import org.apache.hadoop.mapred.JobConf
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
 9 /**
10   * Created by *** on 2018/2/12.
11   *
12   * 使用saveAsHadoopDataset写入数据
13   */
14 object SparkWriteHBaseOne {
15   def main(args: Array[String]): Unit = {
16     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
17     val sc = new SparkContext(sparkConf)
18 
19     val conf = HBaseConfiguration.create()
20     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //设置zookeeper连接端口,默认2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24 
25     val tablename = "account"
26 
27     //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
28     val jobConf = new JobConf(conf)
29     jobConf.setOutputFormat(classOf[TableOutputFormat])
30     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
31 
32     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
33 
34 
35     val rdd = indataRDD.map(_.split(',')).map{arr=>{
36       /*一个Put对象就是一行记录,在构造方法中指定主键
37        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
38        * Put.add方法接收三个参数:列族,列名,数据
39        */
40       val put = new Put(Bytes.toBytes(arr(0).toInt))
41       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
42       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
43       //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
44       (new ImmutableBytesWritable, put)
45     }}
46 
47     rdd.saveAsHadoopDataset(jobConf)
48 
49     sc.stop()
50   }
51 }

  2.第二种方式:

 1 import org.apache.hadoop.hbase.client.{Put, Result}
 2 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 3 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 4 import org.apache.hadoop.hbase.util.Bytes
 5 import org.apache.hadoop.mapreduce.Job
 6 import org.apache.spark._
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 使用saveAsNewAPIHadoopDataset写入数据
11   */
12 object SparkWriteHBaseTwo {
13   def main(args: Array[String]): Unit = {
14     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
15     val sc = new SparkContext(sparkConf)
16 
17     val tablename = "account"
18 
19     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","node02,node03,node04")
20     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
21     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
22 
23     val job = new Job(sc.hadoopConfiguration)
24     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
25     job.setOutputValueClass(classOf[Result])
26     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
27 
28     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
29     val rdd = indataRDD.map(_.split(',')).map{arr=>{
30       val put = new Put(Bytes.toBytes(arr(0)))
31       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
32       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
33       (new ImmutableBytesWritable, put)
34     }}
35 
36     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
37   }
38 }

     

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程