user8623021
user8623021

Reputation:

Spark Scala Dataset map works in main but not in function

I've got 2 datasets:

implicit val spark: SparkSession = SparkSession
  .builder()
  .appName("app").master("local[1]")
  .config("spark.executor.memory", "1g")
  .getOrCreate()


import spark.implicits._
val ds1 = /*read csv file*/.as[caseClass1]   
val ds2 = /*read csv file*/.as[caseClass2]  

Then I do join and map like:

  val ds3 = ds1.
  joinWith(ds2, ds1("id") === ds2("id"))
  .map{case(left, right) => (left, Option(right))}

Getting expected result.

The problem is that i'm trying to implement RichDataset with that and some other functions like following:

object Extentions {

  implicit class RichDataset[T <: Product](leftDs: Dataset[T]) {

    def leftJoinWith[V <: Product](rightDs: Dataset[V], condition: 
Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
      import spark.implicits._

      leftDs.joinWith(rightDs, condition, "left")
        .map{case(left, right) => (left, Option(right))}
    }
  }
 }

In main, with import Extentions._ call to leftJoinWith fails:

Error:(15, 13) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .map{case(left, right) => (left, Option(right))}

Error:(15, 13) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[(T, Option[V])])org.apache.spark.sql.Dataset[(T, Option[V])]. Unspecified value parameter evidence$6. .map{case(left, right) => (left, Option(right))}

... But spark.implicits._ are imported inside the function!

If return just join, not the join + map, it will work both in main and in function.

scalaVersion := "2.11.8", sparkVersion := "2.2.0"

Thanks in advance!

Upvotes: 0

Views: 1024

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

If you add TypeTag to the generic type argument, it works (saw this in Spark's source-code):

import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.{Column, Dataset, SparkSession}


object Extentions {

  implicit class RichDataset[T <: Product : TypeTag](leftDs: Dataset[T]) {

    def leftJoinWith[V <: Product : TypeTag](rightDs: Dataset[V], condition:
    Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
      import spark.implicits._

      leftDs.joinWith(rightDs, condition, "left")
        .map{case(left, right) => (left, Option(right))}
    }
  }
}

Upvotes: 2

Related Questions