user2811630
user2811630

Reputation: 475

How can I select a column dependent of a different columns content or the name of the column

I have the following three dataframes

val df_1 = spark.sparkContext.parallelize(Seq(
  ("FIDs", "123456")
)).toDF("subComponentName", "FID_HardVer")

val df_2 = spark.sparkContext.parallelize(Seq(
  ("CLDs", "123456")
)).toDF("subComponentName", "CLD_HardVer")

val df_3 = spark.sparkContext.parallelize(Seq(
  ("ANYs", "123456")
)).toDF("subComponentName", "ANY_HardVer")

I want to write a function that return a dataframe which adds a column named HardVer with the content of either FID_HardVer, CLD_HardVer, or ANY_HardVer.

Example output would look like this:

df_1

+----------------+-----------+-------+
|subComponentName|FID_HardVer|HardVer|
+----------------+-----------+-------+
|            FIDs|     123456| 123456|
+----------------+-----------+-------+

df_2:

+----------------+-----------+-------+
|subComponentName|CLD_HardVer|HardVer|
+----------------+-----------+-------+
|            CLDs|     123456| 123456|
+----------------+-----------+-------+

This is the code that I tried up unil now but it seems like spark can't handle this type of request since it validates the column even if the condition does not fit.

def addHardVer(spark: SparkSession, df: DataFrame) : DataFrame = {
    import spark.implicits._
    val df_withHardVer = df
      .withColumn("HardVer",
        when($"subComponentName" === "FIDs", $"FID_HardVer")
         .when($"subComponentName" === "CLDs", $"CLD_HardVer")
         .when($"subComponentName" === "ANYs", $"ANY_HardVer")
          .otherwise(lit("unknown"))
    )
    return df_withHardVer
  }

This throws an exception

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'CLD_HardVer' given input columns: [subComponentName, FID_HardVer];;

Upvotes: 1

Views: 32

Answers (2)

user2811630
user2811630

Reputation: 475

I figured something out that gives me the wanted output but it does not seem like a really nice solution. Any comments on this would be helpful.

def addHardVer2(spark: SparkSession, df:DataFrame) : DataFrame = {
    import spark.implicits._
    if(hasColumn(df, "FID_HardVer")){
      print("A")
      df.withColumn("HardVer", $"FID_HardVer")
    }
    else if(hasColumn(df, "CLD_HardVer")){
      print("B")
      df.withColumn("HardVer", $"CLD_HardVer")
    }
    else if(hasColumn(df, "ANY_HardVer")){
      print("C")
      df.withColumn("HardVer", $"ANY_HardVer")
    }
    else{
      print("D")
      df.withColumn("HardVer", lit("unkown"))
    }
  }

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess

Upvotes: 0

mck
mck

Reputation: 42352

How about adding a check of whether the column exists in the dataframe?

def addHardVer(spark: SparkSession, df: DataFrame) : DataFrame = {
    import spark.implicits._
    val df_withHardVer = df
      .withColumn("HardVer",
        when($"subComponentName" === "FIDs", if (df.columns.contains("FID_HardVer")) $"FID_HardVer" else lit("unknown"))
       .when($"subComponentName" === "CLDs", if (df.columns.contains("CLD_HardVer")) $"CLD_HardVer" else lit("unknown"))
       .when($"subComponentName" === "ANYs", if (df.columns.contains("ANY_HardVer")) $"ANY_HardVer" else lit("unknown"))
       .otherwise(lit("unknown"))
    )
    return df_withHardVer
  }

Upvotes: 1

Related Questions