John Humphreys
John Humphreys

Reputation: 39354

Kafka + Spark Streaming - Fairness between partitions?

I have a 20 partition topic in Kafka and am reading it with Spark Streaming (8 executors, 3 cores each). I'm using the direct stream method of reading.

I'm having problems because the first 12 partitions are getting read at a faster rate than the last 8 for some reason. So, data in the last 8 is getting stale (well, staler).

Partitions 12-19 are around 90% caught up to partitions 0-11; but we're talking about billions of messages; so the stale-ness of the data 10% back in the topic partition is pretty significant.

Is this normal? Can I make sure Kafka consumes the partitions more fairly?

Upvotes: 1

Views: 1372

Answers (2)

John Humphreys
John Humphreys

Reputation: 39354

In my particular case, it turns out that I'm hitting some sort of bug (possibly in MapR's distribution).

The bug causes the offsets of certain partitions to reset to 0 which, when observed later, causes them to just look incrementally a little behind.

I found configuration parameters which mitigate the issue, and a much larger discussion on the topic is available here: https://community.mapr.com/thread/22319-spark-streaming-mapr-streams-losing-partitions

Configuration Example - On Spark Context

 .set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(Config.config.AGG_KAFKA_POLLING_MS))
 .set("spark.streaming.kafka.maxRetries", String.valueOf(10))

Edit

Confirmed that other people have had this issue as well with Spark Streaming + MapR-Streams/Kafka - this configuration seemed to lessen the chance of it happening but it did eventually come back.

You can work around it with a safety check that detects the condition and "fixes" the offset using a standard Kafka consumer prior to starting your spark stream (the problem occurs when restarting the streaming app); but you have to store the offsets externally to do this. Compounding this problem, you can't reliably provide offsets to Spark 2.1.0 streaming on start-up due to another bug; this is why you must manipulate the offsets with a consumer prior to starting the streaming; that way it is starting from offsets already stored in Kafka.

Upvotes: 0

vaquar khan
vaquar khan

Reputation: 11489

Kafka consumer consumption divides partitions over consumer instances within a consumer group. Each consumer in the consumer group is an exclusive consumer of a “fair share” of partitions. This is how Kafka does load balancing of consumers in a consumer group. Consumer membership within a consumer group is handled by the Kafka protocol dynamically. If new consumers join a consumer group, it gets a share of partitions. If a consumer dies, its partitions are split among the remaining live consumers in the consumer group. This is how Kafka does fail over of consumers in a consumer group.

UnderReplicatedPartitions:

In a healthy cluster, the number of in sync replicas (ISRs) should be exactly equal to the total number of replicas. If partition replicas fall too far behind their leaders, the follower partition is removed from the ISR pool, and you should see a corresponding increase in IsrShrinksPerSec. Since Kafka’s high-availability guarantees cannot be met without replication, investigation is certainly warranted should this metric value exceed zero for extended time periods.

IsrShrinksPerSec/IsrExpandsPerSec:

The number of in-sync replicas (ISRs) for a particular partition should remain fairly static, the only exceptions are when you are expanding your broker cluster or removing partitions. In order to maintain high availability, a healthy Kafka cluster requires a minimum number of ISRs for failover. A replica could be removed from the ISR pool for a couple of reasons: it is too far behind the leader’s offset (user-configurable by setting the replica.lag.max.messages configuration parameter), or it has not contacted the leader for some time (configurable with the replica.socket.timeout.ms parameter). No matter the reason, an increase in IsrShrinksPerSec without a corresponding increase in IsrExpandsPerSec shortly thereafter is cause for concern and requires user intervention.The Kafka documentation provides a wealth of information on the user-configurable parameters for brokers.

https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/

Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish.

By default, Spark’s scheduler runs jobs in FIFO fashion and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc.

Spark Streaming you can configure fairscheduling mode and Spark Streaming's JobScheduler should submit Spark jobs per topic in parallel

To enable the fair scheduler, simply set the spark.scheduler.mode property to FAIR when configuring a SparkContext:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR") val sc = new
SparkContext(conf)

Upvotes: 2

Related Questions