Reputation: 141
I am in need of clarification about the efficiency of operator pipelines in RxJS.
My current knowledge about the operator pipeline in RxJS is that each operator inside the pipeline receives an observable and creates a new (possibly modified) observable which gets returned to the next operator as input. This behaviour would be similiar to the JavaScript filter, map, reduce behaviour. Thus leaving the original observable (stream) or array untouched/pure.
This assumption is supported by the RxJS Documentation at: https://rxjs-dev.firebaseapp.com/guide/operators
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified.
Considering long operator pipelines the creation of intermediate observables seems kind of expensive to me.
In addition I am reading the book 'Reactive Programming with RxJS 5' written by Sergi Mansilla. I know that RxJS is currently at version 6.5.3 but I expect that the basic mechanism did not change since then.
In the book there is a section about the efficiency of pipeline which says that observable pipelines do not create intermediate Observables. Instead they apply all operations to each element in one go. Which makes sense since the operator take(amount) completes the observable stream after taking the first amount elements. It also explains the lazy evaluation traversing the source observable stream only once at max or until the take condition is met.
import * as rxCore from 'https://dev.jspm.io/rxjs@6/_esm2015/index';
import * as rxOps from 'https://dev.jspm.io/rxjs@6/_esm2015/operators';
const numberStream = rxCore.range(0, 10);
numberStream.pipe(
rxOps.map(number => number * number), //creates Observable with [0,1,4,9,16,25,36,49,64,81]
rxOps.filter(number => number % 2 === 0), //creates Observable with [0,4,16,36,64]
rxOps.take(3) //completes after [0,4,16]
).subscribe(console.log); //logs 0, 4, 16
Are there any intermediate observables being created inside this operator pipeline? Or is only the complete pipeline creating one new observable leaving the numberStream untouched? Or what exactly is the case?
Upvotes: 3
Views: 242
Reputation: 2210
Almost every RxJS operator creates intermediate Observable. Actually, I have not found an example of an operator that doesn't do that, so I may presume that all of them do.
Example: map()
operator internally calls lift
method. What lift
does is that it creates new Observable and returns it, by declaring current Observable as the source
Observable.
This means, that in your example range
creates one Observable, while map
, filter
and take
create three new Observables, each one of them having source
Observable from the one created before the current. So, source
Observable for map
becomes the one created by range
and so on.
So, when you subscribe to any Observable, it tries to call the source
Observable operator. In case of map
, it calls it's call
method which subscribes to the source Observable
. In your case, it would subscribe to the one created by range
. This way, calling subscribe
to the source goes all the way up to the first Observable (which doesn't have source
) meaning that no operator is applied.
Considering long operator pipelines the creation of intermediate observables seems kind of expensive to me.
Don't worry about this. Intermediate Observables are just JavaScript objects. They are lightweight and you can't easily create large stacks of these objects that would cause performance issues. Other than that, Observables are lazy, they are not doing anything unless an event happens.
In the book there is a section about the efficiency of pipeline which says that observable pipelines do not create intermediate Observables. Instead they apply all operations to each element in one go.
Well, in my opinion, this is half-true. Yes, they apply operators to each element, but not by having single Observable, but by creating multiple ones. Maybe this was the case before RxJS 5, but not with today's versions.
Upvotes: 0
Reputation: 11345
By looking into the way how you create your custom operator to pass in pipe()
source=>source.pipe(...)
what it expect is a function that returns an observable by optionally modify/appending/apply more operations on the source observable. I suppose that's not immediate observable as at the end of the pipe you will only have one source passed through.
Upvotes: 0