大数据技术 Spark Core(4)用LogQuery的例子来说明Executor是如何运算RDD的算子
沉沙 2018-10-08 来源 : 阅读 1285 评论 0

摘要:本篇教程探讨了大数据技术 Spark Core(4)用LogQuery的例子来说明Executor是如何运算RDD的算子,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 Spark Core(4)用LogQuery的例子来说明Executor是如何运算RDD的算子,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

1. 究竟是怎么运行的?
很多的博客里大量的讲了什么是RDD, Dependency, Shuffle... 但是究竟那些Executor是怎么运行你提交的代码段的?
下面是一个日志分析的例子,来自Spark的example

      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("Log Query")
        val sc = new SparkContext(sparkConf)
        val dataSet =
          if (args.length == 1) sc.textFile(args(0)) else sc.parallelize(exampleApacheLogs)
        // scalastyle:off
        val apacheLogRegex =
          """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
        // scalastyle:on
        /** Tracks the total query count and number of aggregate bytes for a particular group. */
        class Stats(val count: Int, val numBytes: Int) extends Serializable {
          def merge(other: Stats): Stats = {
            new Stats(count + other.count, numBytes + other.numBytes)
          }
          override def toString: String = "bytes=%s\tn=%s".format(numBytes, count)
        }
     
        def extractKey(line: String): (String, String, String) = {
          apacheLogRegex.findFirstIn(line) match {
            case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
              if (user != "\"-\"") (ip, user, query)
              else (null, null, null)
            case _ => (null, null, null)
          }
        }
     
        def extractStats(line: String): Stats = {
          apacheLogRegex.findFirstIn(line) match {
            case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
              new Stats(1, bytes.toInt)
            case _ => new Stats(1, 0)
          }
        }
     
        dataSet.map(line => (extractKey(line), extractStats(line)))
          .reduceByKey((c, d) => c.merge(d))
          .collect().foreach{
            case (user, query) => println("%s\t%s".format(user, query))}
     
        sc.stop()
      }


在map的RDD算子里,自定义了extractKey, extractStats函数,而在reduceByKey的RDD又自定义了一个相同的key的merge函数
这些函数是如何被传递到executor里并且进行运算的呢?
1.1 RDD,ShuffleDependency
在前面的博文(Executor上是如何launch task的)中,已经讨论过如何获取到Driver的RDD, Dependency, 那么RDD如何能够运行这些函数呢?

Execute获取的DAG里提交的ShuffleMapTask是在TaskDecription中serializedTask中反序列化出来
ShuffleMapTask的RunTask的方法

     override def runTask(context: TaskContext): MapStatus = {
        // Deserialize the RDD using the broadcast variable.
        val threadMXBean = ManagementFactory.getThreadMXBean
        val deserializeStartTime = System.currentTimeMillis()
        val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
        _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
        } else 0L
     
        var writer: ShuffleWriter[Any, Any] = null
        try {
          val manager = SparkEnv.get.shuffleManager
          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
          writer.stop(success = true).get
        } catch {
          case e: Exception =>
            try {
              if (writer != null) {
                writer.stop(success = false)
              }
            } catch {
              case e: Exception =>
                log.debug("Could not stop writer", e)
            }
            throw e
        }
      }

看到了通过shufflewrite去写迭代的rdd数据
1.1.1 ShuffleWrite
ShuffleWrite的构建是通过shuffleManager来获取的,在SortShuffleManager.scala中

     /** Get a writer for a given partition. Called on executors by map tasks. */
      override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Int,
          context: TaskContext): ShuffleWriter[K, V] = {
        numMapsForShuffle.putIfAbsent(
          handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
        val env = SparkEnv.get
        handle match {
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
            new UnsafeShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf)
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              bypassMergeSortHandle,
              mapId,
              context,
              env.conf)
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
        }
      }

在ShuffleDependency中保存着ShuffleHandle, ShuffleHandle中也保存着Dependency

    在Driver DAG 中registerShuffle中dependency决定着使用什么ShuffleHandle
    在Executor的shuffleManager中是由dependency中的ShuffleHandle来决定什么ShuffleWrite

题外话:Dependency本身就可以直接决定shuffleWrite,整个ShuffleHandle只是在SortShuffleWriter的时候用于获取了dependency, Executor端SortShuffleWriter本身就能获取到Dependency,ShuffleHandle感觉就是一个鸡肋。

在日志分析的这个代码案例中,返回的是SortShuffleWriter

1.1.2 RDD.iterator
在ShuffleMapTask中的runTask方法

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])


writer在调用的write函数中传递了rdd.iterator,也就是通过rdd构造的迭代器

     final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
          getOrCompute(split, context)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }

Map的rdd的构造迭代器MapPartitionsRDD,MapPartitionsRDD并没有设置缓存或者存储,StorageLevel是NONE,调用computerOrReadCheckpoint方法

      /**
       * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
       */
      private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
      {
        if (isCheckpointedAndMaterialized) {
          firstParent[T].iterator(split, context)
        } else {
          compute(split, context)
        }
      }

也没有做过checkpointed ,调用compute方法

      override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))

先来看fistParent

      /** Returns the first parent RDD */
      protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
        dependencies.head.rdd.asInstanceOf[RDD[U]]
      }

每个RDD都会保存一个Dependency的数组,Dependency里有RDD的属性,而Dependency数组的头一个dependency的RDD,就是处理数据的首个RDD,也就是如下的代码里的dataSet

    val dataSet =
          if (args.length == 1) sc.textFile(args(0)) else sc.parallelize(exampleApacheLogs)

我们以parallelize为例子,所对应的RDD就是ParallelCollectionRDD
回到

firstParent[T].iterator(split, context))

iterator函数就是前面的RDD函数,StorageLevel依然是NONE,也没有做过checkpointed,依然还是调用compute的方法

      override def compute(s: Partition, context: TaskContext): Iterator[T] = {
        new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
      }

生成了一个InterruptibleIterator迭代器,迭代器本质只是一个代理的迭代器

    @DeveloperApi
    class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
      extends Iterator[T] {
     
      def hasNext: Boolean = {
        // TODO(aarondav/rxin): Check Thread.interrupted instead of context.interrupted if interrupt
        // is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
        // (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
        // introduces an expensive read fence.
        if (context.isInterrupted) {
          throw new TaskKilledException
        } else {
          delegate.hasNext
        }
      }
     
      def next(): T = delegate.next()
    }

当发现有打断命令的时候,直接抛出TaskKilledException的异常,其所代理的iterator 是

s.asInstanceOf[ParallelCollectionPartition[T]].iterator

ParallelCollectionRDD的Partition就是ParallelCollectionPartition

    private[spark] class ParallelCollectionPartition[T: ClassTag](
        var rddId: Long,
        var slice: Int,
        var values: Seq[T]
      ) extends Partition with Serializable {
     
      def iterator: Iterator[T] = values.iterator
       .......
    }

Values是需要支持序列化的数组,在Driver端ParallelCollectionRDD中将数据Data进行了ParallelCollectionPartition的分片,分片的数据Values被保存在了ParallelCollectionPartition里,数据并没有被保存在ParallelCollectionRDD中,所以进行计算的数据并不是通过RDD传递过来的,而是通过反序列化ShuffleMapTask获得的,走的是直接的rpc通道

    private[spark] class ShuffleMapTask(
        stageId: Int,
        stageAttemptId: Int,
        taskBinary: Broadcast[Array[Byte]],
        partition: Partition,
        @transient private var locs: Seq[TaskLocation],
        metrics: TaskMetrics,
        localProperties: Properties,
        jobId: Option[Int] = None,
        appId: Option[String] = None,
        appAttemptId: Option[String] = None)
      extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
        appId, appAttemptId)


回到MapPartitionsRDD原来的函数中去:

      override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))

要看看f是什么?RDD.map函数

      def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }


我们在看看我们是如何调用map函数的:

dataSet.map(line => (extractKey(line), extractStats(line)))


f(context, split.index, firstParent[T].iterator(split, context))就是调用了(context, pid,iter) =>iter.map(cleanF) 关键的是iter.map函数这是scala的基本函数,查看scala代码Iterator.scala

     def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
        def hasNext = self.hasNext
        def next() = f(self.next())
      }

返回的可以简单的认为AbstractIterator,self 指向的是InterruptibleIterator,f 就是 line => (extractKey(line), extractStats(line))
我们来看ExternalSorter.scala通过迭代器获取Partiton的数据并进行运算的代码

    while (records.hasNext) {
            addElementsRead()
            kv = records.next()
            map.changeValue((getPartition(kv._1), kv._1), update)
            maybeSpillCollection(usingMap = true)
          }


    AbstractIterator.hasNext -> InterruptibleIterator.hasNext ->  Elements( Seq.interator).hasNext -> def hasNext: Boolean = index < end
    AbstractIterator.next() -> InterruptibleIterator.next() -> Elements( Seq.interator).next(). -> f(InterruptibleIterator.next()) ->(extractKey(InterruptibleIterator.next()), extractStats(InterruptibleIterator.next()))

运算extractKey, extractStats后返回的是一个Product2[Tuple3(String,String,String),Stats] KV值

还记得executor会loadDriver的jar么?虽然在scala里所定义函数都默认支持反序列化,但是在运行方法并不需要反序列化,只要加载jar包,classload 这个我们写的driver的类就可以了。
1.1.3 reduceByKey算子
在LogQuery中

.reduceByKey((c, d) => c.merge(d))

我们来看PairRDDFunction.scala中的reduceByKey,为什么PairRDDFunction不是RDD在前面的博客已经描述过

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
      }

combineByKeyWithClassTag函数中

    def combineByKeyWithClassTag[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
        if (keyClass.isArray) {
          if (mapSideCombine) {
            throw new SparkException("Cannot use map-side combining with array keys.")
          }
          if (partitioner.isInstanceOf[HashPartitioner]) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
        }
        val aggregator = new Aggregator[K, V, C](
          self.context.clean(createCombiner),
          self.context.clean(mergeValue),
          self.context.clean(mergeCombiners))
        if (self.partitioner == Some(partitioner)) {
          self.mapPartitions(iter => {
            val context = TaskContext.get()
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
          }, preservesPartitioning = true)
        } else {
          new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
        }
      }


在以前都没有介绍过Aggregator,我们来介绍一下这个Aggregator,Aggregator有三个关键函数

    createCombiner: 通过Map获得的新KV, 在Key不存在的情况下将V转化为C
    mergeValue: 通过Map获得的新KV, 在已经存在相同的Key情况下,将新获得的V聚合到C
    mergeCombiners: 分布式计算的时候,最后要每个RDD的分区最后汇总,汇总的时候对相同的Key,已经聚合的C和另一个分区已经聚合的C再次聚合

在logquery的例子中,mergeValue, mergeCombiners 就是 (c,d)  =>c.merge(d)            createCombiner就是 stats不变
还是回到ExternalSorter.scala的insertAll中

    val mergeValue = aggregator.get.mergeValue
          val createCombiner = aggregator.get.createCombiner
          var kv: Product2[K, V] = null
          val update = (hadValue: Boolean, oldValue: C) => {
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
          }
          while (records.hasNext) {
            addElementsRead()
            kv = records.next()
            map.changeValue((getPartition(kv._1), kv._1), update)
            maybeSpillCollection(usingMap = true)
          }

我们看到在map.changeValue的时候,通过update的方法更新相同的key

    val update = (hadValue: Boolean, oldValue: C) => {
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
          }

mergeValue,createCombiner就是从Aggregator中获取到的,而Aggregator被保存在ShuffledRDD和ShuffledDependency中,ShuffledDependency是通过Driver RPC传递给Executor的,所以可以从ShuffledDependency获取到Aggregator,通过Aggregator里指定的算法进行KV的操作,而mergeValue就是Driver中的c.merge(d),因为c 是stats 对象

        class Stats(val count: Int, val numBytes: Int) extends Serializable {
          def merge(other: Stats): Stats = {
            new Stats(count + other.count, numBytes + other.numBytes)
          }
          override def toString: String = "bytes=%s\tn=%s".format(numBytes, count)
        }

调用了Stats.merge的方法

2. 总结

    通过反序列化RDD(不是ShuffleRDD),通过Dependency的列表获的最初获取数据的RDD的迭代器A
    Map算子对迭代器A重新封装AbstractIterator,在迭代器A获取结果后进行Map算子里的函数调用line => (extractKey(line), extractStats(line)),返回KV的结果
    reduceByKey算子里的函数传递是通过ShuffledDependency里的aggregator进行传递
    Executor 只要对迭代器AbstractIterator进行迭代获取KV,调用aggregator里的方法进行相同的K对V进行操作,完成Driver里面的main函数定义的RDD运算。    

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程