Reputation: 11
my collection config
{ "_id" : "db_1.target_collection", "lastmodEpoch" : ObjectId("6076a37e37c2cca5853da6df"), "lastmod" : ISODate("1970-02-19T17:02:47.301Z"), "dropped" : false, "key" : { "kfuin" : "hashed" }, "unique" : false, "uuid" : UUID("57c30bbe-af83-4410-a51f-c04f3c7522f4") }
I want to read from mongo and update into mongo
df.write.format('com.mongodb.spark.sql') \
.option('collection', 'target_collection') \
.option('replaceDocument', 'false') \
.option('shardKey', '{kfuin: 1}') \
.mode('append') \
.save()
I am getting this exception when I try to upsert when replaceDocument is true
com.mongodb.MongoBulkWriteException: Bulk write operation error on server ... message='After applying the update, the (immutable) field '_id' was found to have been altered to _id: ObjectId('5f80331981f3601291e04a1c')
and when replaceDocument is false
Performing an update on the path '_id' would modify the immutable field '_id''.
Any ideas?
Upvotes: 1
Views: 958
Reputation: 11
Resolved
if you want to update the data, shardKey need to add "_id"
.option('shardKey', '{kfuin: 1,_id: 1}')
MongoSpark.save method:
def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList
if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
MongoSpark.save(documentRdd, writeConfig)
} else {
documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
val requests = batch.map(doc =>
if (queryKeyList.forall(doc.containsKey(_))) {
val queryDocument = new BsonDocument()
queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
if (writeConfig.replaceDocument) {
new ReplaceOneModel[BsonDocument](queryDocument, doc, new ReplaceOptions().upsert(true))
} else {
queryDocument.keySet().asScala.foreach(doc.remove(_))
new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), new UpdateOptions().upsert(true))
}
} else {
new InsertOneModel[BsonDocument](doc)
})
collection.bulkWrite(requests.toList.asJava)
})
})
})
}
}
Upvotes: 0