Naga
Naga

Reputation: 1253

Updating/Replacing Mongo Documents using Apache Spark

Here is a common problem when we are working with Spark and MongoDB using MongoSpark connector. This connector is designed to insert/update the documents into MongoDB in batch style. There are three ways to insert/update the documents using Spark.

  1. RDD[Document]
  2. DataFrame[CaseClass]
  3. DataSet[CaseClass]

Both dataset and dataframe support insert/update the documents using MangoSpark.save() method where as RDD[Document] only supports insert. So we have a problem with Mongo Spark to update an RDD[Document].

Is there any solution to update/replace the RDD[Document] into MongoDB using Spark?.

Upvotes: 3

Views: 3681

Answers (1)

Naga
Naga

Reputation: 1253

Currently Mongo Spark Connector doesn't support the updating/replacing RDD[Document]. But There is a workaround solution to update/replace RDD[Document] of Mongo Documents using Apache Spark with the help of Connector.

Here is the sample code for update/replace with example data:

db.people.find()

{ "_id" : 100, "name" : "Naga", "age" : 30, "place" : "Bangalore" }

{ "_id" : 101, "name" : "Ravi", "age" : 33, "place" : "Bangalore" }

{ "_id" : 102, "name" : "Hari", "age" : 23, "place" : "Mysore" }


    val conf = new SparkConf().setAppName("Spark Mongo").setMaster("local[*]")
      val readOverrides = new HashMap[String, String]()
      readOverrides.put("spark.mongodb.input.uri", "mongodb://localhost:27017/info.people")
      val readConfig = ReadConfig.create(conf, readOverrides)
      val sc = new SparkContext(conf)
      val spark = SparkSession.builder().getOrCreate()

      val peopleRDD = MongoSpark.load(sc, readConfig)
      val updateRDD = peopleRDD.map { document => document.append("state", "karnataka") }
      val writeOverrides = new HashMap[String, String]()
      writeOverrides.put("spark.mongodb.output.uri", "mongodb://localhost:27017/info.people")
      writeOverrides.put("replaceDocument", "false")
      val writeConfig = WriteConfig.create(conf, writeOverrides)
      save(updateRDD, writeConfig)

      def save(rdd: RDD[Document], writeConfig: WriteConfig): Unit = {
        val mongoConnector = MongoConnector(writeConfig.asOptions)
        rdd.foreachPartition { partition =>
          {
            if (partition.nonEmpty) {
              mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[Document] =>
                {
                  partition.foreach { document =>
                    {
                      val searchDocument = new Document()
                      searchDocument.append("_id", document.get("_id").asInstanceOf[Double])
                      collection.replaceOne(searchDocument, document)
                    }
                  }
                }
              })
            }
          }
        }
      }

{ "_id" : 100, "name" : "Naga", "age" : 30, "place" : "Bangalore", "state" : "karnataka" }

{ "_id" : 101, "name" : "Ravi", "age" : 33, "place" : "Bangalore", "state" : "karnataka" }

{ "_id" : 102, "name" : "Hari", "age" : 23, "place" : "Mysore", "state" : "karnataka" }

This is solution works.

Upvotes: 3

Related Questions