Reputation: 621
I'm trying to define some functions that take Dataset
s (typed DataFrame
s) as input and produces another as output, and I want them to be flexible enough to deal with parametrized types. In this example, I need a column to represent the ID of users, but it doesn't matter to my functions if that ID is an Int, a Long, a String, etc. That's why my case classes have this type parameter A
.
I tried at first simply wrting my function and using Dataset
instead of DataFrame
:
import org.apache.spark.sql.Dataset
case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)
def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]] // suppose there are some transformations here
}
... but I got this error:
Unable to find encoder for type OutputT[A]. An implicit Encoder[OutputT[A]] is needed
to store OutputT[A] instances in a Dataset. Primitive types (Int, String, etc) and
Product types (case classes) are supported by importing spark.implicits._ Support
for serializing other types will be added in future releases.
So I tried providing an Encoder for my type:
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)
implicit def enc[A]: Encoder[InputT[A]] = implicitly(ExpressionEncoder[OutputT[A]])
def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]] // suppose there are some transformations here
}
And now I get this error:
No TypeTag available for OutputT[A]
If the code is the same as above, but without type parameters (e.g., using String
instead of A
), then there are no errors.
Avoiding the use of import spark.implicits._
magic if possible, what should I do to fix it? Is it even possible to achieve this level of flexibility with Dataset
?
Upvotes: 1
Views: 243
Reputation: 22840
If you check the Scaladoc You would see that as
requires an Encoder
so you only need to add it to the scope.
def someFunction[A](ds: Dataset[InputT[A]])(implicit ev: Encoder[[OutputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]]
}
Also, you may want to take a look at Where does Scala look for implicits.
Upvotes: 1