Reputation: 981
I'm new to Spark. What I'm trying to do is retrieving all related documents from a Couchbase View with a given Id from Spark Kafka Streaming.
When I try to get this documents form the Spark Context, I always have the error Task not serializable
.
From there, I do understand that I can't use nesting RDD neither multiple Spark Context in the same JVM, but want to find a work around.
Here is my current approach:
package xxx.xxx.xxx
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.ViewQuery
import com.couchbase.spark._
import org.apache.spark.broadcast.Broadcast
import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object Streaming {
// Method to create a Json document from Key and Value
def CreateJsonDocument(s: (String, String)): JsonDocument = {
//println("- Parsing document")
//println(s._1)
//println(s._2)
val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
(return_doc)
//(return_doc.content().getString("click"), return_doc)
}
def main(args: Array[String]): Unit = {
// get arguments as key value
val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap
println("----------------------------")
println("Arguments passed to class")
println("----------------------------")
println("- Arguments")
println(arguments)
println("----------------------------")
// If the length of the passed arguments is less than 4
if (arguments.get("brokers") == null || arguments.get("topics") == null) {
// Provide system error
System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
}
// Create the Spark configuration with app name
val conf = new SparkConf().setAppName("Streaming")
// Create the Spark context
val sc = new SparkContext(conf)
// Create the Spark Streaming Context
val ssc = new StreamingContext(sc, Seconds(2))
// Setup the broker list
val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
// Setup the topic list
val topics = arguments.getOrElse("topics", "").split(",").toSet
// Get the message stream from kafka
val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
docs
// Separate the key and the content
.map({ case (key, value) => (key, value) })
// Parse the content to transform in JSON Document
.map(s => CreateJsonDocument(s))
// Call the view to all related Review Application Documents
//.map(messagedDoc => RetrieveAllReviewApplicationDocs(messagedDoc, sc))
.map(doc => {
sc.couchbaseView(ViewQuery.from("my-design-document", "stats").key(messagedDoc.content.getString("id"))).collect()
})
.foreachRDD(
rdd => {
//Create a report of my documents and store it in Couchbase
rdd.foreach( println )
}
)
// Start the streaming context
ssc.start()
// Wait for termination and catch error if there is a problem in the process
ssc.awaitTermination()
}
}
Upvotes: 0
Views: 1130
Reputation: 981
Found the solution by using the Couchbase Client instead of the Couchbase Spark Context.
I don't know if it is the best way to go in a performance side, but I can retrieve the docs I need for computation.
package xxx.xxx.xxx
import com.couchbase.client.java.{Bucket, Cluster, CouchbaseCluster}
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.{ViewResult, ViewQuery}
import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object Streaming {
// Method to create a Json document from Key and Value
def CreateJsonDocument(s: (String, String)): JsonDocument = {
//println("- Parsing document")
//println(s._1)
//println(s._2)
val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
(return_doc)
//(return_doc.content().getString("click"), return_doc)
}
// Method to retrieve related documents
def RetrieveDocs (doc: JsonDocument, arguments: Map[String, String]): ViewResult = {
val cbHosts = arguments.getOrElse("couchbase-hosts", "")
val cbBucket = arguments.getOrElse("couchbase-bucket", "")
val cbPassword = arguments.getOrElse("couchbase-password", "")
val cluster: Cluster = CouchbaseCluster.create(cbHosts)
val bucket: Bucket = cluster.openBucket(cbBucket, cbPassword)
val docs : ViewResult = bucket.query(ViewQuery.from("my-design-document", "my-view").key(doc.content().getString("id")))
cluster.disconnect()
println(docs)
(docs)
}
def main(args: Array[String]): Unit = {
// get arguments as key value
val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap
println("----------------------------")
println("Arguments passed to class")
println("----------------------------")
println("- Arguments")
println(arguments)
println("----------------------------")
// If the length of the passed arguments is less than 4
if (arguments.get("brokers") == null || arguments.get("topics") == null) {
// Provide system error
System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
}
// Create the Spark configuration with app name
val conf = new SparkConf().setAppName("Streaming")
// Create the Spark context
val sc = new SparkContext(conf)
// Create the Spark Streaming Context
val ssc = new StreamingContext(sc, Seconds(2))
// Setup the broker list
val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
// Setup the topic list
val topics = arguments.getOrElse("topics", "").split(",").toSet
// Get the message stream from kafka
val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
// Get broadcast arguments
val argsBC = sc.broadcast(arguments)
docs
// Separate the key and the content
.map({ case (key, value) => (key, value) })
// Parse the content to transform in JSON Document
.map(s => CreateJsonDocument(s))
// Call the view to all related Review Application Documents
.map(doc => RetrieveDocs(doc, argsBC))
.foreachRDD(
rdd => {
//Create a report of my documents and store it in Couchbase
rdd.foreach( println )
}
)
// Start the streaming context
ssc.start()
// Wait for termination and catch error if there is a problem in the process
ssc.awaitTermination()
}
}
Upvotes: 0