Reputation: 2085
I am trying to load a file into spark. If I load a normal textFile into Spark like below:
val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")
The outcome is:
partFile: org.apache.spark.sql.Dataset[String] = [value: string]
I can see a dataset in the output. But if I load a Json file:
val pfile = spark.read.json("hdfs://quickstart:8020/user/cloudera/pjson")
The outcome is a dataframe with a readymade schema:
pfile: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, age: bigint ... 1 more field]
The Json/parquet/orc files have schema. So I can understand that this is a feature from Spark version:2x, which made things easier as we directly get a DataFrame in this case and for a normal textFile you get a dataset where there is no schema which makes sense. What I'd like to know is how can I add a schema to a dataset that is a resultant of loading a textFile into spark. For an RDD, there is case class/StructType option to add the schema and convert it to a DataFrame. Could anyone let me know how can I do it ?
Upvotes: 0
Views: 12796
Reputation: 17862
When you use textFile
, each line of the file will be a string row in your Dataset. To convert to DataFrame with a schema, you can use toDF
:
val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")
import sqlContext.implicits._
val df = partFile.toDF("string_column")
In this case, the DataFrame will have a schema of a single column of type StringType.
If your file contains a more complex schema, you can either use the csv reader (if the file is in a structured csv format):
val partFile = spark.read.option("header", "true").option("delimiter", ";").csv("hdfs://quickstart:8020/user/cloudera/partfile")
Or you can process your Dataset using map, then using toDF
to convert to DataFrame. For example, suppose you want one column to be the first character of the line (as an Int) and the other column to be the fourth character (also as an Int):
val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")
val processedDataset: Dataset[(Int, Int)] = partFile.map {
line: String => (line(0).toInt, line(3).toInt)
}
import sqlContext.implicits._
val df = processedDataset.toDF("value0", "value3")
Also, you can define a case class, which will represent the final schema for your DataFrame:
case class MyRow(value0: Int, value3: Int)
val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")
val processedDataset: Dataset[MyRow] = partFile.map {
line: String => MyRow(line(0).toInt, line(3).toInt)
}
import sqlContext.implicits._
val df = processedDataset.toDF
In both cases above, calling df.printSchema
would show:
root
|-- value0: integer (nullable = true)
|-- value3: integer (nullable = true)
Upvotes: 6