Reputation: 67
Is there any way to upsert Mongo Collection with spark-mongo connector based on a certain field in dataframe ?
Upvotes: 2
Views: 5509
Reputation: 149
To replace documents based on unique key constraint, use replaceDocument and shardKey option. Default shardKey is {_id: 1}. https://docs.mongodb.com/spark-connector/master/configuration/
df.write.format('com.mongodb.spark.sql') \
.option('collection', 'target_collection') \
.option('replaceDocument', 'true') \
.option('shardKey', '{"date": 1, "name": 1, "resource": 1}') \
.mode('append') \
.save()
replaceDocument=false option makes your document merged based on the shardKey.
Upvotes: 4
Reputation: 71
Try option replaceDocument
df.select("_id").withColumn("aaa", lit("ha"))
.write
.option("collection", collectionName)
.option("replaceDocument", "false")
.mode(SaveMode.Append)
.format("mongo")
.save()
I dont know why in mongo document can not find any document for this option
Upvotes: 1
Reputation: 85
With some digging on mongo-spark's source, here's a simple hack to add the feature of upsert on certain fields, to MongoSpark.save
method:
// add additional keys parameter
def save[D](dataset: Dataset[D], writeConfig: WriteConfig, keys: List[String]): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = keys.isEmpty match {
case true => keys
case false => BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList
}
// the rest remains the same
// ...
}
Upvotes: 0
Reputation: 18835
As of MongoDB Connector for Spark version 1.1+ (currently version 2.2)
when you execute save()
as below:
dataFrameWriter.write.mongo()
dataFrameWriter.write.format("com.mongodb.spark.sql").save()
If a dataframe contains an _id
field, the data will be upserted. Which means any existing documents with the same _id
value will be updated and new documents without existing _id
value in the collection will be inserted.
See also MongoDB Spark SQL for more information and snippets.
Upvotes: 2