Feras
Feras

Reputation: 2164

Reactor Flux<String> one to many Flux<T>

I currently have a flux which results from some async operations and produces Flux<String>. I would like to then use those strings as params in a legacy callback API that will result in multiple events emitted per input String from the primary flux.

The following code works as expected, but I cant find a way to terminate successfully the second flux without doing seemingly hacky things like keeping counters and such. Is there a more idiomatic way to do that?

    public Flux<Account> getAccounts(UUID userId) {
        var tokens = tokenRepo.findAllActiveByAccountUUID(userId);

          return tokens.flatMap(p -> Flux.create(e -> {
            var r = new AccountsGetRequest().accessToken(p);
            c.accountsGet(r).enqueue(new Callback<>() {
                @Override
                public void onResponse(@NotNull Call<AccountsGetResponse> call,
                                       @NotNull Response<AccountsGetResponse> response) {
                    if (response.isSuccessful() && response.body() != null) {
                        (response.body()).forEach(e::next);
                        e.complete();
                    } else {
                        log.debug(response.toString());
                        e.error(new RuntimeException("getAccounts" + response.code()));
                        e.complete();
                    }
                }

                @Override
                public void onFailure(@NotNull Call<AccountsGetResponse> call, @NotNull Throwable t) {
                    e.error(t);
                    e.complete();
                }
            });
        }));
    }

Upvotes: 0

Views: 610

Answers (1)

Olivier Boiss&#233;
Olivier Boiss&#233;

Reputation: 18113

You can simply call e.complete() after the forEach

I think it would be simpler to use Flux.fromIterable(response.body()) instead of Flux.generate

Note that if c.accountsGet(r) is a blocking call, you should probably use the publishOn operator to switch to another scheduler to prevent blocking the main thread.

Upvotes: 1

Related Questions