Reputation: 4719
The following code in a Verticle puts a message on the event bus
io.vertx.reactivex.core.Vertx rxVertx = io.vertx.reactivex.core.Vertx.newInstance(vertx);
Single<Message<Integer>> reply = rxVertx.eventBus().<Integer>rxSend("address", "param");
reply.subscribe(r -> {
// Do something with value
});
Another Verticle consumes the message:
vertx.eventBus().<Integer>consumer("address", h -> {
Integer integer = ... // call to getValue() method
h.reply(integer);
});
The value to be returned comes from a mySql database using SQLClient. Currently the code to retrieve the value is in the following method:
private Single<Integer> getValue() {
return Single.create(source -> {
mySQLClient.getConnection(res -> {
if (res.succeeded()) {
SQLConnection connection = res.result();
connection.query("SELECT count(*) from myTable", result -> {
if (result.succeeded()) {
Integer i = result.result().getRows().get(0));
source.onSuccess(i);
}
});
} else {
source.onError(res.cause());
}
});
});
}
What is the correct way to call the getValue()
method from within the consumer?
The following:
vertx.eventBus().<Integer>consumer("address", h -> {
Single<integer> single = getValue();
h.reply(single.subscribe(
s -> System.out.println(s));
});
prints out the value before h.reply
returns, but how do you return the value from single.subscribe()
so that it becomes a parameter to h.reply()
?
Thanks
Upvotes: 1
Views: 671
Reputation: 780
You need to subscribe result and pass result to reply. You have inverted the syntax.
getvalue().subscribe(result -> {
h.reply(result);
}, ex -> {
//Handle error
});
Upvotes: 1