Reputation: 3782
I have the following two DataFrames
in Spark 2.2.0
and Scala 2.11.8
.
df1 =
+----------+-------------------------------+
|item | other_items |
+----------+-------------------------------+
| 111 |[[444,1.0],[333,0.5],[666,0.4]]|
| 222 |[[444,1.0],[333,0.5]] |
| 333 |[] |
| 444 |[[111,2.0],[555,0.5],[777,0.2]]|
+----------+-------------------------------+
The printScheme
gives the following output:
|-- item: string (nullable = true)
|-- other_items: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- item: string (nullable = true)
| | |-- rank: double (nullable = true)
And:
df2 =
+----------+-------------+
|itemA | itemB |
+----------+-------------+
| 111 | 333 |
| 222 | 444 |
| 333 | 555 |
| 444 | 777 |
+----------+-------------+
For each pair in df2
, I want to find rank
from df1
. To do this, I should find the same pair in df1
, so that df1.item
is equal to df2.itemA
and other_items.struct.[item]
is equal to df2.itemB
. If such pair cannot be found, the rank should be 0.
The result should be this one:
+----------+-------------+-------------+
|itemA | itemB | rank |
+----------+-------------+-------------+
| 111 | 333 | 0.5 |
| 222 | 444 | 1.0 |
| 333 | 555 | 0.0 |
| 444 | 777 | 0.2 |
+----------+-------------+-------------+
How can I do it?
Upvotes: 0
Views: 252
Reputation: 41957
You can achieve your requirement by defining a udf
function and call that udf
function after you join
both dataframe
s as
import org.apache.spark.sql.functions._
def findRank = udf((items: mutable.WrappedArray[String], ranks: mutable.WrappedArray[Double], itemB: String) => {
val index = items.indexOf(itemB)
if(index != -1) ranks(index) else 0.0
})
df1.join(df2, df1("item") === df2("itemA"), "right")
.select(df2("itemA"), df2("itemB"), findRank(df1("other_items.item"), df1("other_items.rank"), df2("itemB")).as("rank"))
.show(false)
you should get dataframe
as
+-----+-----+----+
|itemA|itemB|rank|
+-----+-----+----+
|111 |333 |0.5 |
|222 |444 |1.0 |
|333 |555 |0.0 |
|444 |777 |0.2 |
+-----+-----+----+
Upvotes: 1
Reputation: 27373
This should do what you want. The trick is to explode other_items before the join:
df2.as("df2").join(
df1.select($"item", explode($"other_items").as("other_items")).as("df1"),
$"df2.itemA" === $"df1.item" and $"df2.itemB" === $"df1.other_items.item"
, "left"
)
.select($"itemA", $"itemB", coalesce($"df1.other_items.rank", lit(0.0)).as("rank"))
.show()
Upvotes: 1