user1052610
user1052610

Reputation: 4719

How to correctly return a value from a RxJava Single to a Vertx Event Bus Consumer

The following code in a Verticle puts a message on the event bus

io.vertx.reactivex.core.Vertx rxVertx = io.vertx.reactivex.core.Vertx.newInstance(vertx);
Single<Message<Integer>> reply = rxVertx.eventBus().<Integer>rxSend("address", "param");
reply.subscribe(r -> {
   // Do something with value
});

Another Verticle consumes the message:

vertx.eventBus().<Integer>consumer("address", h -> {
    Integer integer = ... // call to getValue() method
    h.reply(integer);
});

The value to be returned comes from a mySql database using SQLClient. Currently the code to retrieve the value is in the following method:

private Single<Integer> getValue() {
    return Single.create(source -> {
        mySQLClient.getConnection(res -> {
            if (res.succeeded()) {
                SQLConnection connection = res.result();
                connection.query("SELECT count(*) from myTable", result -> {
                    if (result.succeeded()) {
                        Integer i = result.result().getRows().get(0));
                        source.onSuccess(i);
                    }
                });
            } else {
                source.onError(res.cause());
            }
        });
    });
}

What is the correct way to call the getValue() method from within the consumer?

The following:

vertx.eventBus().<Integer>consumer("address", h -> {
    Single<integer> single = getValue();
    h.reply(single.subscribe(
       s -> System.out.println(s));
});

prints out the value before h.reply returns, but how do you return the value from single.subscribe() so that it becomes a parameter to h.reply()?

Thanks

Upvotes: 1

Views: 671

Answers (1)

Pendula
Pendula

Reputation: 780

You need to subscribe result and pass result to reply. You have inverted the syntax.

getvalue().subscribe(result -> {
    h.reply(result);
}, ex -> {
    //Handle error
});

Upvotes: 1

Related Questions