rania miladi
rania miladi

Reputation: 35

Optimizing "withColumn when otherwise" performance in pyspark

I work on project with pyspark on databricks . I have a part of code (below) that reformat a string based on a date (french).

The existing code, besides from being verbose, is causing some performance issues like :

Only csv files are used in this projet (for read and write). No database is used.

I'm trying to handle the formatting task in a better way to avoid the performace and memory issues. Any suggestion?

Thanks a lot !

courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2020","XXX0120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2020","XXX0220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2020","XXX0320").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2020","XXX0420").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2020","XXX0520").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2020","XXX0620").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2020","XXX0720").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2020","XXX0820").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2020","XXX0920").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2020","XXX1020").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2020","XXX1120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2020","XXX1220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2021","XXX0121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2021","XXX0221").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2021","XXX0321").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2021","XXX0421").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2021","XXX0521").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2021","XXX0621").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2021","XXX0721").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2021","XXX0821").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2021","XXX0921").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2021","XXX1021").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2021","XXX1121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2021","XXX1221").otherwise(courriers["Vague"]))

Upvotes: 2

Views: 3666

Answers (2)

Axel R.
Axel R.

Reputation: 1300

Another solution could be to leverage the mapType

from pyspark.sql.functions import col, create_map, lit,split,concat
from itertools import chain
df = spark.createDataFrame([["XXX Fevrier 2021"], ["XXX Aout 2021"]], 
   schema="Vague string")

# Create a dict only for the given months
mapping = {
    "Janvier":"01",
    "Fevrier": "02",
    "Mars": "03",
    "Avril": "04",
    "Mai": "05",
    "Juin": "06",
    "Juillet": "07",
    "Aout": "08",
    "Septembre": "09",
    "Octobre": "10",
    "Novembre": "11",
    "Decembre": "12"}

# Create the mapping
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

res = (
  df.withColumn("value", concat(
      split(col("Vague"),' ')[0] 
    , mapping_expr.getItem(split(col("Vague"),' ')[1])
    , concat(split(col("Vague"),' ')[2][3:4])))
)

res.show()

which provides the expected result

+----------------+-------+
|           Vague|  value|
+----------------+-------+
|XXX Fevrier 2021|XXX0221|
|   XXX Aout 2021|XXX0821|
+----------------+-------+

Upvotes: 1

Alex Ott
Alex Ott

Reputation: 87154

It's much easier to programmatically generate full condition, instead of applying it one by one. The withColumn is well known for its bad performance when there is a big number of its usage.

The simplest way will be to define a mapping and generate condition from it, like this:

dates = {"XXX Janvier 2020":"XXX0120",
         "XXX Fevrier 2020":"XXX0220",
         "XXX Mars 2020":"XXX0320",
         "XXX Avril 2020":"XXX0420",
         "XXX Mai 2020":"XXX0520",
         "XXX Juin 2020":"XXX0620",
         "XXX Juillet 2020":"XXX0720",
         "XXX Aout 2020":"XXX0820",
         "XXX Septembre 2020":"XXX0920",
         "XXX Octobre 2020":"XXX1020",
         "XXX Novembre 2020":"XXX1120",
         "XXX Decembre 2020":"XXX1220",
         "XXX Janvier 2021":"XXX0121",
         "XXX Fevrier 2021":"XXX0221",
         "XXX Mars 2021":"XXX0321",
         "XXX Avril 2021":"XXX0421",
         "XXX Mai 2021":"XXX0521",
         "XXX Juin 2021":"XXX0621",
         "XXX Juillet 2021":"XXX0721",
         "XXX Aout 2021":"XXX0821",
         "XXX Septembre 2021":"XXX0921",
         "XXX Octobre 2021":"XXX1021",
         "XXX Novembre 2021":"XXX1121",
         "XXX Decembre 2021":"XXX1221"
         }

and from it we can generate our condition for all possible values:

import pyspark.sql.functions as F
cl = None
for k,v in dates.items():
    if cl is None:
        cl = F.when(F.col("Vague") == k, F.lit(v))
    else:
        cl = cl.when(F.col("Vague") == k, F.lit(v))

cl = cl.otherwise(F.col("Vague")).alias("Vague")

and it could be used as following:

df = spark.createDataFrame([["XXX Fevrier 2021"], ["22332"]], 
   schema="Vague string")
df.select(cl).show()

giving us expected result:

+-------+
|  Vague|
+-------+
|XXX0221|
|  22332|
+-------+

Ideally, it could be generalized to work with any year, by using regular expressions, like this:

dates = {"XXX Janvier 20(\d{2})":"XXX01$1",
         "XXX Fevrier 20(\d{2})":"XXX02$1",
         "XXX Mars 20(\d{2})":"XXX03$1",
         "XXX Avril 20(\d{2})":"XXX04$1",
         "XXX Mai 20(\d{2})":"XXX05$1",
         "XXX Juin 20(\d{2})":"XXX06$1",
         "XXX Juillet 20(\d{2})":"XXX07$1",
         "XXX Aout 20(\d{2})":"XXX08$1",
         "XXX Septembre 20(\d{2})":"XXX09$1",
         "XXX Octobre 20(\d{2})":"XXX10$1",
         "XXX Novembre 20(\d{2})":"XXX11$1",
         "XXX Decembre 20(\d{2})":"XXX12$1",
         }

cl = None
for k,v in dates.items():
    if cl is None:
        cl = F.regexp_replace(F.col("Vague"), k, v)
    else:
        cl = F.regexp_replace(cl, k, v)

cl = cl.alias("Vague")

and it will give the same result, but will work with any year in 21st century

Upvotes: 2

Related Questions