Reputation: 109
I have a dataframe (DF1) with two columns
+-------+------+ |words |value | +-------+------+ |ABC |1.0 | |XYZ |2.0 | |DEF |3.0 | |GHI |4.0 | +-------+------+
and another dataframe (DF2) like this
+-----------------------------+ |string | +-----------------------------+ |ABC DEF GHI | |XYZ ABC DEF | +-----------------------------+
I have to replace the individual string values in DF2 with their corresponding values in DF1.. for eg, after the operation, I should get back this dataframe.
+-----------------------------+ |stringToDouble | +-----------------------------+ |1.0 3.0 4.0 | |2.0 1.0 3.0 | +-----------------------------+
I have tried multiple ways but I cannot seem to figure out the solution.
def createCorpus(conversationCorpus: Dataset[Row], dataDictionary: Dataset[Row]): Unit = {
import spark.implicits._
def getIndex(word: String): Double = {
val idxRow = dataDictionary.selectExpr("index").where('words.like(word))
val idx = idxRow.toString
if (!idx.isEmpty) idx.trim.toDouble else 1.0
}
conversationCorpus.map { //eclipse doesnt like this map here.. throws an error..
r =>
def row = {
val arr = r.getString(0).toLowerCase.split(" ")
val arrList = ArrayBuffer[Double]()
arr.map {
str =>
val index = getIndex(str)
}
Row.fromSeq(arrList.toSeq)
}
row
}
}
Upvotes: 1
Views: 1142
Reputation: 41957
Combining multiple dataframes to create new columns would require a join. And by looking at your two dataframes it seems we can join by words
column of df1
and string
column of df2
but string
column needs an explode
and combination later (which can be done by giving unique ids to each rows before explode). monotically_increasing_id
gives unique ids to each rows in df2
. split
function turns string
column to array for an explode. Then you can join
them. and then rest of the steps is to combine back the exploded rows back to original by doing groupBy
and aggregation.
Finally collected array column can be changed to desired string column by using a udf
function
Long story short, following solution should work for you
import org.apache.spark.sql.functions._
def arrayToString = udf((array: Seq[Double])=> array.mkString(" "))
df2.withColumn("rowId", monotonically_increasing_id())
.withColumn("string", explode(split(col("string"), " ")))
.join(df1, col("string") === col("words"))
.groupBy("rowId")
.agg(collect_list("value").as("stringToDouble"))
.select(arrayToString(col("stringToDouble")).as("stringToDouble"))
which should give you
+--------------+
|stringToDouble|
+--------------+
|1.0 3.0 4.0 |
|2.0 1.0 3.0 |
+--------------+
Upvotes: 3