Reputation: 343
i am new to apache spark and i was trying to run a test application using spark. The problem which i'm facing is that when i create a RDD using the collection of data i want to process, it gets created but it spark doesn't start processing it unless and until i call the .collect method present in the RDD class. In this way i have to wait for spark to process the RDD. Is there some way that spark automatically processes the collection as soon as i form the RDD and then i can call the .collect method to get the processed data any time without have to wait for spark?
Moreover is there any way i can use spark to put the processed data into a database instead of returning it to me?
The code that i'm using is given below:
object appMain extends App {
val spark = new SparkContext("local", "SparkTest")
val list = List(1,2,3,4,5)
// i want this rdd to be processed as soon as it is created
val rdd = spark.parallelize(list.toSeq, 1).map{ i =>
i%2 // checking if the number is even or odd
}
// some more functionality here
// the job above starts when the line below is executed
val result = rdd.collect
result.foreach(println)
}
Upvotes: 0
Views: 578
Reputation: 2362
maasg has explained the things in details. you can apply count on the RDD after creating it.
rdd.count()
Another way can be to print a few lines of the RDD just like
rdd.take(10).foreach(println)
Upvotes: 0
Reputation: 37435
Spark uses a lazy evaluation model in which transformation operations are only applied once an 'action' is called on the RDD. This model fits well a batch operation applied to a large dataset. It's possible to 'cache' some parts of a computation by using rdd.cache(), but that does not force computation, only indicates that once the RDD is available, it should be cached.
Further clarification from the comments indicate that the OP might be better served using a streaming model in which incoming data is processed in 'micro-batches'
This is an example of how an 'urgent event count' streaming job could look like (not tested, for illustrative purposes onle) (based on the Network WordCountExample
object UrgentEventCount {
def main(args: Array[String]) {
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("UrgentEventCount").setMaster(SPARK_MASTER)
val ssc = new StreamingContext(sparkConf, Seconds(1))
val dbConnection = db.connect(dbHost, dbPort)
val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
//we assume events are tab separated
val events = lines.flatMap(_.split("\t"))
val urgentEventCount = events.filter(_.contains("URGENT"))
dbConnection.execute("Insert into UrgentEvents ...)
ssc.start()
}
As you can see, if you need to connect to a database, all you need to do is to provide the necessary driver and code to execute the db interaction. Be sure to include the driver's dependencies in the job jar-file.
Upvotes: 1