dunlu_98k
dunlu_98k

Reputation: 309

Spark Streaming handle Skewed Kafka Partitions

Scenario:
Kafka -> Spark Streaming

Logic in each Spark Streaming microbatch (30 seconds):
Read Json->Parse Json->Send to Kafka

My streaming job is reading from around 1000 Kafka topics, with around 10K Kafka partitions, the throughput was around 5 million events/s.

The issue is coming from uneven traffic load between Kafka partitions, some partitions have throughput about 50 times of smaller ones, this results in skewed RDD partitions (as KafkaUtils creates 1:1 mapping from Kafka partitions to Spark partitions) and really hurt the overall performance, because for each microbatch, most executors are waiting for the one that has largest load to finish, I know this by looking at Spark UI, at some point of each microbatch, there`s only a few executors have "ACTIVE" tasks, all other executors are done with their task and waiting, also by looking at task time distribution, MAX is 2.5 minute but MEDIAN is only 20 seconds.

Notes:

  1. Spark Streaming not Structured Streaming
  2. I am aware of this post Spark - repartition() vs coalesce() , I`m not asking about difference between repartition() or coalesce(), load is consistent, so not relevant to autoscaling or dynamic allocation also

What I tried:

  1. Coalesce() helps a little bit but does not remove the skewness and sometimes even worse, also comes with a higher risk to OOM on executors.
  2. Repartition() does remove skewness but full shuffling is simply too expensive at this scale, the penalty does not payback on execution time for each batch, increasing the batch time does not work also because when batch time increases, load increases for each microbatch and the work load to shuffle increases also

How do I make workload more evenly distributed between Spark executors so that resources are being used more efficiently? And performance would be better?

Upvotes: 1

Views: 1936

Answers (2)

Grigoriev Nick
Grigoriev Nick

Reputation: 1129

I have the same issue. you can try the minPartitoin parameter from spark 2.4.7

Few things which are important to highlight.

  • By default One Kafka partition mapped to 1 spark partition or a few from spark to one from Kafka.
  • Kafka Dataframe has start and end boundaries per partition.
  • Kafka Dataframe maxMessagePerTrigger define a number of messages readed from kafka.
  • From Spark 2.4.7 also supports minParrtions parameter, which can bound one Kafka partition to multiple Kafka partitions based on offset range. By default, it tries to do its best effort to split Kafka partition(offset range) evenly.

So using minPartitons and maxOffsetsPerTrigger you can pre-calculate a good amount of partitions.

.option("minPartitions", partitionsNumberLoadedFromKafkaAdminAPI * splitPartitionFactor)
.option("maxOffsetsPerTrigger", maxEventsPerPartition * partitionsNumber)

maxEventsPerPartition and splitPartitionFactor defined from config.

In my case, sometimes I have data spikes and my message size can be very different. So I have implemented my own Streaming Source which can split kafka-partitions by exact record size and even coalesce a few kafka-parttiions on one spark.

Upvotes: 2

Ged
Ged

Reputation: 18108

Actually you have provided your own answer.

Do not have 1 Streaming Job reading from a 1000 topics. Put those with biggest load into separate Streaming Job(s). Reconfigure, that simple. Load balancing, queuing theory.

Stragglers are an issue in Spark, although a straggler takes on a slightly different trait in Spark.

Upvotes: 0

Related Questions