Klue
Klue

Reputation: 1357

How to pass Array[Seq[String]] to apache spark udf? (Error: Not Applicable)

I have the following apache spark udf in scala:

val myFunc = udf {
  (userBias: Float, otherBiases: Map[Long, Float],
    userFactors: Seq[Float], context: Seq[String]) => 
    var result = Float.NaN

    if (userFactors != null) {
      var contexBias = 0f

      for (cc <- context) {
       contexBias += otherBiases(contextMapping(cc))
      }

      // definition of result
      // ...
    }
    result
}

Now I want to pass parameters to this function, however I always get the message Not Applicable due to the parameter context. I know that user defined functions take inputs by rows, and this function runs if I delete context... How to solve this issue? Why doesn't it read rows from Array[Seq[String]], i.e. from context? Alternatively, it would be acceptable to passcontext as DataFrame or something similar.

// context is Array[Seq[String]]
val a = sc.parallelize(Seq((1,2),(3,4))).toDF("a", "b")
val context = a.collect.map(_.toSeq.map(_.toString))

// userBias("bias"), otherBias("biases") and userFactors("features")
// have a type Column, while userBias... are DataFrames
myDataframe.select(dataset("*"),
                   myFunc(userBias("bias"),
                          otherBias("biases"),
                          userFactors("features"),
                          context)
                   .as($(newCol)))

UPDATE:

I tried the solution indicated in the answer of zero323, however still there is a small issue with context: Array[Seq[String]]. In particular the problem is with looping over this Array for (cc <- context) { contexBias += otherBiases(contextMapping(cc)) }. I should pass a String to contextMapping, not a Seq[String]:

  def myFunc(context: Array[Seq[String]]) = udf {
    (userBias: Float, otherBiases: Map[Long, Float],
     userFactors: Seq[Float]) =>
      var result = Float.NaN

      if (userFactors != null) {
        var contexBias = 0f
        for (cc <- context) {
          contexBias += otherBiases(contextMapping(cc))
        }

        // estimation of result

      }
      result
  }

Now I call it as follows:

myDataframe.select(dataset("*"),
                   myFunc(context)(userBias("bias"),
                                   otherBias("biases"),
                                   userFactors("features"))
           .as($(newCol)))

Upvotes: 3

Views: 3671

Answers (2)

Vadim Zin4uk
Vadim Zin4uk

Reputation: 1796

The alternative way is to use struct

val seq: Seq[String] = ...
val func: UserDefinedFunction = udf((seq: Row) => ...)
val seqStruct = struct(seq.map(v => lit(v)): _*)
func(seqStruct)

Upvotes: 0

zero323
zero323

Reputation: 330163

Spark 2.2+

You can use typedLit functions:

import org.apache.spark.sql.functions.typedLit

myFunc(..., typedLit(context))

Spark < 2.2

Any argument that is passed directly to the UDF has to be a Column so if you want to pass constant array you'll have to convert it to column literal:

import org.apache.spark.sql.functions.{array, lit}

val myFunc: org.apache.spark.sql.UserDefinedFunction = ???

myFunc(
  userBias("bias"),
  otherBias("biases"),
  userFactors("features"),
  // org.apache.spark.sql.Column
  array(context.map(xs => array(xs.map(lit _): _*)): _*)  
)

Non-Column objects can be passed only indirectly using closure, for example like this:

def myFunc(context: Array[Seq[String]]) = udf {
  (userBias: Float, otherBiases: Map[Long, Float],  userFactors: Seq[Float]) => 
    ???
}

myFunc(context)(userBias("bias"), otherBias("biases"), userFactors("features"))

Upvotes: 1

Related Questions