thomas legrand
thomas legrand

Reputation: 493

Spark Streaming: foreachRDD update my mongo RDD

I want to create a new mongodb RDD each time I enter inside foreachRDD. However I have serialization issues:

 mydstream  
   .foreachRDD(rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      // ssc is my StreamingContext
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })

This will give me an error:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)

Any idea?

Upvotes: 15

Views: 1159

Answers (2)

Markon
Markon

Reputation: 4610

You might try to use rdd.context that returns either a SparkContext or a SparkStreamingContext (if rdd is a DStream).

mydstream foreachRDD { rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })

Actually, it seems that RDD has also a .sparkContext method. I honestly don't know the difference, maybe they are aliases (?).

Upvotes: 7

Rami
Rami

Reputation: 8314

In my understanding you have to add if you have a "not serializable" object, you need to pass it through foreachPartition so you can make a connection to database on each node before running your processing.

mydstream.foreachRDD(rdd => {
        rdd.foreachPartition{
          val mongoClient = MongoClient("localhost", 27017)
          val db = mongoClient(mongoDatabase)
          val coll = db(mongoCollection)
          // ssc is my StreamingContext
          val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})

Upvotes: 2

Related Questions