kevin hiew
kevin hiew

Reputation: 96

pyspark read csv with user specified schema - returned all StringType

New to pyspark. I am trying to read the csv file from datalake blob using pyspark with user-specified schema structure type. Below is the code I tried.

from pyspark.sql.types import *

customschema = StructType([
StructField("A", StringType(), True)
,StructField("B", DoubleType(), True)
,StructField("C", TimestampType(), True)
])

df_1 = spark.read.format("csv").options(header="true", schema=customschema, multiline="true", enforceSchema='true').load(destinationPath)

df_1.show()

Out:

+---------+------+--------------------+
|        A|     B|                   C|
+---------+------+--------------------+
|322849691|9547.0|2020-09-24 07:30:...|
|322847371| 492.0|2020-09-23 13:15:...|
|322329853|6661.0|2020-09-07 09:45:...|
|322283810| 500.0|2020-09-04 13:12:...|
|322319107| 251.0|2020-09-02 13:51:...|
|322319096| 254.0|2020-09-02 13:51:...|
+---------+------+--------------------+

But I got the field type as String instead. I am not quite sure what I have done wrong.

df_1.printSchema()

Out:

root
 |-- A: string (nullable = true)
 |-- B: string (nullable = true)
 |-- C: string (nullable = true)

Upvotes: 0

Views: 9552

Answers (2)

blackbishop
blackbishop

Reputation: 32640

When you use DataFrameReader load method you should pass the schema using schema and not in the options :

df_1 = spark.read.format("csv") \
    .options(header="true", multiline="true")\
    .schema(customschema).load(destinationPath)

That's not the same as the API method spark.read.csv which accepts schema as an argument :

df_1 = spark.read.csv(destinationPath, schema=customschema, header=True)

Upvotes: 2

Alex Ortner
Alex Ortner

Reputation: 1238

It works with the following syntax

customschema=StructType([
    StructField("A",StringType(), True),
    StructField("B",DoubleType(), True),
    StructField("C",TimestampType(), True)
])

df = spark.read.csv("test.csv", header=True, sep=";", schema=customschema)

df.show()
df.printSchema()

or you can also use

df = spark.read.load("test.csv",format="csv", sep=";", schema=customschema, header="true")

It is interesting that the read().option().load() syntax does not work for me either. I am not sure if it works at all. At least according to the documentation .options() is only used for write(), so to export a dataframe.

Another option would be to cast the datatypes afterwards

import pyspark.sql.functions as f

df=(df
      .withColumn("B",f.col("B").cast("string"))
      .withColumn("B",f.col("B").cast("double"))
      .withColumn("C",f.col("C").cast("timestamp"))
     )

Upvotes: 1

Related Questions