Reputation: 2553
I am pretty new to working with reactive streams and have come across the following problem, which is giving me a hard time to solve. Objective is to get a number of documents from a MongoDB database. For each document obtain metadata from the db and fetch a file from the db (not yet in example code). Then we need to upload all that data to s3 (combining all three items). However, I am stuck with combining different publishers without messing up the order of the elements.
Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();
Observable<GridFSFile> gridFS = version
.map(extractID())
.flatMap(loadGridFSFile()).toObservable();
Observable c = version.toObservable()
.zipWith(gridFS, (Document v, GridFSFile f) -> {
// if I check here if both messages belong together, the order sometimes is messed up
return v;
});
version.connect();
So, basically I am trying to publish the events to two different paths, one path gets the metadata from GridFS and then I try to combine both path again (so I can access the initial document together with the metadata). However, I noticed that sometimes the events get zipped in a different order (probably as the queries to db take a different amount of time sometimes).
The path of execution for every event should be like this
v
|
/ | \
v query db query db
\ | /
upload aggregate
of all 3 elements
Essentially the problem is that with my approach, I end up with results from an earlier or later query for a different element v. I probably somehow need to make sure that the path of execution happens synchronised between all 3 path for one input element at a time, but I don't know how.
EDIT
I finally found an approach that seems to do what is needed. However, it feels a little weird that it seems to be that complicated to process things in parallel and ensure they stay in sync
Publisher<Document> p = versionCollection.find();
Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {
ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();
Observable o = connectableObservable
.map(extractAudioID())
.flatMap(loadGridFSFile(audioBucket));
Observable o3 = connectableObservable.zipWith(o, (Document a, GridFSFile f) -> {
// now everything seems to stay in order here
// and we can combine both results
});
o3.subscribe();
o.subscribe();
Disposable a = connectableObservable.connect();
return connectableObservable;
}, 1).blockingSubscribe();
static Function<ObjectId, ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
return id -> Observable.fromPublisher(audioBucket.find(new Document("_id", id)).first());
}
Upvotes: 2
Views: 1514
Reputation: 2553
A few things that seem to clear the issue:
And now that piece of code looks a lot more reasonable:
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).replay();
Flowable<GridFSFile> file = version
.map(extractID())
.concatMap(loadGridFSFile(audioBucket));
Flowable<GridFSDownloadStream> data = version
.map(extractID())
.map(loadGridFSData(audioBucket));
Flowable c = Flowable.zip(version, file, data, (v, f, d) -> {
// so far everything seems to stay in order
return v;
});
version.connect();
c.subscribe();
Upvotes: 1