Metadata
Metadata

Reputation: 2085

How to add a schema to a Dataset in Spark?

I am trying to load a file into spark. If I load a normal textFile into Spark like below:

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")

The outcome is:

partFile: org.apache.spark.sql.Dataset[String] = [value: string]

I can see a dataset in the output. But if I load a Json file:

val pfile = spark.read.json("hdfs://quickstart:8020/user/cloudera/pjson")

The outcome is a dataframe with a readymade schema:

pfile: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, age: bigint ... 1 more field]

The Json/parquet/orc files have schema. So I can understand that this is a feature from Spark version:2x, which made things easier as we directly get a DataFrame in this case and for a normal textFile you get a dataset where there is no schema which makes sense. What I'd like to know is how can I add a schema to a dataset that is a resultant of loading a textFile into spark. For an RDD, there is case class/StructType option to add the schema and convert it to a DataFrame. Could anyone let me know how can I do it ?

Upvotes: 0

Views: 12796

Answers (1)

Daniel de Paula
Daniel de Paula

Reputation: 17862

When you use textFile, each line of the file will be a string row in your Dataset. To convert to DataFrame with a schema, you can use toDF:

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")

import sqlContext.implicits._
val df = partFile.toDF("string_column")

In this case, the DataFrame will have a schema of a single column of type StringType.

If your file contains a more complex schema, you can either use the csv reader (if the file is in a structured csv format):

val partFile = spark.read.option("header", "true").option("delimiter", ";").csv("hdfs://quickstart:8020/user/cloudera/partfile")

Or you can process your Dataset using map, then using toDF to convert to DataFrame. For example, suppose you want one column to be the first character of the line (as an Int) and the other column to be the fourth character (also as an Int):

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")

val processedDataset: Dataset[(Int, Int)] = partFile.map {
  line: String => (line(0).toInt, line(3).toInt)
}

import sqlContext.implicits._
val df = processedDataset.toDF("value0", "value3")

Also, you can define a case class, which will represent the final schema for your DataFrame:

case class MyRow(value0: Int, value3: Int)

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")

val processedDataset: Dataset[MyRow] = partFile.map {
  line: String => MyRow(line(0).toInt, line(3).toInt)
}

import sqlContext.implicits._
val df = processedDataset.toDF

In both cases above, calling df.printSchema would show:

root
 |-- value0: integer (nullable = true)
 |-- value3: integer (nullable = true)

Upvotes: 6

Related Questions