David Moles
David Moles

Reputation: 51113

Parallelize map() operation on single Observable and receive results out of order

Given an Observable<Input> and a mapping function Function<Input, Output> that is expensive but takes variable time, is there a way to call the mapping function in parallel on multiple inputs, and receive the outputs in the order they're produced?

I've tried using observeOn() with a multi-threaded Scheduler:

PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...

// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();

// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
    .observeOn(SchedulersFrom(exec))
    .map(mf)
    .publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();

inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input

but in testing, mf.apply(fastInput) is never called till after mf.apply(slowInput) completes.

If I play some tricks in my test with CountDownLatch to ensure mf.apply(slowInput) can't complete until after mf.apply(fastInput), the program deadlocks.

Is there some simple operator I should be using here, or is getting Observables out of order just against the grain of RxJava, and I should be using a different technology?


ETA: I looked at using ParallelFlowable (converting it back to a plain Flowable with .sequential() before subscribing myObserver1/2, or rather mySubscriber1/2), but then I get extra mf.apply() calls, one per input per Subscriber. There's ConnectableFlowable, but I'm not having much luck figuring out how to mix it with .parallel().

Upvotes: 1

Views: 274

Answers (2)

MinseongPark
MinseongPark

Reputation: 342

I guess observeOn operator does not support concurrent execution for alone. So, how about using flatMap? Assume the mf function needs a lot time.

    ConnectableObservable<Output> outputs = inputs
        .flatMap(it -> Observable.just(it)
            .observeOn(SchedulersFrom(exec))
            .map(mf))
        .publish();

or

    ConnectableObservable<Output> outputs = inputs
        .flatMap(it -> Observable.just(it)
            .map(mf))
            .subscribeOn(SchedulersFrom(exec))
        .publish();

Edit 2019-12-30

If you want to run tasks concurrently, but supposed to keep the order, use concatMapEager operator instead of flatMap.

    ConnectableObservable<Output> outputs = inputs
        .concatMapEager(it -> Observable.just(it) // here
            .observeOn(SchedulersFrom(exec))
            .map(mf))
        .publish();

Upvotes: 1

Prashant Pandey
Prashant Pandey

Reputation: 4642

Doesn't sound possible to me, unless Rx has some very specialised operator to do so. If you're using flatMap to do the mapping, then the elements will arrive out-of-order. Or you could use concatMap but then you'll lose the parallel mapping that you want.

Edit: As mentioned by another poster, concatMapEager should work for this. Parallel subscription and in-order results.

Upvotes: 1

Related Questions