kuldeep gupta
kuldeep gupta

Reputation: 55

unable to send data to MongoDB using spark strucutred streaming

I followed This Unable to send data to MongoDB using Kafka-Spark Structured Streaming to send data from spark structured streaming to mongoDB and I implemented it successfully but there is one issue . Like when function

override def process(record: Row): Unit = {

    val doc: Document = Document(record.prettyJson.trim)
    // lazy opening of MongoDB connection


    ensureMongoDBConnection()
    val result = collection.insertOne(doc)
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

code is executing without any problem but no data is being send to MongoDB

But if i add a print statement like this

override def process(record: Row): Unit = {
    val doc: Document = Document(record.prettyJson.trim)

    // lazy opening of MongoDB connection


    ensureMongoDBConnection()
    val result = collection.insertOne(doc)
    result.foreach(println) //print statement
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

Data is getting inserted in MongoDB

I don't know why????

Upvotes: 0

Views: 44

Answers (1)

Sumeet Sharma
Sumeet Sharma

Reputation: 2583

foreach initializes the writer sink. Without the foreach your dataframe is never calculated.

Try this :

val df = // your df here
df.map(r => process(r))
df.count()

Upvotes: 1

Related Questions