Vassilis Moustakas
Vassilis Moustakas

Reputation: 585

How to load a csv directly into a Spark Dataset?

I have a csv file [1] which I want to load directly into a Dataset. The problem is that I always get errors like

org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncate
The type path of the target object is:
- field (class: "scala.Float", name: "probability")
- root class: "TFPredictionFormat"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;

Moreover, and specifically for the phrases field (check case class [2]) it get

org.apache.spark.sql.AnalysisException: cannot resolve '`phrases`' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true);

If I define all the fields in my case class [2] as type String then everything works fine but this is not what I want. Is there a simple way to do it [3]?


References

[1] An example row

B017NX63A2,Merrell,"['merrell_for_men', 'merrell_mens_shoes', 'merrel']",merrell_shoes,0.0806054356579781

[2] My code snippet is as follows

import spark.implicits._

val INPUT_TF = "<SOME_URI>/my_file.csv"

final case class TFFormat (
    doc_id: String,
    brand: String,
    phrases: Seq[String],
    prediction: String,
    probability: Float
)

val ds = sqlContext.read
.option("header", "true")
.option("charset", "UTF8")
.csv(INPUT_TF)
.as[TFFormat]

ds.take(1).map(println)

[3] I have found ways to do it by first defining columns on a DataFrame level and the convert things to Dataset (like here or here or here) but I am almost sure this is not the way things supposed to be done. I am also pretty sure that Encoders are probably the answer but I don't have a clue how

Upvotes: 7

Views: 5051

Answers (1)

zero323
zero323

Reputation: 330093

TL;DR With csv input transforming with standard DataFrame operations is the way to go. If you want to avoid you should use input format which has is expressive (Parquet or even JSON).

In general data to be converted to statically typed dataset must be already of the correct type. The most efficient way to do it is to provide schema argument for csv reader:

val schema: StructType = ???
val ds = spark.read
  .option("header", "true")
  .schema(schema)
  .csv(path)
  .as[T]

where schema could be inferred by reflection:

import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.types.StructType

val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]

Unfortunately it won't work with your data and class because csv reader doesn't support ArrayType (but it would work for atomic types like FloatType) so you have to use the hard way. A naive solution could be expressed as below:

import org.apache.spark.sql.functions._

val df: DataFrame = ???  // Raw data

df
  .withColumn("probability", $"probability".cast("float"))
  .withColumn("phrases",
    split(regexp_replace($"phrases", "[\\['\\]]", ""), ","))
  .as[TFFormat]

but you may need something more sophisticated depending on the content of phrases.

Upvotes: 10

Related Questions