Yash
Yash

Reputation: 1090

Pass case class to Spark UDF

I have a scala-2.11 function which creates a case class from Map based on the provided class type.

def createCaseClass[T: TypeTag, A](someMap: Map[String, A]): T = {

    val rMirror = runtimeMirror(getClass.getClassLoader)
    val myClass = typeOf[T].typeSymbol.asClass
    val cMirror = rMirror.reflectClass(myClass)

    // The primary constructor is the first one
    val ctor = typeOf[T].decl(termNames.CONSTRUCTOR).asTerm.alternatives.head.asMethod
    val argList = ctor.paramLists.flatten.map(param => someMap(param.name.toString))

    cMirror.reflectConstructor(ctor)(argList: _*).asInstanceOf[T]
  }

I'm trying to use this in the context of a spark data frame as a UDF. However, I'm not sure what's the best way to pass the case class. The approach below doesn't seem to work.

def myUDF[T: TypeTag] = udf { (inMap: Map[String, Long]) =>
    createCaseClass[T](inMap)
  }

I'm looking for something like this-

case class MyType(c1: String, c2: Long)

val myUDF = udf{(MyType, inMap) => createCaseClass[MyType](inMap)}

Thoughts and suggestions to resolve this is appreciated.

Upvotes: 5

Views: 4740

Answers (2)

user2232395
user2232395

Reputation: 491

From try and error I learn that whatever data structure that is stored in a Dataframe or Dataset is using org.apache.spark.sql.types

You can see with:

df.schema.toString

Basic types like Int,Double, are stored like:

StructField(fieldname,IntegerType,true),StructField(fieldname,DoubleType,true)

Complex types like case class are transformed to a combination of nested types:

StructType(StructField(..),StructField(..),StructType(..))

Sample code:

case class range(min:Double,max:Double)
org.apache.spark.sql.Encoders.product[range].schema

//Output:
 org.apache.spark.sql.types.StructType = StructType(StructField(min,DoubleType,false), StructField(max,DoubleType,false))

The UDF parameter type in this cases is Row, or Seq[Row] when you store an array of case classes

A basic debug technic is print to string:

 val myUdf = udf( (r:Row) =>   r.schema.toString )

then, to see was happen:

df.take(1).foreach(println) //

Upvotes: 2

zero323
zero323

Reputation: 330353

However, I'm not sure what's the best way to pass the case class

It is not possible to use case classes as arguments for user defined functions. SQL StructTypes are mapped to dynamically typed (for lack of a better word) Row objects.

If you want to operate on statically typed objects please use statically typed Dataset.

Upvotes: 4

Related Questions