Reputation: 842
I have some data with products (DF), however some don't have a description. I have an excel file with the description of some (loaded as Map). Now I would like to fill the missing values in DF with those of Map and the rows that already have a description keep them untouched using Pyspark.
DF
Id | Desc
01 | 'desc1'
02 | null
03 | 'desc3'
04 | null
Map
Key | Value
2 | 'desc2'
4 | 'desc4'
Output
Id | Desc
1 | 'desc1'
2 | 'desc2'
3 | 'desc3'
4 | 'desc4'
Thanks in advance
Upvotes: 0
Views: 1963
Reputation: 974
We can divide DF
into two dataframes, operate on them separately, and then union them:
val df = Seq(
(1, "desc1"),
(2, null),
(3, "desc3"),
(4, null)
).toDF("Id", "Desc")
val Map = Seq(
(2, "desc2"),
(4, "desc4")
).toDF("Key", "Value")
val nullDF = df.where(df("Desc").isNull)
val nonNullDF = df.where(df("Desc").isNotNull)
val joinedWithKeyDF = nullDF.drop("Desc").join(Map, nullDF("Id")===Map("Key")).withColumnRenamed("Value", "Desc").drop("Key")
val outputDF = joinedWithKeyDF.union(nonNullDF)
Upvotes: 0
Reputation: 130
It's hard to know the cardinality of the datasets that you've provided... some examples of how that might change a solution here are:
I've made some assumptions so that you can determine for yourself what is the right approach here:
I'm using PySpark here:
DF = DF.na.drop() # we'll eliminate the missing values from the parent dataframe
DF_Output = DF.join(Map, on = "ID", how = 'outer')
Upvotes: 0
Reputation: 666
In PySpark, with the help of an UDF:
schema = StructType([StructField("Index", IntegerType(), True),
StructField("Desc", StringType(), True)])
DF = sc.parallelize([(1, "desc1"), (2,None), (3,"desc3"), (4, None)]).toDF(schema)
myMap = {
2: "desc2",
4 : "desc4"
}
myMapBroadcasted = sc.broadcast(myMap)
@udf(StringType())
def fillNone(Index, Desc):
if Desc is None:
if Index in myMapBroadcasted.value:
return myMapBroadcasted.value[Index]
return Desc
DF.withColumn('Desc', fillNone(col('Index'), col('Desc'))).show()
Upvotes: 0
Reputation: 2495
You'll want to make sure the DF.Id
field and the Map.Key
field are the same type/values (currently, they don't look like it with the leading 0
), then do a left join, and then select the desired columns with a coalesce()
. My pySpark is a bit rusty, so I'll provide the solution in scala. The logic should be the same.
val df = Seq(
(1, "desc1"),
(2, null),
(3, "desc3"),
(4, null)
).toDF("Id", "Desc")
val map = Seq(
(2, "desc2"),
(4, "desc4")
).toDF("Key", "Value")
df.show()
map.show()
df.join(map, df("Id") === map("Key"), "left")
.select(
df("Id"),
coalesce(df("Desc"), $"Value").as("Desc")
)
.show()
Yields:
+---+-----+
| Id| Desc|
+---+-----+
| 1|desc1|
| 2| null|
| 3|desc3|
| 4| null|
+---+-----+
+---+-----+
|Key|Value|
+---+-----+
| 2|desc2|
| 4|desc4|
+---+-----+
+---+-----+
| Id| Desc|
+---+-----+
| 1|desc1|
| 2|desc2|
| 3|desc3|
| 4|desc4|
+---+-----+
Upvotes: 1