Haha
Haha

Reputation: 1009

How to get number of lines resulted by join in Spark

Consider these two Dataframes:

+---+
|id |
+---+
|1  |
|2  |
|3  |
+---+


+---+-----+
|idz|word |
+---+-----+
|1  |bat  |
|1  |mouse|
|2  |horse|
+---+-----+

I am doing a Left join on ID=IDZ:

val r = df1.join(df2, (df1("id") === df2("idz")), "left_outer").
      withColumn("ID_EMPLOYE_VENDEUR", when(col("word") =!= ("null"), col("word")).otherwise(null)).drop("word")

r.show(false)

+---+----+------------------+
|id |idz |ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|1  |1   |mouse             |
|1  |1   |bat               |
|2  |2   |horse             |
|3  |null|null              |
+---+----+------------------+

But what if I only want to keep the lines whose ID only have one equal IDZ? If not, I would Like to have null in ID_EMPLOYE_VENDEUR. Desired output is:

+---+----+------------------+
|id |idz |ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|1  |1   |null              | --Because the Join resulted two different lines
|2  |2   |horse             |
|3  |null|null              |
+---+----+------------------+

I should precise that I am working on a large DF. The solution should be not very expensive in time.

Thank you

Upvotes: 1

Views: 159

Answers (2)

ebonnal
ebonnal

Reputation: 1167

You can retrieve easily the information that more than two df2's idz matched a single df1's id with a groupBy and a join.

r.join(
    r.groupBy("id").count().as("g"),
    $"g.id" === r("id")
  )
  .withColumn(
    "ID_EMPLOYE_VENDEUR",
    expr("if(count != 1, null, ID_EMPLOYE_VENDEUR)")
  )
  .drop($"g.id").drop("count")
  .distinct()
  .show()

Note: Both the groupBy and the join do not trigger any additional exchange step (shuffle around network) because the dataframe r is already partitioned on id (because it is the result of a join on id).

Upvotes: 1

Neo-coder
Neo-coder

Reputation: 7840

As per you mentioned data your data is too large, so groupBy is not good option to group data and join Windows over function as below :

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

def windowSpec = Window.partitionBy("idz")

val newDF = df1.withColumn("count", count("idz").over(windowSpec)).dropDuplicates("idz").withColumn("word", when(col("count") >=2 , lit(null)).otherwise(col("word"))).drop("count")

val r = df1.join(newDF, (df1("id") === newDF("idz")), "left_outer").withColumn("ID_EMPLOYE_VENDEUR", when(col("word") =!= ("null"), col("word")).otherwise(null)).drop("word") 
 r show 
+---+----+------------------+
| id| idz|ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|  1|   1|              null|
|  3|null|              null|
|  2|   2|             horse|
+---+----+------------------+

Upvotes: 1

Related Questions