Reputation: 2164
I currently have a flux which results from some async operations and produces Flux<String>
. I would like to then use those strings as params in a legacy callback API that will result in multiple events emitted per input String from the primary flux.
The following code works as expected, but I cant find a way to terminate successfully the second flux without doing seemingly hacky things like keeping counters and such. Is there a more idiomatic way to do that?
public Flux<Account> getAccounts(UUID userId) {
var tokens = tokenRepo.findAllActiveByAccountUUID(userId);
return tokens.flatMap(p -> Flux.create(e -> {
var r = new AccountsGetRequest().accessToken(p);
c.accountsGet(r).enqueue(new Callback<>() {
@Override
public void onResponse(@NotNull Call<AccountsGetResponse> call,
@NotNull Response<AccountsGetResponse> response) {
if (response.isSuccessful() && response.body() != null) {
(response.body()).forEach(e::next);
e.complete();
} else {
log.debug(response.toString());
e.error(new RuntimeException("getAccounts" + response.code()));
e.complete();
}
}
@Override
public void onFailure(@NotNull Call<AccountsGetResponse> call, @NotNull Throwable t) {
e.error(t);
e.complete();
}
});
}));
}
Upvotes: 0
Views: 610
Reputation: 18113
You can simply call e.complete()
after the forEach
I think it would be simpler to use Flux.fromIterable(response.body())
instead of Flux.generate
Note that if c.accountsGet(r)
is a blocking call, you should probably use the publishOn
operator to switch to another scheduler to prevent blocking the main thread.
Upvotes: 1