HamidOvic
HamidOvic

Reputation: 133

How to handle multiline without quote while reading csv file in Spark

Variants of this have been asked before, but in my case there is not quoted strings in my multi-line.

I have a file like this.

column1|column2
1|test1 test1 
test1
2|test2

I want a result like this:(2 lines)

+-----------+------------------+
|column1    |column2           |
+-----------+------------------+
|          1| test1 test1
               test1
|          2|test2             |

I tried this:

 Dataset<Row> dsTest = session.read()
                .option("header", "true")
                .option("delimiter", "|")
                .option("quote", "")
                .option("multiLine", "true")
                .csv("test.csv");

and I got this(3 lines)

 +-----------+------------------+
    |column1    |column2           |
    +-----------+------------------+
    |          1| test1 test1
           test1| null    
    |          2|test2             |

Can someOne guides me to resolve this.

Upvotes: 2

Views: 620

Answers (1)

Chema
Chema

Reputation: 2828

I've tried something like this, maybe it can give you some clues.

The code is in Scala but I think it's all clear and in Java it will be the same more or less.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

import scala.util.{Failure, Success, Try}

object Multiline {

  val spark = SparkSession
    .builder()
    .appName("Multiline")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Multiline")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val input = "/home/cloudera/files/tests/multiline.csv"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    import spark.implicits._

    try {

      def makeInt(s: String): Int = Try(s.toInt) match {
        case Success(n) => n
        case Failure(_) => -1
      }

      val data = sc.textFile(input)
      val head = data.first() // header

      val multiline = data
          .filter(line => line != head) // remove header
          .map(line => line.split('|'))
          .map(arr =>{
            val sInt: Int = makeInt(arr(0))
            if(sInt < 0) (sInt.toString, arr(0))
            else (arr(0),arr(1))
          })
          .toDF("column1", "column2")

      multiline.show()
      
      /*
      +-------+------------+
      |column1|     column2|
      +-------+------------+
      |      1|test1 test1 |
      |     -1|       test1|
      |      2|       test2|
      +-------+------------+
       */

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped.")
      spark.stop()
      println("SparkSession stopped.")
    }
  }
}

Upvotes: 1

Related Questions