vicky bangre
vicky bangre

Reputation: 3

Scala Broadcast + UDF

I am trying to broadcast an List and pass the broadcast variable to UDF (Scala code is present in separate file). But facing issues.

val Lookup_BroadCast = SC.broadcast(lookup_data)

UDF creation with 3 arguments

val Call_Sub_Pgm = udf(foo(_: String, Lookup_BroadCast: org.apache.spark.broadcast.Broadcast[List[String]], Trace: String))

Calling the UDF using "withColumn"

Out_DF = Out_DF.withColumn("Col-1", Call_Sub_Pgm(col(Col-1),Lookup_BroadCast,lit(Trace)))

I am getting compilation error for above code - "found broadcast variable, required Sql Column"

If i remove "Lookup_BroadCast" variable from above

Out_DF = Out_DF.withColumn("Col-1", Call_Sub_Pgm(col(Col-1),Lookup_BroadCast,lit(Trace)))

then I get below error:

java.lang.ClassCastException: org.spark.masking.ExtractData$$anonfun$7 cannot be cast to scala.Function0

Upvotes: 0

Views: 380

Answers (1)

pasha701
pasha701

Reputation: 7207

Serializable wrapper class can be created for function, with Broadcast in constructor:

class Wrapper(Lookup_BroadCast: Broadcast[List[String]]) extends Serializable {
  def foo(v: String, s: String): String = {
      // usage example
    Lookup_BroadCast.value.head
  }
}

And used like:

val wrapper = new Wrapper(Lookup_BroadCast)
val Call_Sub_Pgm = udf(wrapper.foo(_: String, _: String))

Upvotes: 2

Related Questions