saurabh
saurabh

Reputation: 303

Vertx timing out for DynamoDB insert calls

I am using DynamoDB with Vertx and one of my verticles is getting timed out with error

Timed out after waiting 30000(ms) for a reply. address

eventBus.send("test", testObject, x -> {
  if (x.succeeded()) {
        log.info("done successfully")
  } else {
        error(“error while completing”)
  }
}
);
public CompletableFuture<Void> process(Object testObject) {
    return CompletableFuture.runAsync(() -> dynamoMapper.save(testObject))
            .thenAcceptAsync(result -> {
        log.info("done successfully")
            }).exceptionally(throwable -> {
                throw new CompletionException(throwable);
            });
}

final void listen(String address) {
    eventBus.consumer("test", x -> process(x).whenCompleteAsync((result, t) -> {
        if (t == null) {
            x.reply(OK);
        } else {
            x.fail(0, errorMessage);
        }
    }));
}

But when I run that DynamoDB save query in async I am not having this issue. Can somebody suggest best practices to use DynamoDB with vert.x?

Upvotes: 0

Views: 340

Answers (2)

jthreja
jthreja

Reputation: 11

Suggest you to use the library: https://github.com/reactiverse/aws-sdk/tree/master/docs

This worked for me, and yes the correct way to use dynamo with vertx would be async.

Using maven:

<dependency>
    <groupId>io.reactiverse</groupId>
    <artifactId>aws-sdk</artifactId>
    <version>1.0.0</version>
</dependency>

Using Gradle:

implementation("io.reactiverse:aws-sdk:1.0.0")

How To:

DynamoDbAsyncClient dynamo = VertxSdkClient.withVertx(DynamoDbAsyncClient.builder().region(Region.EU_WEST_1), context).build();

Upvotes: 1

Tom
Tom

Reputation: 3850

In vert.x, by default, you can't pass pojo's via the eventbus (https://vertx.io/docs/vertx-core/java/#_types_of_messages)

Also, you do not need to use futures as vertx provide most of your needs.

I would use the aws async client, but you can not use the mapper with it at the moment. If you still want to use the mapper, it should be something like the following:

vertx.eventBus().<String>consumer("dynamodb-example", message -> {
  try {
    CatalogItem catalogItem = Json.decodeValue(message.body(), CatalogItem.class);
    dynamoDbMapper.save(catalogItem);
    message.reply(Json.encode(catalogItem));
  } catch (Exception e) {
    message.fail(500, e.getMessage());
  }
});

As their SDK is blocking, you can use executeBlocking to avoid blocking the event-bus:

vertx.eventBus().<String>consumer("dynamodb-example", message -> {
  vertx.<CatalogItem>executeBlocking(promise -> {
    CatalogItem catalogItem = Json.decodeValue(message.body(), CatalogItem.class);
    dynamoDbMapper.save(catalogItem);
    promise.complete(catalogItem);
  }, asyncResult -> {
    if (asyncResult.succeeded()) {
      message.reply(Json.encode(asyncResult.result()));
    } else {
      message.fail(500, asyncResult.cause().getMessage());
    }
  });
});

Then you can call it:

CatalogItem catalogItem = new CatalogItem();
vertx.eventBus().<String>request("dynamodb-example", Json.encode(catalogItem), asyncResult -> {
  if (asyncResult.succeeded()) {
    CatalogItem catalogItemResult = Json.decodeValue(asyncResult.result().body(), CatalogItem.class);
    // do something
  } else {
    // handle exception
  }
});

You can also work with JsonObject instead of strings, or define your own codecs to pass pojo's via the event-bus.

Upvotes: 1

Related Questions