Reputation: 8967
I'm trying to filter df1 by joining df2 based on some column and then filter some rows from df1 based on filter.
df1:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| green|
|Rapid Cash Plus| green|
| DOTOPAL| green|
| RAPID CASH| green|
df2:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| blue|
|Rapid Cash Plus| blue|
| DOTOPAL| blue|
+---------------+----------+
Sample code is:
df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
.filter(not(df1.col("rag_status") === "green"))
.select(df1.col("channel"), df1.col("rag_status")).show
Its not returning any records.
I'm expecting the output as below one, which is returned from df1 after filtering the records based on channel
and green
status condition. If the same channel is available in the df2 and the df1 rag_status
is green, then remove that record from df1 and return the remaining records only from df1.
Expected output is:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| RAPID CASH| green|
Upvotes: 0
Views: 9940
Reputation: 3692
you can use below code to get expected output:
df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")
Upvotes: 1
Reputation: 3544
You can work something like this :
val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show
Spark-shell Output:
scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]
scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]
scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]
scala> resultDF.show
+----------+----------+
| channel|rag_status|
+----------+----------+
|RAPID CASH| green|
+----------+----------+
Upvotes: 4