Reputation: 1357
I have the following apache spark udf in scala:
val myFunc = udf {
(userBias: Float, otherBiases: Map[Long, Float],
userFactors: Seq[Float], context: Seq[String]) =>
var result = Float.NaN
if (userFactors != null) {
var contexBias = 0f
for (cc <- context) {
contexBias += otherBiases(contextMapping(cc))
}
// definition of result
// ...
}
result
}
Now I want to pass parameters to this function, however I always get the message Not Applicable due to the parameter context
. I know that user defined functions take inputs by rows, and this function runs if I delete context
... How to solve this issue? Why doesn't it read rows from Array[Seq[String]]
, i.e. from context
? Alternatively, it would be acceptable to passcontext
as DataFrame
or something similar.
// context is Array[Seq[String]]
val a = sc.parallelize(Seq((1,2),(3,4))).toDF("a", "b")
val context = a.collect.map(_.toSeq.map(_.toString))
// userBias("bias"), otherBias("biases") and userFactors("features")
// have a type Column, while userBias... are DataFrames
myDataframe.select(dataset("*"),
myFunc(userBias("bias"),
otherBias("biases"),
userFactors("features"),
context)
.as($(newCol)))
UPDATE:
I tried the solution indicated in the answer of zero323
, however still there is a small issue with context: Array[Seq[String]]
. In particular the problem is with looping over this Array for (cc <- context) { contexBias += otherBiases(contextMapping(cc)) }
. I should pass a String to contextMapping
, not a Seq[String]
:
def myFunc(context: Array[Seq[String]]) = udf {
(userBias: Float, otherBiases: Map[Long, Float],
userFactors: Seq[Float]) =>
var result = Float.NaN
if (userFactors != null) {
var contexBias = 0f
for (cc <- context) {
contexBias += otherBiases(contextMapping(cc))
}
// estimation of result
}
result
}
Now I call it as follows:
myDataframe.select(dataset("*"),
myFunc(context)(userBias("bias"),
otherBias("biases"),
userFactors("features"))
.as($(newCol)))
Upvotes: 3
Views: 3671
Reputation: 1796
The alternative way is to use struct
val seq: Seq[String] = ...
val func: UserDefinedFunction = udf((seq: Row) => ...)
val seqStruct = struct(seq.map(v => lit(v)): _*)
func(seqStruct)
Upvotes: 0
Reputation: 330163
Spark 2.2+
You can use typedLit
functions:
import org.apache.spark.sql.functions.typedLit
myFunc(..., typedLit(context))
Spark < 2.2
Any argument that is passed directly to the UDF has to be a Column
so if you want to pass constant array you'll have to convert it to column literal:
import org.apache.spark.sql.functions.{array, lit}
val myFunc: org.apache.spark.sql.UserDefinedFunction = ???
myFunc(
userBias("bias"),
otherBias("biases"),
userFactors("features"),
// org.apache.spark.sql.Column
array(context.map(xs => array(xs.map(lit _): _*)): _*)
)
Non-Column
objects can be passed only indirectly using closure, for example like this:
def myFunc(context: Array[Seq[String]]) = udf {
(userBias: Float, otherBiases: Map[Long, Float], userFactors: Seq[Float]) =>
???
}
myFunc(context)(userBias("bias"), otherBias("biases"), userFactors("features"))
Upvotes: 1