Reputation: 13
I am streaming data from kafka and trying to limit the number of events per batch to 10 events. After processing for 10-15 batches, there is a sudden spike in the batch size. Below are my settings:
spark.streaming.kafka.maxRatePerPartition=1
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.pid.minRate=1
spark.streaming.receiver.maxRate=2
Please check this image for the streaming behavior
Upvotes: 0
Views: 1002
Reputation: 11
This is the bug in spark, please reffer to: https://issues.apache.org/jira/browse/SPARK-18371
The pull request isn't merged yet, but you may pick it up and build spark on your own.
To summarize the issue:
If you have the spark.streaming.backpressure.pid.minRate
set to a number <= partition count, then an effective rate of 0 is calculated:
val totalLag = lagPerPartition.values.sum
...
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
...
(the second line calculates rate per partition where rate
is rate comming from PID and defaults to minRate, when PID calculates it shall be smaller)
As here: DirectKafkaInputDStream code
This resulting to 0 causes the fallback to (unreasonable) head of partitions:
...
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
...
maxMessagesPerPartition(offsets).map { mmp =>
mmp.map { case (tp, messages) =>
val lo = leaderOffsets(tp)
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
}
}.getOrElse(leaderOffsets)
As in DirectKafkaInputDStream#clamp
This makes the backpressure basically not working when your actual and minimal receive rate/msg/ partitions is smaller ~ equal to partitions count and you experience significant lag (e.g. messages come in spikes and you have constant processing powers).
Upvotes: 1