Reputation: 355
As an example, I have two observable sequences, "data1" and "data2" that I'm looking to merge together into one observable sequence, while preserving the initial order.
When the two observable sequences have no asynchronous work—here modeled with a small delay—this is easily doable with Rx.Observable.merge
. However, with this approach, any asynchronous work destroys the order.
Is it possible to merge observable sequences, passing along values if their known and waiting for values not yet known, using the builtin operators? If not, what type of operator should I build?
'use strict';
var Rx = require('rx');
var EventEmitter = require('events').EventEmitter;
var eventEmitter = new EventEmitter();
var end = Rx.Observable.fromEvent(eventEmitter, 'end');
var data1 = Rx.Observable.fromEvent(eventEmitter, 'data1')
.takeUntil(end);
var data2 = Rx.Observable.fromEvent(eventEmitter, 'data2')
.takeUntil(end)
.delay(1000);
Rx.Observable
.merge(data1, data2)
.reduce(function(acc, str) {
return acc + str + ';';
}, '')
.subscribe(function(data) {
console.log(data);
});
eventEmitter.emit('data1', '1');
eventEmitter.emit('data2', '2');
eventEmitter.emit('data1', '3');
eventEmitter.emit('data1', '4');
eventEmitter.emit('data2', '5');
eventEmitter.emit('end');
// expected: "1;2;3;4;5;"
// actual: "1;3;4;2;5;"
Thanks.
Upvotes: 3
Views: 3154
Reputation: 39212
One option: Merge the streams before doing the work so that you capture the order, then do the work (using concat
to maintain order)
// synchronous work...but return it as an Observable so that
// concatMap() can use it correctly
var data1Work = function (data) { return Rx.Observable.of(data); };
// asynchronous work...returns an observable with the result
var data2Work = function (data) {
// cold observable (wont start until concatMap hits it)
var work = Rx.Observable.of(data).delay(1000);
// return work;
// return hot observable (starts immediately)
hotwork = work.replay();
hotwork.connect(); // start it now
return hotwork;
// alternatively you can start your async operation and return
// a Promise that will resolve when it is complete if that
// pattern feels better to you since Promises are always hot
// and Rx knows how to consume promises
// return someFuncThatReturnsPromise(data);
};
var data1 = Rx.Observable.fromEvent(eventEmitter, "data1")
.map(function (d) { return { type: "data1", data: d }; });
var data2 = Rx.Observable.fromEvent(eventEmitter, "data2")
.map(function (d) { return { type: "data1", data: d }; });
var results = Rx.Observable
.merge(data1, data2)
.concatMap(function (d) {
var workFunc = d.type === "data1" ? data1Work : data2Work;
return workFunc(d.data);
});
Upvotes: 6