Save MongoDB data to parquet file format using Apache Spark

I am a newbie with Apache spark as well with Scala programming language.

What I am trying to achieve is to extract the data from my local mongoDB database for then to save it in a parquet format using Apache Spark with the hadoop-connector

This is my code so far:

package com.examples 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.rdd.RDD 
import org.apache.hadoop.conf.Configuration 
import org.bson.BSONObject 
import com.mongodb.hadoop.{MongoInputFormat, BSONFileInputFormat} 
import org.apache.spark.sql 
import org.apache.spark.sql.SQLContext 

object DataMigrator { 

    def main(args: Array[String])
    { 
        val conf = new SparkConf().setAppName("Migration    App").setMaster("local") 
        val sc = new SparkContext(conf) 
        val sqlContext = new SQLContext(sc) 

        // Import statement to implicitly convert an RDD to a DataFrame 
        import sqlContext.implicits._ 

        val mongoConfig = new Configuration() 
        mongoConfig.set("mongo.input.uri",   "mongodb://localhost:27017/mongosails4.case") 

        val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]);     

        val count = countsRDD.count()

        // the count value is aprox 100,000 
        println("================ PRINTING =====================") 
        println(s"ROW COUNT IS $count") 
        println("================ PRINTING =====================") 
    } 
} 

The thing is that in order to save data to a parquet file format first its necessary to convert the mongoRDD variable to Spark DataFrame. I have tried something like this:

// convert RDD to DataFrame
val myDf = mongoRDD.toDF()  // this lines throws an error
myDF.write.save("my/path/myData.parquet")

and the error I get is this: Exception in thread "main" scala.MatchError: java.lang.Object (of class scala.reflect.internal.Types.$TypeRef$$anon$6)

do you guys have any other idea how could I convert the RDD to a DataFrame so that I can save data in parquet format?

Here's the structure of one Document in the mongoDB collection : https://gist.github.com/kingtrocko/83a94238304c2d654fe4

Upvotes: 3

Views: 3698

Answers (1)

kanielc
kanielc

Reputation: 1322

Create a Case class representing the data stored in your DBObject.
case class Data(x: Int, s: String)

Then, map the values of your rdd to instances of your case class. val dataRDD = mongoRDD.values.map { obj => Data(obj.get("x"), obj.get("s")) }

Now with your RDD[Data], you can create a DataFrame with the sqlContext

val myDF = sqlContext.createDataFrame(dataRDD)

That should get you going. I can explain more later if needed.

Upvotes: 1

Related Questions