Mike Davies
Mike Davies

Reputation: 554

Difference between Flux.subscribe(Consumer<? super T> consumer>) and Flux.doOnNext(Consumer<? super T> onNext)

Just starting to understand reactive programming with Reactor and I've come across this code snippet from a tutorial here building-a-chat-application-with-angular-and-spring-reactive-websocket

class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
    val sink = Sinks.replay<Message>(100);
    val outputMessages: Flux<Message> = sink.asFlux();
    override fun handle(session: WebSocketSession): Mono<Void> {
        println("handling WebSocketSession...")
        session.receive()
                .map { it.payloadAsText }
                .map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
                .doOnNext { println(it) }
                .subscribe(
                        { message: Message -> sink.next(message) },
                        { error: Throwable -> sink.error(error) }
                );
        return session.send(
                Mono.delay(Duration.ofMillis(100))
                        .thenMany(outputMessages.map { session.textMessage(toJson(it)) })
        )
    }
    fun toJson(message: Message): String = mapper.writeValueAsString(message)
}

I understand what it does but not why the author uses a consumer within the subscribe method vs chaining another doOnNext(consumer). ie. the lines:

                .doOnNext { println(it) }
                .subscribe(
                        { message: Message -> sink.next(message) },
                        { error: Throwable -> sink.error(error) }

From the Reactor documnetation I have read that the Flux.subscribe(Consumer <? super T> consumer):

Subscribe a Consumer to this Flux that will consume all the elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE).

For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer).

However from that I don't understand why one would choose one over the other, to me they seem functionally identical.

Upvotes: 5

Views: 5117

Answers (1)

Michael Berry
Michael Berry

Reputation: 72294

The difference is much more conventional rather than functional - the difference being side-effects vs a final consumer.

The doOnXXX series of methods are meant for user-designed side-effects as the reactive chain executes - logging being the most normal of these, but you may also have metrics, analytics, etc. that require a view into each element as it passes through. The key with all of these is that it doesn't make much sense to have any of these as a final consumer (such as the println() in your above example.)

On the contrary, the subscribe() consumers are meant to be a "final consumer", and usually called by your framework (such as Webflux) rather than by user code - so this case is a bit of an exception to that rule. In this case they're actively passing the messages in this reactive chain to another sink for further processing - so it doesn't make much sense to have this as a "side-effect" style method, as you wouldn't want the Flux to continue beyond this point.

(Addendum: As said above, the normal approach with reactor / Webflux is to let Webflux handle the subscription, which isn't what's happening here. I haven't looked in detail to see if there's a more sensible way to achieve this without a user subscription, but in my experience there usually is, and calling subscribe manually is usually a bit of a code smell as a result. You should certainly avoid it in your own code wherever you can.)

Upvotes: 10

Related Questions