Dinosaurius
Dinosaurius

Reputation: 8628

Execution of three parallel Spark Streaming jobs

I am running 3 Spark Streaming processes at the same time on EMR cluster of Amazon. The problem is that one of these three Spark Streaming jobs does the processing based on toLocalIterator:

dstreamdata.foreachRDD(entry => {
      entry.toLocalIterator.foreach

I have noticed that it gets stuck (looks like it lacks resources or so), but it does not return any error, just does not do any processing of data.

I use the folloiwng parameters of spark-submit for each job:

spark-submit --deploy-mode cluster --executor-cores 6 --executor-memory 10g --num-executors 2 --conf spark.yarn.submit.waitAppCompletion=false --queue queue_name_of_spark_job

Any idea how to resolve this issue without changing the code?

Upvotes: 0

Views: 967

Answers (1)

ImDarrenG
ImDarrenG

Reputation: 2345

1.1) If you are using Kinesis as your queue, ensure that you have twice as many executor cores as Kinesis shards. This may apply to Kafka, I forget how the kafka connector works. This is because the connector consumes one core per shard, so you must make sure you have available executor cores for actually processing the data.

In the past I've used one executor per kinesis shard, each executor having 2 or more cores which worked well in my use-cases.

1.2) At the moment your code is fetching all the data back to the driver as an iterator. If you have lots of data you may need to allocate more resources to the driver so that it has capacity to process all the data in the RDD. This kinda feels a bit wrong:- if you can fit all data in one instance you don't really need the complexity of Spark!

Spark 2.0.x Configuration provides the details of config available to you.

I recommend looking at driver.cores and/or driver.memory to start with. I suspect you need more cores but you will need to experiment.

2) I appreciate you don't want to change the code, but... If possible you could make use of entry.foreachPartition().

This approach avoids the performance issues of processing all data in the driver process. It or some variation of the logic should help you to solve your problem, depending on your exact use-case.

Here is some sample code with a link for more info:

dstream.foreachRDD { rdd =>
  // code here is executed by the driver
  rdd.foreachPartition { partitionOfRecords =>
    // code here is executed by the workers per partition
  }
}

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

Upvotes: 1

Related Questions