Reputation: 346
I have a CSV file formatted like this :
574,REF009,3213,16384,3258,111,512,2013-12-07 21:03:12.567+01,2013-12-07 21:03:12.567+01,2013-12-31 23:33:15.821+01,/data/ath/athdisk/ro/user/bas/b6/c0
48,REF010,456,32768,3258,111,2175850,2018-07-10 04:37:06.495+02,2018-07-10 04:37:06.459+02,2018-07-10 04:37:06.648+02,/data/ath/athdisk/ro/mc15/b9/dc/lo.log.tgz.1
1758,REF011,123,32768,3258,111,31691926,2017-04-21 22:29:30.315+02,2017-10-20 05:55:03.959+02,2017-04-21 22:29:31+02,/data/ath/athdisk/ro/dataV/1f/00/D0293.pool.root
When trying to import this massive (11B lines long) files I had around 4M lines full of null value. I realize that my file was having issues so I tried to run the import with option FAILFAST like this :
val inodes_schema = StructType(
Array(
StructField("testID",LongType,false),
StructField("ref",StringType,false),
StructField("iref",IntegerType, false),
StructField("flag",IntegerType, false),
StructField("iuid",IntegerType, false),
StructField("igid",IntegerType, false),
StructField("isize",LongType, false),
StructField("icrtime",TimestampType,false),
StructField("iatime",TimestampType,false),
StructField("ictime",TimestampType,false),
StructField("path",StringType,false)
)
)
val inodes_table = spark.read.option("mode", "FAILFAST")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSX")
.schema(inodes_schema)
.option("delimiter",",")
.option("header",false).csv("/my/csv/file.csv")
This allows me to identify that the line containing 59+02
was causing the issue. Having a lot of line containing 59+02
, I finally manage to identify one as not properly imported if I use usual PERMISSIVE
mode :
1758,REF011,123,32768,3258,111,31691926,2017-04-21 22:29:30.315+02,2017-10-20 05:55:03.959+02,2017-04-21 22:29:31+02,/data/ath/athdisk/ro/dataV/1f/00/D0293.pool.root
I don't understand why is this line not properly parsed by Spark? 05:55:03.959+02
hour format is correct regarding my timestamp, yet line will not be properly imported and probably a lot of them.
Upvotes: 0
Views: 314
Reputation: 22439
The said problem appears to be caused by having multiple timestamp formats in your data. A work-around is to make the TimestampType columns StringType for reading the CSV and cast them back to TimestampType afterward:
// /path/to/csvfile:
1,2017-04-21 22:29:30.315+02
2,2017-10-20 05:55:03.959+02
3,2017-04-21 22:29:31+02
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, false),
StructField("dt", StringType, false)
))
val df = spark.read.
option("mode", "FAILFAST").
option("delimiter", ",").
option("header", false).
schema(schema).
csv("/path/to/csvfile")
df.select($"id", $"dt".cast(TimestampType)as("dt")).
show(false)
// +---+-----------------------+
// |id |dt |
// +---+-----------------------+
// |1 |2017-04-21 13:29:30.315|
// |2 |2017-10-19 20:55:03.959|
// |3 |2017-04-21 13:29:31 |
// +---+-----------------------+
Upvotes: 1