user1009908
user1009908

Reputation: 1398

rx: unfold array to multiple streams

I have a stream holding an array, each element of which has an id. I need to split this into a stream per id, which will complete when the source stream no longer carries the id.

E.g. input stream sequence with these three values

[{a:1}, {b:1}]    [{a:2}, {b:2}, {c:1}]     [{b:3}, {c:2}]

should return three streams

a -> 1 2 |
b -> 1 2 3
c ->   1 2

Where a has completed on the 3rd value, since its id is gone, and c has been created on the 2nd value, since its id has appeared.

I'm trying groupByUntil, a bit like

 var input = foo.share();              
 var output = input.selectMany(function (s) {
                        return rx.Observable.fromArray(s);
                }).groupByUntil(
                        function (s) { return s.keys()[0]; },
                        null,
                        function (g) { return input.filter(
                                function (s) { return !findkey(s, g.key); }
                        ); }
                )

So, group by the id, and dispose of the group when the input stream no longer has the id. This seems to work, but the two uses of input look odd to me, like there could a weird order dependency when using a single stream to control the input of the groupByUntil, and the disposal of the groups.

Is there a better way?

update

There is, indeed, a weird timing problem here. fromArray by default uses the currentThread scheduler, which will result in events from that array being interleaved with events from input. The dispose conditions on the group are then evaluated at the wrong time (before the groups from the previous input have been processed).

A possible workaround is to do fromArray(.., rx.Scheduler.immediate), which will keep the grouped events in sync with input.

Upvotes: 3

Views: 1435

Answers (1)

Brandon
Brandon

Reputation: 39222

yeah the only alternative I can think of is to manage the state yourself. I don't know that it is better though.

var d = Object.create(null);
var output = input
    .flatMap(function (s) {
        // end completed groups
        Object
            .keys(d)
            .filter(function (k) { return !findKey(s, k); })
            .forEach(function (k) {
                d[k].onNext(1);
                d[k].onCompleted();
                delete d[k];
            });
        return Rx.Observable.fromArray(s);
    })
    .groupByUntil(
        function (s) { return s.keys()[0]; },
        null,
        function (g) { return d[g.key] = new Rx.AsyncSubject(); });

Upvotes: 1

Related Questions