Reputation: 96
New to pyspark. I am trying to read the csv file from datalake blob using pyspark with user-specified schema structure type. Below is the code I tried.
from pyspark.sql.types import *
customschema = StructType([
StructField("A", StringType(), True)
,StructField("B", DoubleType(), True)
,StructField("C", TimestampType(), True)
])
df_1 = spark.read.format("csv").options(header="true", schema=customschema, multiline="true", enforceSchema='true').load(destinationPath)
df_1.show()
Out:
+---------+------+--------------------+
| A| B| C|
+---------+------+--------------------+
|322849691|9547.0|2020-09-24 07:30:...|
|322847371| 492.0|2020-09-23 13:15:...|
|322329853|6661.0|2020-09-07 09:45:...|
|322283810| 500.0|2020-09-04 13:12:...|
|322319107| 251.0|2020-09-02 13:51:...|
|322319096| 254.0|2020-09-02 13:51:...|
+---------+------+--------------------+
But I got the field type as String instead. I am not quite sure what I have done wrong.
df_1.printSchema()
Out:
root
|-- A: string (nullable = true)
|-- B: string (nullable = true)
|-- C: string (nullable = true)
Upvotes: 0
Views: 9552
Reputation: 32640
When you use DataFrameReader load
method you should pass the schema using schema
and not in the options :
df_1 = spark.read.format("csv") \
.options(header="true", multiline="true")\
.schema(customschema).load(destinationPath)
That's not the same as the API method spark.read.csv
which accepts schema
as an argument :
df_1 = spark.read.csv(destinationPath, schema=customschema, header=True)
Upvotes: 2
Reputation: 1238
It works with the following syntax
customschema=StructType([
StructField("A",StringType(), True),
StructField("B",DoubleType(), True),
StructField("C",TimestampType(), True)
])
df = spark.read.csv("test.csv", header=True, sep=";", schema=customschema)
df.show()
df.printSchema()
or you can also use
df = spark.read.load("test.csv",format="csv", sep=";", schema=customschema, header="true")
It is interesting that the read().option().load() syntax does not work for me either. I am not sure if it works at all. At least according to the documentation .options() is only used for write(), so to export a dataframe.
Another option would be to cast the datatypes afterwards
import pyspark.sql.functions as f
df=(df
.withColumn("B",f.col("B").cast("string"))
.withColumn("B",f.col("B").cast("double"))
.withColumn("C",f.col("C").cast("timestamp"))
)
Upvotes: 1