Shankar
Shankar

Reputation: 8967

Spark SQL DataFrame join with filter is not working

I'm trying to filter df1 by joining df2 based on some column and then filter some rows from df1 based on filter.

df1:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|     green|
|Rapid Cash Plus|     green|
|        DOTOPAL|     green|
|     RAPID CASH|     green|

df2:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|      blue|
|Rapid Cash Plus|      blue|
|        DOTOPAL|      blue|
+---------------+----------+

Sample code is:

df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
      .filter(not(df1.col("rag_status") === "green"))
      .select(df1.col("channel"), df1.col("rag_status")).show

Its not returning any records.

I'm expecting the output as below one, which is returned from df1 after filtering the records based on channel and green status condition. If the same channel is available in the df2 and the df1 rag_status is green, then remove that record from df1 and return the remaining records only from df1.

Expected output is:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|     RAPID CASH|     green|

Upvotes: 0

Views: 9940

Answers (2)

Sandeep Purohit
Sandeep Purohit

Reputation: 3692

you can use below code to get expected output:

df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")

Upvotes: 1

Shivansh
Shivansh

Reputation: 3544

You can work something like this :

val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show

Spark-shell Output:

scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]

scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]

scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]

scala> resultDF.show
+----------+----------+                                                         
|   channel|rag_status|
+----------+----------+
|RAPID CASH|     green|
+----------+----------+

Upvotes: 4

Related Questions