Reputation: 876
So I want to use Flux to run each Mono task and wait each task for 2 seconds. Here's what I have tried
val test = { x: Int ->
Mono.just(x)
.log()
.doOnNext { println("Number: $it") }
}
Flux.fromIterable(listOf(1, 2, 3))
.flatMap { number ->
test(number)
.delayElement(Duration.ofSeconds(2))
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
Here's the result
01:58:21.398 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
01:58:21.470 [Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.472 [Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
01:58:21.473 [Test worker] INFO reactor.Mono.Just.1 - | onNext(1)
Number: 1
01:58:21.477 [Test worker] INFO reactor.Mono.Just.1 - | onComplete()
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | request(unbounded)
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onNext(2)
Number: 2
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onComplete()
01:58:21.478 [Test worker] INFO reactor.Mono.Just.3 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | request(unbounded)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onNext(3)
Number: 3
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onComplete()
As you can see on the timestamp delayElement doesn't seem to work even though it runs on the same thread (Test worker). What I did wrong here?
Edited: I don't really understand how it works but instead of adding delayElement inside of flatmap I add delayElements on top of it and it works fine.
Flux.fromIterable(listOf(1, 2, 3))
.delayElements(Duration.ofSeconds(2))
.flatMap { number ->
test(number)
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
the result
03:21:29.825 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
03:21:29.926 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
03:21:29.929 [Test worker] INFO reactor.Flux.Array.1 - | request(32)
03:21:29.930 [Test worker] INFO reactor.Flux.Array.1 - | onNext(1)
03:21:29.935 [Test worker] INFO reactor.Flux.Array.1 - | onNext(2)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onNext(3)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
Thread[parallel-1,5,main] What is this 1 31
Thread[parallel-2,5,main] What is this 2 33
Thread[parallel-3,5,main] What is this 3 35
Upvotes: 0
Views: 840
Reputation: 1702
The problem is that in the first one you are doing the delay in flatmap, Knowing that flatmap is with prefatch and concurrency 256 and 32 this means that the code in flatmap will run in paralel and all items will be delayed 2 seconds but will be executed (subscribed) in the same time if you want to have it inside you need to use concatMap with prefatch 1
Flux.fromIterable(listOf(1, 2, 3))
.concatmap ( number ->
test(number)
.delayElement(Duration.ofSeconds(2))
,1)
.collectList()
.block()
This will make the code inside wait till the preview finish requesting items 1 by 1
But more clean solution is to delay it after
Upvotes: 1