karoma
karoma

Reputation: 1558

Scala - ListBuffer emptying itself after every add in loop

I'm trying to add a number of objects to a Scala ListBuffer in a loop, but every time I add one it disappears on the next iteration of the loop.

When I print the contents of the ListBuffer before and after adding a new entry, I get the following output:

Before add: ListBuffer()

After add: ListBuffer(com.me.FeatureV2@20d953ba)

Before add: ListBuffer()

After add: ListBuffer(com.me.FeatureV2@6b768ce7)

Before add: ListBuffer()

After add: ListBuffer(com.me.FeatureV2@123f42d5)

Code:

def generateStatistics(df: DataFrame): List[FeatureV2] = {
    var features = ListBuffer[FeatureV2]()
    val dataColumn = "data"
    for (field <- df.schema.fieldNames){
      val columnType: String = df.select(field).dtypes(0)._2

      if (columnType == StringType.toString){
        val statsDf: DataFrame = getStats(df, field, dataColumn)
        for (row <- statsDf){
          println("Before add: " + features)
          val feature = new FeatureV2()
          feature.element = row.getString(0)
          feature.count = row.getLong(1)
          feature.sum = row.getDouble(2)
          feature.max = row.getDouble(3)
          feature.min = row.getDouble(4)
          feature.feature = field
          features += feature
          println("After add: " + features)
        }
      }
    }
    features.toList
  }

On occasion however, I get the following:

Before add: ListBuffer()

After add: ListBuffer(com.me.FeatureV2@1433183c)

Before add: ListBuffer(com.me.FeatureV2@1433183c)

After add: ListBuffer(com.me.FeatureV2@1433183c, com.me.FeatureV2@4b0df9e5)

Before add: ListBuffer()

After add: ListBuffer(com.me.FeatureV2@1e201b19)

This looks like it actually is populating the ListBuffer, but it's being cleared out. Something to do with garbage collection?

Upvotes: 1

Views: 1430

Answers (3)

pedrorijo91
pedrorijo91

Reputation: 7845

is the collection mutable?

also, when using Scala one should make an effort in doing FP.

df.schema.fieldNames.map {...}

would probably do the work you need. And since you have an if maybe a collect would be more appropriated

Upvotes: 0

David Griffin
David Griffin

Reputation: 13927

A Spark application is made up of a driver and executors. You control and create things from the Driver -- the executors get copies of variables that were in scope. So, the executors get copies of the ListBuffer. They append to their copies, which are lost when the task completes.

You could use collect() to pull the data into the driver to append to the ListBuffer there, or use broadcast variables.

See the documentation for a discussion.

Upvotes: 2

Mike Park
Mike Park

Reputation: 10931

Try changing for (row <- statsDf) to for (row <- statsDf.collect()).

If this fixes your problem, then your issues may be caused by the fact that foreach is running in one or more threads.

for (row <- stadsDf) is the actually calling DataFrame.foreach(f: Row => Unit), which is a distributed foreach where f may run on any number of threads or machines depending on your Spark master.

Upvotes: 3

Related Questions