Reputation: 337
I am trying to create buffered emissions based on a throttled emission. The following works as expected in RxJs 2:
.buffer(function() { return clickStream.throttle(250); })
But when I tried to use this in my local project it returns the following error:
Uncaught TypeError: You provided '() => clickStream.throttle(250)' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
Here is my code
import Rx from 'rxjs/Rx';
const $button = document.getElementById('button');
const $label = document.getElementById('label');
const clickStream = Rx.Observable.fromEvent($button, 'click');
const doubleClickStream = clickStream
.buffer(() => clickStream.throttle(250))
.map(arr => arr.length)
.filter(len => len === 2)
clickStream.subscribe(x=>console.log(x));
doubleClickStream
.subscribe(() => $label.textContent = 'double click')
//
doubleClickStream
.throttle(1000)
.subscribe(() => $label.textContent = '-')
I know what my current knowledge about this library is not enough, maybe it related to the rxjs version (in working example it is 2.x and in my code above it is 5.x).
I also tried to write it as:
.buffer(clickStream.throttle(250))
And it raised another exception after clicking on the button:
Uncaught TypeError: this.durationSelector is not a function
Can you help me to understand that's wrong with it ?
Upvotes: 1
Views: 1029
Reputation: 894
That's how I made it possible in RxJs5.5
import { buffer, throttleTime, map, filter } from "rxjs/operators";
const clickStream = Observable.fromEvent(button$, 'click');
this.subscriptions.add(clickStream
.pipe(
buffer(clickStream.pipe(throttleTime(500))),
map(arr => arr.length),
filter(len => len === 4)
)
.subscribe(resp => {
console.log('clicked', resp);
}))
Upvotes: 0
Reputation: 9425
Your problem resides in the fact that RxJs5 is a complete rewrite and has a lot of breaking changes. Amongst many of those is the rename of operators or splitting of operator overloads in different operators.
Your error is raised by .throttle()
which in RxJs5 takes a function to return an Observable when the buffer should close. The overload which you are looking for has been renamed in RxJs5 to .throttleTime()
It is best not to mixup different versions of Rx for exact this reason. If you plan to start using RxJs5 look at the migration guide to give you a feel of what went where.
Upvotes: 3