Reputation: 11
I have dataframe: df1
+------+--------+--------+--------+
| Name | value1 | value2 | value3 |
+------+--------+--------+--------+
| A | 100 | null | 200 |
| B | 10000 | 300 | 10 |
| c | null | 10 | 100 |
+------+--------+--------+--------+
second dataframe: df2:
+------+------+
| Col1 | col2 |
+------+------+
| X | 1000 |
| Y | 2002 |
| Z | 3000 |
+------+------+
I want to read the values from table1 like value1,value2 and value3
Apply condition to table2 with new columns:
cond1: when name= A and col2>value1, flag it to Y or N
cond2: when name= B and col2>value2 then Y or N
cond3: when name =c and col2>value1 and col2> value3, then Y or N
source code:
df2.withColumn("cond1",when($"col2")>value1,lit("Y)).otherwise(lit("N"))
df2.withColumn("cond2",when($"col2")>value2,lit("Y)).otherwise(lit("N"))
df2.withColumn("cond3",when($"col2")>value1 && when($"col2")>value3,lit("Y")).otherwise(lit("N"))
output:
+------+------+-------+-------+-------+
| Col1 | col2 | cond1 | cond2 | cond3 |
+------+------+-------+-------+-------+
| X | 1000 | Y | Y | y |
| Y | 2002 | N | Y | Y |
| Z | 3000 | Y | Y | Y |
+------+------+-------+-------+-------+
Upvotes: 1
Views: 2162
Reputation: 22449
If I understand your question correctly, you can join the two dataframes and create the condition columns as shown below. A couple of notes:
1) With the described conditions,null
in df1 is replaced with Int.MinValue
for simplified integer comparison
2) Since df1 is small, broadcast
join is used to minimize sorting/shuffling for better performance
val df1 = Seq(
("A", 100, Int.MinValue, 200),
("B", 10000, 300, 10),
("C", Int.MinValue, 10, 100)
).toDF("Name", "value1", "value2", "value3")
val df2 = Seq(
("A", 1000),
("B", 2002),
("C", 3000),
("A", 5000),
("A", 150),
("B", 250),
("B", 12000),
("C", 50)
).toDF("Col1", "col2")
val df3 = df2.join(broadcast(df1), df2("Col1") === df1("Name")).select(
df2("Col1"),
df2("col2"),
when(df2("col2") > df1("value1"), "Y").otherwise("N").as("cond1"),
when(df2("col2") > df1("value2"), "Y").otherwise("N").as("cond2"),
when(df2("col2") > df1("value1") && df2("col2") > df1("value3"), "Y").otherwise("N").as("cond3")
)
df3.show
+----+-----+-----+-----+-----+
|Col1| col2|cond1|cond2|cond3|
+----+-----+-----+-----+-----+
| A| 1000| Y| Y| Y|
| B| 2002| N| Y| N|
| C| 3000| Y| Y| Y|
| A| 5000| Y| Y| Y|
| A| 150| Y| Y| N|
| B| 250| N| N| N|
| B|12000| Y| Y| Y|
| C| 50| Y| Y| N|
+----+-----+-----+-----+-----+
Upvotes: 1
Reputation: 41987
You can create rowNo
column in both dataframes
as below
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val tempdf1 = df1.withColumn("rowNo", row_number().over(Window.orderBy("Name")))
val tempdf2 = df2.withColumn("rowNo", row_number().over(Window.orderBy("Col1")))
Then you can join
them with the created column as below
val joinedDF = tempdf2.join(tempdf1, Seq("rowNo"), "left")
Finally you can use select
and when
function to get the final dataframe
joinedDF.select($"Col1",
$"col2",
when($"col2">$"value1" || $"value1".isNull, "Y").otherwise("N").as("cond1"),
when($"col2">$"value2" || $"value2".isNull, "Y").otherwise("N").as("cond2"),
when(($"col2">$"value1" && $"col2">$"value3") || $"value3".isNull, "Y").otherwise("N").as("cond3"))
you should have your desired dataframe as
+----+----+-----+-----+-----+
|Col1|col2|cond1|cond2|cond3|
+----+----+-----+-----+-----+
|X |1000|Y |Y |Y |
|Y |2002|N |Y |Y |
|Z |3000|Y |Y |Y |
+----+----+-----+-----+-----+
I hope the answer is helpful
Upvotes: 0