RSG
RSG

Reputation: 77

Spark dataframe replace values of specific columns in a row with Nulls

I am facing a problem when trying to replace the values of specific columns of a Spark dataframe with nulls. I have a dataframe with more than fifty columns of which two are key columns. I want to create a new dataframe with same schema and the new dataframe should have values from the key columns and null values in non-key columns. I tried the following ways but facing issues:

//old_df is the existing Dataframe 
val key_cols = List("id", "key_number")
val non_key_cols = old_df.columns.toList.filterNot(key_cols.contains(_))

val key_col_df = old_df.select(key_cols.head, key_cols.tail:_*)
val non_key_cols_df = old_df.select(non_key_cols.head, non_key_cols.tail:_*)
val list_cols = List.fill(non_key_cols_df.columns.size)("NULL")
val rdd_list_cols = spark.sparkContext.parallelize(Seq(list_cols)).map(l => Row(l:_*))
val list_df = spark.createDataFrame(rdd_list_cols, non_key_cols_df.schema)

val new_df = key_col_df.crossJoin(list_df)

This approach was good when I only have string type columns in the old_df. But I have some columns of double type and int type which is throwing error because the rdd is a list of null strings.

To avoid this I tried the list_df as an empty dataframe with schema as the non_key_cols_df but the result of crossJoin is an empty dataframe which I believe is because one dataframe is empty.

My requirement is to have the non_key_cols as a single row dataframe with Nulls so that I can do crossJoin with key_col_df and form the required new_df.

Also any other easier way to update all columns except key columns of a dataframe to nulls will resolve my issue. Thanks in advance

Upvotes: 1

Views: 4297

Answers (2)

pasha701
pasha701

Reputation: 7207

Shaido answer has small drawback - column type will be lost. Can be fixed with schema usage, like this:

val nonKeyCols = df.schema.fields.filterNot(f => keyCols.contains(f.name))
val df2 = nonKeyCols.foldLeft(df)((df, c) => df.withColumn(c.name, lit(null).cast(c.dataType)))

Upvotes: 0

Shaido
Shaido

Reputation: 28422

crossJoin is an expensive operation so you want to avoid it if possible. An easier solution would be to iterate over all non-key columns and insert null with lit(null). Using foldLeft this can be done as follows:

val keyCols = List("id", "key_number")
val nonKeyCols = df.columns.filterNot(keyCols.contains(_))

val df2 = nonKeyCols.foldLeft(df)((df, c) => df.withColumn(c, lit(null)))

Input example:

+---+----------+---+----+
| id|key_number|  c|   d|
+---+----------+---+----+
|  1|         2|  3| 4.0|
|  5|         6|  7| 8.0|
|  9|        10| 11|12.0|
+---+----------+---+----+

will give:

+---+----------+----+----+
| id|key_number|   c|   d|
+---+----------+----+----+
|  1|         2|null|null|
|  5|         6|null|null|
|  9|        10|null|null|
+---+----------+----+----+

Upvotes: 2

Related Questions