Reputation: 135
I have a status dataset like below:
I want to select all the rows from this dataset which have "FAILURE" in any of these 5 status columns.
So, I want the result to contain only IDs 1,2,4 as they have FAILURE in one of the Status columns.
I guess in SQL we can do something like below:
SELECT * FROM status WHERE "FAILURE" IN (Status1, Status2, Status3, Status4, Status5);
In spark, I know I can do a filter by comparing each Status column with "FAILURE"
status.filter(s => {s.Status1.equals(FAILURE) || s.Status2.equals(FAILURE) ... and so on..})
But I would like to know if there is a smarter way of doing this in Spark SQL.
Thanks in advance!
Upvotes: 1
Views: 2010
Reputation: 11479
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import spark.implicits._
import spark.implicits._
scala> val df = Seq(
| ("Prop1", "SUCCESS", "SUCCESS", "SUCCESS", "FAILURE" ,"SUCCESS"),
| ("Prop2", "SUCCESS", "FAILURE", "SUCCESS", "FAILURE", "SUCCESS"),
| ("Prop3", "SUCCESS", "SUCCESS", "SUCCESS", "SUCCESS", "SUCCESS" ),
| ("Prop4", "SUCCESS", "FAILURE", "SUCCESS", "FAILURE", "SUCCESS"),
| ("Prop5", "SUCCESS", "SUCCESS", "SUCCESS", "SUCCESS","SUCCESS")
| ).toDF("Name", "Status1", "Status2", "Status3", "Status4","Status5")
df: org.apache.spark.sql.DataFrame = [Name: string, Status1: string ... 4 more fields]
scala> df.show
+-----+-------+-------+-------+-------+-------+
| Name|Status1|Status2|Status3|Status4|Status5|
+-----+-------+-------+-------+-------+-------+
|Prop1|SUCCESS|SUCCESS|SUCCESS|FAILURE|SUCCESS|
|Prop2|SUCCESS|FAILURE|SUCCESS|FAILURE|SUCCESS|
|Prop3|SUCCESS|SUCCESS|SUCCESS|SUCCESS|SUCCESS|
|Prop4|SUCCESS|FAILURE|SUCCESS|FAILURE|SUCCESS|
|Prop5|SUCCESS|SUCCESS|SUCCESS|SUCCESS|SUCCESS|
+-----+-------+-------+-------+-------+-------+
scala> df.where($"Name".isin("Prop1","Prop4") and $"Status1".isin("SUCCESS","FAILURE")).show
+-----+-------+-------+-------+-------+-------+
| Name|Status1|Status2|Status3|Status4|Status5|
+-----+-------+-------+-------+-------+-------+
|Prop1|SUCCESS|SUCCESS|SUCCESS|FAILURE|SUCCESS|
|Prop4|SUCCESS|FAILURE|SUCCESS|FAILURE|SUCCESS|
+-----+-------+-------+-------+-------+-------+
Upvotes: 0
Reputation: 22449
In case there are many columns to be examined, consider a recursive function that short-circuits upon the first match, as shown below:
val df = Seq(
(1, "T", "F", "T", "F"),
(2, "T", "T", "T", "T"),
(3, "T", "T", "F", "T")
).toDF("id", "c1", "c2", "c3", "c4")
import org.apache.spark.sql.Column
def checkFor(elem: Column, cols: List[Column]): Column = cols match {
case Nil =>
lit(true)
case h :: tail =>
when(h === elem, lit(false)).otherwise(checkFor(elem, tail))
}
val cols = df.columns.filter(_.startsWith("c")).map(col).toList
df.where(checkFor(lit("F"), cols)).show
// +---+---+---+---+---+
// | id| c1| c2| c3| c4|
// +---+---+---+---+---+
// | 2| T| T| T| T|
// +---+---+---+---+---+
Upvotes: 1
Reputation: 18108
A similar example you can modify and filter on the new column added. I leave that to you, here checking for zeroes excluding first col:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = sc.parallelize(Seq(
("r1", 0.0, 0.0, 0.0, 0.0),
("r2", 6.4, 4.9, 6.3, 7.1),
("r3", 4.2, 0.0, 7.2, 8.4),
("r4", 1.0, 2.0, 0.0, 0.0)
)).toDF("ID", "a", "b", "c", "d")
val count_some_val = df.columns.tail.map(x => when(col(x) === 0.0, 1).otherwise(0)).reduce(_ + _)
val df2 = df.withColumn("some_val_count", count_some_val)
df2.filter(col("some_val_count") > 0).show(false)
Afaik not possible to stop when first match found easily, but I do remember a smarter person than myself showing me this approach with lazy exists which I think does stop at first encounter of a match. Like this then, but a different approach, that I like:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = sc.parallelize(Seq(
("r1", 0.0, 0.0, 0.0, 0.0),
("r2", 6.0, 4.9, 6.3, 7.1),
("r3", 4.2, 0.0, 7.2, 8.4),
("r4", 1.0, 2.0, 0.0, 0.0)
)).toDF("ID", "a", "b", "c", "d")
df.map{r => (r.getString(0),r.toSeq.tail.exists(c =>
c.asInstanceOf[Double]==0))}
.toDF("ID","ones")
.show()
Upvotes: 0