Josef.B
Josef.B

Reputation: 952

How to add action function to Disposable returned from subscribe on an Observable?

You have an Observable, then you subscribe to it. The return value is a Disposable. How do you add a function to be invoked when that Disposable instance is disposed?

clickStream.subscribe(....).dispose();

When dispose() is invoked I want to call a debug function, for example, console.log('Disposing ....'). In a real app, I would want to do some non-rxjs clean-up and UI notification actions. Probably simple thing, but I don't see it in the API. Thx.

Upvotes: 3

Views: 2068

Answers (2)

user3743222
user3743222

Reputation: 18665

I wonder if finally would work for your use case (documentation). According to the documentation, Invokes a specified action after the source observable sequence terminates gracefully or exceptionally. Note that this is not exactly the same as a callback being called when dispose is called but I thought that might be enough for your needs, and you can use it for clean-up actions.

UPDATE

Also close to what you want, but not quite so, you could use the using operator (documentation). It allows to create an object with a dispose method which would be called when the subscription to the associated observable is disposed of.

UPDATE 2

Looking at the source code, finally actually executes the action when the subscription to its observable is disposed. This includes termination or manually calling dispose() on the subscription. That seems very close to what you want to achieve.

Best is to test. Please keep us updated of the results.

Upvotes: 2

Peter Hall
Peter Hall

Reputation: 58795

Disposables are tied to event sources. The principle is that an event source is interacting with the non-Rx world, and might need to do some clean-up when no one is subscribed any more.

If you could put a hook into the Disposable that is returned from a call to subscribe then it wouldn't be of much use: you know when you called dispose() so you could just handle the clean-up logic there, and if some other code called dispose() it might be on a Disposable further down the chain, which may not affect your subscription.

It sounds like you are trying to make use of side-effects, which isn't necessarily in the spirit of the RxJS API. If you need to close down some network connection or something like that, then maybe you should be using a custom Observable, which creates a Disposable to clean itself up.

A simple example of this (simplified implementation of Observable.fromEvent) might look like this:

function fromEvent(obj, evt){
     var subject = new Rx.Subject();
     function listener(e){
        subject.onNext( e );
     }
     return Rx.Observable.create( function( observer ){
        var disp = subject.subscribe( observer );
          obj.addEventListener( evt, listener );
          return Rx.Disposable.create(function(){
             // All the clean-up code goes here
             obj.removeEventListener( evt, listener );
             disp.dispose();
          })
     });     
}


var events$ = fromEvent( button, 'click');
var count = 0;
var unsub = events$.subscribe( function(){
    console.log('click');
    count++;
    if( count > 5){
        unsub.dispose();
    }
})

Upvotes: 2

Related Questions