Vishal Sharma
Vishal Sharma

Reputation: 175

Unable to convert rdd to Dataframe using Case Class

I am trying to convert rdd into DataFrame using Case Class as follows

1.)Fetching Data from textfile having "id,name,country" saperated by "," but without header

val x = sc.textFile("file:///home/hdadmin/records.txt")

2.)Creating a case class "rec" with header definition as below:

case class rec(id:Int, name:String, country:String)

3.) Now I define the transformations

val y = x.map(x=>x.split(",")).map(x=>rec(x(0).toInt,x(1),x(2)))

4.) Then I imported the implicits library

import spark.implicits._

5.) Converting rdd to data Frame using toDF method:

val z = y.toDF()

6.) Now when I try to fetch the records with command below:

z.select("name").show()

I get the following error:

17/05/19 12:50:14 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerSQLExecutionStart(9,show at :49,org.apache.spark.sql.Dataset.show(Dataset.scala:495) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:49) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:54) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:56) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:58) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:60) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:62) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:64) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:66) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:68) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:70) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:72) $line105.$read$$iw$$iw$$iw$$iw$$iw.(:74) $line105.$read$$iw$$iw$$iw$$iw.(:76) $line105.$read$$iw$$iw$$iw.(:78) $line105.$read$$iw$$iw.(:80) $line105.$read$$iw.(:82) $line105.$read.(:84) $line105.$read$.(:88) $line105.$read$.(),== Parsed Logical Plan == GlobalLimit 21 +- LocalLimit 21 +- Project [name#91] +- LogicalRDD [id#90, name#91, country#92]

== Analyzed Logical Plan == name: string GlobalLimit 21 +- LocalLimit 21 +- Project [name#91] +- LogicalRDD [id#90, name#91, country#92]

== Optimized Logical Plan == GlobalLimit 21 +- LocalLimit 21 +- Project [name#91] +- LogicalRDD [id#90, name#91, country#92]

== Physical Plan == CollectLimit 21 +- *Project [name#91] +- Scan ExistingRDD[id#90,name#91,country#92],org.apache.spark.sql.execution.SparkPlanInfo@b807ee,1495223414636) 17/05/19 12:50:14 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerSQLExecutionEnd(9,1495223414734) java.lang.IllegalStateException: SparkContext has been shutdown at org.apache.spark.SparkContext.runJob(SparkContext.scala:1863) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562) at org.apache.spark.sql.Dataset.head(Dataset.scala:1924) at org.apache.spark.sql.Dataset.take(Dataset.scala:2139) at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) at org.apache.spark.sql.Dataset.show(Dataset.scala:526) at org.apache.spark.sql.Dataset.show(Dataset.scala:486) at org.apache.spark.sql.Dataset.show(Dataset.scala:495) ... 56 elided

Where could be the problem?

Upvotes: 0

Views: 997

Answers (1)

Vishal Sharma
Vishal Sharma

Reputation: 175

After trying the same code for a couple of text files I actually rectified the text format in the text file for any discrepency.

The Column Separator in below code is "," and it was missing at 1 place inside the text file after I scanned it minutely.

val y = x.map(x=>x.split(",")).map(x=>rec(x(0).toInt,x(1),x(2)))

The code worked fine and gave me results in Structured table format after the changes.

Therefore its important to note that the separator(",", "\t", "|") given inside

x.split("")

should be same as in source file and throughout the source file.

Upvotes: 1

Related Questions