大数据技术 Spark数据读取与保存
沉沙 2018-09-29 来源 : 阅读 1439 评论 0

摘要:本篇教程探讨了大数据技术 Spark数据读取与保存,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 Spark数据读取与保存,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

Spark对很多种文件格式的读取和保存方式都很简单。Spark会根据文件扩展名选择对应的处理方式。

Spark支持的一些常见文件格式如下:

     文本文件

   使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件。也可以指定minPartitions控制分区数。传递目录作为参数,会把目录中的各部分都读取到RDD中。例如:

val input = sc.textFile("E:\\share\\new\\chapter5")
input.foreach(println)

 chapter目录有三个txt文件,内容如下:

 

输出结果:

用SparkContext.wholeTextFiles()也可以处理多个文件,该方法返回一个pair RDD,其中键是输入文件的文件名。

例如:

val input = sc.wholeTextFiles("E:\\share\\new\\chapter5")
input.foreach(println)

  输出结果:

保存文本文件用saveAsTextFile(outputFile)

     JSON

JSON是一种使用较广的半结构化数据格式,这里使用json4s来解析JSON文件。

如下:
+ View Code

 json文件内容:

输出结果:

保存JSON文件用saveASTextFile(outputFile)即可

如下:

val datasave = input.map { myrecord =>
      implicit val formats = DefaultFormats
      val jsonObj = parse(myrecord)
      jsonObj.extract[Person]
    }
datasave.saveAsTextFile("E:\\share\\spark\\savejson")

输出结果:

    CSV文件

 读取CSV文件和读取JSON数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。

如下:

import org.apache.spark.{SparkConf, SparkContext}
import java.io.StringReader
 
import au.com.bytecode.opencsv.CSVReader
 
object DataReadAndSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("CSV")
    val sc = new SparkContext(conf)
 
    val input = sc.textFile("E:\\share\\spark\\test.csv")
    input.foreach(println)
    val result = input.map{
      line =>
        val reader = new CSVReader(new StringReader(line))
        reader.readNext()
    }
    for(res <- result){
      for(r <- res){
        println(r)
      }
    }
  }
 
}

test.csv内容:

输出结果:

 保存csv

如下:

val inputRDD = sc.parallelize(List(Person("Mike", "yes")))
        inputRDD.map(person  => List(person.name,person.favoriteAnimal).toArray)
        .mapPartitions { people =>
          val stringWriter = new StringWriter()
          val csvWriter = new CSVWriter(stringWriter)
          csvWriter.writeAll(people.toList)
          Iterator(stringWriter.toString)
        }.saveAsTextFile("E:\\share\\spark\\savecsv")

 

    SequenceFile

SequenceFile是由没有相对关系结构的键值对文件组成的常用Hadoop格式。是由实现Hadoop的Writable接口的元素组成,常见的数据类型以及它们对应的Writable类如下:

读取SequenceFile

调用sequenceFile(path , keyClass , valueClass , minPartitions)

保存SequenceFile

调用saveAsSequenceFile(outputFile)

 

    对象文件

对象文件使用Java序列化写出,允许存储只包含值的RDD。对象文件通常用于Spark作业间的通信。

保存对象文件调用 saveAsObjectFile    读取对象文件用SparkContext的objectFile()函数接受一个路径,返回对应的RDD

 

    Hadoop输入输出格式

 Spark可以与任何Hadoop支持的格式交互。

读取其他Hadoop输入格式,使用newAPIHadoopFile接收一个路径以及三个类,第一个类是格式类,代表输入格式,第二个类是键的类,最后一个类是值的类。

hadoopFile()函数用于使用旧的API实现的Hadoop输入格式。

KeyValueTextInputFormat 是最简单的 Hadoop 输入格式之一,可以用于从文本文件中读取键值对数据。每一行都会被独立处理,键和值之间用制表符隔开。

 例子:

 
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
object HadoopFile {
 
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("hadoopfile").setMaster("local")
    val sc = new SparkContext(conf)
 
    
    val job = new Job()
    val data = sc.newAPIHadoopFile("E:\\share\\spark\\test.json" ,
      classOf[KeyValueTextInputFormat],
      classOf[Text],
      classOf[Text],
      job.getConfiguration)
    data.foreach(println)
 
    data.saveAsNewAPIHadoopFile(
      "E:\\share\\spark\\savehadoop",
      classOf[Text],
      classOf[Text],
      classOf[TextOutputFormat[Text,Text]],
      job.getConfiguration)
 
  }
}

  输出结果:

读取

保存

若使用旧API如下:

 
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("E:\\share\\spark\\test.json

 
").map { case (x, y) => (x.toString, y.toString) } input.foreach(println)

  

    文件压缩

对数据进行压缩可以节省存储空间和网络传输开销,Spark原生的输入方式(textFile和sequenFile)可以自动处理一些类型的压缩。在读取压缩后的数据时,一些压缩编解码器可以推测压缩类型。

 

    文件系统

Spark支持读写很多种文件系统,可以使用任何我们想要的文件格式。包括:

  1、本地文件系统 

要求文件在集群中所有节点的相同路径下都可以找到。 本地文件系统路径使用 例如:val rdd = sc.textFile("file:///home/holden/happypandas.gz")。

  2、Amazon S3

将一个以s3n://开头的路径以s3n://bucket/path-within-bucket的形式传给Spark的输入方法。

  3、HDFS

在Spark中使用HDFS只需要将输入路径输出路径指定为hdfs://master:port/path就可以了

 

    Apache Hive

Apache Hive是Hadoop上一中常见的结构化数据源。Hive可以在HDFS内或者在其他存储系统上存储多种格式的表。SparkSQL可以读取Hive支持的任何表。

将Spark SQL连接到已有的Hive上,创建出HiveContext对象也就是Spark SQL入口,然后就可以使用Hive查询语言来对你的表进行查询,并以由行组成的RDD形式返回数据。

使用HiveContext.jsonFile方法可以从整个文件中获取Row对象组成的RDD。例子:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
 
object Sparksql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkSQL")
        val sc = new SparkContext(conf)
        val sql = new HiveContext(sc)
        val input = sql.jsonFile("E:\\share\\spark\\tweets.json")
        input.registerTempTable("tweets")
        val topTweets = sql.sql("select user.name,text from tweets")
        topTweets.foreach(println)
  }
 
}

使用数据:

 

输出结果:

 

    数据库

 Spark可以从任何支持Java数据库连接(JDBC)的关系型数据库中读取数据,包括MySQL,Postgre等系统。

Spark连接JDBC,通过创建SQLContext对象进行连接,设置连接参数,然后就可以使用sql语句进行查询,结果返回一个jdbcRDD。如下:

首先在MySQL里面建立名为info的数据库,建表及导入数据:

sql查询数据:

使用Spark连接JDBC查询,Scala代码如下:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
 
object JDBC {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkSQL")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val mysql = sqlContext.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/info").
      option("dbtable","student").option("driver","com.mysql.jdbc.Driver").
      option("user","root").option("password","********").load()
    mysql.registerTempTable("student")
    mysql.sqlContext.sql("select * from student where sage >= 20").collect().foreach(println)
  }
 
}

  输出结果:

   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程