solistice
solistice

Reputation: 63

How can I further reduce my Apache Spark task size

I'm trying to run the following code in scala on the Spark framework, but I get an extremely large task size (8MB)

tidRDD:RDD[ItemSet]
mh:MineHelper
x:ItemSet
broadcast_tid:Broadcast[Array[ItemSet]]
count:Int

tidRDD.flatMap(x => mh.mineFreqSets(x, broadcast_tid.value, count)).collect()

The reason I added the MinerHelper class was to make it serialisable, and it only contains given method. An ItemSet is a class with 3 private members and a few getter/setter methods, nothing out of the ordinary. I feel that this is the correct way to approach this problem, but Spark thinks otherwise. Am I making some gaping errors, or is it something small that's wrong?

Here's the warning:

WARN TaskSetManager: Stage 1 contains a task of very large size (8301 KB). The maximum recommended task size is 100 KB.

Upvotes: 3

Views: 3302

Answers (1)

Lomig Mégard
Lomig Mégard

Reputation: 1828

You're probably closing over this, forcing the whole enclosing object to be serialized.

You probably have something like the following:

class Foo {
  val outer = ??? 
  def f(rdd: RDD[ItemSet]): RDD[ItemSet] = {
    rdd.map(x => outer.g(x))
  }
}

In this case, during the serialization of the task, Spark will need the instance of the enclosing Foo. Indeed, when you are referencing outer, you really mean this.outer.

A simple fix is to put your outer variables in local ones:

class Foo {
  val outer = ??? 
  def f(rdd: RDD[ItemSet]): RDD[ItemSet] = {
    val _outer = outer         // local variable
    rdd.map(x => _outer.g(x))  // no reference to `this`
  }
}

Upvotes: 1

Related Questions