Mansoor
Mansoor

Reputation: 1259

sparklyr exception while reading csv file by infering schema : double

I am trying to read csv into Spark using spark_read_csv function. I am getting an exception while inferring the schema i.e I get exception when I set infer_schema=TRUE.

spark_read_csv(sc,"myDf",DatasetUrl)

I am getting the below exception:

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 151, localhost): java.text.ParseException: Unparseable number: "cr1_fd_dttm" at java.text.NumberFormat.parse(NumberFormat.java:385) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:259)

However, when I try by setting infer_schema=FALSE, as expected everything is read as chr type.

This is how data looks like in the cr1_fd_dttm column:

      cr1_fd_dttm
            <chr>
1             0.0
2   1.45679112E12
3   1.45679166E12
4   1.45679154E12
5   1.45679274E12
6             0.0
7             0.0
8             0.0
9             0.0
10  1.45679118E12

Can someone help me in this?

Thanks

Upvotes: 3

Views: 453

Answers (1)

edgararuiz
edgararuiz

Reputation: 675

I just read the file without bringing it immediately into memory, coerce the fields numeric, and then load those results into memory. So the keys are to set memory to FALSE, infer_schema to FALSE, pass a list of the columns, coerce and then use compute() to save the results into Spark memory. Here's a long winded, but working example:

mapped_flights <- spark_read_csv(sc, "mapped_flights", 
                      path =  "s3a://flights-data/full", 
                      memory = FALSE, 
                      infer_schema = FALSE,
                      columns = list(
                        Year = "character",
                        Month = "character",
                        DayofMonth = "character",
                        DayOfWeek = "character",
                        DepTime = "character",
                        CRSDepTime = "character",
                        ArrTime = "character",
                        CRSArrTime = "character",
                        UniqueCarrier = "character",
                        FlightNum = "character",
                        TailNum = "character",
                        ActualElapsedTime = "character",
                        CRSElapsedTime = "character",
                        AirTime = "character",
                        ArrDelay = "character",
                        DepDelay = "character",
                        Origin = "character",
                        Dest = "character",
                        Distance = "character",
                        TaxiIn = "character",
                        TaxiOut = "character",
                        Cancelled = "character",
                        CancellationCode = "character",
                        Diverted = "character",
                        CarrierDelay = "character",
                        WeatherDelay = "character",
                        NASDelay = "character",
                        SecurityDelay = "character",
                        LateAircraftDelay = "character")
                      )


flights <- mapped_flights %>%   mutate(
Year = as.integer(Year),
Month = as.integer(Month),
DayofMonth = as.integer(DayofMonth),
DayOfWeek = as.integer(DayOfWeek),
DepTime = as.integer(DepTime),
CRSDepTime = as.integer(CRSDepTime),
CRSArrTime = as.integer(CRSArrTime),
ArrTime = as.integer(ArrTime),
ActualElapsedTime = as.integer(ActualElapsedTime),
CRSElapsedTime = as.integer(CRSElapsedTime),
AirTime = as.integer(AirTime),
ArrDelay = as.double(ArrDelay),
DepDelay = as.double(DepDelay),
Distance = as.integer(Distance),
TaxiIn = as.integer(TaxiIn),
TaxiOut = as.integer(TaxiOut),
Cancelled = as.integer(Cancelled),
Diverted = as.integer(Diverted),
CarrierDelay = as.integer(CarrierDelay),
WeatherDelay = as.integer(WeatherDelay),
NASDelay = as.integer(NASDelay),
SecurityDelay = as.integer(SecurityDelay),
LateAircraftDelay = as.integer(LateAircraftDelay)) %>%   compute("flights")

Upvotes: 1

Related Questions