Reputation: 15266
My goal is to create a pipeline that invokes a back-end (Cloud hosted) service a maximum number of times per second ... how can I achieve that?
Back story: Imagine a back-end service that is invoked with a single input and returns a single output. This service has quotas associated with it that permit a maximum number of requests per second (let's say 10 requests per second). Now imagine an unbounded source PCollection where I wish to transform the elements in the input by passing them through my back-end service. I can envisage a ParDo invoking the back-end service once for each element in the input PCollection. However, this doesn't perform any kind of flow control against the back-end.
I could imagine my DoFn logic testing the response from the back-end response and retrying till it succeeds but this doesn't feel right. If I have 100 workers, then I seem to be burning a lot of resources and putting a load on the back-end. What I think I want to do is throttle the calls to the back-end from the pipeline.
Upvotes: 3
Views: 674
Reputation: 841
Good Day, kolban. In addition to Bruno Volpato's helpful RampupThrottlingFn example, I've seen a combination of the following. Please do not hesitate at all to let me know how I can update the example with more clarity.
maxNumWorkers
and numWorkers
(Please see Dataflow Pipeline Options), if using the Dataflow runner.The following shows abbreviated code starting from the whole pipeline followed by some details as needed to provide clarity. It assumes a target of 10
workers, using Dataflow with the arguments --maxNumWorkers=10
and --numWorkers=10
and a goal to limit the resource requests among all workers to 10 requests per second
. This translates to 1 request per second per worker
.
public class MyPipeline {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(/* Usually with options */);
PCollection<Response> responses = pipeline.apply(
"PeriodicImpulse",
PeriodicImpulse
.create()
.withInterval(Duration.standardSeconds(1L))
).apply(
"Build Requests",
ParDo.of(new RequestFn())
)
.apply(ResourceTransform.create());
}
}
class RequestFn extends DoFn<Instant, Request> {
@ProcessElement
public void process(@Element Instant instant, OutputReceiver<Request> receiver) {
receiver.output(
Request.builder().build()
);
}
}
class ResourceTransform extends PTransform<PCollection<Request>, PCollection<Response>> {
static ResourceTransform create() {
return new ResourceTransform();
}
public PCollection<Response> expand(PCollection<Request> input) {
return ParDo.of("Consume Resource", new ResourceFn());
}
}
class ResourceFn extends DoFn<Request, Response> {
private Counter counter = Metrics.counter(ResourceFn.class, "some:resource");
private transient ResourceClient client = null;
@Setup
public void setup() {
client = new ResourceClient();
}
@ProcessElement
public void process(@Element Request request, OutputReceiver<> receiver)
{
counter.inc(); // Increment the counter.
// not showing error handling
Response response = client.execute(request);
receiver.output(response);
}
}
(Aside: consider creating a Schema for the request input and response output classes. Example below uses AutoValue and AutoValueSchema)
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Request {
/* abstract Getters. */
abstract String getId();
@AutoValue.Builder
static abstract class Builder {
/* abstract Setters. */
abstract Builder setId(String value);
abstract Request build();
}
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Response {
/* abstract Getters. */
abstract String getId();
@AutoValue.Builder
static abstract class Builder {
/* abstract Setters. */
abstract Builder setId(String value);
abstract Response build();
}
}
Upvotes: 1