Reputation: 9937
I'm using a library in a Vert.x application which returns Project Reactor type Mono.
I have a verticle which receives this reactive type and is intended to send the content through the event bus to another verticle:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
@Override
public void start() throws Exception
{
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
Is this right approach? Should I switch to Vert.x event loop thread pool before sending the message to the event bus? Is there anything I should be aware of when using these libraries together?
Upvotes: 2
Views: 2404
Reputation: 9128
The code looks good to me, except you shouldn't use the Netty event loop group as executor but rather the verticle context:
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
private Scheduler scheduler;
@Override
public void start() throws Exception
{
scheduler = Schedulers.fromExecutor(command -> context.runOnContext(v -> command.run()));
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(scheduler)
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
With such a scheduler, you get the insurance that the verticle state will not be modified by a thread other than the event loop it's been assigned.
Upvotes: 6