pvillela
pvillela

Reputation: 593

Are calls to subscribe's consumer argument sequential in Project Reactor?

With the following code:

flux.subscribe(consumer)

the calls to consumer may take place on different threads, depending on how flux was constructed (e.g., with the use of subscribeOn or publishOn). Is there a guarantee that, even though calls to consumer may take place on different threads, the calls are sequential, i.e., each call completes before the next one starts?

A more specific example below (using Reactor-Kafka):

val resultFlux: Flux<Pair<TopicPartition, Long>> = KafkaReceiver
    .create<K, V>(receiverOptions)
    .receive()
    .groupBy { m -> m.receiverOffset().topicPartition() }
    .flatMap { partitionFlux ->
        val parallelRoFlux = partitionFlux
                .publishOn(scheduler)
                .flatMapSequential(::processRecord, parallelism)
        parallelRoFlux.map { ro ->
            acknowledge(ro)
            Pair(ro.topicPartition(), ro.offset())
        }
    }

resultFlux.doOnNext { Thread.sleep(2000); log.info("doOnNext: $it") }
        .subscribe { Thread.sleep(1000); log.info("subscribe: $it") }

produces the following output snippet:

13:44:26.401 [elastic-6] INFO  consumerSvcFlow - Message_5>>>processed
13:44:28.402 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 15)
13:44:29.402 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 15)
13:44:29.435 [elastic-8] INFO  consumerSvcFlow - Message_8>>>processed
13:44:31.435 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 16)
13:44:32.436 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 16)
13:44:32.461 [elastic-6] INFO  consumerSvcFlow - Message_9>>>processed
13:44:34.462 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 17)
13:44:35.462 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 17)
13:44:35.494 [elastic-8] INFO  consumerSvcFlow - Message_15>>>processed
13:44:37.494 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 18)
13:44:38.495 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 18)
13:44:38.497 [elastic-6] INFO  consumerSvcFlow - Message_18>>>processed
13:44:40.498 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 19)
13:44:41.499 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 19)
13:44:41.539 [elastic-8] INFO  consumerSvcFlow - Message_19>>>processed
13:44:43.540 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 20)
13:44:44.540 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 20)

The calls to the subscribe consumer argument are sequential but some calls are on thread [elastic-6] and some are on thread [elastic-8].

Upvotes: 0

Views: 608

Answers (1)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

Yes there is such a guarantee, per the Reactive Streams specification.

First, the calls might happen on a different thread than the one from which you called subscribe(). But all consumer calls happen on the same thread.

Second, value consumer in the subscribe(Consumer<T>) method is actually considered an onNext signal in a Subscriber, so the spec enforces that such calls are serialized with respect to one another and to onComplete and onError signals.

Edit: now that you've added some snippet, the fact that you have 2 threads in there comes from the publishOn done inside flatMap. Each group of the groupBy could therefore pick a different Worker of the Scheduler (if it has many). The processing done in these inner sequences can therefore be executed in parallel. The result however, when merged by flatMap, are serialized => the subscribe(Consumer) are sequential.

Upvotes: 1

Related Questions