Markus
Markus

Reputation: 3782

How to search through struct in Spark 2?

I have the following two DataFrames in Spark 2.2.0 and Scala 2.11.8.

df1 =

+----------+-------------------------------+
|item      |        other_items            |
+----------+-------------------------------+
|  111     |[[444,1.0],[333,0.5],[666,0.4]]|
|  222     |[[444,1.0],[333,0.5]]          |
|  333     |[]                             |
|  444     |[[111,2.0],[555,0.5],[777,0.2]]|
+----------+-------------------------------+

The printScheme gives the following output:

 |-- item: string (nullable = true)
 |-- other_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item: string (nullable = true)
 |    |    |-- rank: double (nullable = true)

And:

df2 = 

+----------+-------------+
|itemA     | itemB       |
+----------+-------------+
|  111     | 333         |
|  222     | 444         |
|  333     | 555         |
|  444     | 777         |
+----------+-------------+

For each pair in df2, I want to find rank from df1. To do this, I should find the same pair in df1, so that df1.item is equal to df2.itemA and other_items.struct.[item] is equal to df2.itemB. If such pair cannot be found, the rank should be 0.

The result should be this one:

+----------+-------------+-------------+
|itemA     | itemB       |  rank       |
+----------+-------------+-------------+
|  111     | 333         |   0.5       |
|  222     | 444         |   1.0       |
|  333     | 555         |   0.0       |
|  444     | 777         |   0.2       |
+----------+-------------+-------------+

How can I do it?

Upvotes: 0

Views: 252

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can achieve your requirement by defining a udf function and call that udf function after you join both dataframes as

import org.apache.spark.sql.functions._
def findRank = udf((items: mutable.WrappedArray[String], ranks: mutable.WrappedArray[Double], itemB: String) => {
  val index = items.indexOf(itemB)
  if(index != -1) ranks(index) else 0.0
})
df1.join(df2, df1("item") === df2("itemA"), "right")
    .select(df2("itemA"), df2("itemB"), findRank(df1("other_items.item"), df1("other_items.rank"), df2("itemB")).as("rank"))
  .show(false)

you should get dataframe as

+-----+-----+----+
|itemA|itemB|rank|
+-----+-----+----+
|111  |333  |0.5 |
|222  |444  |1.0 |
|333  |555  |0.0 |
|444  |777  |0.2 |
+-----+-----+----+

Upvotes: 1

Raphael Roth
Raphael Roth

Reputation: 27373

This should do what you want. The trick is to explode other_items before the join:

df2.as("df2").join(
   df1.select($"item", explode($"other_items").as("other_items")).as("df1"),
    $"df2.itemA" === $"df1.item" and $"df2.itemB" === $"df1.other_items.item"
    , "left"
 )
 .select($"itemA", $"itemB", coalesce($"df1.other_items.rank", lit(0.0)).as("rank"))
 .show()

Upvotes: 1

Related Questions