st-h
st-h

Reputation: 2553

RxJava: process in parallel and combine results (without messing up the order)

I am pretty new to working with reactive streams and have come across the following problem, which is giving me a hard time to solve. Objective is to get a number of documents from a MongoDB database. For each document obtain metadata from the db and fetch a file from the db (not yet in example code). Then we need to upload all that data to s3 (combining all three items). However, I am stuck with combining different publishers without messing up the order of the elements.

Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();

Observable<GridFSFile> gridFS = version
        .map(extractID())
        .flatMap(loadGridFSFile()).toObservable();

Observable c = version.toObservable()
        .zipWith(gridFS, (Document v, GridFSFile f) -> {

            // if I check here if both messages belong together, the order sometimes is messed up  
            return v;
        });
version.connect();

So, basically I am trying to publish the events to two different paths, one path gets the metadata from GridFS and then I try to combine both path again (so I can access the initial document together with the metadata). However, I noticed that sometimes the events get zipped in a different order (probably as the queries to db take a different amount of time sometimes).

The path of execution for every event should be like this

         v
         |
  /      |      \
v    query db   query db
  \      |      /
   upload aggregate
   of all 3 elements

Essentially the problem is that with my approach, I end up with results from an earlier or later query for a different element v. I probably somehow need to make sure that the path of execution happens synchronised between all 3 path for one input element at a time, but I don't know how.

EDIT

I finally found an approach that seems to do what is needed. However, it feels a little weird that it seems to be that complicated to process things in parallel and ensure they stay in sync

Publisher<Document> p = versionCollection.find();

Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {

    ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();

    Observable o = connectableObservable
        .map(extractAudioID())
        .flatMap(loadGridFSFile(audioBucket));

    Observable o3 = connectableObservable.zipWith(o, (Document a, GridFSFile f) -> {
            // now everything seems to stay in order here
            // and we can combine both results
    });
    o3.subscribe();
    o.subscribe();

    Disposable a = connectableObservable.connect();
    return connectableObservable;
}, 1).blockingSubscribe();


static Function<ObjectId, ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
    return id -> Observable.fromPublisher(audioBucket.find(new Document("_id", id)).first());
}

Upvotes: 2

Views: 1514

Answers (1)

st-h
st-h

Reputation: 2553

A few things that seem to clear the issue:

And now that piece of code looks a lot more reasonable:

ConnectableFlowable<Document> version = Flowable.fromPublisher(p).replay();

Flowable<GridFSFile> file = version
        .map(extractID())
        .concatMap(loadGridFSFile(audioBucket));

Flowable<GridFSDownloadStream> data = version
        .map(extractID())
        .map(loadGridFSData(audioBucket));

Flowable c = Flowable.zip(version, file, data, (v, f, d) -> {
    // so far everything seems to stay in order
    return v;
});

version.connect();

c.subscribe();

Upvotes: 1

Related Questions