Reputation: 5397
I was trying to setup the Spark streaming for multiple RabbitMQ queues. As mentioned below i have setup 2 workers, each worker is given one core and 2GB of memory. So, the problem is when i keep this parameter to be conf.set("spark.cores.max","2")
streaming doesn't process any data it just keep on adding the jobs. But once i set it to conf.set("spark.cores.max","3")
streaming starts processing it. So, i couldn't understand the reason for this. Also, if i want to process the data in parallel from both the queue how should i do it. I have mentioned my code and config settings below.
Spark-env.sh:
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_INSTANCES=1
SPARK_WORKER_CORES=1
Scala Code :
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1"))
val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams)
predReceiverStream.start()
Upvotes: 0
Views: 603
Reputation: 330063
This behavior is explained in the Streaming Guide. Each receiver is a long running process which and occupies a single thread.
If number of the available threads is less or equal to the number of the receivers there are no resources left for task processing:
the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.
Upvotes: 1