paul
paul

Reputation: 13471

Spring Reactor Merge vs Concat

I´m playing with Spring reactor, and I cannot see any differences between concat and merge operator

Here's my example

    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.merge(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);
    }

    @Test
    public void concat() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.concat(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);    
}

Both behave exactly the same. Can someone explain the difference between the two operations?

Upvotes: 12

Views: 17889

Answers (4)

jitender singh
jitender singh

Reputation: 1

1st example:  Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofMillis(200));
        Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));
        Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
                stringFlux2.subscribe(System.out::println);
                Thread.sleep(4000);
output: a
        c
        b
        d
2nd example: Flux<String> stringFlux= Flux.just("a","b");
        Flux<String> stringFlux1= Flux.just("c","d");
        Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
                stringFlux2.subscribe(System.out::println);
                Thread.sleep(4000);
output: a a c
        c b a
        b c d 
        d d b
conclusion: if you dont give delay to the elements then publishers are subscribed eagerly and output would be random as in 2nd example case.
If you specify delay to the elements then in which publisher you provide less amount of time will subscribed first and emit element and then other publisher emit elements as in 1st example.
It totally depends upon time for example output for below would be :
c,d,a,b because there is big diff between 2 seconds and 300 milliseconds. But in 1st example there is less diff between milliseconds so output would be in **interleaving fashion**.
3rd example :
Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofSeconds(2));
Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));

Upvotes: 0

Manish Maheshwari
Manish Maheshwari

Reputation: 4134

Another noteworthy difference is that all variations of concat subscribe to the second stream lazily, only after the first stream has terminated.

Whereas all variations of merge subscribe to the publishers eagerly (all publishers are subscribed together)

Running the below code highlights this aspect:

//Lazy subscription of conact
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(500)),
                Flux.just(10, 20, 30, 40).delayElements(Duration.ofMillis(500)))
                .subscribe(System.out::println, System.out::println);



//Eager subscription of the merge. Also, try mergeSequential.
Flux.merge(Flux.range(500, 3).delayElements(Duration.ofMillis(500)),
                Flux.range(-500, 3).delayElements(Duration.ofMillis(300)))
                .subscribe(System.out::println, System.out::println);

Upvotes: 3

Vikram Rawat
Vikram Rawat

Reputation: 1662

The difference is already mentioned in the API docs that while concat first reads one flux completely and then appends the second flux to that, merge operator doesn't guarantee the sequence between the two flux.

In order to see the difference, modify your merge() code as below.

e.g. sample code below

//Flux with Delay
@Test
public void merge() {
    Flux<String> flux1 = Flux.just("Hello", "Vikram");

    flux1 = Flux.interval(Duration.ofMillis(3000))
    .zipWith(flux1, (i, msg) -> msg);
    
    
    Flux<String> flux2 = Flux.just("reactive");
    flux2 = Flux.interval(Duration.ofMillis(2000))
            .zipWith(flux2, (i, msg) -> msg);
    
    Flux<String> flux3 = Flux.just("world");
    Flux.merge(flux1, flux2, flux3)
            .subscribe(System.out::println);

    try {
        Thread.sleep(8000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

As you modify the Flux.interval duration, currently set as 3000 milliseconds you will see that the output with merge() keeps changing. But with concat(), the output will be always same.

Upvotes: 13

Amriteya
Amriteya

Reputation: 1172

The essential difference between merge and concat is that in merge, both streams are live. In case of concat, first stream is terminated and then the other stream is concatenated to it.

Concat enter image description here


Merge enter image description here

Upvotes: 20

Related Questions