Reputation: 13471
I´m playing with Spring reactor, and I cannot see any differences between concat
and merge
operator
Here's my example
@Test
public void merge() {
Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux3 = Flux.just("world");
Flux.merge(flux1, flux2, flux3)
.map(String::toUpperCase)
.subscribe(System.out::println);
}
@Test
public void concat() {
Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux3 = Flux.just("world");
Flux.concat(flux1, flux2, flux3)
.map(String::toUpperCase)
.subscribe(System.out::println);
}
Both behave exactly the same. Can someone explain the difference between the two operations?
Upvotes: 12
Views: 17889
Reputation: 1
1st example: Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofMillis(200));
Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));
Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
stringFlux2.subscribe(System.out::println);
Thread.sleep(4000);
output: a
c
b
d
2nd example: Flux<String> stringFlux= Flux.just("a","b");
Flux<String> stringFlux1= Flux.just("c","d");
Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
stringFlux2.subscribe(System.out::println);
Thread.sleep(4000);
output: a a c
c b a
b c d
d d b
conclusion: if you dont give delay to the elements then publishers are subscribed eagerly and output would be random as in 2nd example case.
If you specify delay to the elements then in which publisher you provide less amount of time will subscribed first and emit element and then other publisher emit elements as in 1st example.
It totally depends upon time for example output for below would be :
c,d,a,b because there is big diff between 2 seconds and 300 milliseconds. But in 1st example there is less diff between milliseconds so output would be in **interleaving fashion**.
3rd example :
Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofSeconds(2));
Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));
Upvotes: 0
Reputation: 4134
Another noteworthy difference is that all variations of concat subscribe to the second stream lazily, only after the first stream has terminated.
Whereas all variations of merge subscribe to the publishers eagerly (all publishers are subscribed together)
Running the below code highlights this aspect:
//Lazy subscription of conact
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(500)),
Flux.just(10, 20, 30, 40).delayElements(Duration.ofMillis(500)))
.subscribe(System.out::println, System.out::println);
//Eager subscription of the merge. Also, try mergeSequential.
Flux.merge(Flux.range(500, 3).delayElements(Duration.ofMillis(500)),
Flux.range(-500, 3).delayElements(Duration.ofMillis(300)))
.subscribe(System.out::println, System.out::println);
Upvotes: 3
Reputation: 1662
The difference is already mentioned in the API docs that while concat first reads one flux completely and then appends the second flux to that, merge operator doesn't guarantee the sequence between the two flux.
In order to see the difference, modify your merge() code as below.
e.g. sample code below
//Flux with Delay
@Test
public void merge() {
Flux<String> flux1 = Flux.just("Hello", "Vikram");
flux1 = Flux.interval(Duration.ofMillis(3000))
.zipWith(flux1, (i, msg) -> msg);
Flux<String> flux2 = Flux.just("reactive");
flux2 = Flux.interval(Duration.ofMillis(2000))
.zipWith(flux2, (i, msg) -> msg);
Flux<String> flux3 = Flux.just("world");
Flux.merge(flux1, flux2, flux3)
.subscribe(System.out::println);
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
As you modify the Flux.interval duration, currently set as 3000 milliseconds you will see that the output with merge() keeps changing. But with concat(), the output will be always same.
Upvotes: 13