Reputation: 2566
I am trying to apply different types of logic according to the type of spark dataset.
Depending on the type of case class that is passed to doWork
(Customer
or Worker
) I have to apply different types of aggregation.
How can I do that?
import org.apache.spark.sql.{Dataset, SparkSession}
object SparkSql extends App {
import spark.implicits._
val spark = SparkSession
.builder()
.appName("Simple app")
.config("spark.master", "local")
.getOrCreate()
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 1, skills = Array("self-motivation"))
).toDS
def doWork(persons: Dataset[Person]): Unit = {
persons match {
case ... // Dataset[Customer] ... do something
case ... // Dataset[Worker] ... do something else
}
}
}
Upvotes: 1
Views: 555
Reputation: 2566
I found a solution to my own question however I want to give credit to Someshwar Kale'answer as it does what is requested. In this version, I am using implicit to created converter that I can extend as need be.
import org.apache.spark.sql.{Dataset, SparkSession}
object TempProject extends App {
import spark.implicits._
val spark = SparkSession
.builder()
.appName("Simple app")
.config("spark.master", "local")
.getOrCreate()
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
trait CustomDataProcessor[T] {
def doSomethingCool(dataset: Dataset[T]): Dataset[T]
}
implicit object CustomerDataProcessor extends CustomDataProcessor[Customer] {
override def doSomethingCool(dataset: Dataset[Customer]): Dataset[Customer] =
dataset.filter(_.name.contains("B"))
}
implicit object WorkerDataProcessor extends CustomDataProcessor[Worker] {
override def doSomethingCool(dataset: Dataset[Worker]): Dataset[Worker] =
dataset.filter(_.id == 2)
}
def doWork[T](person: Dataset[T])(implicit processor: CustomDataProcessor[T]): Unit = {
processor.doSomethingCool(person)
}
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 1, skills = Array("self-motivation"))
).toDS
val customers: Dataset[Customer] = Seq(
Customer("Bob", "bob@email"),
Customer("Jack", "jack@email")
).toDS
doWork(workers)
doWork(customers)
}
Upvotes: 1
Reputation: 6338
Try this-
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
Test case-
@Test
def test62262873(): Unit = {
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 2, skills = Array("self-motivation"))
).toDS
import scala.reflect.runtime.universe._
def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
typeOf[T] match {
case t if t =:= typeOf[Worker] => println("I'm worker")
persons.as[Worker].filter(_.id == 2).show(false)
case t if t =:= typeOf[Customer] => println("I'm Customer")
persons.as[Customer].filter(_.name.contains("B")).show(false)
}
}
doWork(workers)
/**
* I'm worker
* +----+---+-----------------+
* |name|id |skills |
* +----+---+-----------------+
* |Sam |2 |[self-motivation]|
* +----+---+-----------------+
*/
}
Upvotes: 2
Reputation: 4045
Modify your method to accept [T <:parent]
and you extract bean class name from Dataset.javaRdd
as below
import org.apache.spark.sql.Dataset
object InheritDataframe {
private def matcherDef[T <:parent](dfb: Dataset[T]): Unit = {
dfb.toJavaRDD.classTag.toString() match {
case "child1" => println("child1")
case "child2" => println("child2")
case _ => println("Unkown")
}
}
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val dfB = List(child1(1)).toDS()
val dfC = List(child2(1)).toDS()
matcherDef(dfB)
matcherDef(dfC)
}
}
case class child1(i: Int) extends parent(i)
case class child2(i: Int) extends parent(i)
class parent(j: Int)
Upvotes: 0
Reputation: 2838
With case classes you can do pattern matching. Case classes are Scala’s way to allow pattern matching on objects without requiring a large amount of boilerplate. Generally, all you need to do is add a single case keyword to each class that you want to be pattern matchable.
As an example:
abstract class Expr
case class Var(name: String) extends Expr
case class Number(num: Double) extends Expr
case class UnOp(operator: String, arg: Expr) extends Expr
case class BinOp(operator: String,left: Expr, right: Expr) extends Expr
def simplifyTop(expr: Expr): Expr = expr match {
case UnOp("",UnOp("",e)) => e // Double negation
case BinOp("+", e, Number(0)) => e // Adding zero
case BinOp("*", e, Number(1)) => e // Multiplying by one
case _ => expr
}
with your example I would try this
def doWork(persons: Person): Unit = {
persons match {
case Customer => ... do something
case Worker ... do something else
}
}
dataset.map(doWork)
Upvotes: 0