Daniel Zapata
Daniel Zapata

Reputation: 842

Replace null values with other Dataframe in PySpark

I have some data with products (DF), however some don't have a description. I have an excel file with the description of some (loaded as Map). Now I would like to fill the missing values in DF with those of Map and the rows that already have a description keep them untouched using Pyspark.

DF
Id | Desc
01 | 'desc1'
02 | null
03 | 'desc3'
04 | null

Map
Key | Value
2   | 'desc2'
4   | 'desc4'

Output
Id | Desc
1  | 'desc1'
2  | 'desc2'
3  | 'desc3'
4  | 'desc4'

Thanks in advance

Upvotes: 0

Views: 1963

Answers (4)

Amita
Amita

Reputation: 974

We can divide DF into two dataframes, operate on them separately, and then union them:

val df = Seq(
    (1, "desc1"),
    (2, null),
    (3, "desc3"),
    (4, null)
).toDF("Id", "Desc")

val Map = Seq(
    (2, "desc2"),
    (4, "desc4")
).toDF("Key", "Value")

val nullDF = df.where(df("Desc").isNull)
val nonNullDF = df.where(df("Desc").isNotNull)

val joinedWithKeyDF = nullDF.drop("Desc").join(Map, nullDF("Id")===Map("Key")).withColumnRenamed("Value", "Desc").drop("Key")

val outputDF = joinedWithKeyDF.union(nonNullDF)

Upvotes: 0

shadow_dev
shadow_dev

Reputation: 130

It's hard to know the cardinality of the datasets that you've provided... some examples of how that might change a solution here are:

  1. If "DF" and "Map" have overlapping Desc... how should we prioritize which table has the "right" description?
  2. Does the final dataframe that you are looking to create need to be fully inclusive of a list of ID's or descriptions? Do either of these dataframes have the full list? This could also change the solution.

I've made some assumptions so that you can determine for yourself what is the right approach here:

  • I'm assuming that "DF" contains the whole list of IDs
  • I'm assuming that "Map" only has a subset of IDs and is not wholly inclusive of the broader set of IDs that exist within "DF"

I'm using PySpark here:

DF = DF.na.drop() # we'll eliminate the missing values from the parent dataframe
DF_Output = DF.join(Map, on = "ID", how = 'outer')

Upvotes: 0

LizardKing
LizardKing

Reputation: 666

In PySpark, with the help of an UDF:

schema = StructType([StructField("Index", IntegerType(), True),
                    StructField("Desc", StringType(), True)])

DF = sc.parallelize([(1, "desc1"), (2,None), (3,"desc3"), (4, None)]).toDF(schema)

myMap = {
      2: "desc2",
      4 : "desc4"
    }

myMapBroadcasted = sc.broadcast(myMap)

@udf(StringType())
def fillNone(Index, Desc):
  if Desc is None:
    if Index in myMapBroadcasted.value:
      return myMapBroadcasted.value[Index]
  return Desc

DF.withColumn('Desc', fillNone(col('Index'), col('Desc'))).show()

Upvotes: 0

Travis Hegner
Travis Hegner

Reputation: 2495

You'll want to make sure the DF.Id field and the Map.Key field are the same type/values (currently, they don't look like it with the leading 0), then do a left join, and then select the desired columns with a coalesce(). My pySpark is a bit rusty, so I'll provide the solution in scala. The logic should be the same.

val df = Seq(
    (1, "desc1"),
    (2, null),
    (3, "desc3"),
    (4, null)
).toDF("Id", "Desc")

val map = Seq(
    (2, "desc2"),
    (4, "desc4")
).toDF("Key", "Value")

df.show()
map.show()

df.join(map, df("Id") === map("Key"), "left")
  .select(
      df("Id"),
      coalesce(df("Desc"), $"Value").as("Desc")
      )
  .show()

Yields:

+---+-----+
| Id| Desc|
+---+-----+
|  1|desc1|
|  2| null|
|  3|desc3|
|  4| null|
+---+-----+

+---+-----+
|Key|Value|
+---+-----+
|  2|desc2|
|  4|desc4|
+---+-----+

+---+-----+
| Id| Desc|
+---+-----+
|  1|desc1|
|  2|desc2|
|  3|desc3|
|  4|desc4|
+---+-----+

Upvotes: 1

Related Questions