Michael
Michael

Reputation: 2566

Doing pattern matching on typed dataset

I am trying to apply different types of logic according to the type of spark dataset. Depending on the type of case class that is passed to doWork (Customer or Worker) I have to apply different types of aggregation. How can I do that?

import org.apache.spark.sql.{Dataset, SparkSession}

object SparkSql extends App {
  import spark.implicits._

  val spark = SparkSession
    .builder()
    .appName("Simple app")
    .config("spark.master", "local")
    .getOrCreate()

  sealed trait Person {
    def name: String
  }

  final case class Customer(override val name: String, email: String)                extends Person
  final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

  val workers: Dataset[Worker] = Seq(
    Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
    Worker("Sam", id = 1, skills = Array("self-motivation"))
  ).toDS

  def doWork(persons: Dataset[Person]): Unit = {
    persons match {
      case ... // Dataset[Customer] ... do something
      case ... // Dataset[Worker] ... do something else
    }
  }

}

Upvotes: 1

Views: 555

Answers (4)

Michael
Michael

Reputation: 2566

I found a solution to my own question however I want to give credit to Someshwar Kale'answer as it does what is requested. In this version, I am using implicit to created converter that I can extend as need be.

import org.apache.spark.sql.{Dataset, SparkSession}

object TempProject extends App {
  import spark.implicits._

  val spark = SparkSession
    .builder()
    .appName("Simple app")
    .config("spark.master", "local")
    .getOrCreate()

  sealed trait Person {
    def name: String
  }
  final case class Customer(override val name: String, email: String)                extends Person
  final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

  trait CustomDataProcessor[T] {
    def doSomethingCool(dataset: Dataset[T]): Dataset[T]
  }

  implicit object CustomerDataProcessor extends CustomDataProcessor[Customer] {

    override def doSomethingCool(dataset: Dataset[Customer]): Dataset[Customer] =
      dataset.filter(_.name.contains("B"))
  }

  implicit object WorkerDataProcessor extends CustomDataProcessor[Worker] {

    override def doSomethingCool(dataset: Dataset[Worker]): Dataset[Worker] =
      dataset.filter(_.id == 2)
  }

  def doWork[T](person: Dataset[T])(implicit processor: CustomDataProcessor[T]): Unit = {
    processor.doSomethingCool(person)
  }

  val workers: Dataset[Worker] = Seq(
    Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
    Worker("Sam", id = 1, skills = Array("self-motivation"))
  ).toDS

  val customers: Dataset[Customer] = Seq(
    Customer("Bob", "bob@email"),
    Customer("Jack", "jack@email")
  ).toDS

  doWork(workers)
  doWork(customers)
}

Upvotes: 1

Som
Som

Reputation: 6338

Try this-


sealed trait Person {
  def name: String
}

final case class Customer(override val name: String, email: String)                extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

Test case-

  @Test
  def test62262873(): Unit = {

    val workers: Dataset[Worker] = Seq(
      Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
      Worker("Sam", id = 2, skills = Array("self-motivation"))
    ).toDS

    import scala.reflect.runtime.universe._
    def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
      typeOf[T] match {
        case t if t =:= typeOf[Worker] => println("I'm worker")
          persons.as[Worker].filter(_.id == 2).show(false)
        case t if t =:= typeOf[Customer] => println("I'm Customer")
          persons.as[Customer].filter(_.name.contains("B")).show(false)

      }
    }
    doWork(workers)

    /**
      * I'm worker
      * +----+---+-----------------+
      * |name|id |skills           |
      * +----+---+-----------------+
      * |Sam |2  |[self-motivation]|
      * +----+---+-----------------+
      */
  }

Upvotes: 2

QuickSilver
QuickSilver

Reputation: 4045

Modify your method to accept [T <:parent] and you extract bean class name from Dataset.javaRdd as below

import org.apache.spark.sql.Dataset

object InheritDataframe {


  private def matcherDef[T <:parent](dfb: Dataset[T]): Unit = {

    dfb.toJavaRDD.classTag.toString() match {
      case "child1" =>  println("child1")
      case "child2" => println("child2")
      case _ => println("Unkown")
    }

  }

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val dfB  = List(child1(1)).toDS()
    val dfC  = List(child2(1)).toDS()

    matcherDef(dfB)
    matcherDef(dfC)

  }


}

case class child1(i: Int) extends parent(i)

case class child2(i: Int) extends parent(i)

class parent(j: Int)


Upvotes: 0

Chema
Chema

Reputation: 2838

With case classes you can do pattern matching. Case classes are Scala’s way to allow pattern matching on objects without requiring a large amount of boilerplate. Generally, all you need to do is add a single case keyword to each class that you want to be pattern matchable.

As an example:

abstract class Expr
case class Var(name: String) extends Expr
case class Number(num: Double) extends Expr
case class UnOp(operator: String, arg: Expr) extends Expr
case class BinOp(operator: String,left: Expr, right: Expr) extends Expr

def simplifyTop(expr: Expr): Expr = expr match {
  case UnOp("",UnOp("",e)) => e // Double negation
  case BinOp("+", e, Number(0)) => e // Adding zero
  case BinOp("*", e, Number(1)) => e // Multiplying by one
  case _ => expr
}

with your example I would try this

  def doWork(persons: Person): Unit = {
    persons match {
      case Customer => ... do something
      case Worker ... do something else
    }
  }

dataset.map(doWork)

Upvotes: 0

Related Questions