Reputation: 1398
I have a stream holding an array, each element of which has an id. I need to split this into a stream per id, which will complete when the source stream no longer carries the id.
E.g. input stream sequence with these three values
[{a:1}, {b:1}] [{a:2}, {b:2}, {c:1}] [{b:3}, {c:2}]
should return three streams
a -> 1 2 |
b -> 1 2 3
c -> 1 2
Where a has completed on the 3rd value, since its id is gone, and c has been created on the 2nd value, since its id has appeared.
I'm trying groupByUntil, a bit like
var input = foo.share();
var output = input.selectMany(function (s) {
return rx.Observable.fromArray(s);
}).groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return input.filter(
function (s) { return !findkey(s, g.key); }
); }
)
So, group by the id, and dispose of the group when the input stream no longer has the id. This seems to work, but the two uses of input look odd to me, like there could a weird order dependency when using a single stream to control the input of the groupByUntil, and the disposal of the groups.
Is there a better way?
update
There is, indeed, a weird timing problem here. fromArray by default uses the currentThread scheduler, which will result in events from that array being interleaved with events from input. The dispose conditions on the group are then evaluated at the wrong time (before the groups from the previous input have been processed).
A possible workaround is to do fromArray(.., rx.Scheduler.immediate), which will keep the grouped events in sync with input.
Upvotes: 3
Views: 1435
Reputation: 39222
yeah the only alternative I can think of is to manage the state yourself. I don't know that it is better though.
var d = Object.create(null);
var output = input
.flatMap(function (s) {
// end completed groups
Object
.keys(d)
.filter(function (k) { return !findKey(s, k); })
.forEach(function (k) {
d[k].onNext(1);
d[k].onCompleted();
delete d[k];
});
return Rx.Observable.fromArray(s);
})
.groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return d[g.key] = new Rx.AsyncSubject(); });
Upvotes: 1