Reputation: 1956
I am unable to convert an rdd with zipWithIndex
to a dataframe.
I have read from a file and I need to skip the first 3 records and then limit the records to row number 10. For this, I used rdd.zipwithindex
.
But afterwards, when I try to save the 7 records , I am not able to do so.
val df = spark.read.format("com.databricks.spark.csv")
.option("delimiter", delimValue)
.option("header", "false")
.load("/user/ashwin/data1/datafile.txt")
val df1 = df.rdd.zipWithIndex()
.filter(x => { x._2 > 3&& x._2 <= 10;})
.map(f => Row(f._1))
val skipValue = 3
val limitValue = 10
val delimValue = ","
df1.foreach(f2=> print(f2.toString))
[[113,3Bapi,Ghosh,86589579]][[114,4Bapi,Ghosh,86589579]]
[[115,5Bapi,Ghosh,86589579]][[116,6Bapi,Ghosh,86589579]]
[[117,7Bapi,Ghosh,86589579]][[118,8Bapi,Ghosh,86589579]]
[[119,9Bapi,Ghosh,86589579]]
scala> val df = spark.read.format("com.databricks.spark.csv").option("delimiter", delimValue).option("header", "true").load("/user/bigframe/ashwin/data1/datafile.txt")
df: org.apache.spark.sql.DataFrame = [empid: string, fname: string ... 2 more fields]
scala> val df1 = df.rdd.zipWithIndex().filter(x => { x._2 > skipValue && x._2 <= limitValue;}).map(f => Row(f._1))
df1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[885] at map at <console>:38
scala> import spark.implicits._
import spark.implicits._
scala> df1.
++ count flatMap groupBy mapPartitionsWithIndex reduce takeAsync union
aggregate countApprox fold id max repartition takeOrdered unpersist
cache countApproxDistinct foreach intersection min sample takeSample zip
cartesian countAsync foreachAsync isCheckpointed name saveAsObjectFile toDebugString zipPartitions
checkpoint countByValue foreachPartition isEmpty partitioner saveAsTextFile toJavaRDD zipWithIndex
coalesce countByValueApprox foreachPartitionAsync iterator partitions setName toLocalIterator zipWithUniqueId
collect dependencies getCheckpointFile keyBy persist sortBy toString
collectAsync distinct getNumPartitions localCheckpoint pipe sparkContext top
compute filter getStorageLevel map preferredLocations subtract treeAggregate
context first glom mapPartitions randomSplit take treeReduce
scala> df1.toDF
<console>:44: error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
df1.toDF
^
Upvotes: 2
Views: 1780
Reputation: 23099
You get RDD[ROW]
once you change dataframe
to rdd
, So to convert back to the dataframe
you need to create dataframe by sqlContext.createDataframe()
Schema is also required to create the dataframe
, In this case you can use the schema that was generated before in df
val df1 = df.rdd.zipWithIndex()
.filter(x => { x._2 > 3&& x._2 <= 10})
.map(_._1)
val result = spark.sqlContext.createDataFrame(df1, df.schema)
Hope this helps!
Upvotes: 3
Reputation: 1016
This is probably of type RDD[Row]
right now. have you tried using the toDF
function? You'll have to import spark.implicits._
as well.
Upvotes: 0