大数据技术 Spark Storage(1)集群下的区块管理
沉沙 2018-10-08 来源 : 阅读 1192 评论 0

摘要:本篇教程探讨了大数据技术 Spark Storage(1)集群下的区块管理,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 Spark Storage(1)集群下的区块管理,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

Storage模块

在Spark中提及最多的是RDD,而RDD所交互的数据是通过Storage来实现和管理
Storage模块整体架构
1. 存储层
在Spark里,单节点的Storage的管理是通过block来管理的,每个Block的存储可以在内存里或者在磁盘中,在BlockManager里既可以管理内存的存储,同时也管理硬盘的存储,存储的标识是通过块的ID来区分的。




2. 集群下的架构
2.1 架构
在集群下Spark的Block的管理架构使用Master-Slave模式

    Master : 拥有所有block的具体信息(本地和Slave节点)
    Slave : 通过master获取block的信息,并且汇报自己的信息

这里的Master并不是Spark集群中分配任务的Master,而是提交task的客户端Driver,这里并没有主备设计,因为Driver client是单点的,通常Driver client crash了,计算也没有结果了,在Storage 的集群管理中Master是由driver承担。

在Executor在运行task的时候,通过blockManager获取本地的block块,如果本地找不到,尝试通过master去获取远端的块

     for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
          val pieceId = BroadcastBlockId(id, "piece" + pid)
          logDebug(s"Reading piece $pieceId of $broadcastId")
          // First try getLocalBytes because there is a chance that previous attempts to fetch the
          // broadcast blocks have already fetched some of the blocks. In that case, some blocks
          // would be available locally (on this executor).
          bm.getLocalBytes(pieceId) match {
            case Some(block) =>
              blocks(pid) = block
              releaseLock(pieceId)
            case None =>
              bm.getRemoteBytes(pieceId) match {
                case Some(b) =>
                  if (checksumEnabled) {
                    val sum = calcChecksum(b.chunks(0))
                    if (sum != checksums(pid)) {
                      throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
                        s" $sum != ${checksums(pid)}")
                    }
                  }
                  // We found the block from remote executors/driver's BlockManager, so put the block
                  // in this executor's BlockManager.
                  if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
                    throw new SparkException(
                      s"Failed to store $pieceId of $broadcastId in local BlockManager")
                  }
                  blocks(pid) = b
                case None =>
                  throw new SparkException(s"Failed to get $pieceId of $broadcastId")
              }
          }
        }

2.2 Executor获取块内容的位置


唯一的blockID: 
broadcast_0_piece0
请求Master获取该BlockID所在的 Location,也就是BlockManagerId的集合

    /** Get locations of the blockId from the driver */
      def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
        driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
      }


唯一的BlockManagerId

BlockManagerId(driver, 192.168.121.101, 55153, None)
Executor ID, executor ID, 对driver来说就是driver
Host: executor/driver IP
Port:    executor/driver Port

每一个executor, 和driver 都生成唯一的BlockManagerId
2.3 Executor获取块的内容

    def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
        logDebug(s"Getting remote block $blockId")
        require(blockId != null, "BlockId is null")
        var runningFailureCount = 0
        var totalFailureCount = 0
        val locations = getLocations(blockId)
        val maxFetchFailures = locations.size
        var locationIterator = locations.iterator
        while (locationIterator.hasNext) {
          val loc = locationIterator.next()
          logDebug(s"Getting remote block $blockId from $loc")
          val data = try {
            blockTransferService.fetchBlockSync(
              loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
          } catch {
            case NonFatal(e) =>
              runningFailureCount += 1
              totalFailureCount += 1
     
              if (totalFailureCount >= maxFetchFailures) {
                // Give up trying anymore locations. Either we've tried all of the original locations,
                // or we've refreshed the list of locations from the master, and have still
                // hit failures after trying locations from the refreshed list.
                logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
                  s"Most recent failure cause:", e)
                return None
              }
     
              logWarning(s"Failed to fetch remote block $blockId " +
                s"from $loc (failed attempt $runningFailureCount)", e)
     
              // If there is a large number of executors then locations list can contain a
              // large number of stale entries causing a large number of retries that may
              // take a significant amount of time. To get rid of these stale entries
              // we refresh the block locations after a certain number of fetch failures
              if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
                locationIterator = getLocations(blockId).iterator
                logDebug(s"Refreshed locations from the driver " +
                  s"after ${runningFailureCount} fetch failures.")
                runningFailureCount = 0
              }
     
              // This location failed, so we retry fetch from a different one by returning null here
              null
          }
     
          if (data != null) {
            return Some(new ChunkedByteBuffer(data))
          }
          logDebug(s"The value of block $blockId is null")
        }
        logDebug(s"Block $blockId not found")
        None
      }


