Reputation: 55
I followed This Unable to send data to MongoDB using Kafka-Spark Structured Streaming to send data from spark structured streaming to mongoDB and I implemented it successfully but there is one issue . Like when function
override def process(record: Row): Unit = {
val doc: Document = Document(record.prettyJson.trim)
// lazy opening of MongoDB connection
ensureMongoDBConnection()
val result = collection.insertOne(doc)
if (messageCountAccum != null)
messageCountAccum.add(1)
}
code is executing without any problem but no data is being send to MongoDB
But if i add a print statement like this
override def process(record: Row): Unit = {
val doc: Document = Document(record.prettyJson.trim)
// lazy opening of MongoDB connection
ensureMongoDBConnection()
val result = collection.insertOne(doc)
result.foreach(println) //print statement
if (messageCountAccum != null)
messageCountAccum.add(1)
}
Data is getting inserted in MongoDB
I don't know why????
Upvotes: 0
Views: 44
Reputation: 2583
foreach
initializes the writer sink. Without the foreach your dataframe is never calculated.
Try this :
val df = // your df here
df.map(r => process(r))
df.count()
Upvotes: 1