Markus
Markus

Reputation: 3782

Spark java.lang.UnsupportedOperationException: empty collection

When I run this code, I get empty collection error in some cases.

    val result = df
                  .filter(col("channel_pk") === "abc")
                  .groupBy("member_PK")
                  .agg(sum(col("price") * col("quantityOrdered")) as "totalSum")
                  .select("totalSum")
                  .rdd.map(_ (0).asInstanceOf[Double]).reduce(_ + _)

The error happens at this line:

.rdd.map(_ (0).asInstanceOf[Double]).reduce(_ + _)

When collection is empty, I want result to be equal to 0. How can I do it?

Upvotes: 0

Views: 4684

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

When collection is empty, I want result to be equal to 0. How can I do it?

Before you do aggregation, just check if the dataframe has some rows or not

val result = if(df.take(1).isEmpty) 0 else df
  .filter(col("channel_pk") === "abc")
  .groupBy("member_PK")
  .agg(sum(col("price") * col("quantityOrdered")) as "totalSum")
  .select("totalSum")
  .rdd.map(_(0).asInstanceOf[Double]).reduce(_ + _)

or you can use count too

val result = if(df.count() == 0) 0 else df
  .filter(col("channel_pk") === "abc")
  .groupBy("member_PK")
  .agg(sum(col("price") * col("quantityOrdered")) as "totalSum")
  .select("totalSum")
  .rdd.map(_(0).asInstanceOf[Double]).reduce(_ + _)

Upvotes: 1

Noam Shaish
Noam Shaish

Reputation: 1623

The error appears only at that line because this is the first time you make some action. before that spark doesn't execute anything (laziness). your df is just empty. You can verify it by adding before: assert(!df.take(1).isEmpty)

Upvotes: 1

Related Questions