Ritesh
Ritesh

Reputation: 333

Issue while reading a csv file through Spark

I am trying to read a csv file through Spark. However one of the columns has the data in the below format and because of comma it is being split into multiple columns. The input csv file is a comma delimited file.

"[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]"

could you please help me how to parse this column in spark scala.

I tried using option("escape","") and option("quoteMode","ALL"). didn't work as expected

Upvotes: 1

Views: 1379

Answers (2)

Vincent Doba
Vincent Doba

Reputation: 5068

You can't read directly such column using Spark CSV reader, as there is no way to distinguish between a comma that is part of the json string and a column-separator, and you can't use " as quote character for the same reason.

To solve this, you should read your CSV file as a text file, split String rows to a Seq[String] and then recreate columns from this Seq[String]

For instance, if you have a file called file.csv with the following content:

column1, column2, column3, column4
1, "value1", "[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]", 1.1
2, "value2", "[{"code": "200", "name": "CLS2", "type": "SECONDARY"}]", 1.2

Your Spark program would be:

import org.apache.spark.sql.functions.col
import sparkSession.implicits._

val finalDataframe = sparkSession.read.text("file.csv")
  .filter(col("value").notEqual("column1, column2, column3, column4"))
  .as[String]
  .map(splitRow)
  .withColumn("column1", col("value").getItem(0).cast("int"))
  .withColumn("column2", col("value").getItem(1).cast("string"))
  .withColumn("column3", col("value").getItem(2).cast("string"))
  .withColumn("column4", col("value").getItem(3).cast("float"))
  .drop("value")

Where splitRow function takes a String as input and returns Seq[String], each String in sequence representing a column value.

A implementation of this method could be:

def splitRow(row: String): Seq[String] = row.foldLeft(Accumulator())((acc, char) => char match {
  case ',' if !acc.openBracket => acc.copy(result = acc.buffer +: acc.result, buffer = "", previousIsHyphen = false)
  case '[' if acc.previousIsHyphen => acc.copy(buffer = "[", openBracket = true, previousIsHyphen = false)
  case '"' if !acc.openBracket => acc.copy(previousIsHyphen = true)
  case '"' if acc.buffer.last == ']' => acc.copy(openBracket = false)
  case _ => acc.copy(buffer = acc.buffer + char, previousIsHyphen = false)
}).flush()

With the following case class as Accumulator:

case class Accumulator(
  result: Seq[String] = Nil, 
  openBracket: Boolean = false, 
  buffer: String = "", 
  previousIsHyphen: Boolean = false
) {
  def flush(): Seq[String] = (buffer +: result).reverse
}

If we use this code, we obtain the following output when parsing file.csv:

+-------+-------+------------------------------------------------------+-------+
|column1|column2|column3                                               |column4|
+-------+-------+------------------------------------------------------+-------+
|1      | value1|[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]  |1.1    |
|2      | value2|[{"code": "200", "name": "CLS2", "type": "SECONDARY"}]|1.2    |
+-------+-------+------------------------------------------------------+-------+

Upvotes: 1

Filip
Filip

Reputation: 661

I assume this whole string is value of just one field inside of csv file. In that case you should use some of the Spark options quote and/or quteMode.

I'm not 100% on this one but I think that quote takes string representation of the quote that is going to be used inside of csv and quoteMode seems to depends on the version because they switched the library at some point. On 3.x it should be just true or false and in below version there should be options like these available.

Anyways, you'll have to play with it a little bit, it's more of a direction than answer, but I hope it helps because I don't have time to test it right now.

Upvotes: 0

Related Questions