Reputation: 139
Trying out Vertx EventBus for the first time, using Vert.x and SpringBoot, I want to create a microservice that publishes the prizes of items every 2 seconds using Eventbus, I have created a verticle to publish the items and the logs show it has been deployed successfully, also the prize log show that message is published in event bus and the consumer also receives it. However when I send an HTTP request to see the response it returns the error
io.vertx.core.eventbus.ReplyException: Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.1, repliedAddress: prizes
Below is what I have done
PrizeBrokerVerticle
@Component
public class PrizeBrokerVerticle extends AbstractVerticle {
public static final Logger LOGGER = Logger.getLogger(PrizeBrokerVerticle.class.getName());
private final EventBus eventBus;
private final List<String> availablePrizes = List.of("Apple", "HP", "Dell");
public PrizeBrokerVerticle(EventBus eventBus) {
this.eventBus = eventBus;
}
public void start() {
vertx.setPeriodic(2000, timerId -> {
String randomPrize = generateRandomPrize();
eventBus.publish("prizes", randomPrize);
LOGGER.info("Prize published to EventBus");
});
}
private String generateRandomPrize() {
Random random = new Random();
int index = random.nextInt(availablePrizes.size());
return availablePrizes.get(index);
}
}
PrizeController
@Controller
public class PrizeController implements Function<RoutingContext, Uni<Void>> {
public static final Logger LOGGER = Logger.getLogger(PrizeController.class.getName());
private final EventBus eventBus;
public PrizeController(Router router, EventBus eventBus) {
this.eventBus = eventBus;
router.get("/api/prizes")
.respond(this);
}
@Override
public Uni<Void> apply(RoutingContext ctx) {
eventBus.consumer("prizes", message -> {
String prize = (String) message.body();
LOGGER.info("Received prize from EventBus: " + prize);
});
return eventBus.request("prizes", "")
.flatMap(reply -> {
Object body = reply.body();
if (body != null) {
return ctx.response()
.setStatusCode(200)
.putHeader("content-type", "application/json")
.end((String) body);
} else {
LOGGER.warning("Received null response from EventBus");
return ctx.response()
.setStatusCode(500)
.end("Internal Server Error");
}
});
}
}
Logs
: Started Main in 0.546 seconds (process running for 0.801)
2023-12-19T16:56:46.291+01:00 INFO 39191 --- [ main] org.example.runner.VertxRunner : []
2023-12-19T16:56:46.306+01:00 INFO 39191 --- [ntloop-thread-3] org.example.model.ConsumerPrize : ConsumerPrize verticle started
2023-12-19T16:56:46.307+01:00 INFO 39191 --- [ntloop-thread-1] org.example.verticle.MainVerticle : PrizeBrokerVerticle deployed with ID: 0fceae1f-5106-49da-9aa9-28dc70b14907
2023-12-19T16:56:46.307+01:00 INFO 39191 --- [ntloop-thread-1] org.example.verticle.MainVerticle : ConsumerPrize deployed with ID: a34b18d6-f621-4067-8a7f-6b19d667a3b8
2023-12-19T16:56:46.347+01:00 INFO 39191 --- [ntloop-thread-1] org.example.verticle.MainVerticle : Server Running on http://localhost8081
2023-12-19T16:56:46.347+01:00 INFO 39191 --- [ntloop-thread-0] org.example.runner.VertxRunner : 3d63fe6d-3186-4598-9111-2e626b7008f0
2023-12-19T16:56:52.306+01:00 INFO 39191 --- [ntloop-thread-2] o.example.verticle.PrizeBrokerVerticle : Prize published to EventBus
2023-12-19T16:56:52.306+01:00 INFO 39191 --- [ntloop-thread-1] org.example.controller.PrizeController : Received prize from EventBus: HP
2023-12-19T16:56:54.305+01:00 INFO 39191 --- [ntloop-thread-2] o.example.verticle.PrizeBrokerVerticle : Prize published to EventBus
2023-12-19T16:56:54.305+01:00 INFO 39191 --- [ntloop-thread-1] org.example.controller.PrizeController : Received prize from EventBus: Dell
Okay I made some progress after checking the docs
The Vert.x eventBus.request("prizes", "") sends a message to a consumer using the same address and expects a reply. So if I refactor the eventBus.consumer method to
eventBus.consumer("prizes", message -> {
String prize = (String) message.body();
message.reply("Response from consumer");
});
"Response from consumer" is returned when an HTTP call is made, similarly if I return the message.body() on the message reply I get the content of the eventBus.request("prizes", ""), in this case an empty string
eventBus.consumer("prizes", message -> {
String prize = (String) message.body();
message.reply(prize);
});
But what I want to achieve is to return the radomPrize published to the eventBus when an HTTP call is made. Hope this helps someone
Upvotes: 1
Views: 140