cph_sto
cph_sto

Reputation: 7607

Creating a DataFrame from RDD while specifying DateType() in schema

I am creating a DataFrame from RDD and one of the value is a date. I don't know how to specify DateType() in schema.

Let me illustrate the problem at hand -

One way we can load the date into the DataFrame is by first specifying it as string and converting it to proper date using to_date() function.

from pyspark.sql.types import Row, StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col, to_date
values=sc.parallelize([(3,'2012-02-02'),(5,'2018-08-08')])
rdd= values.map(lambda t: Row(A=t[0],date=t[1]))

# Importing date as String in Schema
schema = StructType([StructField('A', IntegerType(), True), StructField('date', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# Finally converting the string into date using to_date() function.
df = df.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
df.show()
+---+----------+
|  A|      date|
+---+----------+
|  3|2012-02-02|
|  5|2018-08-08|
+---+----------+

df.printSchema()
root
 |-- A: integer (nullable = true)
 |-- date: date (nullable = true)

Is there a way, where we could use DateType() in the schema and avoid having to convert string to date explicitly?

Something like this -

values=sc.parallelize([(3,'2012-02-02'),(5,'2018-08-08')])
rdd= values.map(lambda t: Row(A=t[0],date=t[1]))
# Somewhere we would need to specify date format 'yyyy-MM-dd' too, don't know where though.
schema = StructType([StructField('A', DateType(), True), StructField('date', DateType(), True)])

UPDATE: As suggested by @user10465355, following code works -

import datetime
schema = StructType([
  StructField('A', IntegerType(), True),
  StructField('date', DateType(), True)
])
rdd= values.map(lambda t: Row(A=t[0],date=datetime.datetime.strptime(t[1], "%Y-%m-%d")))
df = sqlContext.createDataFrame(rdd, schema)
df.show()
+---+----------+
|  A|      date|
+---+----------+
|  3|2012-02-02|
|  5|2018-08-08|
+---+----------+
df.printSchema()
root
 |-- A: integer (nullable = true)
 |-- date: date (nullable = true)

Upvotes: 8

Views: 7828

Answers (1)

10465355
10465355

Reputation: 4631

Long story short, schema used with RDD of external object is not intended to be used that way - declared types should reflect the actual state of the data, not the desired one.

In other words to allow:

schema = StructType([
  StructField('A', IntegerType(), True),
  StructField('date', DateType(), True)
])

the data corresponding to date field should use datetime.date. So for example with your RDD[Tuple[int, str]]:

import datetime

spark.createDataFrame(
    # Since values from the question are just two element tuples
    # we can use mapValues to transform the "value"
    # but in general case you'll need map
    values.mapValues(datetime.date.fromisoformat),
    schema
)

The closest you can get to desired behavior is to convert data (RDD[Row]) with JSON reader, using dicts

from pyspark.sql import Row

spark.read.schema(schema).json(rdd.map(Row.asDict))

or better explicit JSON dumps:

import json
spark.read.schema(schema).json(rdd.map(Row.asDict).map(json.dumps))

but that's of course much more expensive than explicit casting, which BTW, is easy to automate in simple cases like the one you describe:

from pyspark.sql.functions import col

(spark
    .createDataFrame(values, ("a", "date"))
    .select([col(f.name).cast(f.dataType) for f in schema]))

Upvotes: 7

Related Questions