Nilesh
Nilesh

Reputation: 2180

store Spark RDD in mongodb

How can i store spark streaming data in the mongodb.

in java this is done like :

data.foreachRDD(
    new Function<JavaRDD<String>, Void>() {
        Mongo mongo = new Mongo("localhost", 27017);
        DB db = mongo.getDB("mongodb");

        DBCollection collection = db.getCollection("fb");

        public Void call(JavaRDD<String> data) throws Exception {
            if(data!=null){
                List<String>result=data.collect();    
                for (String temp :result) {
                    System.out.println(temp);   
                    DBObject dbObject = (DBObject)JSON.parse(temp.toString());
                    collection.insert(dbObject);
                }
                System.out.println("Inserted Data Done");
            } else {
                System.out.println("Got no data in this window");
            }
            return null;
        }     
    }   
);

where i want to store data in mongodb but in scala. the above code is in java.

Upvotes: 0

Views: 638

Answers (1)

Anupam Jain
Anupam Jain

Reputation: 476

//remove if not needed

import scala.collection.JavaConversions._

data.foreachRDD(new Function[JavaRDD[String], Void]() {

      var mongo: Mongo = new Mongo("localhost", 27017)

      var db: DB = mongo.getDB("mongodb")

      var collection: DBCollection = db.getCollection("fb")

      def call(data: JavaRDD[String]): Void = {
        if (data != null) {
          val result = data.collect()
          for (temp <- result) {
            println(temp)
            val dbObject = JSON.parse(temp.toString).asInstanceOf[DBObject]
            collection.insert(dbObject)
          }
          println("Inserted Data Done")
        } else {
          println("Got no data in this window")
        }
        null
      }
})

Upvotes: 1

Related Questions