Reputation: 5900
I have a big list of strings that needs to be checked against remote API.
Observable.from(List<String> strings) // let's say the `strings` has > 5000 items
.buffer(50) // splitting the strings into 50-sized chunks, it returns Observable<List<String>> (fast)
.flatMap((strings) -> {
// checkPhoneNumbers is a network call using Retrofit's RxJava (slow)
return mSyncApi.checkPhoneNumbers(strings);
})
.reduce( ... ) // aggregate all checking results
.subscribe( ... );
The problem is buffer()
seems to emit List<String>
too fast that all of multiple .checkPhoneNumbers()
get executed almost in the same time. What I would like to achieve is to enqueue .checkPhoneNumbers()
to better support devices with slow connection.
Throttling the emitted List<String>
by predefined time interval doesn't make sense since it will be a disadvantage for devices with lightning fast connection.
I have tried RxJava's serialize()
right after the flatMap()
but it doesn't seems make any difference (although I don't know if it's the right use of serialize
).
Any alternative approaches appreciated! Thanks.
Upvotes: 2
Views: 3018
Reputation: 1781
As @zsxwing suggested, I think the maxConcurrent
overload is what you're looking for if you're trying to limit the concurrency occurring inside flatMap
.
For example: https://gist.github.com/benjchristensen/a0350776a595fd6e3810#file-parallelexecution-java-L78
private static void flatMapBufferedExampleAsync() {
final AtomicInteger total = new AtomicInteger();
Observable.range(0, 500000000)
.doOnNext(i -> total.incrementAndGet())
.buffer(100)
.doOnNext(i -> System.out.println("emit " + i))
.flatMap(i -> {
return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(10);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}, 2 /* limit concurrency to 2 */) // <--- note argument here
.toBlocking().forEach(System.out::println);
System.out.println("total emitted: " + total.get());
}
Upvotes: 5