Reputation: 3782
When I run this code, I get empty collection error in some cases.
val result = df
.filter(col("channel_pk") === "abc")
.groupBy("member_PK")
.agg(sum(col("price") * col("quantityOrdered")) as "totalSum")
.select("totalSum")
.rdd.map(_ (0).asInstanceOf[Double]).reduce(_ + _)
The error happens at this line:
.rdd.map(_ (0).asInstanceOf[Double]).reduce(_ + _)
When collection is empty, I want result
to be equal to 0. How can I do it?
Upvotes: 0
Views: 4684
Reputation: 41957
When collection is empty, I want result to be equal to 0. How can I do it?
Before you do aggregation, just check if the dataframe has some rows or not
val result = if(df.take(1).isEmpty) 0 else df
.filter(col("channel_pk") === "abc")
.groupBy("member_PK")
.agg(sum(col("price") * col("quantityOrdered")) as "totalSum")
.select("totalSum")
.rdd.map(_(0).asInstanceOf[Double]).reduce(_ + _)
or you can use count
too
val result = if(df.count() == 0) 0 else df
.filter(col("channel_pk") === "abc")
.groupBy("member_PK")
.agg(sum(col("price") * col("quantityOrdered")) as "totalSum")
.select("totalSum")
.rdd.map(_(0).asInstanceOf[Double]).reduce(_ + _)
Upvotes: 1
Reputation: 1623
The error appears only at that line because this is the first time you make some action. before that spark doesn't execute anything (laziness). your df is just empty.
You can verify it by adding before:
assert(!df.take(1).isEmpty)
Upvotes: 1