TechnoCorner
TechnoCorner

Reputation: 5155

Issue with understanding RXJS Observable Output

I'm fairly new to rxjs and trying to learn.

I was reading through this medium article and trying to understand this RXJS output:

import {Observable} from 'rxjs-es';
let output = Observable.interval(500)
             .map(i => [1,2,3,4,5,6][i]);

let result = output.map(num1 => num1)
    .filter(num1 => num1 > 4)
    .reduce((num1, num2) => num1 + num2);

result.subscribe(number => console.log(number));

Output: 27

I don't understand how the output turned out to be 27 and how that reduce is working (and what are those two arguments for the reducer).

Can someone enlighten me what's happening? (I've tried to run it on codesandbox however it's throwing an error when I run this code)

Upvotes: 2

Views: 62

Answers (1)

BizzyBob
BizzyBob

Reputation: 14750

Understanding Filter and Reduce Operators

As others have mentioned in the comments, the article you refer to is using an older version of rxjs that has a different syntax. For this example I'll use the newer syntax that started in version 6.

In rxjs, there are various operators available to transform the values emitted through a stream. Typically these are imported like:

import { filter, reduce } from 'rxjs/operators';

There are also many generator functions available to create a stream of values. interval is one of these functions that will create a stream that emits sequential integers every n milliseconds. Imported like:

import { interval } from 'rxjs';

Let's create a simple stream:

number$ = interval(1000); // emit number every 1 second
// output: 0, 1, 2, 3, 4, 5...

We can apply operators to this stream to transform the emissions:

The usage of filter is pretty simple. It simply emits values that pass the given truth test (exactly like the Array.filter() method).

numbersLessThan4$ = numbers$.pipe(
    filter(number => number < 4)
);
// output: 0, 1, 2, 3

The reduce operator is a bit more complex and behaves just like the Array.reduce() method. A function is applied to each emitted value and is able store a value that can be referenced when the next emission is evaluated.

reduce takes two parameters. The first is a function that receives the current emission (cur) and the previous accumulated result (acc) and returns a new accumulated value. The second is an initial value for acc.

example:

sumOfNumbers$ = numbers$.pipe(
    reduce((acc, cur) => acc + cur, 0)
);

So, let's look what reduce does when numbers$ emits the first 3 numbers:

  • 0
    • cur receives the current emission value 0
    • acc starts with the provided default 0
    • the expression acc + cur returns 0
  • 1
    • cur receives the current emission value 1
    • acc receives the previously returned value 0
    • the expression acc + cur returns 1
  • 2
    • cur receives the current emission value 2
    • acc receives the previously returned value 1
    • the expression acc + cur returns 3

So this is cool. We can get quite a lot of logic into a simple line of code. One important thing about reduce is that it will not emit until the source observable completes. Currently, numbers$ never completes (interval() emits sequential integers indefinitely).

We can use the take() operator to complete the stream after a certain number of values are emitted.

Example:

numbers$ = interval(1000).pipe(take(5)); // completes after 5 emissions

sumOfNumbers$ = numbers$.pipe(
    // receives 5 values (0, 1, 2, 3, 4) and performs the logic described above.
    reduce((acc, cur) => acc + cur, 0) 
);
// output: 10

Multiple operators can be used to transform the emissions. Simply provide multiple inside the pipe():

sumOfNumbersLessThan4$ = numbers$.pipe(
    filter(number => number < 4),
    reduce((acc, cur) => acc + cur, 0) 
);
// output: 6

Upvotes: 1

Related Questions