Sparky
Sparky

Reputation: 717

How to implement a trait with a generic case class that creates a dataset in Scala

I want to create a Scala trait that should be implemented with a case class T. The trait is simply to load data and transform it into a Spark Dataset of type T. I got the error that no encoder can be stored, which I think is because Scala does not know that T should be a case class. How can I tell the compiler that? I've seen somewhere that I should mention Product, but there is no such class defined.. Feel free to suggest other ways to do this!

I have the following code but it is not compiling with the error: 42: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ [INFO] .as[T]

I'm using Spark 1.6.1

Code:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SQLContext}    

/**
      * A trait that moves data on Hadoop with Spark based on the location and the granularity of the data.
      */
    trait Agent[T] {
      /**
        * Load a Dataframe from the location and convert into a Dataset
        * @return Dataset[T]
        */
      protected def load(): Dataset[T] = {
        // Read in the data
        SparkContextKeeper.sqlContext.read
          .format("com.databricks.spark.csv")
          .load("/myfolder/" + location + "/2016/10/01/")
          .as[T]
      }
    }

Upvotes: 4

Views: 3208

Answers (2)

Tzach Zohar
Tzach Zohar

Reputation: 37832

Your code is missing 3 things:

  • Indeed, you must let compiler know that T is subclass of Product (the superclass of all Scala case classes and Tuples)
  • Compiler would also require the TypeTag and ClassTag of the actual case class. This is used implicitly by Spark to overcome type erasure
  • import of sqlContext.implicits._

Unfortunately, you can't add type parameters with context bounds in a trait, so the simplest workaround would be to use an abstract class instead:

import scala.reflect.runtime.universe.TypeTag
import scala.reflect.ClassTag

abstract class Agent[T <: Product : ClassTag : TypeTag] {
  protected def load(): Dataset[T] = { 
    val sqlContext: SQLContext = SparkContextKeeper.sqlContext
    import sqlContext.implicits._
    sqlContext.read.// same... 
  }
}

Obviously, this isn't equivalent to using a trait, and might suggest that this design isn't the best fit for the job. Another alternative is placing load in an object and moving the type parameter to the method:

object Agent {
  protected def load[T <: Product : ClassTag : TypeTag](): Dataset[T] = {
    // same...
  }
}

Which one is preferable is mostly up to where and how you're going to call load and what you're planning to do with the result.

Upvotes: 7

C4stor
C4stor

Reputation: 8026

You need to take two actions :

  1. Add import sparkSession.implicits._ in your imports
  2. Make your trait trait Agent[T <: Product]

Upvotes: 0

Related Questions