Reputation: 1255
Is it possible to configure Spark with the spark-streaming-kafka-0-10 library to read multiple Kafka partitions or an entire Kafka topic with a single task instead of creating a different Spark task for every Kafka partition available?
Please excuse my rough understanding of these technologies; I think I'm still new to Spark and Kafka. The architecture and settings are mostly just messing around to explore and see how these technologies work together.
I have a four virtual hosts, one with a Spark master and each with a Spark worker. One of the hosts is also running a Kafka broker, based on Spotify's Docker image. Each host has four cores and about 8 GB of unused RAM.
The Kafka broker has 206 topics, and each topic has 10 partitions. So there are a total of 2,060 partitions for applications to read from.
I'm using the spark-streaming-kafka-0-10 library (currently experimental) to subscribe to topics in Kafka from a Spark Streaming job. I am using the SubscribePattern
class to subscribe to all 206 topics from Spark:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
SubscribePattern[String, String](Pattern.compile("(pid\\.)\\d+"),
kafkaParams)
)
When I submit this job to the Spark master, it looks like 16 executors are started, one for each core in the cluster. It also looks like each Kafka partition gets its own task, for a total of 2,060 tasks. I think my cluster of 16 executors is having trouble churning through so many tasks because the job keeps failing at different points between 1500 and 1800 tasks completed.
I found a tutorial by Michael Noll from 2014 which addresses using the spark-streaming-kafka-0-8 library to control the number of consumer threads for each topic:
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val consumerThreadsPerInputDstream = 3
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
Upvotes: 1
Views: 2873
Reputation: 377
The second approach will be the best for your requirements. Only you have to set consumerThreadsPerInputDstream = 1. So only one thread will be created per read operations, hence single machine will be involved per cluster.
Upvotes: 1
Reputation: 149636
Is it possible to configure Spark with the spark-streaming-kafka-0-10 library to read multiple Kafka partitions or an entire Kafka topic with a single task instead of creating a different Spark task for every Kafka partition available?
You could alter the number of generated partitions by calling repartition
on the stream, but then you lose the 1:1 correspondence between Kafka and RDD partition.
The number of tasks generated by Kafka partitions aren't related to the fact you have 16 executors. The number of executors depend on your settings and the resource manager you're using.
There is a 1:1 mapping between Kafka partitions and RDD partitions with the direct streaming API, each executor will get a subset of these partitions to consume from Kafka and process where each partition is independent and can be computed on it's own. This is unlike the receiver based API which creates a single receiver on an arbitrary executor and consumes the data itself via threads on the node.
If you have 206 topics and 10 partitions each, you better have a decent sized cluster which can handle the load of the generated tasks. You can control the max messages generated per partition, but you can alter the number of partitions unless you're will to call the shuffling effect of the repartition
transformation.
Upvotes: 3