vikv
vikv

Reputation: 11

Read value from table and apply condition in Spark

I have dataframe: df1

+------+--------+--------+--------+
| Name | value1 | value2 | value3 |
+------+--------+--------+--------+
| A    | 100    | null   |    200 |
| B    | 10000  | 300    |     10 |
| c    | null   | 10     |    100 |
+------+--------+--------+--------+

second dataframe: df2:

+------+------+
| Col1 | col2 |
+------+------+
| X    | 1000 |
| Y    | 2002 |
| Z    | 3000 |
+------+------+

I want to read the values from table1 like value1,value2 and value3

Apply condition to table2 with new columns:

cond1: when name= A and col2>value1, flag it to Y or N

cond2: when name= B and col2>value2 then Y or N

cond3: when name =c and col2>value1 and col2> value3, then Y or N

source code:

df2.withColumn("cond1",when($"col2")>value1,lit("Y)).otherwise(lit("N"))
df2.withColumn("cond2",when($"col2")>value2,lit("Y)).otherwise(lit("N"))
df2.withColumn("cond3",when($"col2")>value1 && when($"col2")>value3,lit("Y")).otherwise(lit("N"))

output:

+------+------+-------+-------+-------+
| Col1 | col2 | cond1 | cond2 | cond3 |
+------+------+-------+-------+-------+
| X    | 1000 | Y     | Y     | y     |
| Y    | 2002 | N     | Y     | Y     |
| Z    | 3000 | Y     | Y     | Y     |
+------+------+-------+-------+-------+

Upvotes: 1

Views: 2162

Answers (2)

Leo C
Leo C

Reputation: 22449

If I understand your question correctly, you can join the two dataframes and create the condition columns as shown below. A couple of notes:

1) With the described conditions,null in df1 is replaced with Int.MinValue for simplified integer comparison

2) Since df1 is small, broadcast join is used to minimize sorting/shuffling for better performance

val df1 = Seq(
  ("A", 100, Int.MinValue, 200),
  ("B", 10000, 300, 10),
  ("C", Int.MinValue, 10, 100)
).toDF("Name", "value1", "value2", "value3")

val df2 = Seq(
  ("A", 1000),
  ("B", 2002),
  ("C", 3000),
  ("A", 5000),
  ("A", 150),
  ("B", 250),
  ("B", 12000),
  ("C", 50)
).toDF("Col1", "col2")

val df3 = df2.join(broadcast(df1), df2("Col1") === df1("Name")).select(
  df2("Col1"),
  df2("col2"),
  when(df2("col2") > df1("value1"), "Y").otherwise("N").as("cond1"),
  when(df2("col2") > df1("value2"), "Y").otherwise("N").as("cond2"),
  when(df2("col2") > df1("value1") && df2("col2") > df1("value3"), "Y").otherwise("N").as("cond3")
)

df3.show
+----+-----+-----+-----+-----+
|Col1| col2|cond1|cond2|cond3|
+----+-----+-----+-----+-----+
|   A| 1000|    Y|    Y|    Y|
|   B| 2002|    N|    Y|    N|
|   C| 3000|    Y|    Y|    Y|
|   A| 5000|    Y|    Y|    Y|
|   A|  150|    Y|    Y|    N|
|   B|  250|    N|    N|    N|
|   B|12000|    Y|    Y|    Y|
|   C|   50|    Y|    Y|    N|
+----+-----+-----+-----+-----+

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

You can create rowNo column in both dataframes as below

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val tempdf1 = df1.withColumn("rowNo", row_number().over(Window.orderBy("Name")))
val tempdf2 = df2.withColumn("rowNo", row_number().over(Window.orderBy("Col1")))

Then you can join them with the created column as below

val joinedDF = tempdf2.join(tempdf1, Seq("rowNo"), "left")

Finally you can use select and when function to get the final dataframe

joinedDF.select($"Col1",
  $"col2",
  when($"col2">$"value1" || $"value1".isNull, "Y").otherwise("N").as("cond1"),
  when($"col2">$"value2" || $"value2".isNull, "Y").otherwise("N").as("cond2"),
  when(($"col2">$"value1" && $"col2">$"value3") || $"value3".isNull, "Y").otherwise("N").as("cond3"))

you should have your desired dataframe as

+----+----+-----+-----+-----+
|Col1|col2|cond1|cond2|cond3|
+----+----+-----+-----+-----+
|X   |1000|Y    |Y    |Y    |
|Y   |2002|N    |Y    |Y    |
|Z   |3000|Y    |Y    |Y    |
+----+----+-----+-----+-----+

I hope the answer is helpful

Upvotes: 0

Related Questions