Ogod
Ogod

Reputation: 948

Flux.zip with repeated empty Mono leads to endless Flux instead of empty Flux

What is the reason that the following code produces an endless Flux instead of an empty Flux? If I replace .repeat() on the empty Mono with the code in the comments it works as expected (a Flux which does not emit any value).

void endlessFluxZipBecauseOfEmptyMonoRepeat() {
  Flux.zip(
    Flux.just("A", "B", "C", "D", "E"),
    Mono.empty().repeat() // .flatMapMany(value -> Mono.just(value).repeat())
  ).subscribe(System.out::println);
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
}

Update 20230609:

This example with an non empty Mono will terminate as expected when the Flux completes.

void finiteFluxZipBecauseOfNonEmptyMonoRepeat() {
  Flux.zip(
    Flux.just("A", "B", "C", "D", "E"),
    Mono.just(1).repeat()
  ).subscribe(x -> System.out.println("value: " + x));
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
}

The result looks like this:

| Flux | Mono |
| -----| ---- |
| A    | 1    |
| B    | 1    |
| C    | 1    |
| D    | 1    |
| E    | 1    |

So why does the first examle runs endlessly while the second example terminates just fine? The Mono completes in both cases, the only difference is that in the non empty case it also emits a value (onNext signal).

From the doc of Flux.zip: Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources completes. From my understanding the empty Mono completes directly and I would expect that this should end the Flux.zip immediately resulting in an empty Flux. Or the Flux.zip should end when the inner Flux has emitted all it's values (A, B, C, D, E) but the resulting Flux must still be empty as there is no value to zip with in the Mono.

Upvotes: 0

Views: 423

Answers (1)

ishimwe
ishimwe

Reputation: 1216

Repeat does not emit the response, it keeps subscription active which makes flux endless. To achieve what you described, you can change your code to be like this:

void emptyFluxZip() {
  Flux.zip(
    Flux.just("A", "B", "C", "D", "E"),
    Mono.empty().flatMapMany(value -> Mono.just(value).repeat())
  ).subscribe(System.out::println);
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
}

Upvotes: 0

Related Questions