Ashwin Balamohan
Ashwin Balamohan

Reputation: 3332

Stream from RxJS promise

I'm attempting to run a reduce on a stream that I'm creating from a promise.

In the example below, myApi.getItemsAsync() returns an array.

I'd expect the reduce callback to be called with each individual item in the array. Instead, it's being called with the entire array.

Rx.Observable.fromPromise(
  myApi.getItemsAsync()
)
.reduce(function (acc, item) {
  // expecting `item` to be a single item
  // instead, seeing the entire array
}, [])
.subscribe(function (result) { 
  console.log(result);
});

If myApi.getItemsAsync() was a synchronous function which returns an array, the reduce is working as expected, calling the callback with each item in the array.

How would I get this to work with promises?

Upvotes: 1

Views: 1051

Answers (1)

Bergi
Bergi

Reputation: 665584

reduce is working on the whole stream, not on the array that is emitted as one part of it.

If you were looking to use the array method, then it should be

Rx.Observable.fromPromise(myApi.getItemsAsync())
.map(function(array) {
  return array.reduce(function (acc, item) {
    return …;
  }, []);
})
…

If you were looking to use the stream method, then you'll need to inject the array items in the stream first. I guess in your synchronous version you had used fromArray. Together with a promise, you'd do

Rx.Observable.fromPromise(myApi.getItemsAsync())
.flatMap(function(array) {
  return Rx.Observable.fromArray(array);
}).reduce(function (acc, item) {
  return …;
}, [])
…

Upvotes: 2

Related Questions