Reputation: 303
I am using DynamoDB with Vertx and one of my verticles is getting timed out with error
Timed out after waiting 30000(ms) for a reply. address
eventBus.send("test", testObject, x -> {
if (x.succeeded()) {
log.info("done successfully")
} else {
error(“error while completing”)
}
}
);
public CompletableFuture<Void> process(Object testObject) {
return CompletableFuture.runAsync(() -> dynamoMapper.save(testObject))
.thenAcceptAsync(result -> {
log.info("done successfully")
}).exceptionally(throwable -> {
throw new CompletionException(throwable);
});
}
final void listen(String address) {
eventBus.consumer("test", x -> process(x).whenCompleteAsync((result, t) -> {
if (t == null) {
x.reply(OK);
} else {
x.fail(0, errorMessage);
}
}));
}
But when I run that DynamoDB save query in async I am not having this issue. Can somebody suggest best practices to use DynamoDB with vert.x?
Upvotes: 0
Views: 340
Reputation: 11
Suggest you to use the library: https://github.com/reactiverse/aws-sdk/tree/master/docs
This worked for me, and yes the correct way to use dynamo with vertx would be async.
Using maven:
<dependency>
<groupId>io.reactiverse</groupId>
<artifactId>aws-sdk</artifactId>
<version>1.0.0</version>
</dependency>
Using Gradle:
implementation("io.reactiverse:aws-sdk:1.0.0")
How To:
DynamoDbAsyncClient dynamo = VertxSdkClient.withVertx(DynamoDbAsyncClient.builder().region(Region.EU_WEST_1), context).build();
Upvotes: 1
Reputation: 3850
In vert.x, by default, you can't pass pojo's via the eventbus (https://vertx.io/docs/vertx-core/java/#_types_of_messages)
Also, you do not need to use futures as vertx provide most of your needs.
I would use the aws async client, but you can not use the mapper with it at the moment. If you still want to use the mapper, it should be something like the following:
vertx.eventBus().<String>consumer("dynamodb-example", message -> {
try {
CatalogItem catalogItem = Json.decodeValue(message.body(), CatalogItem.class);
dynamoDbMapper.save(catalogItem);
message.reply(Json.encode(catalogItem));
} catch (Exception e) {
message.fail(500, e.getMessage());
}
});
As their SDK is blocking, you can use executeBlocking to avoid blocking the event-bus:
vertx.eventBus().<String>consumer("dynamodb-example", message -> {
vertx.<CatalogItem>executeBlocking(promise -> {
CatalogItem catalogItem = Json.decodeValue(message.body(), CatalogItem.class);
dynamoDbMapper.save(catalogItem);
promise.complete(catalogItem);
}, asyncResult -> {
if (asyncResult.succeeded()) {
message.reply(Json.encode(asyncResult.result()));
} else {
message.fail(500, asyncResult.cause().getMessage());
}
});
});
Then you can call it:
CatalogItem catalogItem = new CatalogItem();
vertx.eventBus().<String>request("dynamodb-example", Json.encode(catalogItem), asyncResult -> {
if (asyncResult.succeeded()) {
CatalogItem catalogItemResult = Json.decodeValue(asyncResult.result().body(), CatalogItem.class);
// do something
} else {
// handle exception
}
});
You can also work with JsonObject instead of strings, or define your own codecs to pass pojo's via the event-bus.
Upvotes: 1