Reputation: 593
With the following code:
flux.subscribe(consumer)
the calls to consumer
may take place on different threads, depending on how flux
was constructed (e.g., with the use of subscribeOn
or publishOn
). Is there a guarantee that, even though calls to consumer
may take place on different threads, the calls are sequential, i.e., each call completes before the next one starts?
A more specific example below (using Reactor-Kafka):
val resultFlux: Flux<Pair<TopicPartition, Long>> = KafkaReceiver
.create<K, V>(receiverOptions)
.receive()
.groupBy { m -> m.receiverOffset().topicPartition() }
.flatMap { partitionFlux ->
val parallelRoFlux = partitionFlux
.publishOn(scheduler)
.flatMapSequential(::processRecord, parallelism)
parallelRoFlux.map { ro ->
acknowledge(ro)
Pair(ro.topicPartition(), ro.offset())
}
}
resultFlux.doOnNext { Thread.sleep(2000); log.info("doOnNext: $it") }
.subscribe { Thread.sleep(1000); log.info("subscribe: $it") }
produces the following output snippet:
13:44:26.401 [elastic-6] INFO consumerSvcFlow - Message_5>>>processed
13:44:28.402 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 15)
13:44:29.402 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 15)
13:44:29.435 [elastic-8] INFO consumerSvcFlow - Message_8>>>processed
13:44:31.435 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 16)
13:44:32.436 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 16)
13:44:32.461 [elastic-6] INFO consumerSvcFlow - Message_9>>>processed
13:44:34.462 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 17)
13:44:35.462 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 17)
13:44:35.494 [elastic-8] INFO consumerSvcFlow - Message_15>>>processed
13:44:37.494 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 18)
13:44:38.495 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 18)
13:44:38.497 [elastic-6] INFO consumerSvcFlow - Message_18>>>processed
13:44:40.498 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 19)
13:44:41.499 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 19)
13:44:41.539 [elastic-8] INFO consumerSvcFlow - Message_19>>>processed
13:44:43.540 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 20)
13:44:44.540 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 20)
The calls to the subscribe
consumer argument are sequential but some calls are on thread [elastic-6] and some are on thread [elastic-8].
Upvotes: 0
Views: 608
Reputation: 28301
Yes there is such a guarantee, per the Reactive Streams specification.
First, the calls might happen on a different thread than the one from which you called subscribe()
. But all consumer calls happen on the same thread.
Second, value consumer in the subscribe(Consumer<T>)
method is actually considered an onNext
signal in a Subscriber
, so the spec enforces that such calls are serialized with respect to one another and to onComplete
and onError
signals.
Edit: now that you've added some snippet, the fact that you have 2 threads in there comes from the publishOn
done inside flatMap
. Each group of the groupBy
could therefore pick a different Worker
of the Scheduler
(if it has many). The processing done in these inner sequences can therefore be executed in parallel. The result however, when merged by flatMap
, are serialized => the subscribe(Consumer)
are sequential.
Upvotes: 1