ashwinsakthi
ashwinsakthi

Reputation: 1956

Unable to convert an rdd with zipWithIndex to a dataframe in spark

I am unable to convert an rdd with zipWithIndex to a dataframe.

I have read from a file and I need to skip the first 3 records and then limit the records to row number 10. For this, I used rdd.zipwithindex.

But afterwards, when I try to save the 7 records , I am not able to do so.

val df = spark.read.format("com.databricks.spark.csv")
                   .option("delimiter", delimValue)
                   .option("header", "false")
                   .load("/user/ashwin/data1/datafile.txt")

val df1 = df.rdd.zipWithIndex()
                .filter(x => { x._2 > 3&& x._2 <= 10;})
                .map(f => Row(f._1))

val skipValue = 3

val limitValue = 10

val delimValue = ","

df1.foreach(f2=> print(f2.toString))
[[113,3Bapi,Ghosh,86589579]][[114,4Bapi,Ghosh,86589579]]
[[115,5Bapi,Ghosh,86589579]][[116,6Bapi,Ghosh,86589579]]
[[117,7Bapi,Ghosh,86589579]][[118,8Bapi,Ghosh,86589579]]
[[119,9Bapi,Ghosh,86589579]]



scala> val df = spark.read.format("com.databricks.spark.csv").option("delimiter", delimValue).option("header", "true").load("/user/bigframe/ashwin/data1/datafile.txt")
df: org.apache.spark.sql.DataFrame = [empid: string, fname: string ... 2 more fields]

scala> val df1 = df.rdd.zipWithIndex().filter(x => { x._2 > skipValue && x._2 <= limitValue;}).map(f => Row(f._1))
df1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[885] at map at <console>:38

scala> import spark.implicits._
import spark.implicits._

scala> df1.

    ++             count                 flatMap                 groupBy           mapPartitionsWithIndex   reduce             takeAsync         union
aggregate      countApprox           fold                    id                max                      repartition        takeOrdered       unpersist
cache          countApproxDistinct   foreach                 intersection      min                      sample             takeSample        zip
cartesian      countAsync            foreachAsync            isCheckpointed    name                     saveAsObjectFile   toDebugString     zipPartitions
checkpoint     countByValue          foreachPartition        isEmpty           partitioner              saveAsTextFile     toJavaRDD         zipWithIndex
coalesce       countByValueApprox    foreachPartitionAsync   iterator          partitions               setName            toLocalIterator   zipWithUniqueId
collect        dependencies          getCheckpointFile       keyBy             persist                  sortBy             toString
collectAsync   distinct              getNumPartitions        localCheckpoint   pipe                     sparkContext       top
compute        filter                getStorageLevel         map               preferredLocations       subtract           treeAggregate
context        first                 glom                    mapPartitions     randomSplit              take               treeReduce

scala> df1.toDF
<console>:44: error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
       df1.toDF
       ^

Upvotes: 2

Views: 1780

Answers (2)

koiralo
koiralo

Reputation: 23099

You get RDD[ROW] once you change dataframe to rdd, So to convert back to the dataframe you need to create dataframe by sqlContext.createDataframe()

Schema is also required to create the dataframe, In this case you can use the schema that was generated before in df

val df1 = df.rdd.zipWithIndex()
  .filter(x => { x._2 > 3&& x._2 <= 10})
  .map(_._1)

val result = spark.sqlContext.createDataFrame(df1, df.schema)

Hope this helps!

Upvotes: 3

Dominic Egger
Dominic Egger

Reputation: 1016

This is probably of type RDD[Row] right now. have you tried using the toDF function? You'll have to import spark.implicits._ as well.

Upvotes: 0

Related Questions