John
John

Reputation: 1591

How to create a when expression in spark with loops

Hi I have a requirement to a create a column based on multiple when statements.

df.withColumn("new colume",when(col("col1").isin("list of item1"),"item1")
                           .when(col("col1").isin("list of item2"),"item2")
                           ....)

Is there anyway I can create a chain of when condition using a loop ? My plan is put input and output of when in a map like

Map("item1" -> "list of item1","item2" -> "list of item2")

Upvotes: 2

Views: 2140

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

Just use when / otherwise with foldLeft:

import org.apache.spark.sql.functions._


val conditions = Map(
  "item1" -> Seq("foo", "bar"), "item2" -> Seq("foobar")
)

conditions.foldLeft(lit(null)){
  case (acc, (v, ls)) => when(col("col1").isin(ls: _*), v).otherwise(acc)  
}

which will create an nested CASE WHEN expression of form:

CASE WHEN (col1 IN (foobar)) THEN item2 
     ELSE CASE WHEN (col1 IN (foo, bar)) THEN item1 
          ELSE NULL 
     END 
END

You can replace lit(null) with another value, that you want to use as a base outcome (if no value matches).

You can also generate

CASE 
  WHEN (col1 IN (foo, bar)) THEN item1 
  WHEN (col1 IN (foobar)) THEN item2 
END

using a recursive function like this:

import org.apache.spark.sql.Column

def mergeConditions(conditions: Map[String, Seq[String]], c: Column) = {
  def mergeConditionsR(conditions: Seq[(String, Seq[String])], acc: Column): Column = conditions match {
    case (v, ls) :: t => mergeConditionsR(t, acc.when(c.isin(ls: _*), v))
    case Nil          => acc
  }

  conditions.toList match {
    case (v, ls) :: t => mergeConditionsR(t, when(c.isin(ls: _*), v))
    case Nil          => lit(null)
  }
}

mergeConditions(conditions, col("col1"))

but it shouldn't make much difference.

With simple is in it is of course possible to skip when completely:

import org.apache.spark.sql.functions.typedLit

val conditionsCol = typedLit(conditions.flatMap {
  case (k, vs) => vs.map { v => (v, k) } 
}.toMap)

df.withColumn("value", conditionsCol($"col1"))

or by converting conditions to a DataFrame and joining.

conditions.toSeq.toDF("value", "col1")
  .withColumn("col1", explode($"col1"))
  .join(df, Seq("col1"), "rightouter")

Upvotes: 2

Related Questions