大数据技术 Spark Core(1)什么是RDD的Transformation和Actions以及Dependency?
沉沙 2018-10-08 来源 : 阅读 2090 评论 0

摘要:本篇教程探讨了大数据技术 Spark Core(1)什么是RDD的Transformation和Actions以及Dependency?,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 Spark Core(1)什么是RDD的Transformation和Actions以及Dependency?,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

1. Spark的RDD
RDD(Resilient Distributed Datasets),弹性分布式数据集,是对分布式数据集的一种抽象。
RDD所具备5个主要特性:

    一组分区列表
    计算每一个数据分片的函数
    RDD上的一组依赖
    对于Key Value 对的RDD,会有一个Partitioner, 这是数据的分区器,控制数据分区策略和数量
    一组Preferred Location信息(如HDFS 上的数据块地址)


上图是一个简单的CoGroupedRDD满足了RDD 5个特性

2. RDD的两种操作

2.1 Transformation
Transformation: 转换,从现有的数据集创建一个新的数据集,从一个RDD转换成另一个RDD,transformation的操作是延迟计算的,在Driver层就构建好RDD之间的关系,数据分区策略,但并不提交计算。
Transformations 按照数据类型纬度分为:Value数据类型和Key-Value的数据类型的Transformation
2.1.1 Value型Transformation
针对以Value为输入值的RDD,常见的Map, FlatMap....,而输出值并不一定是value,也有可能是Key,Value的数据类型
以输入分区和输出分区的数据关系类型

    输入分区和输出分区1对1 例如 map
    输入分区和输出分区多对1 例如 union
    输入分区和输出分区多对多 例如 groupBy
    输入分区包含输出分区 例如 filter

2.1.2 Key-Value型Transformation
针对Key,Value的输入类型,进行聚集,连接等操作
Spark 里处理Key,Value的输入类型有个专门的类来处理

    class PairRDDFunctions[K, V](self: RDD[(K, V)])
        (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
      extends Logging with Serializable {
    }

2.1.2.1 RDD 转 PairRDDFunctions
会不会很奇怪,并没有继承RDD,也就是说严格意义上来说,K-V的算子并不是RDD,先看看一个例子:

    line.flatMap(_.split(" "))
                             .map((_, 2))
                                 .reduceByKey(_+_).collect().foreach(println)


reduceByKey是一个Key-Value的算子 

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }

在rdd.scala中map函数中,返回的类型是MapPartitionsRDD,并不是PairRDDFunctions,如何转换的?
在scala语言里有个语法叫做“scala implicit method”,在隐式转化里我们看到了定义

      implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
        (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
        new PairRDDFunctions(rdd)
      }


将rdd自动转为PairRDDFunctions,最后调用了算子reduceByKey

2.1.2.1 PairRDDFunctions 转 RDD
Spark的核心抽象RDD是各个组件交互的核心,也是API里的主要接口,显然不能使用对象PairRDDFunctions作为RDD之间的交互。
PairRDDFunctions的初始化的时候会带入一个RDD,这是父类的RDD

    @Experimental
      def combineByKeyWithClassTag[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
        if (keyClass.isArray) {
          if (mapSideCombine) {
            throw new SparkException("Cannot use map-side combining with array keys.")
          }
          if (partitioner.isInstanceOf[HashPartitioner]) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
        }
        val aggregator = new Aggregator[K, V, C](
          self.context.clean(createCombiner),
          self.context.clean(mergeValue),
          self.context.clean(mergeCombiners))
        if (self.partitioner == Some(partitioner)) {
          self.mapPartitions(iter => {
            val context = TaskContext.get()
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
          }, preservesPartitioning = true)
        } else {
          new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
        }
      }


当调用K-V算子的时候,可以单独指定分区器,否则算子会自己构建一个HashPartitioner的分区器而分区策略依赖输入的分片块, 通过判断数据的分区器是否和父RDD的一致,构建ShuffledRDD,MapPartitionsRDD

2.2 Action
 在前面谈到Transformation都是延迟计算的,原因也很简单,所有的计算都需要最后的结果展现,如果我不想获取结果,用于计算、保存,那么计算就没有意义了,也就不需要计算了,所以用于最后需要计算的前提是需要有Action,结果展现。
比较常见的:

    无输出 foreach
    输出到文件或者HDFS
    Scala的集合等数据类型 collect, count

在Action中,比如collect

      def collect(): Array[T] = withScope {
        val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
        Array.concat(results: _*)
      }

调用SparkContext进行运行Job

      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }

SparkContext运行Job,最终就是调用了DAG 进行job的调度,关于 DAG的具体会在后面一篇讲到

3. RDD的依赖关系



protected def getDependencies: Seq[Dependency[_]] = deps

RDD可以通过getDependencies获取到依赖的数组

    @DeveloperApi
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }


对Dependency来说会保存Parent 的RDD, 可以通过RDD的Dependency来获取双亲的RDD,这样就能溯源

依赖上整体分为Narrow 和Shuffle 两类,也有人叫窄依赖,宽依赖

NarrowDependency 分为三类

    1对1  OneToOneDependency:  常见MapRDD
    多对1 RangDependency: UnionRDD
    1 对部分 PruneDependency: 裁剪

ShuffleDependency 多对多,对应的是ShuffleRDD

只有Transformation的RDD之间才会有Dependency,而对Action来说是并不存在Dependency

整个RDD的分析,构建依赖,数据分片,最后通过Action提交到DAG调度,都是在Driver的主线程完成,这时候并没有构建好Job。    

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved