大数据技术 CacheManager运行原理流程图和源码详解
沉沙 2018-09-21 来源 : 阅读 969 评论 0

摘要:本篇教程探讨了大数据技术 CacheManager运行原理流程图和源码详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 CacheManager运行原理流程图和源码详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

Spark出色的原因:

1、Spark基于RDD构成了一体化、多元化的大数据处理中心(不需要再处理多种范式来部署多种框架,只要Spark!!!降低成本投入获得更高的产出);

2、迭代,因为在计算的时候迭代,在构建复杂算法的时候非常方便(图计算、机器学习、数据仓库),而CacheManager 在多重迭代的时候非常重要;

==========CacheManager分析============

1、CacheManager管理的是缓存,而缓存可以是基于内存的缓存,也可以是基于磁盘的缓存;

2、CacheManager需要通过BlockManager来操作数据;

3、每当Task运行的时候,会调用RDD的conpute方法,而compute方法会调用iterator方法,从下面代码中可以看到默认的RDD是基于内存的,计算一次后基本从CacheManager获得;

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ‘‘not‘‘ be called by users directly, but is available for implementors of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

==========CacheManager源码详解============

1、Cache在工作的时候会最大化的保留数据,但是数据不一定绝对完整,因为当前的计算如果需要内存空间的话,那么Cache在内存中的数据必须让出空间,此时如果在RDD持久化的时候同时指定了可以把数据放在disk上,那么部分cache的数据就可以从内存转入磁盘,否则的话,数据就会丢失。所以Cache不一定可靠,所以必须得用getOrCompute来确定数据能取到!!!

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
    rdd: RDD[T],
    partition: Partition,
    context: TaskContext,
    storageLevel: StorageLevel): Iterator[T] = {

  val key = RDDBlockId(rdd.id, partition.index)
  logDebug(s"Looking for partition $key")
  blockManager.get(key) match {
    case Some(blockResult) =>
      // Partition is already materialized, so just return its values
      val existingMetrics = context.taskMetrics
        .getInputMetricsForReadMethod(blockResult.readMethod)
      existingMetrics.incBytesRead(blockResult.bytes)

      val iter = blockResult.data.asInstanceOf[Iterator[T]]
      new InterruptibleIterator[T](context, iter) {
        override def next(): T = {
          existingMetrics.incRecordsRead(1)
          delegate.next()
        }
      }
    case None =>
      // Acquire a lock for loading this partition
      // If another thread already holds the lock, wait for it to finish return its results
      val storedValues = acquireLockForPartition[T](key)
      if (storedValues.isDefined) {
        return new InterruptibleIterator[T](context, storedValues.get)
      }

      // Otherwise, we have to load the partition ourselves
      try {
        logInfo(s"Partition $key not found, computing it")
        val computedValues = rdd.computeOrReadCheckpoint(partition, context)

        // If the task is running locally, do not persist the result
        if (context.isRunningLocally) {
          return computedValues
        }

        // Otherwise, cache the values and keep track of any updates in block statuses
        val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
        val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
        val metrics = context.taskMetrics
        val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
        metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
        new InterruptibleIterator(context, cachedValues)

      } finally {
        loading.synchronized {
          loading.remove(key)
          loading.notifyAll()
        }
      }
  }
}

2、具体CacheManager在获得缓存数据的时候,首先会通过BlockManager来抓到数据(其中getLocal和getRemote在上一讲有提及);

/**
 * Get a block from the block manager (either local or remote).
 */
def get(blockId: BlockId): Option[BlockResult] = {
  val local = getLocal(blockId)
  if (local.isDefined) {
    logInfo(s"Found block $blockId locally")
    return local
  }
  val remote = getRemote(blockId)
  if (remote.isDefined) {
    logInfo(s"Found block $blockId remotely")
    return remote
  }
  None
}

3、缓存没有数据算的时候,先要锁数据,这里还是从blockManager中获得数据(一般走到这里从这里也取不到的);

/**
 * Acquire a loading lock for the partition identified by the given block ID.
 *
 * If the lock is free, just acquire it and return None. Otherwise, another thread is already
 * loading the partition, so we wait for it to finish and return the values loaded by the thread.
 */
private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
  loading.synchronized {
    if (!loading.contains(id)) {
      // If the partition is free, acquire its lock to compute its value
      loading.add(id)
      None
    } else {
      // Otherwise, wait for another thread to finish and return its result
      logInfo(s"Another thread is loading $id, waiting for it to finish...")
      while (loading.contains(id)) {
        try {
          loading.wait()
        } catch {
          case e: Exception =>
            logWarning(s"Exception while waiting for another thread to load $id", e)
        }
      }
      logInfo(s"Finished waiting for $id")
      val values = blockManager.get(id)
      if (!values.isDefined) {
        /* The block is not guaranteed to exist even after the other thread has finished.
         * For instance, the block could be evicted after it was put, but before our get.
         * In this case, we still need to load the partition ourselves. */
        logInfo(s"Whoever was loading $id failed; we‘ll try it ourselves")
        loading.add(id)
      }
      values.map(_.data.asInstanceOf[Iterator[T]])
    }
  }
}

4、如果CacheManager没有通过BlockManager获得缓存内容的话,此时会通过BlockManager的RDD的如下方法来获得数据:

val computedValues = rdd.computeOrReadCheckpoint(partition, context)

/**
 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
 */
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

上述方法首先会查看当前的RDD是否进行了checkpoint,如果进行了的话,就直接读取checkpoint的数据,否则的话,就必须进行计算,计算之后会通过putInBlockManager会把数据按照StorageLevel重新缓存起来;

备注:所以如果多步骤迭代的话,有了checkpoint,就极大提升效率了!

5、缓存的时候如果需要放在内存中,内存足够的情况下,看到一点内存就放一下,看到一点内存就放一下,一点一点放,实在放不完,就放disk;

private def putInBlockManager[T](
    key: BlockId,
    values: Iterator[T],
    level: StorageLevel,
    updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
    effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

  val putLevel = effectiveStorageLevel.getOrElse(level)
  if (!putLevel.useMemory) {
    /*
     * This RDD is not to be cached in memory, so we can just pass the computed values as an
     * iterator directly to the BlockManager rather than first fully unrolling it in memory.
     */
    updatedBlocks ++=
      blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
    blockManager.get(key) match {
      case Some(v) => v.data.asInstanceOf[Iterator[T]]
      case None =>
        logInfo(s"Failure to store $key")
        throw new BlockException(key, s"Block manager failed to return cached value for $key!")
    }
  } else {
    /*
     * This RDD is to be cached in memory. In this case we cannot pass the computed values
     * to the BlockManager as an iterator and expect to read it back later. This is because
     * we may end up dropping a partition from memory store before getting it back.
     *
     * In addition, we must be careful to not unroll the entire partition in memory at once.
     * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
     * single partition. Instead, we unroll the values cautiously, potentially aborting and
     * dropping the partition to disk if applicable.
     */
    blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
      case Left(arr) =>
        // We have successfully unrolled the entire partition, so cache it in memory
        updatedBlocks ++=
          blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
        arr.iterator.asInstanceOf[Iterator[T]]
      case Right(it) =>
        // There is not enough space to cache this partition in memory
        val returnValues = it.asInstanceOf[Iterator[T]]
        if (putLevel.useDisk) {
          logWarning(s"Persisting partition $key to disk instead.")
          val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
            useOffHeap = false, deserialized = false, putLevel.replication)
          putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
        } else {
          returnValues
        }
    }
  }
}    

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程