Zach B
Zach B

Reputation: 97

Scala/Apache Spark Converting DataFrame column values and type, multiple when otherwise

I have a primary SQL table that I am reading into Spark and modifying to write to CassandraDB. Currently I have a working implementation for converting a gender from 0, 1, 2, 3 (integers) to "Male", "Female", "Trans", etc (Strings). Though the below method does work, it seems very inefficient to make a seperate Array with those mappings into a DataFrame, join it into the main table/DataFrame, then remove, rename, etc.

I have seen:

.withColumn("gender", when(col("gender) === 1, "male").otherwise("female") 

that would allow me to continue method chaining on the primary table but have not been able to get it working with more than 2 options. Is there a way to do this? I have around 10 different columns on this table that each need their own custom conversion created. Since this code will be processing TBs of data, is there a less repetitive and more efficient way to accomplish this. Thanks for any help in advance!

case class Gender(tmpid: Int, tmpgender: String)

private def createGenderDf(spark:SparkSession): DataFrame = {
  import spark.implicits._
  Seq(
    Gender(1, "Male"),
    Gender(2, "Female"),
    Gender(777, "Prefer not to answer")
  ).toDF
}


private def createPersonsDf(spark: SparkSession): DataFrame = {
  val genderDf = createGenderDf(spark)
  genderDf.show()

  val personsDf: DataFrame = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load(dataPath + "people.csv")
    .withColumnRenamed("ID", "id")
    .withColumnRenamed("name_first", "firstname")

  val personsDf1: DataFrame = personsDf
    .join(genderDf, personsDf("gender") === genderDf("tmpid"), "leftouter")

  val personsDf2: DataFrame = personsDf1
    .drop("gender")
    .drop("tmpid")
    .withColumnRenamed("tmpgender", "gender")
}

Upvotes: 2

Views: 1727

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

You can use nested when function which would eliminate your need of creating genderDf, join, drop, rename etc. As for your example you can do the following

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
personsDf.withColumn("gender", when(col("gender") === 1, "male").otherwise(when(col("gender") ===2, "female").otherwise("Prefer not to answer")).cast(StringType))

You can add more when function in the above nested structure and you can repeate the same for other 10 columns as well.

Upvotes: 2

Related Questions