Kolban
Kolban

Reputation: 15266

Design of a pipeline that invokes a maximum number of requests per second

My goal is to create a pipeline that invokes a back-end (Cloud hosted) service a maximum number of times per second ... how can I achieve that?

Back story: Imagine a back-end service that is invoked with a single input and returns a single output. This service has quotas associated with it that permit a maximum number of requests per second (let's say 10 requests per second). Now imagine an unbounded source PCollection where I wish to transform the elements in the input by passing them through my back-end service. I can envisage a ParDo invoking the back-end service once for each element in the input PCollection. However, this doesn't perform any kind of flow control against the back-end.

I could imagine my DoFn logic testing the response from the back-end response and retrying till it succeeds but this doesn't feel right. If I have 100 workers, then I seem to be burning a lot of resources and putting a load on the back-end. What I think I want to do is throttle the calls to the back-end from the pipeline.

Upvotes: 3

Views: 674

Answers (1)

Damon
Damon

Reputation: 841

Good Day, kolban. In addition to Bruno Volpato's helpful RampupThrottlingFn example, I've seen a combination of the following. Please do not hesitate at all to let me know how I can update the example with more clarity.

  1. PeriodicImpulse - emits an Instant at a fixed specified interval.
  2. Fix the number of workers with the maxNumWorkers and numWorkers (Please see Dataflow Pipeline Options), if using the Dataflow runner.
  3. Beam Metrics API to monitor the actual resource request count over time and set alerts. When using Dataflow, the Beam Metrics API automatically connects to Cloud Monitoring as Custom metrics

The following shows abbreviated code starting from the whole pipeline followed by some details as needed to provide clarity. It assumes a target of 10 workers, using Dataflow with the arguments --maxNumWorkers=10 and --numWorkers=10 and a goal to limit the resource requests among all workers to 10 requests per second. This translates to 1 request per second per worker.

PeriodicImpulse limits the Request creation to 1 per second

public class MyPipeline {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(/* Usually with options */);
        PCollection<Response> responses = pipeline.apply(
            "PeriodicImpulse",
            PeriodicImpulse
                .create()
                .withInterval(Duration.standardSeconds(1L))
        ).apply(
            "Build Requests",
            ParDo.of(new RequestFn())
        )
        .apply(ResourceTransform.create());
    }
}

RequestFn DoFn emits Requests per Instant emitted from PeriodicImpulse

class RequestFn extends DoFn<Instant, Request> {
    @ProcessElement
    public void process(@Element Instant instant, OutputReceiver<Request> receiver) {
        receiver.output(
            Request.builder().build()
        );
    }
}

ResourceTransform transforms Requests to Responses, incrementing a Counter

class ResourceTransform extends PTransform<PCollection<Request>, PCollection<Response>> {

    static ResourceTransform create() {
        return new ResourceTransform();
    }

    public PCollection<Response> expand(PCollection<Request> input) {
        return ParDo.of("Consume Resource", new ResourceFn());
    }
}

class ResourceFn extends DoFn<Request, Response> {

    private Counter counter = Metrics.counter(ResourceFn.class, "some:resource");

    private transient ResourceClient client = null;

    @Setup
    public void setup() {
        client = new ResourceClient();
    }

    @ProcessElement
    public void process(@Element Request request, OutputReceiver<> receiver) 
    {
        counter.inc(); // Increment the counter.
        // not showing error handling
        Response response = client.execute(request);
        receiver.output(response);
    }
}

Request and Response classes

(Aside: consider creating a Schema for the request input and response output classes. Example below uses AutoValue and AutoValueSchema)

@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Request {

    /* abstract Getters. */

    abstract String getId();

    @AutoValue.Builder
    static abstract class Builder {

        /* abstract Setters. */

        abstract Builder setId(String value);

        abstract Request build();
    }
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Response {

    /* abstract Getters. */

    abstract String getId();

    @AutoValue.Builder
    static abstract class Builder {

        /* abstract Setters. */

        abstract Builder setId(String value);

        abstract Response build();
    }
}


Upvotes: 1

Related Questions