Plus Ultra
Plus Ultra

Reputation: 139

io.vertx.core.eventbus.ReplyException: Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.1, repliedAddress: prizes

Trying out Vertx EventBus for the first time, using Vert.x and SpringBoot, I want to create a microservice that publishes the prizes of items every 2 seconds using Eventbus, I have created a verticle to publish the items and the logs show it has been deployed successfully, also the prize log show that message is published in event bus and the consumer also receives it. However when I send an HTTP request to see the response it returns the error

io.vertx.core.eventbus.ReplyException: Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.1, repliedAddress: prizes

Below is what I have done

PrizeBrokerVerticle

@Component
public class PrizeBrokerVerticle extends AbstractVerticle  {

    public static final Logger LOGGER = Logger.getLogger(PrizeBrokerVerticle.class.getName());

    private final EventBus eventBus;
    private final List<String> availablePrizes = List.of("Apple", "HP", "Dell");

    public PrizeBrokerVerticle(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    public void start() {
        vertx.setPeriodic(2000, timerId -> {
            String randomPrize = generateRandomPrize();

            eventBus.publish("prizes", randomPrize);
LOGGER.info("Prize published to EventBus");
        });
    }

    private String generateRandomPrize() {
        Random random = new Random();
        int index = random.nextInt(availablePrizes.size());
        return availablePrizes.get(index);
    }
}

PrizeController

@Controller
public class PrizeController implements Function<RoutingContext, Uni<Void>> {

    public static final Logger LOGGER = Logger.getLogger(PrizeController.class.getName());
    private final EventBus eventBus;
    public PrizeController(Router router, EventBus eventBus) {
        this.eventBus = eventBus;
        router.get("/api/prizes")
                .respond(this);
    }



    @Override
    public Uni<Void> apply(RoutingContext ctx) {

         eventBus.consumer("prizes", message -> {
            String prize = (String) message.body();
            LOGGER.info("Received prize from EventBus: " + prize);
        });

        return eventBus.request("prizes", "")
                .flatMap(reply -> {
                    Object body = reply.body();
                    if (body != null) {
                        return   ctx.response()
                                .setStatusCode(200)
                                .putHeader("content-type", "application/json")
                                .end((String) body);
                    } else {
                        LOGGER.warning("Received null response from EventBus");
                        return    ctx.response()
                                .setStatusCode(500)
                                .end("Internal Server Error");
                    }
                });
    }
}

Logs

: Started Main in 0.546 seconds (process running for 0.801)
2023-12-19T16:56:46.291+01:00  INFO 39191 --- [           main] org.example.runner.VertxRunner           : []
2023-12-19T16:56:46.306+01:00  INFO 39191 --- [ntloop-thread-3] org.example.model.ConsumerPrize          : ConsumerPrize verticle started
2023-12-19T16:56:46.307+01:00  INFO 39191 --- [ntloop-thread-1] org.example.verticle.MainVerticle        : PrizeBrokerVerticle deployed with ID: 0fceae1f-5106-49da-9aa9-28dc70b14907
2023-12-19T16:56:46.307+01:00  INFO 39191 --- [ntloop-thread-1] org.example.verticle.MainVerticle        : ConsumerPrize deployed with ID: a34b18d6-f621-4067-8a7f-6b19d667a3b8
2023-12-19T16:56:46.347+01:00  INFO 39191 --- [ntloop-thread-1] org.example.verticle.MainVerticle        : Server Running on http://localhost8081
2023-12-19T16:56:46.347+01:00  INFO 39191 --- [ntloop-thread-0] org.example.runner.VertxRunner           : 3d63fe6d-3186-4598-9111-2e626b7008f0
2023-12-19T16:56:52.306+01:00  INFO 39191 --- [ntloop-thread-2] o.example.verticle.PrizeBrokerVerticle   : Prize published to EventBus
2023-12-19T16:56:52.306+01:00  INFO 39191 --- [ntloop-thread-1] org.example.controller.PrizeController   : Received prize from EventBus: HP
2023-12-19T16:56:54.305+01:00  INFO 39191 --- [ntloop-thread-2] o.example.verticle.PrizeBrokerVerticle   : Prize published to EventBus
2023-12-19T16:56:54.305+01:00  INFO 39191 --- [ntloop-thread-1] org.example.controller.PrizeController   : Received prize from EventBus: Dell

Okay I made some progress after checking the docs

The Vert.x eventBus.request("prizes", "") sends a message to a consumer using the same address and expects a reply. So if I refactor the eventBus.consumer method to

  eventBus.consumer("prizes", message -> {
            String prize = (String) message.body();
            message.reply("Response from consumer");
        });

"Response from consumer" is returned when an HTTP call is made, similarly if I return the message.body() on the message reply I get the content of the eventBus.request("prizes", ""), in this case an empty string

eventBus.consumer("prizes", message -> {
            String prize = (String) message.body();
            message.reply(prize);
        });

But what I want to achieve is to return the radomPrize published to the eventBus when an HTTP call is made. Hope this helps someone

Upvotes: 1

Views: 140

Answers (0)

Related Questions