Reputation: 1322
I have an input file that has following structure,
col1, col2, col3
line1filed1,line1filed2.1\
line1filed2.2, line1filed3
line2filed1,line2filed2.1\
line2filed2.2, line2filed3
line3filed1, line3filed2, line3filed3
line4filed1,line4filed2,
line5filed1,,line5filed3
The output dataframe has to be,
col1, col2, col3
[line1filed1,line1filed2.1 line1filed2.2, line1filed3]
[line2filed1,line2filed2.1 line2filed2.2, line2filed3]
[line3filed1, line3filed2, line3filed3]
[line4filed1,line4filed2, null]
[line5filed1, null, line5filed3]
I'm trying to do
spark
.read
.option("multiLine", "true")
.option("escape", "\\")
.csv("path to file")
Some solutions suggest going wholeTextFiles
, but it is also mentioned that wholeTextFiles
is not an optimal solution.
What would be the right way to do this?
P.S: I do have an input production file of 50GB
.
Upvotes: 2
Views: 775
Reputation: 2828
I've tried this piece of code.
I think it can be improved but maybe it can give you some clues to resolve your problem.
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* col1, col2, col3
* [line1filed1,line1filed2.1 line1filed2.2, line1filed3]
* [line2filed1,line2filed2.1 line2filed2.2, line2filed3]
* [line3filed1, line3filed2, line3filed3]
* [line4filed1,line4filed2, null]
* [line5filed1, null, line5filed3]
*/
object Multiline2 {
val spark = SparkSession
.builder()
.appName("Multiline2")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id", "Multiline2") // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
val input = "/home/cloudera/files/tests/multiline2.csv"
def main(args: Array[String]): Unit = {
try {
Logger.getRootLogger.setLevel(Level.ERROR)
val data = sc.textFile(input)
val header = data.first()
val columns = header.split(",")
import spark.implicits._
var aux = ""
val multiline = data
.filter(line => !line.equals(header))
.map(line => {
if(line.contains("\")) {
aux = line.substring(0,line.lastIndexOf("\"))
""
} else {
val l = s"$aux $line"
aux = ""
l
}
})
.filter(line => !line.equals(""))
.map(line => line.split(","))
.map(r =>{ r.length match {
case 2 => (r(0).trim,r(1).trim,"")
case _ => (r(0).trim,r(1).trim,r(2).trim)
}})
.toDF(columns(0).trim, columns(1).trim, columns(2).trim)
multiline.show()
// To have the opportunity to view the web console of Spark: http://localhost:4040/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped.")
spark.stop()
println("SparkSession stopped.")
}
}
}
+-----------+--------------------+-----------+
| col1| col2| col3|
+-----------+--------------------+-----------+
|line1filed1|line1filed2.1 lin...|line1filed3|
|line2filed1|line2filed2.1 lin...|line2filed3|
|line3filed1| line3filed2|line3filed3|
|line4filed1| line4filed2| |
|line5filed1| |line5filed3|
+-----------+--------------------+-----------+
Upvotes: 1