rgarc101
rgarc101

Reputation: 51

How to compose column name using another column's value for withColumn in Scala Spark

I'm trying to add a new column to a DataFrame. The value of this column is the value of another column whose name depends on other columns from the same DataFrame.

For instance, given this:

+---+---+----+----+
|  A|  B| A_1| B_2|
+---+---+----+----+
|  A|  1| 0.1| 0.3|
|  B|  2| 0.2| 0.4|
+---+---+----+----+

I'd like to obtain this:

+---+---+----+----+----+
|  A|  B| A_1| B_2|   C|
+---+---+----+----+----+
|  A|  1| 0.1| 0.3| 0.1|
|  B|  2| 0.2| 0.4| 0.4|
+---+---+----+----+----+

That is, I added column C whose value came from either column A_1 or B_2. The name of the source column A_1 comes from concatenating the value of columns A and B.

I know that I can add a new column based on another and a constant like this:

df.withColumn("C", $"B" + 1)

I also know that the name of the column can come from a variable like this:

val name = "A_1"
df.withColumn("C", col(name) + 1)

However, what I'd like to do is something like this:

df.withColumn("C", col(s"${col("A")}_${col("B")}"))

Which doesn't work.

NOTE: I'm coding in Scala 2.11 and Spark 2.2.

Upvotes: 5

Views: 8107

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can achieve your requirement by writing a udf function. I am suggesting udf, as your requirement is to process dataframe row by row contradicting to inbuilt functions which functions column by column.

But before that you would need array of column names

val columns = df.columns

Then write a udf function as

import org.apache.spark.sql.functions._
def getValue = udf((A: String, B: String, array: mutable.WrappedArray[String]) => array(columns.indexOf(A+"_"+B)))

where

A is the first column value
B is the second column value
array is the Array of all the columns values

Now just call the udf function using withColumn api

df.withColumn("C", getValue($"A", $"B", array(columns.map(col): _*))).show(false)

You should get your desired output dataframe.

Upvotes: 3

zero323
zero323

Reputation: 330193

You can select from a map. Define map which translates name to column value:

import org.apache.spark.sql.functions.{col, concat_ws, lit, map}

val dataMap = map(
  df.columns.diff(Seq("A", "B")).flatMap(c => lit(c) :: col(c) :: Nil): _*
)

df.select(dataMap).show(false)
+---------------------------+
|map(A_1, A_1, B_2, B_2)    |
+---------------------------+
|Map(A_1 -> 0.1, B_2 -> 0.3)|
|Map(A_1 -> 0.2, B_2 -> 0.4)|
+---------------------------+

and select from it with apply:

df.withColumn("C", dataMap(concat_ws("_", $"A", $"B"))).show
+---+---+---+---+---+
|  A|  B|A_1|B_2|  C|
+---+---+---+---+---+
|  A|  1|0.1|0.3|0.1|
|  B|  2|0.2|0.4|0.4|
+---+---+---+---+---+

You can also try mapping, but I suspect it won't perform well with very wide data:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val outputEncoder = RowEncoder(df.schema.add(StructField("C", DoubleType)))

df.map(row => {
   val a = row.getAs[String]("A")
   val b = row.getAs[String]("B")
   val key = s"${a}_${b}"
   Row.fromSeq(row.toSeq :+ row.getAs[Double](key))
})(outputEncoder).show
+---+---+---+---+---+
|  A|  B|A_1|B_2|  C|
+---+---+---+---+---+
|  A|  1|0.1|0.3|0.1|
|  B|  2|0.2|0.4|0.4|
+---+---+---+---+---+

and in general I wouldn't recommend this approach.

If data comes from csv you might consider skipping default csv reader and use custom logic to push column selection directly into parsing process. With pseudocode:

spark.read.text(...).map { line => {
  val a = ???  // parse A
  val b = ???  // parse B
  val c = ???  // find c, based on a and b
  (a, b, c)
}}

Upvotes: 1

Related Questions