Rahul
Rahul

Reputation: 2374

How to create a DataFrame from a text file in Spark

I have a text file on HDFS and I want to convert it to a Data Frame in Spark.

I am using the Spark Context to load the file and then try to generate individual columns from that file.

val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))

After doing this, I am trying the following operation.

myFile1.toDF()

I am getting an issues since the elements in myFile1 RDD are now array type.

How can I solve this issue?

Upvotes: 24

Views: 190219

Answers (9)

Gaurav Shah
Gaurav Shah

Reputation: 5279

sqlContext.read.text("file.txt").where("lower(value) like '%just_in_personalization_stat%'").show(100,false)

will also work, It will avoid having to deal with CSV, headers etc

Upvotes: 0

Mohseen Mulla
Mohseen Mulla

Reputation: 592

A txt File with PIPE (|) delimited file can be read as :


df = spark.read.option("sep", "|").option("header", "true").csv("s3://bucket_name/folder_path/file_name.txt")

Upvotes: 1

Abhijit
Abhijit

Reputation: 353

You will not able to convert it into data frame until you use implicit conversion.

val sqlContext = new SqlContext(new SparkContext())

import sqlContext.implicits._

After this only you can convert this to data frame

case class Test(id:String,filed2:String)

val myFile = sc.textFile("file.txt")

val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()

Upvotes: 3

user9406937
user9406937

Reputation:

You can read a file to have an RDD and then assign schema to it. Two common ways to creating schema are either using a case class or a Schema object [my preferred one]. Follows the quick snippets of code that you may use.

Case Class approach

case class Test(id:String,name:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()

Schema Approach

import org.apache.spark.sql.types._
val schemaString = "id name"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields)

val dfWithSchema = sparkSess.read.option("header","false").schema(schema).csv("file.txt")
dfWithSchema.show()

The second one is my preferred approach since case class has a limitation of max 22 fields and this will be a problem if your file has more than 22 fields!

Upvotes: 0

Vishal
Vishal

Reputation: 11

val df = spark.read.textFile("abc.txt")

case class Abc (amount:Int, types: String, id:Int)  //columns and data types

val df2 = df.map(rec=>Amount(rec(0).toInt, rec(1), rec(2).toInt))
rdd2.printSchema

root
 |-- amount: integer (nullable = true)
 |-- types: string (nullable = true)
 |-- id: integer (nullable = true)

Upvotes: 1

Vikas Singh
Vikas Singh

Reputation: 419

I have given different ways to create DataFrame from text file

val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)

raw text file

val file = sc.textFile("C:\\vikas\\spark\\Interview\\text.txt")
val fileToDf = file.map(_.split(",")).map{case Array(a,b,c) => 
(a,b.toInt,c)}.toDF("name","age","city")
fileToDf.foreach(println(_))

spark session without schema

import org.apache.spark.sql.SparkSession
val sparkSess = 
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()

val df = sparkSess.read.option("header", 
"false").csv("C:\\vikas\\spark\\Interview\\text.txt")
df.show()

spark session with schema

import org.apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, 
StringType, nullable=true))
val schema = StructType(fields)

val dfWithSchema = sparkSess.read.option("header", 
"false").schema(schema).csv("C:\\vikas\\spark\\Interview\\text.txt")
dfWithSchema.show()

using sql context

import org.apache.spark.sql.SQLContext

val fileRdd = 
sc.textFile("C:\\vikas\\spark\\Interview\\text.txt").map(_.split(",")).map{x 
=> org.apache.spark.sql.Row(x:_*)}
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()

Upvotes: 11

Tzach Zohar
Tzach Zohar

Reputation: 37822

Update - as of Spark 1.6, you can simply use the built-in csv data source:

spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")

You can also use various options to control the CSV parsing, e.g.:

val df = spark.read.option("header", "false").csv("file.txt")

For Spark version < 1.6: The easiest way is to use spark-csv - include it in your dependencies and follow the README, it allows setting a custom delimiter (;), can read CSV headers (if you have them), and it can infer the schema types (with the cost of an extra scan of the data).

Alternatively, if you know the schema you can create a case-class that represents it and map your RDD elements into instances of this class before transforming into a DataFrame, e.g.:

case class Record(id: Int, name: String)

val myFile1 = myFile.map(x=>x.split(";")).map {
  case Array(id, name) => Record(id.toInt, name)
} 

myFile1.toDF() // DataFrame will have columns "id" and "name"

Upvotes: 27

Ankita
Ankita

Reputation: 480

I know I am quite late to answer this but I have come up with a different answer:

val rdd = sc.textFile("/home/training/mydata/file.txt")

val text = rdd.map(lines=lines.split(",")).map(arrays=>(ararys(0),arrays(1))).toDF("id","name").show 

Upvotes: 0

mgaido
mgaido

Reputation: 3055

If you want to use the toDF method, you have to convert your RDD of Array[String] into a RDD of a case class. For example, you have to do:

case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()

Upvotes: 8

Related Questions