sid_bond
sid_bond

Reputation: 135

Spark SQL - Check for a value in multiple columns

I have a status dataset like below:

enter image description here

I want to select all the rows from this dataset which have "FAILURE" in any of these 5 status columns.

So, I want the result to contain only IDs 1,2,4 as they have FAILURE in one of the Status columns.

I guess in SQL we can do something like below:

SELECT * FROM status WHERE "FAILURE" IN (Status1, Status2, Status3, Status4, Status5);

In spark, I know I can do a filter by comparing each Status column with "FAILURE"

status.filter(s => {s.Status1.equals(FAILURE) || s.Status2.equals(FAILURE) ... and so on..})

But I would like to know if there is a smarter way of doing this in Spark SQL.

Thanks in advance!

Upvotes: 1

Views: 2010

Answers (3)

vaquar khan
vaquar khan

Reputation: 11479

        scala> import org.apache.spark.sql.functions._
        import org.apache.spark.sql.functions._

        scala> import spark.implicits._
        import spark.implicits._

        scala> val df = Seq(
             |     ("Prop1", "SUCCESS", "SUCCESS", "SUCCESS", "FAILURE" ,"SUCCESS"),
             |     ("Prop2", "SUCCESS", "FAILURE", "SUCCESS", "FAILURE", "SUCCESS"),
             |     ("Prop3", "SUCCESS", "SUCCESS", "SUCCESS", "SUCCESS", "SUCCESS" ),
             |     ("Prop4", "SUCCESS", "FAILURE", "SUCCESS", "FAILURE", "SUCCESS"),
             |     ("Prop5", "SUCCESS", "SUCCESS", "SUCCESS", "SUCCESS","SUCCESS")
             |    ).toDF("Name", "Status1", "Status2", "Status3", "Status4","Status5")
        df: org.apache.spark.sql.DataFrame = [Name: string, Status1: string ... 4 more fields]


        scala> df.show
        +-----+-------+-------+-------+-------+-------+
        | Name|Status1|Status2|Status3|Status4|Status5|
        +-----+-------+-------+-------+-------+-------+
        |Prop1|SUCCESS|SUCCESS|SUCCESS|FAILURE|SUCCESS|
        |Prop2|SUCCESS|FAILURE|SUCCESS|FAILURE|SUCCESS|
        |Prop3|SUCCESS|SUCCESS|SUCCESS|SUCCESS|SUCCESS|
        |Prop4|SUCCESS|FAILURE|SUCCESS|FAILURE|SUCCESS|
        |Prop5|SUCCESS|SUCCESS|SUCCESS|SUCCESS|SUCCESS|
        +-----+-------+-------+-------+-------+-------+


        scala> df.where($"Name".isin("Prop1","Prop4") and $"Status1".isin("SUCCESS","FAILURE")).show
        +-----+-------+-------+-------+-------+-------+
        | Name|Status1|Status2|Status3|Status4|Status5|
        +-----+-------+-------+-------+-------+-------+
        |Prop1|SUCCESS|SUCCESS|SUCCESS|FAILURE|SUCCESS|
        |Prop4|SUCCESS|FAILURE|SUCCESS|FAILURE|SUCCESS|
        +-----+-------+-------+-------+-------+-------+

Upvotes: 0

Leo C
Leo C

Reputation: 22449

In case there are many columns to be examined, consider a recursive function that short-circuits upon the first match, as shown below:

val df = Seq(
  (1, "T", "F", "T", "F"),
  (2, "T", "T", "T", "T"),
  (3, "T", "T", "F", "T")
).toDF("id", "c1", "c2", "c3", "c4")

import org.apache.spark.sql.Column

def checkFor(elem: Column, cols: List[Column]): Column = cols match {
  case Nil =>
    lit(true)
  case h :: tail =>
    when(h === elem, lit(false)).otherwise(checkFor(elem, tail))
}

val cols = df.columns.filter(_.startsWith("c")).map(col).toList

df.where(checkFor(lit("F"), cols)).show

// +---+---+---+---+---+
// | id| c1| c2| c3| c4|
// +---+---+---+---+---+
// |  2|  T|  T|  T|  T|
// +---+---+---+---+---+

Upvotes: 1

Ged
Ged

Reputation: 18108

A similar example you can modify and filter on the new column added. I leave that to you, here checking for zeroes excluding first col:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = sc.parallelize(Seq(
    ("r1", 0.0, 0.0, 0.0, 0.0),
    ("r2", 6.4, 4.9, 6.3, 7.1),
    ("r3", 4.2, 0.0, 7.2, 8.4),
    ("r4", 1.0, 2.0, 0.0, 0.0)
)).toDF("ID", "a", "b", "c", "d")

val count_some_val = df.columns.tail.map(x => when(col(x) === 0.0, 1).otherwise(0)).reduce(_ + _)     

val df2 = df.withColumn("some_val_count", count_some_val)
df2.filter(col("some_val_count") > 0).show(false)

Afaik not possible to stop when first match found easily, but I do remember a smarter person than myself showing me this approach with lazy exists which I think does stop at first encounter of a match. Like this then, but a different approach, that I like:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = sc.parallelize(Seq(
    ("r1", 0.0, 0.0, 0.0, 0.0),
    ("r2", 6.0, 4.9, 6.3, 7.1),
    ("r3", 4.2, 0.0, 7.2, 8.4),
    ("r4", 1.0, 2.0, 0.0, 0.0)
)).toDF("ID", "a", "b", "c", "d")

df.map{r => (r.getString(0),r.toSeq.tail.exists(c => 
             c.asInstanceOf[Double]==0))}
  .toDF("ID","ones")
  .show() 

Upvotes: 0

Related Questions