Bheid
Bheid

Reputation: 307

Vertx delayed batch process

How can I process a list of delayed jobs in Vertx (actually hundreds of HTTP GET requests, to limited API that bans fast requesting hosts)? now, I am using this code and it gets blocked because Vertx starts all requests at once. It is desirable to process each request with a 5-second delay between each request.

public void getInstrumnetDailyInfo(Instrument instrument,
                                   Handler<AsyncResult<OptionInstrument>> handler) {
  webClient
    .get("/Loader")
    .addQueryParam("i", instrument.getId())
    .timeout(30000)
    .send(
      ar -> {
        if (ar.succeeded()) {
          String html = ar.result().bodyAsString();
          Integer thatData = processHTML(html);
            instrument.setThatData(thatData);
            handler.handle(Future.succeededFuture(instrument));
        } else {
          // error
          handler.handle(Future.failedFuture("error " +ar.cause()));
        }
      });
}

public void start(){
  List<Instrument> instruments = loadInstrumentsList();
  instruments.forEach(
    instrument -> {
      webClient.getInstrumnetDailyInfo(instrument,
            async -> {
              if(async.succeeded()){
                instrumentMap.put(instrument.getId(), instrument);
              }else {
                log.warn("getInstrumnetDailyInfo: ", async.cause());
              }
            });
        });
}

Upvotes: 1

Views: 796

Answers (2)

teppic
teppic

Reputation: 7286

You could use any out of the box rate limiter function and adapt it for async use.

An example with the RateLimiter from Guava:

    // Make permits available at a rate of one every 5 seconds
    private RateLimiter limiter = RateLimiter.create(1 / 5.0);

    // A vert.x future that completes when it obtains a throttle permit
    public Future<Double> throttle() {
        return vertx.executeBlocking(p -> p.complete(limiter.acquire()), true);
    }

Then...

   throttle()
       .compose(d -> {
           System.out.printf("Waited %.2f before running job\n", d);
           return runJob(); // runJob returns a Future result
       });

Upvotes: 1

Tcheutchoua Steve
Tcheutchoua Steve

Reputation: 556

You can consider using a timer to fire events (rather than all at startup).

There are two variants in Vertx,

  1. .setTimer() that fires a specific event after a delay

    vertx.setTimer(interval, new Handler<T>() {});

and
2. .setPeriodic() that fires every time a specified period of time has passed.

vertx.setPeriodic(interval, new Handler<Long>() {});

setPeriodic seems to be what you are looking for.

You can get more info from the documentation

For more sophisticated Vertx scheduling use-cases, you can have a look at Chime or other schedulers or this module

Upvotes: 3

Related Questions