Rahul Kumar
Rahul Kumar

Reputation: 109

Map individual values in one dataframe with values in another dataframe

I have a dataframe (DF1) with two columns

+-------+------+
|words  |value |
+-------+------+
|ABC    |1.0   |
|XYZ    |2.0   |
|DEF    |3.0   |
|GHI    |4.0   |
+-------+------+

and another dataframe (DF2) like this

+-----------------------------+
|string                       |
+-----------------------------+
|ABC DEF GHI                  |
|XYZ ABC DEF                  |                
+-----------------------------+

I have to replace the individual string values in DF2 with their corresponding values in DF1.. for eg, after the operation, I should get back this dataframe.

+-----------------------------+
|stringToDouble               |
+-----------------------------+
|1.0 3.0 4.0                  |
|2.0 1.0 3.0                  |                
+-----------------------------+

I have tried multiple ways but I cannot seem to figure out the solution.

 def createCorpus(conversationCorpus: Dataset[Row], dataDictionary: Dataset[Row]): Unit = {
 import spark.implicits._

 def getIndex(word: String): Double = {
 val idxRow = dataDictionary.selectExpr("index").where('words.like(word))
 val idx = idxRow.toString
 if (!idx.isEmpty) idx.trim.toDouble else 1.0
 }

 conversationCorpus.map { //eclipse doesnt like this map here.. throws an error..
    r =>
    def row = {
       val arr = r.getString(0).toLowerCase.split(" ")
       val arrList = ArrayBuffer[Double]()
       arr.map {
          str =>
          val index = getIndex(str)
       }
       Row.fromSeq(arrList.toSeq)
       }
       row

   }
 }

Upvotes: 1

Views: 1142

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Combining multiple dataframes to create new columns would require a join. And by looking at your two dataframes it seems we can join by words column of df1 and string column of df2 but string column needs an explode and combination later (which can be done by giving unique ids to each rows before explode). monotically_increasing_id gives unique ids to each rows in df2. split function turns string column to array for an explode. Then you can join them. and then rest of the steps is to combine back the exploded rows back to original by doing groupBy and aggregation.

Finally collected array column can be changed to desired string column by using a udf function

Long story short, following solution should work for you

import org.apache.spark.sql.functions._
def arrayToString = udf((array: Seq[Double])=> array.mkString(" "))

df2.withColumn("rowId", monotonically_increasing_id())
  .withColumn("string", explode(split(col("string"), " ")))
  .join(df1, col("string") === col("words"))
  .groupBy("rowId")
  .agg(collect_list("value").as("stringToDouble"))
  .select(arrayToString(col("stringToDouble")).as("stringToDouble"))

which should give you

+--------------+
|stringToDouble|
+--------------+
|1.0 3.0 4.0   |
|2.0 1.0 3.0   |
+--------------+

Upvotes: 3

Related Questions