Kishore Indraganti
Kishore Indraganti

Reputation: 1322

Reading a file in Spark with newline(\n) in fields, escaped with backslash(\) and not quoted

I have an input file that has following structure,

    col1, col2, col3
    
    line1filed1,line1filed2.1\
    
    line1filed2.2, line1filed3
    
    line2filed1,line2filed2.1\
    
    line2filed2.2, line2filed3
    
    line3filed1, line3filed2, line3filed3
    
    line4filed1,line4filed2,
    
    line5filed1,,line5filed3

The output dataframe has to be,

    col1, col2, col3
    
    [line1filed1,line1filed2.1 line1filed2.2, line1filed3]
    
    [line2filed1,line2filed2.1 line2filed2.2, line2filed3]
    
    [line3filed1, line3filed2, line3filed3]
    
    [line4filed1,line4filed2, null]
    
    [line5filed1, null, line5filed3]

I'm trying to do

    spark
    .read
    .option("multiLine", "true")
    .option("escape", "\\")
    .csv("path to file")

Some solutions suggest going wholeTextFiles, but it is also mentioned that wholeTextFiles is not an optimal solution.

What would be the right way to do this?

P.S: I do have an input production file of 50GB.

Upvotes: 2

Views: 775

Answers (1)

Chema
Chema

Reputation: 2828

I've tried this piece of code.

I think it can be improved but maybe it can give you some clues to resolve your problem.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

/**
  * col1, col2, col3
  * [line1filed1,line1filed2.1 line1filed2.2, line1filed3]
  * [line2filed1,line2filed2.1 line2filed2.2, line2filed3]
  * [line3filed1, line3filed2, line3filed3]
  * [line4filed1,line4filed2, null]
  * [line5filed1, null, line5filed3]
  */

object Multiline2 {

  val spark = SparkSession
    .builder()
    .appName("Multiline2")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Multiline2")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val input = "/home/cloudera/files/tests/multiline2.csv"

  def main(args: Array[String]): Unit = {
    try {
      Logger.getRootLogger.setLevel(Level.ERROR)

      val data = sc.textFile(input)
      val header = data.first()
      val columns = header.split(",")

      import spark.implicits._

      var aux = ""
      val multiline = data
          .filter(line => !line.equals(header))
          .map(line => {
            if(line.contains("\")) {
              aux = line.substring(0,line.lastIndexOf("\"))
              ""
            } else {
              val l = s"$aux $line"
              aux = ""
              l
            }
          })
          .filter(line => !line.equals(""))
          .map(line => line.split(","))
          .map(r =>{ r.length match {
            case 2 => (r(0).trim,r(1).trim,"")
            case _ => (r(0).trim,r(1).trim,r(2).trim)
          }})
          .toDF(columns(0).trim, columns(1).trim, columns(2).trim)

      multiline.show()

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped.")
      spark.stop()
      println("SparkSession stopped.")
    }
  }
}
+-----------+--------------------+-----------+
|       col1|                col2|       col3|
+-----------+--------------------+-----------+
|line1filed1|line1filed2.1 lin...|line1filed3|
|line2filed1|line2filed2.1 lin...|line2filed3|
|line3filed1|         line3filed2|line3filed3|
|line4filed1|         line4filed2|           |
|line5filed1|                    |line5filed3|
+-----------+--------------------+-----------+

Upvotes: 1

Related Questions