Reputation: 59
I have a sender and a consumer that exchange messages:
public class Sender extends AbstractVerticle {
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}
public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;
@Override
public void start() {
eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
}));
}
}
I expect that when the consumer replies to the message, it will be received by the sender. However, I get a timeout:
Successfully sent reply
Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1
Deployment:
public class ServiceLauncher {
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) {
vertx.deployVerticle(new Consumer(), res -> {
if (res.succeeded()) {
System.out.println("Verticle " + Consumer.NAME + " deployed.");
vertx.deployVerticle(new Sender());
System.out.println("Verticle " + Sender.NAME + " deployed.");
} else {
System.out.println("Verticle " + Consumer.NAME + " not deployed.");
}
});
}
What am I doing wrong? Thanx in advance
Update: The problem is in msg.reply() - the consumer doesn't reply to the message but I can't still figure out why.
Upvotes: 4
Views: 8261
Reputation: 61
I had a similar issue. In my case, I was sending a non JsonObject reply. The message has to be replied to with a valid JsonObject -- not JsonArray or any other. This looks the default behaviour although the doc mentions JsonObject is not required. But the real problem in your original code snippet is that you have specified a handler for the reply's reply. The Consumer is replying successfully but the consumer is not getting a reply from Sender. See below with comment.
@Override
public void start() {
eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
if (res.succeeded()) { //this is expecting a response from Sender which never does so this will never execute.
System.out.println("Successfully sent reply");
} else { //it's not failing either so this will not execute either
System.out.println("Failed to send reply." + res.cause());
}
}));
}
Upvotes: 1
Reputation: 144
The timeout occurs not in the sender of the request, but in its recipient.
Handler, defined in msg.reply()
, waits for next reply from the sender. It is not a handler, confirming just send status.
And handler in Sender
's eventBus.send()
also fires when sender receives a reply.
Just remove handler in msg.reply()
and modify handler eventBus.send()
in Sender
in the same manner:
public class Sender extends AbstractVerticle {
public static final String NAME = "SENDER";
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully received reply: " + res.result().body());
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
}
}
and
public class Consumer extends AbstractVerticle {
public static final String NAME = "CONSUMER";
@Override
public void start() {
final EventBus eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> {
System.out.println("Message received");
msg.reply("Hi from consumer.");
});
}
}
And after execute you'll see:
Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.
Upvotes: 3