通过获取的BlockManagerId的集合列表,顺序的从列表中取出一个拥有该Block的服务器,通过

    blockTransferService.fetchBlockSync(
              loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()

同步的获取块的内容,如果该块不存在,则换下一个拥有该Block的服务器
2.4 BlockManager注册
Driver 初始化SparkContext.init 的时候,会初始化BlockManager.initialize

    val idFromMaster = master.registerBlockManager(
          id,
          maxMemory,
          slaveEndpoint)

会通过master 注册BlockManager

      def registerBlockManager(
          blockManagerId: BlockManagerId,
          maxMemSize: Long,
          slaveEndpoint: RpcEndpointRef): BlockManagerId = {
        logInfo(s"Registering BlockManager $blockManagerId")
        val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
          RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
        logInfo(s"Registered BlockManager $updatedId")
        updatedId
      }


在BlockManagerMaster里,我们看到了endpoint是强制的driver,也就是默认是driver 是master
无论driver,还是executor都是初始化后BlockManager,发消息给driver master进行注册,唯一不同的是driver标识自己的ID是driver,而executor是按照executor id来标识自己的

2.5 Driver Master的endpoint
前面一节已经介绍过无论driver还是executor 都会发送消息到Driver的Master,在Driver 和Executor里SparkEnv.create的时候会初始化BlockManagerMaster

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
          BlockManagerMaster.DRIVER_ENDPOINT_NAME,
          new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
          conf, isDriver)

注册一个lookup的endpoint

    def registerOrLookupEndpoint(
            name: String, endpointCreator: => RpcEndpoint):
          RpcEndpointRef = {
          if (isDriver) {
            logInfo("Registering " + name)
            rpcEnv.setupEndpoint(name, endpointCreator)
          } else {
            RpcUtils.makeDriverRef(name, conf, rpcEnv)
          }
        }


代码中可以看到只有isDriver的时候才会setup一个rpc的endpoint,默认是netty的rpc环境,命名为:BlockManagerMaster

spark://BlockManagerMaster@192.168.121.101:40978

所有的driver, executor都会向master 40978发消息

2.6 Master和Executor消息格式
下面的代码每个case都是master和executor的消息格式

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
          context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
     
        case _updateBlockInfo @
            UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
          context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
          listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
     
        case GetLocations(blockId) =>
          context.reply(getLocations(blockId))
     
        case GetLocationsMultipleBlockIds(blockIds) =>
          context.reply(getLocationsMultipleBlockIds(blockIds))
     
        case GetPeers(blockManagerId) =>
          context.reply(getPeers(blockManagerId))
     
        case GetExecutorEndpointRef(executorId) =>
          context.reply(getExecutorEndpointRef(executorId))
     
        case GetMemoryStatus =>
          context.reply(memoryStatus)
     
        case GetStorageStatus =>
          context.reply(storageStatus)
     
        case GetBlockStatus(blockId, askSlaves) =>
          context.reply(blockStatus(blockId, askSlaves))
     
        case GetMatchingBlockIds(filter, askSlaves) =>
          context.reply(getMatchingBlockIds(filter, askSlaves))
     
        case RemoveRdd(rddId) =>
          context.reply(removeRdd(rddId))
     
        case RemoveShuffle(shuffleId) =>
          context.reply(removeShuffle(shuffleId))
     
        case RemoveBroadcast(broadcastId, removeFromDriver) =>
          context.reply(removeBroadcast(broadcastId, removeFromDriver))
     
        case RemoveBlock(blockId) =>
          removeBlockFromWorkers(blockId)
          context.reply(true)
     
        case RemoveExecutor(execId) =>
          removeExecutor(execId)
          context.reply(true)
     
        case StopBlockManagerMaster =>
          context.reply(true)
          stop()
     
        case BlockManagerHeartbeat(blockManagerId) =>
          context.reply(heartbeatReceived(blockManagerId))
     
        case HasCachedBlocks(executorId) =>
          blockManagerIdByExecutor.get(executorId) match {
            case Some(bm) =>
              if (blockManagerInfo.contains(bm)) {
                val bmInfo = blockManagerInfo(bm)
                context.reply(bmInfo.cachedBlocks.nonEmpty)
              } else {
                context.reply(false)
              }
            case None => context.reply(false)
          }
      }


2.7 Master结构关系



在Master上会保存每一个executor所对应的BlockManagerID和BlockManagerInfo,而在BlockManagerInfo中保存了每个block的状态
Executor通过心跳主动汇报自己的状态,Master更新EndPoint中Executor的状态
Executor 中的block的状态更新也会汇报给Master,只是跟新Master状态,但不会通知其他的Executor


在Executor和Master交互中是Executor主动推和获取数据的,Master只是管理executor的状态,以及Block的所在的Driver、Executor的位置及其状态,负载较小,Master没有考虑可用性,通常Master节点就是提交任务的Driver的节点。
   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程