Reputation: 563
I would like to read csv file into dataframe in spark using Scala. My csv file has first record which has three columns and remaining records have 5 columns. My csv file does not come with column names. I have mentioned here's for understanding
Ex:
I'dtype date recordsCount
0 13-02-2015 300
I'dtype date type location. locationCode
1 13-02-2015. R. USA. Us
1. 13-02-2015. T. London. Lon
My question is how I will read this file into dataframe,as first and remaining rows have different columns. The solution what I tried is read file as rdd and filter out header record and then convert remaining records into dataframe. Is there any better solution for ? Please help me
Upvotes: 3
Views: 4892
Reputation: 4125
You can load the files as raw text, and then use case classes, Either
instances, and pattern matching to sort out what goes where. Example of that below.
case class Col3(c1: Int, c2: String, c3: Int)
case class Col5(c1: Int, c2: String, c5_col3: String, c4:String, c5: String)
case class Header(value: String)
type C3 = Either[Header, Col3]
type C5 = Either[Header, Col5]
// assume sqlC & sc created
val path = "tmp.tsv"
val rdd = sc.textFile(path)
val eitherRdd: RDD[Either[C3, C5]] = rdd.map{s =>
val spl = s.split("\t")
spl.length match{
case 3 =>
val res = Try{
Col3(spl(0).toInt, spl(1), spl(2).toInt)
}
res match{
case Success(c3) => Left(Right(c3))
case Failure(_) => Left(Left(Header(s)))
}
case 5 =>
val res = Try{
Col5(spl(0).toInt, spl(1), spl(2), spl(3), spl(4))
}
res match{
case Success(c5) => Right(Right(c5))
case Failure(_) => Right(Left(Header(s)))
}
case _ => throw new Exception("fail")
}
}
val rdd3 = eitherRdd.flatMap(_.left.toOption)
val rdd3Header = rdd3.flatMap(_.left.toOption).collect().head
val df3 = sqlC.createDataFrame(rdd3.flatMap(_.right.toOption))
val rdd5 = eitherRdd.flatMap(_.right.toOption)
val rdd5Header = rdd5.flatMap(_.left.toOption).collect().head
val df5 = sqlC.createDataFrame(rdd5.flatMap(_.right.toOption))
df3.show()
df5.show()
Tested with simple tsv below:
col1 col2 col3
0 sfd 300
1 asfd 400
col1 col2 col4 col5 col6
2 pljdsfn R USA Us
3 sad T London Lon
which gives output
+---+----+---+
| c1| c2| c3|
+---+----+---+
| 0| sfd|300|
| 1|asfd|400|
+---+----+---+
+---+-------+-------+------+---+
| c1| c2|c5_col3| c4| c5|
+---+-------+-------+------+---+
| 2|pljdsfn| R| USA| Us|
| 3| sad| T|London|Lon|
+---+-------+-------+------+---+
For simplicity sake, I have ignored the date formatting, simply storing those fields as Strings. however it would not be much more complicated to add a date parser to get you a proper column type.
Likewise, I have relied on parsing failure to indicate a header row. You may substitute different logic if either the parsing would not fail, or if a more complicated determination must be made. Similarly, more complicated logic would be needed to differentiate between different record types of the same length, or which may contain (escaped) split character
Upvotes: 1
Reputation: 10406
It's a bit of a hack but here is a solution to ignore the first line of the file.
val cols = Array("dtype", "date", "type", "location", "locationCode")
val schema = new StructType(cols.map(n => StructField(n ,StringType, true)))
spark.read
.schema(schema) // we specify the schema
.option("header", true) // and tell spark that there is a header
.csv("path/file.csv")
The first line is the header, but the schema is specified. The first line is thus ignored.
Upvotes: 1