fq zzzhao
fq zzzhao

Reputation: 11

use spark-mongo to upsert

my collection config

{ "_id" : "db_1.target_collection", "lastmodEpoch" : ObjectId("6076a37e37c2cca5853da6df"), "lastmod" : ISODate("1970-02-19T17:02:47.301Z"), "dropped" : false, "key" : { "kfuin" : "hashed" }, "unique" : false, "uuid" : UUID("57c30bbe-af83-4410-a51f-c04f3c7522f4") }

I want to read from mongo and update into mongo

df.write.format('com.mongodb.spark.sql') \
    .option('collection', 'target_collection') \
    .option('replaceDocument', 'false') \
    .option('shardKey', '{kfuin: 1}') \
    .mode('append') \
    .save()

I am getting this exception when I try to upsert when replaceDocument is true

com.mongodb.MongoBulkWriteException: Bulk write operation error on server ... message='After applying the update, the (immutable) field '_id' was found to have been altered to _id: ObjectId('5f80331981f3601291e04a1c')

and when replaceDocument is false

Performing an update on the path '_id' would modify the immutable field '_id''.

Any ideas?

Upvotes: 1

Views: 958

Answers (1)

fq zzzhao
fq zzzhao

Reputation: 11

Resolved

if you want to update the data, shardKey need to add "_id"

.option('shardKey', '{kfuin: 1,_id: 1}') 

MongoSpark.save method:

def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
    val mongoConnector = MongoConnector(writeConfig.asOptions)
    val dataSet = dataset.toDF()
    val mapper = rowToDocumentMapper(dataSet.schema)
    val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
    val fieldNames = dataset.schema.fieldNames.toList
    val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList

    if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
      MongoSpark.save(documentRdd, writeConfig)
    } else {
      documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
        mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
          iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
            val requests = batch.map(doc =>
              if (queryKeyList.forall(doc.containsKey(_))) {
                val queryDocument = new BsonDocument()
                queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
                if (writeConfig.replaceDocument) {
                  new ReplaceOneModel[BsonDocument](queryDocument, doc, new ReplaceOptions().upsert(true))
                } else {
                  queryDocument.keySet().asScala.foreach(doc.remove(_))
                  new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), new UpdateOptions().upsert(true))
                }
              } else {
                new InsertOneModel[BsonDocument](doc)
              })
            collection.bulkWrite(requests.toList.asJava)
          })
        })
      })
    }
  }

Upvotes: 0

Related Questions