mrammah
mrammah

Reputation: 225

Convert Column of ArrayType(StringType()) to ArrayType(DateType()) in PySpark

I have a dataframe like the following that I want to convert to ISO-8601:

|     production_date        |       expiration_date         |
--------------------------------------------------------------
|["20/05/1996","01/01/2018"] | ["15/01/1997","27/03/2019"]   |
| ....                         ....                          |
--------------------------------------------------------------

I want:

|     good_prod_date        |       good_exp_date         |
-------------------------------------------------------------
|[1996-05-20,2018-01-01]    | [1997-01-01,2019-03-27]     |
| ....                         ....                       |
-------------------------------------------------------------

However, there are over 20 columns and millions of rows. Im trying to avoid using UDFs since they are inefficient and, most of the time, a poor approach. Im also avoiding exploding each column because that is:

  1. Inefficient (hundreds of millions of rows are unnecessarily created)
  2. Not an elegant solution
  3. I tried that and it doesn't work

So far I have the following:

def explodeCols(df):
  return (df
          .withColumn("production_date", sf.explode("production_date"))
          .withColumn("expiration_date", sf.explode("expiration_date")))

def fixTypes(df):
  return (df
          .withColumn("production_date", sf.to_date("production_date", "dd/MM/yyyy"))
          .withColumn("expiration_date", sf.to_date("expiration_date", "dd/MM/yyyy")))

def consolidate(df):
  cols = ["production_date", "expiration_date"]
  return df.groupBy("id").agg(*[sf.collect_list(c) for c in cols])

historyDF = (df
             .transform(explodeCols)
             .transform(fixTypes)
             .transform(consolidate))

However when I run this code on DataBricks, the jobs never execute, in fact, it results in failed/dead executors (which isn't good).

Another solution I tried is the following:

df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))

But the result I get is an array of nulls:

|     production_date        |       good_prod_date         |
-------------------------------------------------------------
|["20/05/1996","01/01/2018"] | [null,null]                  |
| ....                         ....                         |
-------------------------------------------------------------

Upvotes: 1

Views: 1944

Answers (1)

s.polam
s.polam

Reputation: 10382

Use pyspark.sql.function.transform higher order function instead of explode function, to transform each value in array.

df
.withColumn("production_date",F.expr("transform(production_date,v -> to_date(v,'dd/MM/yyyy'))"))
.withColumn("expiration_date",F.expr("transform(expiration_date,v -> to_date(v,'dd/MM/yyyy'))"))
.show()
df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))

This will not work because production_date has different date format, if this column has date format like yyyy-MM-dd casting will work.

df.select("actual_date").printSchema()
root
 |-- actual_date: array (nullable = true)
 |    |-- element: string (containsNull = true)

df.select("actual_date").show(false)
+------------------------+
|actual_date             |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+
df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).printSchema()
root
 |-- actual_date: array (nullable = true)
 |    |-- element: date (containsNull = true)

df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).show()
+------------------------+
|actual_date             |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+

Upvotes: 1

Related Questions