Avramescu Cristian
Avramescu Cristian

Reputation: 67

Spark scala use spark-mongo connector to upsert

Is there any way to upsert Mongo Collection with spark-mongo connector based on a certain field in dataframe ?

Upvotes: 2

Views: 5509

Answers (4)

Joe
Joe

Reputation: 149

To replace documents based on unique key constraint, use replaceDocument and shardKey option. Default shardKey is {_id: 1}. https://docs.mongodb.com/spark-connector/master/configuration/

df.write.format('com.mongodb.spark.sql') \
  .option('collection', 'target_collection') \
  .option('replaceDocument', 'true') \
  .option('shardKey', '{"date": 1, "name": 1, "resource": 1}') \
  .mode('append') \
  .save()

replaceDocument=false option makes your document merged based on the shardKey.

https://github.com/mongodb/mongo-spark/blob/c9e1bc58cb509021d7b7d03367776b84da6db609/src/main/scala/com/mongodb/spark/MongoSpark.scala#L120-L141

Upvotes: 4

Man Nguyen Dinh
Man Nguyen Dinh

Reputation: 71

Try option replaceDocument

df.select("_id").withColumn("aaa", lit("ha"))
  .write
  .option("collection", collectionName)
  .option("replaceDocument", "false")
  .mode(SaveMode.Append)
  .format("mongo")
  .save()

I dont know why in mongo document can not find any document for this option

Upvotes: 1

Todd Leo
Todd Leo

Reputation: 85

With some digging on mongo-spark's source, here's a simple hack to add the feature of upsert on certain fields, to MongoSpark.save method:

// add additional keys parameter
def save[D](dataset: Dataset[D], writeConfig: WriteConfig, keys: List[String]): Unit = {
    val mongoConnector = MongoConnector(writeConfig.asOptions)
    val dataSet = dataset.toDF()
    val mapper = rowToDocumentMapper(dataSet.schema)
    val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
    val fieldNames = dataset.schema.fieldNames.toList
    val queryKeyList = keys.isEmpty match {
      case true => keys
      case false => BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList
    }
    // the rest remains the same
    // ...
}

Upvotes: 0

Wan B.
Wan B.

Reputation: 18835

As of MongoDB Connector for Spark version 1.1+ (currently version 2.2) when you execute save() as below:

dataFrameWriter.write.mongo()
dataFrameWriter.write.format("com.mongodb.spark.sql").save()

If a dataframe contains an _id field, the data will be upserted. Which means any existing documents with the same _id value will be updated and new documents without existing _id value in the collection will be inserted. 

See also MongoDB Spark SQL for more information and snippets.

Upvotes: 2

Related Questions