Reputation: 7607
I am creating a DataFrame from RDD and one of the value is a date
. I don't know how to specify DateType()
in schema.
Let me illustrate the problem at hand -
One way we can load the date
into the DataFrame is by first specifying it as string and converting it to proper date
using to_date() function.
from pyspark.sql.types import Row, StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col, to_date
values=sc.parallelize([(3,'2012-02-02'),(5,'2018-08-08')])
rdd= values.map(lambda t: Row(A=t[0],date=t[1]))
# Importing date as String in Schema
schema = StructType([StructField('A', IntegerType(), True), StructField('date', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# Finally converting the string into date using to_date() function.
df = df.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
df.show()
+---+----------+
| A| date|
+---+----------+
| 3|2012-02-02|
| 5|2018-08-08|
+---+----------+
df.printSchema()
root
|-- A: integer (nullable = true)
|-- date: date (nullable = true)
Is there a way, where we could use DateType()
in the schema
and avoid having to convert string
to date
explicitly?
Something like this -
values=sc.parallelize([(3,'2012-02-02'),(5,'2018-08-08')])
rdd= values.map(lambda t: Row(A=t[0],date=t[1]))
# Somewhere we would need to specify date format 'yyyy-MM-dd' too, don't know where though.
schema = StructType([StructField('A', DateType(), True), StructField('date', DateType(), True)])
UPDATE: As suggested by @user10465355, following code works -
import datetime
schema = StructType([
StructField('A', IntegerType(), True),
StructField('date', DateType(), True)
])
rdd= values.map(lambda t: Row(A=t[0],date=datetime.datetime.strptime(t[1], "%Y-%m-%d")))
df = sqlContext.createDataFrame(rdd, schema)
df.show()
+---+----------+
| A| date|
+---+----------+
| 3|2012-02-02|
| 5|2018-08-08|
+---+----------+
df.printSchema()
root
|-- A: integer (nullable = true)
|-- date: date (nullable = true)
Upvotes: 8
Views: 7828
Reputation: 4631
Long story short, schema used with RDD
of external object is not intended to be used that way - declared types should reflect the actual state of the data, not the desired one.
In other words to allow:
schema = StructType([
StructField('A', IntegerType(), True),
StructField('date', DateType(), True)
])
the data corresponding to date
field should use datetime.date
. So for example with your RDD[Tuple[int, str]]
:
import datetime
spark.createDataFrame(
# Since values from the question are just two element tuples
# we can use mapValues to transform the "value"
# but in general case you'll need map
values.mapValues(datetime.date.fromisoformat),
schema
)
The closest you can get to desired behavior is to convert data (RDD[Row]
) with JSON reader, using dicts
from pyspark.sql import Row
spark.read.schema(schema).json(rdd.map(Row.asDict))
or better explicit JSON dumps:
import json
spark.read.schema(schema).json(rdd.map(Row.asDict).map(json.dumps))
but that's of course much more expensive than explicit casting, which BTW, is easy to automate in simple cases like the one you describe:
from pyspark.sql.functions import col
(spark
.createDataFrame(values, ("a", "date"))
.select([col(f.name).cast(f.dataType) for f in schema]))
Upvotes: 7