sergeda
sergeda

Reputation: 2211

Spark error "org.apache.spark.sql.AnalysisException: Can't extract value need struct type but got decimal(38,18)"

I have this case class:

case class AllData(positionId: Long, warehouse: String, product: String, amount: BigDecimal, amountTime: Long)

and dataset:

val data: Dataset[AllData]

and this code runs fine:

 val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
  .agg(
   max($"amount").as("maxAmount").as[DecimalType])
 statisticForAmounts.show(5)

but when I do:

statisticForAmounts.collect()

I get this strange error:

org.apache.spark.sql.AnalysisException: Can't extract value from maxAmount#101: need struct type but got decimal(38,18)

Here my schema:

root
 |-- value: string (nullable = true)
 |-- maxAmount: decimal(38,18) (nullable = true)

What's the cause of the problem and how it can be fixed?

Upvotes: 1

Views: 3895

Answers (2)

sergeda
sergeda

Reputation: 2211

After upgrading Spark from 2.1 version to 2.4 version I was able to replace code:

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
  .agg(
   max($"amount").as("maxAmount").as[DecimalType])

With:

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
  .agg(
   max($"amount").as("maxAmount").as[BigDecimal])

And everything running smoothly. Before upgrade there was no Encoder for BigDecimal

Upvotes: 1

Goldie
Goldie

Reputation: 164

This could be because of the BigDecimal incompatibility between spark and Scala. You may want to change it to Double and try it.

case class AllData(positionId: Long, warehouse: String, product: String, amount: Double, amountTime: Long)

val data1 = AllData(1,"WC","FC",12.11,123)

val data = spark.createDataset(Seq(data1))

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product).agg(max($"amount").as("maxAmount").as[Double])

scala> statisticForAmounts.show
+------+---------+                                                              
| value|maxAmount|
+------+---------+
|WC, FC|    12.11|
+------+---------+


scala> statisticForAmounts.collect
res36: Array[(String, Double)] = Array((WC, FC,12.11))

Upvotes: 1

Related Questions