PiFace
PiFace

Reputation: 621

How to define methods to deal with Datasets with parametrized types?

I'm trying to define some functions that take Datasets (typed DataFrames) as input and produces another as output, and I want them to be flexible enough to deal with parametrized types. In this example, I need a column to represent the ID of users, but it doesn't matter to my functions if that ID is an Int, a Long, a String, etc. That's why my case classes have this type parameter A.

I tried at first simply wrting my function and using Dataset instead of DataFrame:

import org.apache.spark.sql.Dataset

case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)

def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
  ds.select().as[OutputT[A]] // suppose there are some transformations here
}

... but I got this error:

Unable to find encoder for type OutputT[A]. An implicit Encoder[OutputT[A]] is needed
to store OutputT[A] instances in a Dataset. Primitive types (Int, String, etc) and
Product types (case classes) are supported by importing spark.implicits._  Support
for serializing other types will be added in future releases.

So I tried providing an Encoder for my type:

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)

implicit def enc[A]: Encoder[InputT[A]] = implicitly(ExpressionEncoder[OutputT[A]])

def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
  ds.select().as[OutputT[A]] // suppose there are some transformations here
}

And now I get this error:

No TypeTag available for OutputT[A]

If the code is the same as above, but without type parameters (e.g., using String instead of A), then there are no errors.

Avoiding the use of import spark.implicits._ magic if possible, what should I do to fix it? Is it even possible to achieve this level of flexibility with Dataset?

Upvotes: 1

Views: 243

Answers (1)

If you check the Scaladoc You would see that as requires an Encoder so you only need to add it to the scope.

def someFunction[A](ds: Dataset[InputT[A]])(implicit ev: Encoder[[OutputT[A]]): Dataset[OutputT[A]] = {
  ds.select().as[OutputT[A]]
}

Also, you may want to take a look at Where does Scala look for implicits.

Upvotes: 1

Related Questions