CrazySynthax
CrazySynthax

Reputation: 15008

vert.x: publish and consume messages from event bus

I wrote the following code:

public class VertxApp {

    public static void main(String[] args)  { // This is OK
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new ReceiveVerticle());  // line A
        vertx.deployVerticle(new SendVerticle());     // line B
    }
}

public class ReceiveVerticle extends AbstractVerticle{

    @Override
    public void start(Future<Void> startFuture) {
        vertx.eventBus().consumer("address", message -> {
            System.out.println("message received by receiver");
            System.out.println(message.body());
        });
    }
}

public class SendVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> startFuture) throws InterruptedException {
        System.out.println("SendVerticle started!");
        int i = 0;

        for (i = 0; i < 5; i++) {
            System.out.println("Sender sends a message " + i );
            vertx.eventBus().publish("address", "message" + i);
        }
    }
}

This code is inconsistent. There is a race condition. If I run the code several times, sometimes all 5 messages sent are consumed, and sometimes none of them is consumed.

Can you please explain why there is race condition here and how it can be solved?

Upvotes: 1

Views: 1376

Answers (1)

tsegismont
tsegismont

Reputation: 9128

There is no race condition, deploying a verticle is an asynchronous operation and your receiver verticle may register the consumer after the sender verticle has sent the messages.

To make sure operations happen in order, use the deploy method which takes a handler argument:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new ReceiveVerticle(), ar -> {
    if (ar.succeeded()) {
        vertx.deployVerticle(new SendVerticle());
    } else {
        // handle the problem -> ar.cause()
    }
});

Upvotes: 6

Related Questions