Reputation: 35
I work on project with pyspark on databricks . I have a part of code (below) that reformat a string based on a date (french).
The existing code, besides from being verbose, is causing some performance issues like :
Only csv files are used in this projet (for read and write). No database is used.
I'm trying to handle the formatting task in a better way to avoid the performace and memory issues. Any suggestion?
Thanks a lot !
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2020","XXX0120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2020","XXX0220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2020","XXX0320").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2020","XXX0420").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2020","XXX0520").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2020","XXX0620").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2020","XXX0720").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2020","XXX0820").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2020","XXX0920").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2020","XXX1020").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2020","XXX1120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2020","XXX1220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2021","XXX0121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2021","XXX0221").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2021","XXX0321").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2021","XXX0421").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2021","XXX0521").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2021","XXX0621").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2021","XXX0721").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2021","XXX0821").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2021","XXX0921").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2021","XXX1021").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2021","XXX1121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2021","XXX1221").otherwise(courriers["Vague"]))
Upvotes: 2
Views: 3666
Reputation: 1300
Another solution could be to leverage the mapType
from pyspark.sql.functions import col, create_map, lit,split,concat
from itertools import chain
df = spark.createDataFrame([["XXX Fevrier 2021"], ["XXX Aout 2021"]],
schema="Vague string")
# Create a dict only for the given months
mapping = {
"Janvier":"01",
"Fevrier": "02",
"Mars": "03",
"Avril": "04",
"Mai": "05",
"Juin": "06",
"Juillet": "07",
"Aout": "08",
"Septembre": "09",
"Octobre": "10",
"Novembre": "11",
"Decembre": "12"}
# Create the mapping
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
res = (
df.withColumn("value", concat(
split(col("Vague"),' ')[0]
, mapping_expr.getItem(split(col("Vague"),' ')[1])
, concat(split(col("Vague"),' ')[2][3:4])))
)
res.show()
which provides the expected result
+----------------+-------+
| Vague| value|
+----------------+-------+
|XXX Fevrier 2021|XXX0221|
| XXX Aout 2021|XXX0821|
+----------------+-------+
Upvotes: 1
Reputation: 87154
It's much easier to programmatically generate full condition, instead of applying it one by one. The withColumn
is well known for its bad performance when there is a big number of its usage.
The simplest way will be to define a mapping and generate condition from it, like this:
dates = {"XXX Janvier 2020":"XXX0120",
"XXX Fevrier 2020":"XXX0220",
"XXX Mars 2020":"XXX0320",
"XXX Avril 2020":"XXX0420",
"XXX Mai 2020":"XXX0520",
"XXX Juin 2020":"XXX0620",
"XXX Juillet 2020":"XXX0720",
"XXX Aout 2020":"XXX0820",
"XXX Septembre 2020":"XXX0920",
"XXX Octobre 2020":"XXX1020",
"XXX Novembre 2020":"XXX1120",
"XXX Decembre 2020":"XXX1220",
"XXX Janvier 2021":"XXX0121",
"XXX Fevrier 2021":"XXX0221",
"XXX Mars 2021":"XXX0321",
"XXX Avril 2021":"XXX0421",
"XXX Mai 2021":"XXX0521",
"XXX Juin 2021":"XXX0621",
"XXX Juillet 2021":"XXX0721",
"XXX Aout 2021":"XXX0821",
"XXX Septembre 2021":"XXX0921",
"XXX Octobre 2021":"XXX1021",
"XXX Novembre 2021":"XXX1121",
"XXX Decembre 2021":"XXX1221"
}
and from it we can generate our condition for all possible values:
import pyspark.sql.functions as F
cl = None
for k,v in dates.items():
if cl is None:
cl = F.when(F.col("Vague") == k, F.lit(v))
else:
cl = cl.when(F.col("Vague") == k, F.lit(v))
cl = cl.otherwise(F.col("Vague")).alias("Vague")
and it could be used as following:
df = spark.createDataFrame([["XXX Fevrier 2021"], ["22332"]],
schema="Vague string")
df.select(cl).show()
giving us expected result:
+-------+
| Vague|
+-------+
|XXX0221|
| 22332|
+-------+
Ideally, it could be generalized to work with any year, by using regular expressions, like this:
dates = {"XXX Janvier 20(\d{2})":"XXX01$1",
"XXX Fevrier 20(\d{2})":"XXX02$1",
"XXX Mars 20(\d{2})":"XXX03$1",
"XXX Avril 20(\d{2})":"XXX04$1",
"XXX Mai 20(\d{2})":"XXX05$1",
"XXX Juin 20(\d{2})":"XXX06$1",
"XXX Juillet 20(\d{2})":"XXX07$1",
"XXX Aout 20(\d{2})":"XXX08$1",
"XXX Septembre 20(\d{2})":"XXX09$1",
"XXX Octobre 20(\d{2})":"XXX10$1",
"XXX Novembre 20(\d{2})":"XXX11$1",
"XXX Decembre 20(\d{2})":"XXX12$1",
}
cl = None
for k,v in dates.items():
if cl is None:
cl = F.regexp_replace(F.col("Vague"), k, v)
else:
cl = F.regexp_replace(cl, k, v)
cl = cl.alias("Vague")
and it will give the same result, but will work with any year in 21st century
Upvotes: 2