Silas
Silas

Reputation: 1150

How do I implement something like "bufferWhile" in RxJs?

I use RxJs to process data replayed from a file. Each data item contains a timereceived property. While replaying, I want to create buffers with all data items originally received within a given timespan x. In other words: I want to add all items to the current buffer while the timespan between the first buffer element received and the current element received is less than timespan x.

Example test:

it('should be able to create buffers based on time received', function() {
    // given
    let source = require('rx').Observable.from([
        {val: 1, timereceived: 10},
        {val: 2, timereceived: 20},
        {val: 3, timereceived: 100},
        {val: 4, timereceived: 110},
        {val: 5, timereceived: 120}
    ]);

    // when
    let actual = app.bufferWithTimeReceived(source, 105).toArray();

    // then
    console.log(actual);
    assert.equal(actual.length, 2); // first contains val 1-3, second val 4-5
})

If I would not replay all data from the file but just receive it in real time, I could use bufferWithTime for that and would be fine.

Update with another example

    // given
    let source = require('rxjs/Rx').Observable.from([
        {val: 1, timereceived: 10},
        {val: 2, timereceived: 20},
        {val: 3, timereceived: 100},
        {val: 4, timereceived: 110},
        {val: 5, timereceived: 120},
        {val: 6, timereceived: 9920},
        {val: 7, timereceived: 9930}
    ]);

    // when
    app.bufferWithTimeReceived(source, 30).subscribe(console.log);


    // then
    // expected output would be [val 1-2][val 3-5][val 6-7] (empty arrays in between would be ok)

Update end

Now I played around with different approaches. My last one was:

exports.bufferWithTimeReceived = (source, timespan) => {
    return Rx.Observable.defer(() => Rx.Observable.create(function (observer) {
        let currBuffer = [];
        source.subscribe(x => {
            if (currBuffer.length == 0)
                currBuffer = [x];
            else {
                if (x.timereceived-currBuffer[0].timereceived < timespan)
                    currBuffer.push(x);
                else {
                    observer.onNext(currBuffer);
                    currBuffer = [x];
                }
            }
        },
        (err)=>observer.onError(err),
        ()=>observer.onCompleted());
    }));
};

Unfortunately this only leads to oArrayObservable { source: Defer { _f: [Function] } } as an error message, which is not very helpful. I also wondered how Rx - Divide stream into segments (lists) by condition might could help me?!

Bonus question: Any hint how I could make this buffer overlapping?

Upvotes: 1

Views: 477

Answers (2)

Silas
Silas

Reputation: 1150

Thanks to martin's answer I thought again about groupBy and this seems to work for me:

exports.bufferWithTimeReceived = (source, timespan) => {
        let currTime;
        return source.groupBy(x => {
            if (!currTime)
                currTime = x.timereceived;
            if (x.timereceived-currTime > timespan)
                currTime = x.timereceived;
            return currTime;            
        })
        .map(observable => observable.toArray())
        .mergeAll()
};

But I wonder if bufferWhen could offer a more elegant solution?!

Upvotes: 0

martin
martin

Reputation: 96999

I'd do it like this but note that this is RxJS 5. However, mergeAll should be available in RxJS 4 as well (maybe uder different names).

const THRESHOLD = 105;

const data = [
  {val: 1, timereceived: 10},
  {val: 2, timereceived: 20},
  {val: 3, timereceived: 100},
  {val: 4, timereceived: 110},
  {val: 5, timereceived: 120}
];
const source = Observable.from(data)
  .groupBy(item => parseInt(item.timereceived / THRESHOLD))
  .map(observable => observable.toArray())
  .mergeAll()
  .subscribe(console.log);

The groupBy operator creates a new Observable for each key returned by parseInt(item.timereceived / THRESHOLD). Then I chain it with toArray() because I want to collect all its items before reemitting them and mergeAll() that subscribes to all Observables (in this case 2) and reemits their items which is always a single array for each source Observable.

This produces the following output:

[ { val: 1, timereceived: 10 }, { val: 2, timereceived: 20 }, { val: 3, timereceived: 100 } ]
[ { val: 4, timereceived: 110 }, { val: 5, timereceived: 120 } ]

See:

Upvotes: 1

Related Questions