rwxrwxr--
rwxrwxr--

Reputation: 45

Selecting specific rows from different dataframes within a map scope

Hello I am new to Spark and scala, and I have three similar dataframes as the following:

df1:
+--------+-------+-------+-------+
| Country|1/22/20|1/23/20|1/24/20|
+--------+-------+-------+-------+
|    Chad|      1|      0|      5|
+--------+-------+-------+-------+
|Paraguay|      4|      6|      3|
+--------+-------+-------+-------+
|  Russia|      0|      0|      1|
+--------+-------+-------+-------+
df2 and d3 are exactly similar just with different values

I would like to apply a function to each row of df1 but I also need to select the same row (using the Country as key) from the other two dataframes because I need the selected rows as input arguments for the function I want to apply. I thought of using

df1.map{ r =>
  val selectedRowDf2 = selectRow using r at column "Country" ...
  val selectedRowDf3 = selectRow using r at column "Country" ...
  r.apply(functionToApply(r, selectedRowDf2, selectedRowDf3)
}

I also tried with map but I get an error as follows:

Error:(238, 23) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[Unit])org.apache.spark.sql.Dataset[Unit].
Unspecified value parameter evidence$6.
    df1.map{

Upvotes: 0

Views: 86

Answers (1)

hagarwal
hagarwal

Reputation: 1163

A possible approach could be to append each dataframe columns with a key to uniquely identify the columns and finally merge all the dataframe to a single dataframe using country column. The desired operation could be performed on each row of the merged datafarme.

def appendColWithKey(df: DataFrame, key: String) = {
  var newdf = df
  df.schema.foreach(s => {
    newdf = newdf.withColumnRenamed(s.name, s"$key${s.name}")
  })
  newdf
} 

val kdf1 = appendColWithKey(df1, "key1_")
val kdf2 = appendColWithKey(df2, "key2_")
val kdf3 = appendColWithKey(df3, "key3_")

val tempdf1 = kdf1.join(kdf2, col("key1_country") === col("key2_country"))
val tempdf = tempdf1.join(kdf3, col("key1_country") === col("key3_country"))

val finaldf = tempdf
  .drop("key2_country")
  .drop("key3_country")
  .withColumnRenamed("key1_country", "country")

finaldf.show(10)
//Output
+--------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
| country|key1_1/22/20|key1_1/23/20|key1_1/24/20|key2_1/22/20|key2_1/23/20|key2_1/24/20|key3_1/22/20|key3_1/23/20|key3_1/24/20|
+--------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|    Chad|           1|           0|           5|           1|           0|           5|           1|           0|           5|
|Paraguay|           4|           6|           3|           4|           6|           3|           4|           6|           3|
|  Russia|           0|           0|           1|           0|           0|           1|           0|           0|           1|
+--------+------------+------------+------------+------------+------------+------------+------------+------------+------------+

Upvotes: 1

Related Questions