user3405300
user3405300

Reputation: 71

Spark: PageRank example when iteration too large throws stackoverflowError

I test the spark default PageRank example and set the iteration to 1024, then it throws stackoverflowerror. I also met this problem in my other program.How can i solve it.

object SparkPageRank {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
      System.exit(1)
    }
    var iters = args(2).toInt
    val ctx = new SparkContext(args(0), "PageRank",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
    val lines = ctx.textFile(args(1), 1)
    val links = lines.map{ s => val parts = s.split("\\s+")
    (parts(0), parts(1))
    }.distinct().groupByKey().cache()
    var ranks = links.mapValues(v => 1.0)

    for (i <- 1 to iters) {
        val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
        val size = urls.size
        urls.map(url => (url, rank / size))
      }
    ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
    }

    val output = ranks.collect()
    output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))

    System.exit(0)
  }
}

I post the error here.

    [spark-akka.actor.default-dispatcher-15] ERROR LocalActorRefProvider(akka://spark) - guardian failed, shutting down system
java.lang.StackOverflowError
    at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
    at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
    at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
    at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
    at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:312)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
    at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:326)

Upvotes: 7

Views: 3088

Answers (2)

viirya
viirya

Reputation: 674

It is because these transformations in the for-loop produce very long dependencies in your rdd. When you try to run your spark job, the recursively visiting on your rdd would cause stackoverflow error.

To solve this problem, you can use checkpoint() on your rdd. cache() would not help you evaluate your rdd immediately.

So you should call cache() and checkpoint() on your intermediate rdd after certain iterations and manually evaluate it to clear its dependencies.

Upvotes: 4

Wildfire
Wildfire

Reputation: 6418

My guess is that the error occurs because intermediate RDDs don't get evaluated until collect(). And upon collect they are evaluated recursively.

Try to add cache() to evaluate RDDs on each iteration, it will probably help:

ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _).cache

Upvotes: 0

Related Questions