Reputation: 315
I wrote a basic pipeline in Go running on Google Dataflow.
Basically it transforms Pubsub events to elastic documents and then update Elastic document in bulk.
I need to find a way to limit the number of Bulk request per second. Because when my pubsub subscription accumulated a lot of messages and my Dataflow streaming job wants to "catch up", it's literally killing my Elastic cluster.
How would you do it?
Upvotes: 1
Views: 219
Reputation: 605
You can use the Throttle transform. This transform limits the number of elements that pass through it per unit of time. You can specify the number of elements and the duration of the time window. Here is an example:
import "github.com/apache/beam/sdks/go/pkg/beam"
func ThrottlePipeline(s beam.Scope, input beam.PCollection) beam.PCollection
{
return beam.ParDo(s, func(_ []byte, emit func(interface{})) {
emit(1)
}, input).
Apply(s, beam.WindowInto(window.NewFixedWindows(time.Second))).
Apply(s, beam.CombinePerKey(sum.SumInt64)).
Apply(s, beam.ParDo(func(_ []byte, count int64) {
time.Sleep(time.Duration(count) * time.Second)
}, beam.Impulse(s)))
}
You can adjust the duration of the time window to change the throughput rate.
Upvotes: 0