Reputation: 5155
I'm fairly new to rxjs and trying to learn.
I was reading through this medium article and trying to understand this RXJS output:
import {Observable} from 'rxjs-es';
let output = Observable.interval(500)
.map(i => [1,2,3,4,5,6][i]);
let result = output.map(num1 => num1)
.filter(num1 => num1 > 4)
.reduce((num1, num2) => num1 + num2);
result.subscribe(number => console.log(number));
Output: 27
I don't understand how the output turned out to be 27 and how that reduce is working (and what are those two arguments for the reducer).
Can someone enlighten me what's happening? (I've tried to run it on codesandbox however it's throwing an error when I run this code)
Upvotes: 2
Views: 62
Reputation: 14750
Filter
and Reduce
OperatorsAs others have mentioned in the comments, the article you refer to is using an older version of rxjs that has a different syntax. For this example I'll use the newer syntax that started in version 6.
In rxjs, there are various operators available to transform the values emitted through a stream. Typically these are imported like:
import { filter, reduce } from 'rxjs/operators';
There are also many generator functions available to create a stream of values. interval
is one of these functions that will create a stream that emits sequential integers every n
milliseconds. Imported like:
import { interval } from 'rxjs';
Let's create a simple stream:
number$ = interval(1000); // emit number every 1 second
// output: 0, 1, 2, 3, 4, 5...
We can apply operators to this stream to transform the emissions:
The usage of filter
is pretty simple. It simply emits values that pass the given truth test (exactly like the Array.filter()
method).
numbersLessThan4$ = numbers$.pipe(
filter(number => number < 4)
);
// output: 0, 1, 2, 3
The reduce
operator is a bit more complex and behaves just like the Array.reduce()
method. A function is applied to each emitted value and is able store a value that can be referenced when the next emission is evaluated.
reduce
takes two parameters. The first is a function that receives the current emission (cur
) and the previous accumulated result (acc
) and returns a new accumulated value. The second is an initial value for acc
.
example:
sumOfNumbers$ = numbers$.pipe(
reduce((acc, cur) => acc + cur, 0)
);
So, let's look what reduce
does when numbers$
emits the first 3 numbers:
cur
receives the current emission value 0
acc
starts with the provided default 0
acc + cur
returns 0
cur
receives the current emission value 1
acc
receives the previously returned value 0
acc + cur
returns 1
cur
receives the current emission value 2
acc
receives the previously returned value 1
acc + cur
returns 3
So this is cool. We can get quite a lot of logic into a simple line of code. One important thing about reduce
is that it will not emit until the source observable completes. Currently, numbers$
never completes (interval()
emits sequential integers indefinitely).
We can use the take()
operator to complete the stream after a certain number of values are emitted.
Example:
numbers$ = interval(1000).pipe(take(5)); // completes after 5 emissions
sumOfNumbers$ = numbers$.pipe(
// receives 5 values (0, 1, 2, 3, 4) and performs the logic described above.
reduce((acc, cur) => acc + cur, 0)
);
// output: 10
Multiple operators can be used to transform the emissions. Simply provide multiple inside the pipe()
:
sumOfNumbersLessThan4$ = numbers$.pipe(
filter(number => number < 4),
reduce((acc, cur) => acc + cur, 0)
);
// output: 6
Upvotes: 1