Reputation: 1259
I am trying to read csv into Spark using spark_read_csv
function. I am getting an exception while inferring the schema i.e I get exception when I set infer_schema=TRUE
.
spark_read_csv(sc,"myDf",DatasetUrl)
I am getting the below exception:
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 151, localhost): java.text.ParseException: Unparseable number: "cr1_fd_dttm" at java.text.NumberFormat.parse(NumberFormat.java:385) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:259)
However, when I try by setting infer_schema=FALSE
, as expected everything is read as chr
type.
This is how data looks like in the cr1_fd_dttm
column:
cr1_fd_dttm
<chr>
1 0.0
2 1.45679112E12
3 1.45679166E12
4 1.45679154E12
5 1.45679274E12
6 0.0
7 0.0
8 0.0
9 0.0
10 1.45679118E12
Can someone help me in this?
Thanks
Upvotes: 3
Views: 453
Reputation: 675
I just read the file without bringing it immediately into memory, coerce the fields numeric, and then load those results into memory. So the keys are to set memory
to FALSE, infer_schema
to FALSE, pass a list of the columns, coerce and then use compute()
to save the results into Spark memory. Here's a long winded, but working example:
mapped_flights <- spark_read_csv(sc, "mapped_flights",
path = "s3a://flights-data/full",
memory = FALSE,
infer_schema = FALSE,
columns = list(
Year = "character",
Month = "character",
DayofMonth = "character",
DayOfWeek = "character",
DepTime = "character",
CRSDepTime = "character",
ArrTime = "character",
CRSArrTime = "character",
UniqueCarrier = "character",
FlightNum = "character",
TailNum = "character",
ActualElapsedTime = "character",
CRSElapsedTime = "character",
AirTime = "character",
ArrDelay = "character",
DepDelay = "character",
Origin = "character",
Dest = "character",
Distance = "character",
TaxiIn = "character",
TaxiOut = "character",
Cancelled = "character",
CancellationCode = "character",
Diverted = "character",
CarrierDelay = "character",
WeatherDelay = "character",
NASDelay = "character",
SecurityDelay = "character",
LateAircraftDelay = "character")
)
flights <- mapped_flights %>% mutate(
Year = as.integer(Year),
Month = as.integer(Month),
DayofMonth = as.integer(DayofMonth),
DayOfWeek = as.integer(DayOfWeek),
DepTime = as.integer(DepTime),
CRSDepTime = as.integer(CRSDepTime),
CRSArrTime = as.integer(CRSArrTime),
ArrTime = as.integer(ArrTime),
ActualElapsedTime = as.integer(ActualElapsedTime),
CRSElapsedTime = as.integer(CRSElapsedTime),
AirTime = as.integer(AirTime),
ArrDelay = as.double(ArrDelay),
DepDelay = as.double(DepDelay),
Distance = as.integer(Distance),
TaxiIn = as.integer(TaxiIn),
TaxiOut = as.integer(TaxiOut),
Cancelled = as.integer(Cancelled),
Diverted = as.integer(Diverted),
CarrierDelay = as.integer(CarrierDelay),
WeatherDelay = as.integer(WeatherDelay),
NASDelay = as.integer(NASDelay),
SecurityDelay = as.integer(SecurityDelay),
LateAircraftDelay = as.integer(LateAircraftDelay)) %>% compute("flights")
Upvotes: 1