Val Entin
Val Entin

Reputation: 981

Using Spark Context in map of Spark Streaming Context to retrieve documents after Kafka Event

I'm new to Spark. What I'm trying to do is retrieving all related documents from a Couchbase View with a given Id from Spark Kafka Streaming.

When I try to get this documents form the Spark Context, I always have the error Task not serializable.

From there, I do understand that I can't use nesting RDD neither multiple Spark Context in the same JVM, but want to find a work around.

Here is my current approach:

package xxx.xxx.xxx

import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.ViewQuery
import com.couchbase.spark._

import org.apache.spark.broadcast.Broadcast
import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object Streaming {
  // Method to create a Json document from Key and Value
  def CreateJsonDocument(s: (String, String)): JsonDocument = {
    //println("- Parsing document")
    //println(s._1)
    //println(s._2)
    val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
    (return_doc)
    //(return_doc.content().getString("click"), return_doc)
  }

  def main(args: Array[String]): Unit = {
    // get arguments as key value
    val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap

    println("----------------------------")
    println("Arguments passed to class")
    println("----------------------------")
    println("- Arguments")
    println(arguments)
    println("----------------------------")

    // If the length of the passed arguments is less than 4
    if (arguments.get("brokers") == null || arguments.get("topics") == null) {
      // Provide system error
      System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
    }

    // Create the Spark configuration with app name
    val conf = new SparkConf().setAppName("Streaming")
    // Create the Spark context
    val sc = new SparkContext(conf)
    // Create the Spark Streaming Context
    val ssc = new StreamingContext(sc, Seconds(2))

    // Setup the broker list
    val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
    // Setup the topic list
    val topics = arguments.getOrElse("topics", "").split(",").toSet
    // Get the message stream from kafka
    val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    docs
      // Separate the key and the content
      .map({ case (key, value) => (key, value) })
      // Parse the content to transform in JSON Document
      .map(s => CreateJsonDocument(s))
      // Call the view to all related Review Application Documents
      //.map(messagedDoc => RetrieveAllReviewApplicationDocs(messagedDoc, sc))
      .map(doc => {

        sc.couchbaseView(ViewQuery.from("my-design-document", "stats").key(messagedDoc.content.getString("id"))).collect()
      })
      .foreachRDD(
          rdd => {
             //Create a report of my documents and store it in Couchbase
             rdd.foreach( println )
          }
        )

    // Start the streaming context
    ssc.start()
    // Wait for termination and catch error if there is a problem in the process
    ssc.awaitTermination()
  }
}

Upvotes: 0

Views: 1130

Answers (1)

Val Entin
Val Entin

Reputation: 981

Found the solution by using the Couchbase Client instead of the Couchbase Spark Context.

I don't know if it is the best way to go in a performance side, but I can retrieve the docs I need for computation.

package xxx.xxx.xxx

import com.couchbase.client.java.{Bucket, Cluster, CouchbaseCluster}
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.{ViewResult, ViewQuery}

import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object Streaming {
  // Method to create a Json document from Key and Value
  def CreateJsonDocument(s: (String, String)): JsonDocument = {
    //println("- Parsing document")
    //println(s._1)
    //println(s._2)
    val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
    (return_doc)
    //(return_doc.content().getString("click"), return_doc)
  }

  // Method to retrieve related documents
  def RetrieveDocs (doc: JsonDocument, arguments: Map[String, String]): ViewResult = {
    val cbHosts = arguments.getOrElse("couchbase-hosts", "")
    val cbBucket = arguments.getOrElse("couchbase-bucket", "")
    val cbPassword = arguments.getOrElse("couchbase-password", "")

    val cluster: Cluster = CouchbaseCluster.create(cbHosts)

    val bucket: Bucket = cluster.openBucket(cbBucket, cbPassword)
    val docs : ViewResult = bucket.query(ViewQuery.from("my-design-document", "my-view").key(doc.content().getString("id")))
    cluster.disconnect()
    println(docs)

    (docs)
  }

  def main(args: Array[String]): Unit = {
    // get arguments as key value
    val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap

    println("----------------------------")
    println("Arguments passed to class")
    println("----------------------------")
    println("- Arguments")
    println(arguments)
    println("----------------------------")

    // If the length of the passed arguments is less than 4
    if (arguments.get("brokers") == null || arguments.get("topics") == null) {
      // Provide system error
      System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
    }

    // Create the Spark configuration with app name
    val conf = new SparkConf().setAppName("Streaming")
    // Create the Spark context
    val sc = new SparkContext(conf)
    // Create the Spark Streaming Context
    val ssc = new StreamingContext(sc, Seconds(2))

    // Setup the broker list
    val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
    // Setup the topic list
    val topics = arguments.getOrElse("topics", "").split(",").toSet
    // Get the message stream from kafka
    val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    // Get broadcast arguments
    val argsBC = sc.broadcast(arguments)

    docs
      // Separate the key and the content
      .map({ case (key, value) => (key, value) })
      // Parse the content to transform in JSON Document
      .map(s => CreateJsonDocument(s))
      // Call the view to all related Review Application Documents
      .map(doc => RetrieveDocs(doc, argsBC))
      .foreachRDD(
          rdd => {
             //Create a report of my documents and store it in Couchbase
             rdd.foreach( println )
          }
        )

    // Start the streaming context
    ssc.start()
    // Wait for termination and catch error if there is a problem in the process
    ssc.awaitTermination()
  }
}

Upvotes: 0

Related Questions