boolangery
boolangery

Reputation: 315

How to limit througput on an Apache Beam pileline in Go?

I wrote a basic pipeline in Go running on Google Dataflow.

Basically it transforms Pubsub events to elastic documents and then update Elastic document in bulk.

I need to find a way to limit the number of Bulk request per second. Because when my pubsub subscription accumulated a lot of messages and my Dataflow streaming job wants to "catch up", it's literally killing my Elastic cluster.

How would you do it?

Upvotes: 1

Views: 219

Answers (1)

Joevanie
Joevanie

Reputation: 605

You can use the Throttle transform. This transform limits the number of elements that pass through it per unit of time. You can specify the number of elements and the duration of the time window. Here is an example:

import "github.com/apache/beam/sdks/go/pkg/beam"

func ThrottlePipeline(s beam.Scope, input beam.PCollection) beam.PCollection 
{
    return beam.ParDo(s, func(_ []byte, emit func(interface{})) {
        emit(1)
    }, input).
        Apply(s, beam.WindowInto(window.NewFixedWindows(time.Second))).
        Apply(s, beam.CombinePerKey(sum.SumInt64)).
        Apply(s, beam.ParDo(func(_ []byte, count int64) {
            time.Sleep(time.Duration(count) * time.Second)
        }, beam.Impulse(s)))
}

You can adjust the duration of the time window to change the throughput rate.

Upvotes: 0

Related Questions