yiooxir
yiooxir

Reputation: 337

rxjs buffer doesn't work with throttle as expected for me

I am trying to create buffered emissions based on a throttled emission. The following works as expected in RxJs 2:

.buffer(function() { return clickStream.throttle(250); })

full jsfiddle example here

But when I tried to use this in my local project it returns the following error:

Uncaught TypeError: You provided '() => clickStream.throttle(250)' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

Here is my code

import Rx from 'rxjs/Rx';

const $button = document.getElementById('button');
const $label = document.getElementById('label');

const clickStream = Rx.Observable.fromEvent($button, 'click');

const doubleClickStream = clickStream
  .buffer(() => clickStream.throttle(250))
  .map(arr => arr.length)
  .filter(len => len === 2)

clickStream.subscribe(x=>console.log(x));

doubleClickStream
  .subscribe(() => $label.textContent = 'double click')
//
doubleClickStream
  .throttle(1000)
  .subscribe(() => $label.textContent = '-')

I know what my current knowledge about this library is not enough, maybe it related to the rxjs version (in working example it is 2.x and in my code above it is 5.x).

I also tried to write it as:

.buffer(clickStream.throttle(250))

And it raised another exception after clicking on the button:

Uncaught TypeError: this.durationSelector is not a function

Can you help me to understand that's wrong with it ?

Upvotes: 1

Views: 1029

Answers (2)

Samiullah Khan
Samiullah Khan

Reputation: 894

That's how I made it possible in RxJs5.5

import { buffer, throttleTime, map, filter } from "rxjs/operators";

const clickStream = Observable.fromEvent(button$, 'click');
this.subscriptions.add(clickStream
  .pipe(
      buffer(clickStream.pipe(throttleTime(500))),
      map(arr => arr.length),
      filter(len => len === 4)
    )
  .subscribe(resp => {
    console.log('clicked', resp);
}))

Upvotes: 0

Mark van Straten
Mark van Straten

Reputation: 9425

Your problem resides in the fact that RxJs5 is a complete rewrite and has a lot of breaking changes. Amongst many of those is the rename of operators or splitting of operator overloads in different operators.

Your error is raised by .throttle() which in RxJs5 takes a function to return an Observable when the buffer should close. The overload which you are looking for has been renamed in RxJs5 to .throttleTime()

It is best not to mixup different versions of Rx for exact this reason. If you plan to start using RxJs5 look at the migration guide to give you a feel of what went where.

Upvotes: 3

Related Questions