Divya
Divya

Reputation: 13

How to avoid sudden spikes in batch size in Spark streaming?

I am streaming data from kafka and trying to limit the number of events per batch to 10 events. After processing for 10-15 batches, there is a sudden spike in the batch size. Below are my settings:

spark.streaming.kafka.maxRatePerPartition=1

spark.streaming.backpressure.enabled=true

spark.streaming.backpressure.pid.minRate=1

spark.streaming.receiver.maxRate=2

Please check this image for the streaming behavior

Upvotes: 0

Views: 1002

Answers (1)

ptaku_cc
ptaku_cc

Reputation: 11

This is the bug in spark, please reffer to: https://issues.apache.org/jira/browse/SPARK-18371

The pull request isn't merged yet, but you may pick it up and build spark on your own.

To summarize the issue:

If you have the spark.streaming.backpressure.pid.minRate set to a number <= partition count, then an effective rate of 0 is calculated:

val totalLag = lagPerPartition.values.sum
...
    val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
...

(the second line calculates rate per partition where rate is rate comming from PID and defaults to minRate, when PID calculates it shall be smaller) As here: DirectKafkaInputDStream code

This resulting to 0 causes the fallback to (unreasonable) head of partitions:

    ...
    if (effectiveRateLimitPerPartition.values.sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
      })
    } else {
      None
    }

    ...

maxMessagesPerPartition(offsets).map { mmp =>
  mmp.map { case (tp, messages) =>
    val lo = leaderOffsets(tp)
    tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
  }
}.getOrElse(leaderOffsets)

As in DirectKafkaInputDStream#clamp

This makes the backpressure basically not working when your actual and minimal receive rate/msg/ partitions is smaller ~ equal to partitions count and you experience significant lag (e.g. messages come in spikes and you have constant processing powers).

Upvotes: 1

Related Questions