Reputation: 175
I am trying to convert rdd into DataFrame using Case Class as follows
1.)Fetching Data from textfile having "id,name,country" saperated by "," but without header
val x = sc.textFile("file:///home/hdadmin/records.txt")
2.)Creating a case class "rec" with header definition as below:
case class rec(id:Int, name:String, country:String)
3.) Now I define the transformations
val y = x.map(x=>x.split(",")).map(x=>rec(x(0).toInt,x(1),x(2)))
4.) Then I imported the implicits library
import spark.implicits._
5.) Converting rdd to data Frame using toDF method:
val z = y.toDF()
6.) Now when I try to fetch the records with command below:
z.select("name").show()
I get the following error:
17/05/19 12:50:14 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerSQLExecutionStart(9,show at :49,org.apache.spark.sql.Dataset.show(Dataset.scala:495) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:49) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:54) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:56) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:58) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:60) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:62) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:64) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:66) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:68) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:70) $line105.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:72) $line105.$read$$iw$$iw$$iw$$iw$$iw.(:74) $line105.$read$$iw$$iw$$iw$$iw.(:76) $line105.$read$$iw$$iw$$iw.(:78) $line105.$read$$iw$$iw.(:80) $line105.$read$$iw.(:82) $line105.$read.(:84) $line105.$read$.(:88) $line105.$read$.(),== Parsed Logical Plan == GlobalLimit 21 +- LocalLimit 21 +- Project [name#91] +- LogicalRDD [id#90, name#91, country#92]
== Analyzed Logical Plan == name: string GlobalLimit 21 +- LocalLimit 21 +- Project [name#91] +- LogicalRDD [id#90, name#91, country#92]
== Optimized Logical Plan == GlobalLimit 21 +- LocalLimit 21 +- Project [name#91] +- LogicalRDD [id#90, name#91, country#92]
== Physical Plan == CollectLimit 21 +- *Project [name#91] +- Scan ExistingRDD[id#90,name#91,country#92],org.apache.spark.sql.execution.SparkPlanInfo@b807ee,1495223414636) 17/05/19 12:50:14 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerSQLExecutionEnd(9,1495223414734) java.lang.IllegalStateException: SparkContext has been shutdown at org.apache.spark.SparkContext.runJob(SparkContext.scala:1863) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562) at org.apache.spark.sql.Dataset.head(Dataset.scala:1924) at org.apache.spark.sql.Dataset.take(Dataset.scala:2139) at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) at org.apache.spark.sql.Dataset.show(Dataset.scala:526) at org.apache.spark.sql.Dataset.show(Dataset.scala:486) at org.apache.spark.sql.Dataset.show(Dataset.scala:495) ... 56 elided
Where could be the problem?
Upvotes: 0
Views: 997
Reputation: 175
After trying the same code for a couple of text files I actually rectified the text format in the text file for any discrepency.
The Column Separator in below code is "," and it was missing at 1 place inside the text file after I scanned it minutely.
val y = x.map(x=>x.split(",")).map(x=>rec(x(0).toInt,x(1),x(2)))
The code worked fine and gave me results in Structured table format after the changes.
Therefore its important to note that the separator(",", "\t", "|") given inside
x.split("")
should be same as in source file and throughout the source file.
Upvotes: 1