Ashish
Ashish

Reputation: 349

How to manipulate object coming from a Flux<Object> with a value coming from a method emitting Mono<Items> in non-blocking way?

I am trying to manipulate my objects received from Flux with data received from a Mono where the methods emitting the Flux of object and Mono of items are both different API calls. The problem is, I don't have control over the threads and the items received from the Mono are never assigned to my object unless I intentionally block() that thread. Kindly suggest if any non-blocking way possible for this scenario.

I have also looked into Schedulers, subscribeOn, publishOn but unable to figure out the pipeline.

public Flux<Object> test {

 method1().map(obj -> {
        if (obj.getTotalItems() > 20) {
            obj.setItems(method2(obj).block());
        }
        return obj;
  });
}

Here method1 is emitting Flux of objects received from API hit.

And method2 is emitting a list of items fetched from another API hit.

How can I make this whole flow non-blocking?

Upvotes: 1

Views: 1384

Answers (1)

Oleh Dokuka
Oleh Dokuka

Reputation: 12194

Try flatMap or concatMap

using flatMap operator you can flatten substream in non-blocking public

Flux<Object> test {

 method1().flatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

flatMap allows you to flatten several streams at a time, so in case of long-running operations, you may in more efficient process elements.

One downside of flatMap is that it does not preserve the order of elements so if you have a sequence of upstream elements like [1, 2, 3, 4] with flatMap there is a chance that the order will be changed because of asynchronous nature of substreams.

To preserve order, you can use concatMap which flatten only once streams at a time, so there are guarantees that order of flattening elements will be preserved:

Flux<Object> test {

 method1().concatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

Note

Mutation of the objects such a way is not the best idea, and I would prefer to use immutable object pattern object in reactive programming

Upvotes: 3

Related Questions