locus
locus

Reputation: 59

Vertx: Timeout in message reply

I have a sender and a consumer that exchange messages:

public class Sender extends AbstractVerticle {
@Override
public void start() {
    EventBus eventBus = vertx.eventBus();
    eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    });
    eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}

public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;

@Override
public void start() {
    eventBus = vertx.eventBus();

    eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    }));
}

}

I expect that when the consumer replies to the message, it will be received by the sender. However, I get a timeout:

Successfully sent reply

Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1

Deployment:

public class ServiceLauncher { 

private static Vertx vertx = Vertx.vertx();

public static void main(String[] args) {
    vertx.deployVerticle(new Consumer(), res -> {
        if (res.succeeded()) {
            System.out.println("Verticle " + Consumer.NAME + " deployed.");
            vertx.deployVerticle(new Sender());
            System.out.println("Verticle " + Sender.NAME + " deployed.");
        } else {
            System.out.println("Verticle " + Consumer.NAME + " not deployed.");
        }
    });
}

What am I doing wrong? Thanx in advance

Update: The problem is in msg.reply() - the consumer doesn't reply to the message but I can't still figure out why.

Upvotes: 4

Views: 8261

Answers (2)

YheCoder
YheCoder

Reputation: 61

I had a similar issue. In my case, I was sending a non JsonObject reply. The message has to be replied to with a valid JsonObject -- not JsonArray or any other. This looks the default behaviour although the doc mentions JsonObject is not required. But the real problem in your original code snippet is that you have specified a handler for the reply's reply. The Consumer is replying successfully but the consumer is not getting a reply from Sender. See below with comment.

@Override
public void start() {
    eventBus = vertx.eventBus();

    eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
        if (res.succeeded()) { //this is expecting a response from Sender which never does so this will never execute.
            System.out.println("Successfully sent reply");
        } else { //it's not failing either so this will not execute either
            System.out.println("Failed to send reply." + res.cause());
        }
    }));
}

Upvotes: 1

xuthus
xuthus

Reputation: 144

The timeout occurs not in the sender of the request, but in its recipient. Handler, defined in msg.reply(), waits for next reply from the sender. It is not a handler, confirming just send status. And handler in Sender's eventBus.send() also fires when sender receives a reply.

Just remove handler in msg.reply() and modify handler eventBus.send() in Sender in the same manner:

public class Sender extends AbstractVerticle {
    public static final String NAME = "SENDER";

    @Override
    public void start() {
        EventBus eventBus = vertx.eventBus();
        eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
            if (res.succeeded()) {
                System.out.println("Successfully received reply: " + res.result().body());
            } else {
                System.out.println("Failed to send reply." + res.cause());
            }
        });

    }
}

and

public class Consumer extends AbstractVerticle {
    public static final String NAME = "CONSUMER";

    @Override
    public void start() {
        final EventBus eventBus = vertx.eventBus();
        eventBus.consumer(Constants.ADDRESS, msg -> {
            System.out.println("Message received");
            msg.reply("Hi from consumer.");
        });

    }
}

And after execute you'll see:

Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.

Upvotes: 3

Related Questions