Reputation: 475
I have the following three dataframes
val df_1 = spark.sparkContext.parallelize(Seq(
("FIDs", "123456")
)).toDF("subComponentName", "FID_HardVer")
val df_2 = spark.sparkContext.parallelize(Seq(
("CLDs", "123456")
)).toDF("subComponentName", "CLD_HardVer")
val df_3 = spark.sparkContext.parallelize(Seq(
("ANYs", "123456")
)).toDF("subComponentName", "ANY_HardVer")
I want to write a function that return a dataframe which adds a column named HardVer with the content of either FID_HardVer, CLD_HardVer, or ANY_HardVer.
Example output would look like this:
df_1
+----------------+-----------+-------+
|subComponentName|FID_HardVer|HardVer|
+----------------+-----------+-------+
| FIDs| 123456| 123456|
+----------------+-----------+-------+
df_2:
+----------------+-----------+-------+
|subComponentName|CLD_HardVer|HardVer|
+----------------+-----------+-------+
| CLDs| 123456| 123456|
+----------------+-----------+-------+
This is the code that I tried up unil now but it seems like spark can't handle this type of request since it validates the column even if the condition does not fit.
def addHardVer(spark: SparkSession, df: DataFrame) : DataFrame = {
import spark.implicits._
val df_withHardVer = df
.withColumn("HardVer",
when($"subComponentName" === "FIDs", $"FID_HardVer")
.when($"subComponentName" === "CLDs", $"CLD_HardVer")
.when($"subComponentName" === "ANYs", $"ANY_HardVer")
.otherwise(lit("unknown"))
)
return df_withHardVer
}
This throws an exception
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '
CLD_HardVer
' given input columns: [subComponentName, FID_HardVer];;
Upvotes: 1
Views: 32
Reputation: 475
I figured something out that gives me the wanted output but it does not seem like a really nice solution. Any comments on this would be helpful.
def addHardVer2(spark: SparkSession, df:DataFrame) : DataFrame = {
import spark.implicits._
if(hasColumn(df, "FID_HardVer")){
print("A")
df.withColumn("HardVer", $"FID_HardVer")
}
else if(hasColumn(df, "CLD_HardVer")){
print("B")
df.withColumn("HardVer", $"CLD_HardVer")
}
else if(hasColumn(df, "ANY_HardVer")){
print("C")
df.withColumn("HardVer", $"ANY_HardVer")
}
else{
print("D")
df.withColumn("HardVer", lit("unkown"))
}
}
def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
Upvotes: 0
Reputation: 42352
How about adding a check of whether the column exists in the dataframe?
def addHardVer(spark: SparkSession, df: DataFrame) : DataFrame = {
import spark.implicits._
val df_withHardVer = df
.withColumn("HardVer",
when($"subComponentName" === "FIDs", if (df.columns.contains("FID_HardVer")) $"FID_HardVer" else lit("unknown"))
.when($"subComponentName" === "CLDs", if (df.columns.contains("CLD_HardVer")) $"CLD_HardVer" else lit("unknown"))
.when($"subComponentName" === "ANYs", if (df.columns.contains("ANY_HardVer")) $"ANY_HardVer" else lit("unknown"))
.otherwise(lit("unknown"))
)
return df_withHardVer
}
Upvotes: 1