大数据技术之spark java.lang.StackOverflowError
沉沙 2018-12-26 来源 : 阅读 1873 评论 0

摘要:本篇教程探讨了大数据技术之spark java.lang.StackOverflowError,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术之spark java.lang.StackOverflowError,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

大数据技术之spark java.lang.StackOverflowError

在工作中使用spark的一个主要内容就是从多个路径下搜集数据并进行处理。常用的代码大致如下:

    val paths = obtainPaths()    val rdd = paths.map(readData).reduce(_ ++ _)    val arr = rdd.collect()

在readData方法中调用SparkContext的sequenceFile方法读取文件创建RDD集合。而后调用RDD的reduce和union(即“++”)方法将多个RDD集合进行合并。

这里的代码通常是不会报错的。但是执行合并后的RDD集合的Action算子(这里是collect方法)的时候偶尔会遇到StackOverflowError。异常信息如下:

18/06/29 17:23:14 ERROR executor.Executor: Exception in task 28.0 in stage 0.0 (TID 28)java.lang.StackOverflowError        at java.lang.Exception.<init>(Exception.java:102)        at java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:89)        at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)        at sun.reflect.GeneratedSerializationConstructorAccessor13.newInstance(Unknown Source)        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)        at java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:967)        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

从异常信息上来看,是在使用java.io.ObjectInputStream执行序列化的时候出现了递归或者死循环,因为栈空间不足导致的这个问题。

继续跟踪调试,发现主要是因为处理的文件过多导致的——一般处理的文件数量超过450(大致数值)以后就会遇到这个问题。查了一些资料,了解到根本原因是RDD Lineage过长:代码中每执行一次RDD.union操作就会增加一次RDD Lineage的步长。

 解决问题 

现在根据问题的特征和根源,找到的解决思路大致上有这么几个:

增加执行时的栈空间;

避免一次处理过多文件;

定期削减RDD Lineage的长度;

避免创建太长的Lineage。

接下来逐个解释下上面的思路。

 增加栈空间 

从异常信息中可以看到StatckOverflowError是在Executor中抛出的,所以要调整Executor栈空间,可以在job提交参数中添加如下内容:

--conf spark.executor.extraJavaOptions="-Xss80M "

这项配置将Executor的栈空间设置为80M。

然后我们测试一下,测试目标是一次处理720个文件。执行结果OK。

再次测试,仍然是80M栈空间,目标一次处理4320个文件。仍然能执行成功。

可知这个方案在一定程度上是可行的,至少可以用来做任务优化。

 避免一次处理过多的文件 

这个思路是最简单的:既然一次处理太多文件会报错,那么就分成多个批次来处理好了。

调整后的代码如下:

    val paths = obtainPaths()     val pathItr = paths.grouped(360)     val buffer = ListBuffer[String]()        for (p <- pathItr) {      val list =  paths.map(readData).reduce(_ ++ _).collect().toList      buffer ++= list    }

这种方式肯定是可行的,但是用起来多少有点儿麻烦:需要将中间结果集临时存储起来而后再一起使用。中间结果集要是比较小的话还好说,一个变量就足够了;中间结果集要是太大了就得先保存到HDFS上,而后再做二次处理。

 削减RDD Lineage的长度 

既然问题是因为RDD Lineage长度过长导致的,那么就需要在RDD Lineage变得太长之前,将之削减掉一部分。做法是对合并出的RDD结果集定期做checkpoint,并随意执行一个Action算子。

代码如下:

    val paths = obtainPaths()     val count = new AtomicInteger(0)    val arr = paths.map(readData).fold(sc.emptyRDD[String]) {      (a, b) => {        if(count.getAndIncrement() % 360 == 0){          a.checkpoint()          a.collect()        }        a ++ b      }    }.collect()

我一度依赖过这个方案。但是这个方案有一个很大的缺点:就是执行效率太低——比上一种方案效率还低,可以说是执行时间最慢的一种方式了。checkpoint操作实际上是将每个rdd都存储到了硬盘上,其效率可想而知。

 避免创建太长的Lineage 

前面说的第二种方式也可以说是这种思路的实现。对这个问题来说,推荐得比较多的还是使用SparkContext.union方法来替换RDD.union方法。代码大致如下:

    val paths = obtainPaths()     val arr = sc.union(paths.map(readData)).collect()

这个方式也是我最初寄望最多的一个方案。

本来希望能通过这个方案一劳永逸地解决这个问题。可是在测试的时候遇到了些问题:这个方案也不能处理路径太多(800个以上)的问题,但是也没有立即报错,而是阻塞住了。执行两三个小时后提示任务执行失败。在日志中可以找到如下错误提示:

18/07/01 14:48:02 INFO storage.BlockManagerMaster: Registered BlockManager18/07/01 15:00:33 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@40fe6823,BlockManagerId(2, 172.18.12.197, 44839))] in 1 attemptsorg.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:449)        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:470)        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved