NewGuru
NewGuru

Reputation: 23

Select a literal based on a column value in Spark

I have a map:

val map = Map("A" -> 1, "B" -> 2)

And I have a DataFrame. a column in the data frame contains the keys in the map. I am trying to select a column in a new DF that has the map values in it based on the key:

val newDF = DfThatContainsTheKeyColumn.select(concat(col(SomeColumn), lit("|"),
    lit(map.get(col(ColumnWithKey).toString()).get) as newColumn)

But this is resulting in the following error:

java.lang.RuntimeException: Unsupported literal type class scala.None$ None

I made sure that the column ColumnWithKey has As and Bs only and does not have empty values in it.

Is there another way to get the result I am looking for? Any help would be appreciated.

Upvotes: 1

Views: 2531

Answers (4)

Helder Pereira
Helder Pereira

Reputation: 5756

I think a simpler option could be to use typedLit:

val map = typedLit(Map("A" -> 1, "B" -> 2))

val newDF = DfThatContainsTheKeyColumn.select(concat(col(SomeColumn), lit("|"),
    map(col(ColumnWithKey))) as newColumn)

Upvotes: 0

Raphael Roth
Raphael Roth

Reputation: 27373

The Problem in this statement (besides syntax problems)

val newDF = DfThatContainsTheKeyColumn.select(concat(col(SomeColumn), lit("|"),
    lit(map.get(col(ColumnWithKey).toString()).get) as newColumn)

is that col(ColumnWithKey) will not take the value of a specific row, but is only given by the schema, i.e. has a constant value.

In your case I would suggest to join your map to your dataframe :

val map = Map("A" -> 1, "B" -> 2)
val df_map = map.toSeq.toDF("key","value")

val DfThatContainsTheKeyColumn = Seq(
  "A",
  "A",
  "B",
  "B"
).toDF("myCol")


DfThatContainsTheKeyColumn
  .join(broadcast(df_map),$"mycol"===$"key")
  .select(concat($"mycol",lit("|"),$"value").as("newColumn"))
  .show()

gives

|newColumn|
+---------+
|      A|1|
|      A|1|
|      B|2|
|      B|2|
+---------+

Upvotes: 2

Salim
Salim

Reputation: 2178

You can lookup a map using key from a column as,

val map = Map("A" -> 1, "B" -> 2)    
val df = spark.createDataset(Seq("dummy"))
      .withColumn("key",lit("A"))

df.map{ row =>
          val k = row.getAs[String]("key")
          val v = map.getOrElse(k,0)
        (k,v)
        }.toDF("key", "value").show(false)

Result -

+---+-----+
|key|value|
+---+-----+
|A  |1    |
+---+-----+

You can look up a map present inside a column using a literal key using Column.getItem, please see an example below.

    val mapKeys = Array("A","B")
    val mapValues = Array(1,2)

    val df = spark.createDataset(Seq("dummy"))
      .withColumn("key",lit("A"))
      .withColumn("keys",lit(mapKeys))
      .withColumn("values",lit(mapValues))
      .withColumn("map",map_from_arrays($"keys",$"values"))
      .withColumn("lookUpTheMap",$"map".getItem("A"))

      //A dataframe with Map is created.
      //A map is looked up using a hard coded String key.
      df.show(false)

Result

+-----+---+------+------+----------------+------------+
|value|key|keys  |values|map             |lookUpTheMap|
+-----+---+------+------+----------------+------------+
|dummy|A  |[A, B]|[1, 2]|[A -> 1, B -> 2]|1           |
+-----+---+------+------+----------------+------------+

To look up a map present inside a column based on another column containing the key - you can use an UDF or use map function on the dataframe the way I am showing below.

    //A map is looked up using a Column key.
      df.map{ row =>
          val m = row.getAs[Map[String,Int]]("map")
          val k = row.getAs[String]("key")
          val v = m.getOrElse(k,0)
        (m,k,v)
        }.toDF("map","key", "value").show(false)

Result

+----------------+---+-----+
|map             |key|value|
+----------------+---+-----+
|[A -> 1, B -> 2]|A  |1    |
+----------------+---+-----+

Upvotes: 0

Cesar A. Mostacero
Cesar A. Mostacero

Reputation: 770

You can use case classes to make it easy. This is an example:

Given this input

val givenMap = Map("A" -> 1, "B" -> 2)

import spark.implicits._
val df = Seq(
  (1, "A"),
  (2, "A"),
  (3, "B"),
  (4, "B")
).toDF("col_a", "col_b")
df.show()

Above code looks like:

+-----+-----+
|col_a|col_b|
+-----+-----+
|    1|    A|
|    2|    A|
|    3|    B|
|    4|    B|
+-----+-----+

givenMap: scala.collection.immutable.Map[String,Int] = Map(A -> 1, B -> 2)
import spark.implicits._
df: org.apache.spark.sql.DataFrame = [col_a: int, col_b: string]

The code that you need will look like:

case class MyInput(col_a: Int, col_b: String)
case class MyOutput(col_a: Int, col_b: String, new_column: Int)

df.as[MyInput].map(row=> MyOutput(row.col_a, row.col_b, givenMap(row.col_b))).show()

With the case classes you can cast your df and use object notation to access to your column values within a .map. Above code will output:

+-----+-----+----------+
|col_a|col_b|new_column|
+-----+-----+----------+
|    1|    A|         1|
|    2|    A|         1|
|    3|    B|         2|
|    4|    B|         2|
+-----+-----+----------+

defined class MyInput
defined class MyOutput

Upvotes: 0

Related Questions